summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-02-12 21:47:11 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-02-12 21:47:11 +0400
commit1864fb2106c29b64af0cb80bebd91209b67d5fd3 (patch)
tree428627ef2b0a447c814d713de1d9e2cb00d6c1a8
parent0cd2099b36aa0382ec7363537ac9e2a8ca5831a9 (diff)
Add broadcast channel for P2P messaging
-rw-r--r--examples/MkTorrent.hs3
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs6
-rw-r--r--src/Network/BitTorrent/Exchange/Wire.hs55
3 files changed, 44 insertions, 20 deletions
diff --git a/examples/MkTorrent.hs b/examples/MkTorrent.hs
index e9eb7f1a..87a7d45e 100644
--- a/examples/MkTorrent.hs
+++ b/examples/MkTorrent.hs
@@ -361,7 +361,8 @@ exchangeTorrent ih addr = do
361 pid <- genPeerId 361 pid <- genPeerId
362 var <- newEmptyMVar 362 var <- newEmptyMVar
363 let hs = Handshake def (toCaps [ExtExtended]) ih pid 363 let hs = Handshake def (toCaps [ExtExtended]) ih pid
364 connectWire () hs addr (toCaps [ExtMetadata]) $ do 364 chan <- newChan
365 connectWire () hs addr (toCaps [ExtMetadata]) chan $ do
365 infodict <- getMetadata 366 infodict <- getMetadata
366 liftIO $ putMVar var infodict 367 liftIO $ putMVar var infodict
367 takeMVar var 368 takeMVar var
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index dfb0355b..695ac18a 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -42,6 +42,7 @@ data Session = Session
42 , storage :: Storage 42 , storage :: Storage
43 , unchoked :: [PeerAddr IP] 43 , unchoked :: [PeerAddr IP]
44 , connections :: MVar (Map (PeerAddr IP) (Connection Session)) 44 , connections :: MVar (Map (PeerAddr IP) (Connection Session))
45 , broadcast :: Chan Message
45 } 46 }
46 47
47newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer; 48newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer;
@@ -51,6 +52,7 @@ newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer;
51newSession addr rootPath dict = do 52newSession addr rootPath dict = do
52 connVar <- newMVar M.empty 53 connVar <- newMVar M.empty
53 store <- openInfoDict ReadWriteEx rootPath dict 54 store <- openInfoDict ReadWriteEx rootPath dict
55 chan <- newChan
54 return Session 56 return Session
55 { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) 57 { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr)
56 , infohash = idInfoHash dict 58 , infohash = idInfoHash dict
@@ -59,6 +61,7 @@ newSession addr rootPath dict = do
59 , storage = store 61 , storage = store
60 , unchoked = [] 62 , unchoked = []
61 , connections = connVar 63 , connections = connVar
64 , broadcast = chan
62 } 65 }
63 66
64closeSession :: Session -> IO () 67closeSession :: Session -> IO ()
@@ -72,7 +75,8 @@ insert addr ses @ Session {..} = do
72 let caps = def 75 let caps = def
73 let ecaps = def 76 let ecaps = def
74 let hs = Handshake def caps infohash tpeerId 77 let hs = Handshake def caps infohash tpeerId
75 connectWire ses hs addr ecaps $ do 78 chan <- dupChan broadcast
79 connectWire ses hs addr ecaps chan $ do
76 conn <- getConnection 80 conn <- getConnection
77-- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn 81-- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn
78 exchange 82 exchange
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs
index 0414ebe7..6b6abde1 100644
--- a/src/Network/BitTorrent/Exchange/Wire.hs
+++ b/src/Network/BitTorrent/Exchange/Wire.hs
@@ -63,6 +63,7 @@ module Network.BitTorrent.Exchange.Wire
63 ) where 63 ) where
64 64
65import Control.Applicative 65import Control.Applicative
66import Control.Concurrent hiding (yield)
66import Control.Exception 67import Control.Exception
67import Control.Monad.Reader 68import Control.Monad.Reader
68import Control.Monad.State 69import Control.Monad.State
@@ -85,6 +86,7 @@ import Network.Socket.ByteString as BS
85import Text.PrettyPrint as PP hiding (($$), (<>)) 86import Text.PrettyPrint as PP hiding (($$), (<>))
86import Text.PrettyPrint.Class 87import Text.PrettyPrint.Class
87import Text.Show.Functions 88import Text.Show.Functions
89import System.Timeout
88 90
89import Data.BEncode as BE 91import Data.BEncode as BE
90import Data.Torrent 92import Data.Torrent
@@ -92,7 +94,7 @@ import Data.Torrent.Bitfield
92import Data.Torrent.InfoHash 94import Data.Torrent.InfoHash
93import Data.Torrent.Piece 95import Data.Torrent.Piece
94import Network.BitTorrent.Core 96import Network.BitTorrent.Core
95import Network.BitTorrent.Exchange.Message 97import Network.BitTorrent.Exchange.Message as Msg
96 98
97-- TODO handle port message? 99-- TODO handle port message?
98-- TODO handle limits? 100-- TODO handle limits?
@@ -600,16 +602,27 @@ trackFlow side = iterM $ do
600-- Setup 602-- Setup
601-----------------------------------------------------------------------} 603-----------------------------------------------------------------------}
602 604
605-- System.Timeout.timeout multiplier
606seconds :: Int
607seconds = 1000000
608
609sinkChan :: MonadIO m => Chan Message -> Sink Message m ()
610sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan)
611
612sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message
613sourceChan interval chan = do
614 mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan
615 yield $ fromMaybe Msg.KeepAlive mmsg
616
603-- | Normally you should use 'connectWire' or 'acceptWire'. 617-- | Normally you should use 'connectWire' or 'acceptWire'.
604runWire :: Wire s () -> Socket -> Connection s -> IO () 618runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO ()
605runWire action sock conn = flip runReaderT conn $ runConnected $ 619runWire action sock chan conn = flip runReaderT conn $ runConnected $
606 sourceSocket sock $= 620 sourceSocket sock $=
607 conduitGet S.get $= 621 conduitGet S.get $=
608 trackFlow RemotePeer $= 622 trackFlow RemotePeer $=
609 action $= 623 action $=
610 trackFlow ThisPeer $= 624 trackFlow ThisPeer $$
611 conduitPut S.put $$ 625 sinkChan chan
612 sinkSocket sock
613 626
614-- | This function will block until a peer send new message. You can 627-- | This function will block until a peer send new message. You can
615-- also use 'await'. 628-- also use 'await'.
@@ -649,8 +662,9 @@ reconnect = undefined
649-- 662--
650-- This function can throw 'WireFailure' exception. 663-- This function can throw 'WireFailure' exception.
651-- 664--
652connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Wire s () -> IO () 665connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message
653connectWire session hs addr extCaps wire = 666 -> Wire s () -> IO ()
667connectWire session hs addr extCaps chan wire =
654 bracket (peerSocket Stream addr) close $ \ sock -> do 668 bracket (peerSocket Stream addr) close $ \ sock -> do
655 hs' <- initiateHandshake sock hs 669 hs' <- initiateHandshake sock hs
656 670
@@ -680,16 +694,21 @@ connectWire session hs addr extCaps wire =
680 , _connMetadata = Nothing 694 , _connMetadata = Nothing
681 } 695 }
682 696
683 runWire wire' sock $ Connection 697 -- TODO make KA interval configurable
684 { connProtocol = hsProtocol hs 698 let kaInterval = defaultKeepAliveInterval
685 , connCaps = caps 699 bracket
686 , connTopic = hsInfoHash hs 700 (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock)
687 , connRemotePeerId = hsPeerId hs' 701 (killThread) $ \ _ ->
688 , connThisPeerId = hsPeerId hs 702 runWire wire' sock chan $ Connection
689 , connOptions = def 703 { connProtocol = hsProtocol hs
690 , connState = cstate 704 , connCaps = caps
691 , connSession = session 705 , connTopic = hsInfoHash hs
692 } 706 , connRemotePeerId = hsPeerId hs'
707 , connThisPeerId = hsPeerId hs
708 , connOptions = def
709 , connState = cstate
710 , connSession = session
711 }
693 712
694-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed 713-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed
695-- socket. For peer listener loop the 'acceptSafe' should be 714-- socket. For peer listener loop the 'acceptSafe' should be