diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-12 21:47:11 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-12 21:47:11 +0400 |
commit | 1864fb2106c29b64af0cb80bebd91209b67d5fd3 (patch) | |
tree | 428627ef2b0a447c814d713de1d9e2cb00d6c1a8 /src/Network | |
parent | 0cd2099b36aa0382ec7363537ac9e2a8ca5831a9 (diff) |
Add broadcast channel for P2P messaging
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 6 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Wire.hs | 55 |
2 files changed, 42 insertions, 19 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index dfb0355b..695ac18a 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -42,6 +42,7 @@ data Session = Session | |||
42 | , storage :: Storage | 42 | , storage :: Storage |
43 | , unchoked :: [PeerAddr IP] | 43 | , unchoked :: [PeerAddr IP] |
44 | , connections :: MVar (Map (PeerAddr IP) (Connection Session)) | 44 | , connections :: MVar (Map (PeerAddr IP) (Connection Session)) |
45 | , broadcast :: Chan Message | ||
45 | } | 46 | } |
46 | 47 | ||
47 | newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer; | 48 | newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer; |
@@ -51,6 +52,7 @@ newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer; | |||
51 | newSession addr rootPath dict = do | 52 | newSession addr rootPath dict = do |
52 | connVar <- newMVar M.empty | 53 | connVar <- newMVar M.empty |
53 | store <- openInfoDict ReadWriteEx rootPath dict | 54 | store <- openInfoDict ReadWriteEx rootPath dict |
55 | chan <- newChan | ||
54 | return Session | 56 | return Session |
55 | { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) | 57 | { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) |
56 | , infohash = idInfoHash dict | 58 | , infohash = idInfoHash dict |
@@ -59,6 +61,7 @@ newSession addr rootPath dict = do | |||
59 | , storage = store | 61 | , storage = store |
60 | , unchoked = [] | 62 | , unchoked = [] |
61 | , connections = connVar | 63 | , connections = connVar |
64 | , broadcast = chan | ||
62 | } | 65 | } |
63 | 66 | ||
64 | closeSession :: Session -> IO () | 67 | closeSession :: Session -> IO () |
@@ -72,7 +75,8 @@ insert addr ses @ Session {..} = do | |||
72 | let caps = def | 75 | let caps = def |
73 | let ecaps = def | 76 | let ecaps = def |
74 | let hs = Handshake def caps infohash tpeerId | 77 | let hs = Handshake def caps infohash tpeerId |
75 | connectWire ses hs addr ecaps $ do | 78 | chan <- dupChan broadcast |
79 | connectWire ses hs addr ecaps chan $ do | ||
76 | conn <- getConnection | 80 | conn <- getConnection |
77 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn | 81 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn |
78 | exchange | 82 | exchange |
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 0414ebe7..6b6abde1 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs | |||
@@ -63,6 +63,7 @@ module Network.BitTorrent.Exchange.Wire | |||
63 | ) where | 63 | ) where |
64 | 64 | ||
65 | import Control.Applicative | 65 | import Control.Applicative |
66 | import Control.Concurrent hiding (yield) | ||
66 | import Control.Exception | 67 | import Control.Exception |
67 | import Control.Monad.Reader | 68 | import Control.Monad.Reader |
68 | import Control.Monad.State | 69 | import Control.Monad.State |
@@ -85,6 +86,7 @@ import Network.Socket.ByteString as BS | |||
85 | import Text.PrettyPrint as PP hiding (($$), (<>)) | 86 | import Text.PrettyPrint as PP hiding (($$), (<>)) |
86 | import Text.PrettyPrint.Class | 87 | import Text.PrettyPrint.Class |
87 | import Text.Show.Functions | 88 | import Text.Show.Functions |
89 | import System.Timeout | ||
88 | 90 | ||
89 | import Data.BEncode as BE | 91 | import Data.BEncode as BE |
90 | import Data.Torrent | 92 | import Data.Torrent |
@@ -92,7 +94,7 @@ import Data.Torrent.Bitfield | |||
92 | import Data.Torrent.InfoHash | 94 | import Data.Torrent.InfoHash |
93 | import Data.Torrent.Piece | 95 | import Data.Torrent.Piece |
94 | import Network.BitTorrent.Core | 96 | import Network.BitTorrent.Core |
95 | import Network.BitTorrent.Exchange.Message | 97 | import Network.BitTorrent.Exchange.Message as Msg |
96 | 98 | ||
97 | -- TODO handle port message? | 99 | -- TODO handle port message? |
98 | -- TODO handle limits? | 100 | -- TODO handle limits? |
@@ -600,16 +602,27 @@ trackFlow side = iterM $ do | |||
600 | -- Setup | 602 | -- Setup |
601 | -----------------------------------------------------------------------} | 603 | -----------------------------------------------------------------------} |
602 | 604 | ||
605 | -- System.Timeout.timeout multiplier | ||
606 | seconds :: Int | ||
607 | seconds = 1000000 | ||
608 | |||
609 | sinkChan :: MonadIO m => Chan Message -> Sink Message m () | ||
610 | sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan) | ||
611 | |||
612 | sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message | ||
613 | sourceChan interval chan = do | ||
614 | mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan | ||
615 | yield $ fromMaybe Msg.KeepAlive mmsg | ||
616 | |||
603 | -- | Normally you should use 'connectWire' or 'acceptWire'. | 617 | -- | Normally you should use 'connectWire' or 'acceptWire'. |
604 | runWire :: Wire s () -> Socket -> Connection s -> IO () | 618 | runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO () |
605 | runWire action sock conn = flip runReaderT conn $ runConnected $ | 619 | runWire action sock chan conn = flip runReaderT conn $ runConnected $ |
606 | sourceSocket sock $= | 620 | sourceSocket sock $= |
607 | conduitGet S.get $= | 621 | conduitGet S.get $= |
608 | trackFlow RemotePeer $= | 622 | trackFlow RemotePeer $= |
609 | action $= | 623 | action $= |
610 | trackFlow ThisPeer $= | 624 | trackFlow ThisPeer $$ |
611 | conduitPut S.put $$ | 625 | sinkChan chan |
612 | sinkSocket sock | ||
613 | 626 | ||
614 | -- | This function will block until a peer send new message. You can | 627 | -- | This function will block until a peer send new message. You can |
615 | -- also use 'await'. | 628 | -- also use 'await'. |
@@ -649,8 +662,9 @@ reconnect = undefined | |||
649 | -- | 662 | -- |
650 | -- This function can throw 'WireFailure' exception. | 663 | -- This function can throw 'WireFailure' exception. |
651 | -- | 664 | -- |
652 | connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Wire s () -> IO () | 665 | connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message |
653 | connectWire session hs addr extCaps wire = | 666 | -> Wire s () -> IO () |
667 | connectWire session hs addr extCaps chan wire = | ||
654 | bracket (peerSocket Stream addr) close $ \ sock -> do | 668 | bracket (peerSocket Stream addr) close $ \ sock -> do |
655 | hs' <- initiateHandshake sock hs | 669 | hs' <- initiateHandshake sock hs |
656 | 670 | ||
@@ -680,16 +694,21 @@ connectWire session hs addr extCaps wire = | |||
680 | , _connMetadata = Nothing | 694 | , _connMetadata = Nothing |
681 | } | 695 | } |
682 | 696 | ||
683 | runWire wire' sock $ Connection | 697 | -- TODO make KA interval configurable |
684 | { connProtocol = hsProtocol hs | 698 | let kaInterval = defaultKeepAliveInterval |
685 | , connCaps = caps | 699 | bracket |
686 | , connTopic = hsInfoHash hs | 700 | (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock) |
687 | , connRemotePeerId = hsPeerId hs' | 701 | (killThread) $ \ _ -> |
688 | , connThisPeerId = hsPeerId hs | 702 | runWire wire' sock chan $ Connection |
689 | , connOptions = def | 703 | { connProtocol = hsProtocol hs |
690 | , connState = cstate | 704 | , connCaps = caps |
691 | , connSession = session | 705 | , connTopic = hsInfoHash hs |
692 | } | 706 | , connRemotePeerId = hsPeerId hs' |
707 | , connThisPeerId = hsPeerId hs | ||
708 | , connOptions = def | ||
709 | , connState = cstate | ||
710 | , connSession = session | ||
711 | } | ||
693 | 712 | ||
694 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed | 713 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed |
695 | -- socket. For peer listener loop the 'acceptSafe' should be | 714 | -- socket. For peer listener loop the 'acceptSafe' should be |