From 1864fb2106c29b64af0cb80bebd91209b67d5fd3 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 12 Feb 2014 21:47:11 +0400 Subject: Add broadcast channel for P2P messaging --- examples/MkTorrent.hs | 3 +- src/Network/BitTorrent/Exchange/Session.hs | 6 +++- src/Network/BitTorrent/Exchange/Wire.hs | 55 ++++++++++++++++++++---------- 3 files changed, 44 insertions(+), 20 deletions(-) diff --git a/examples/MkTorrent.hs b/examples/MkTorrent.hs index e9eb7f1a..87a7d45e 100644 --- a/examples/MkTorrent.hs +++ b/examples/MkTorrent.hs @@ -361,7 +361,8 @@ exchangeTorrent ih addr = do pid <- genPeerId var <- newEmptyMVar let hs = Handshake def (toCaps [ExtExtended]) ih pid - connectWire () hs addr (toCaps [ExtMetadata]) $ do + chan <- newChan + connectWire () hs addr (toCaps [ExtMetadata]) chan $ do infodict <- getMetadata liftIO $ putMVar var infodict takeMVar var diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index dfb0355b..695ac18a 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs @@ -42,6 +42,7 @@ data Session = Session , storage :: Storage , unchoked :: [PeerAddr IP] , connections :: MVar (Map (PeerAddr IP) (Connection Session)) + , broadcast :: Chan Message } newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer; @@ -51,6 +52,7 @@ newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer; newSession addr rootPath dict = do connVar <- newMVar M.empty store <- openInfoDict ReadWriteEx rootPath dict + chan <- newChan return Session { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) , infohash = idInfoHash dict @@ -59,6 +61,7 @@ newSession addr rootPath dict = do , storage = store , unchoked = [] , connections = connVar + , broadcast = chan } closeSession :: Session -> IO () @@ -72,7 +75,8 @@ insert addr ses @ Session {..} = do let caps = def let ecaps = def let hs = Handshake def caps infohash tpeerId - connectWire ses hs addr ecaps $ do + chan <- dupChan broadcast + connectWire ses hs addr ecaps chan $ do conn <- getConnection -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn exchange diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 0414ebe7..6b6abde1 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs @@ -63,6 +63,7 @@ module Network.BitTorrent.Exchange.Wire ) where import Control.Applicative +import Control.Concurrent hiding (yield) import Control.Exception import Control.Monad.Reader import Control.Monad.State @@ -85,6 +86,7 @@ import Network.Socket.ByteString as BS import Text.PrettyPrint as PP hiding (($$), (<>)) import Text.PrettyPrint.Class import Text.Show.Functions +import System.Timeout import Data.BEncode as BE import Data.Torrent @@ -92,7 +94,7 @@ import Data.Torrent.Bitfield import Data.Torrent.InfoHash import Data.Torrent.Piece import Network.BitTorrent.Core -import Network.BitTorrent.Exchange.Message +import Network.BitTorrent.Exchange.Message as Msg -- TODO handle port message? -- TODO handle limits? @@ -600,16 +602,27 @@ trackFlow side = iterM $ do -- Setup -----------------------------------------------------------------------} +-- System.Timeout.timeout multiplier +seconds :: Int +seconds = 1000000 + +sinkChan :: MonadIO m => Chan Message -> Sink Message m () +sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan) + +sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message +sourceChan interval chan = do + mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan + yield $ fromMaybe Msg.KeepAlive mmsg + -- | Normally you should use 'connectWire' or 'acceptWire'. -runWire :: Wire s () -> Socket -> Connection s -> IO () -runWire action sock conn = flip runReaderT conn $ runConnected $ +runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO () +runWire action sock chan conn = flip runReaderT conn $ runConnected $ sourceSocket sock $= conduitGet S.get $= trackFlow RemotePeer $= action $= - trackFlow ThisPeer $= - conduitPut S.put $$ - sinkSocket sock + trackFlow ThisPeer $$ + sinkChan chan -- | This function will block until a peer send new message. You can -- also use 'await'. @@ -649,8 +662,9 @@ reconnect = undefined -- -- This function can throw 'WireFailure' exception. -- -connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Wire s () -> IO () -connectWire session hs addr extCaps wire = +connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message + -> Wire s () -> IO () +connectWire session hs addr extCaps chan wire = bracket (peerSocket Stream addr) close $ \ sock -> do hs' <- initiateHandshake sock hs @@ -680,16 +694,21 @@ connectWire session hs addr extCaps wire = , _connMetadata = Nothing } - runWire wire' sock $ Connection - { connProtocol = hsProtocol hs - , connCaps = caps - , connTopic = hsInfoHash hs - , connRemotePeerId = hsPeerId hs' - , connThisPeerId = hsPeerId hs - , connOptions = def - , connState = cstate - , connSession = session - } + -- TODO make KA interval configurable + let kaInterval = defaultKeepAliveInterval + bracket + (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock) + (killThread) $ \ _ -> + runWire wire' sock chan $ Connection + { connProtocol = hsProtocol hs + , connCaps = caps + , connTopic = hsInfoHash hs + , connRemotePeerId = hsPeerId hs' + , connThisPeerId = hsPeerId hs + , connOptions = def + , connState = cstate + , connSession = session + } -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed -- socket. For peer listener loop the 'acceptSafe' should be -- cgit v1.2.3