diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-12 21:47:11 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-12 21:47:11 +0400 |
commit | 1864fb2106c29b64af0cb80bebd91209b67d5fd3 (patch) | |
tree | 428627ef2b0a447c814d713de1d9e2cb00d6c1a8 /src/Network/BitTorrent/Exchange/Wire.hs | |
parent | 0cd2099b36aa0382ec7363537ac9e2a8ca5831a9 (diff) |
Add broadcast channel for P2P messaging
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Wire.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Wire.hs | 55 |
1 files changed, 37 insertions, 18 deletions
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 0414ebe7..6b6abde1 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs | |||
@@ -63,6 +63,7 @@ module Network.BitTorrent.Exchange.Wire | |||
63 | ) where | 63 | ) where |
64 | 64 | ||
65 | import Control.Applicative | 65 | import Control.Applicative |
66 | import Control.Concurrent hiding (yield) | ||
66 | import Control.Exception | 67 | import Control.Exception |
67 | import Control.Monad.Reader | 68 | import Control.Monad.Reader |
68 | import Control.Monad.State | 69 | import Control.Monad.State |
@@ -85,6 +86,7 @@ import Network.Socket.ByteString as BS | |||
85 | import Text.PrettyPrint as PP hiding (($$), (<>)) | 86 | import Text.PrettyPrint as PP hiding (($$), (<>)) |
86 | import Text.PrettyPrint.Class | 87 | import Text.PrettyPrint.Class |
87 | import Text.Show.Functions | 88 | import Text.Show.Functions |
89 | import System.Timeout | ||
88 | 90 | ||
89 | import Data.BEncode as BE | 91 | import Data.BEncode as BE |
90 | import Data.Torrent | 92 | import Data.Torrent |
@@ -92,7 +94,7 @@ import Data.Torrent.Bitfield | |||
92 | import Data.Torrent.InfoHash | 94 | import Data.Torrent.InfoHash |
93 | import Data.Torrent.Piece | 95 | import Data.Torrent.Piece |
94 | import Network.BitTorrent.Core | 96 | import Network.BitTorrent.Core |
95 | import Network.BitTorrent.Exchange.Message | 97 | import Network.BitTorrent.Exchange.Message as Msg |
96 | 98 | ||
97 | -- TODO handle port message? | 99 | -- TODO handle port message? |
98 | -- TODO handle limits? | 100 | -- TODO handle limits? |
@@ -600,16 +602,27 @@ trackFlow side = iterM $ do | |||
600 | -- Setup | 602 | -- Setup |
601 | -----------------------------------------------------------------------} | 603 | -----------------------------------------------------------------------} |
602 | 604 | ||
605 | -- System.Timeout.timeout multiplier | ||
606 | seconds :: Int | ||
607 | seconds = 1000000 | ||
608 | |||
609 | sinkChan :: MonadIO m => Chan Message -> Sink Message m () | ||
610 | sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan) | ||
611 | |||
612 | sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message | ||
613 | sourceChan interval chan = do | ||
614 | mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan | ||
615 | yield $ fromMaybe Msg.KeepAlive mmsg | ||
616 | |||
603 | -- | Normally you should use 'connectWire' or 'acceptWire'. | 617 | -- | Normally you should use 'connectWire' or 'acceptWire'. |
604 | runWire :: Wire s () -> Socket -> Connection s -> IO () | 618 | runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO () |
605 | runWire action sock conn = flip runReaderT conn $ runConnected $ | 619 | runWire action sock chan conn = flip runReaderT conn $ runConnected $ |
606 | sourceSocket sock $= | 620 | sourceSocket sock $= |
607 | conduitGet S.get $= | 621 | conduitGet S.get $= |
608 | trackFlow RemotePeer $= | 622 | trackFlow RemotePeer $= |
609 | action $= | 623 | action $= |
610 | trackFlow ThisPeer $= | 624 | trackFlow ThisPeer $$ |
611 | conduitPut S.put $$ | 625 | sinkChan chan |
612 | sinkSocket sock | ||
613 | 626 | ||
614 | -- | This function will block until a peer send new message. You can | 627 | -- | This function will block until a peer send new message. You can |
615 | -- also use 'await'. | 628 | -- also use 'await'. |
@@ -649,8 +662,9 @@ reconnect = undefined | |||
649 | -- | 662 | -- |
650 | -- This function can throw 'WireFailure' exception. | 663 | -- This function can throw 'WireFailure' exception. |
651 | -- | 664 | -- |
652 | connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Wire s () -> IO () | 665 | connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message |
653 | connectWire session hs addr extCaps wire = | 666 | -> Wire s () -> IO () |
667 | connectWire session hs addr extCaps chan wire = | ||
654 | bracket (peerSocket Stream addr) close $ \ sock -> do | 668 | bracket (peerSocket Stream addr) close $ \ sock -> do |
655 | hs' <- initiateHandshake sock hs | 669 | hs' <- initiateHandshake sock hs |
656 | 670 | ||
@@ -680,16 +694,21 @@ connectWire session hs addr extCaps wire = | |||
680 | , _connMetadata = Nothing | 694 | , _connMetadata = Nothing |
681 | } | 695 | } |
682 | 696 | ||
683 | runWire wire' sock $ Connection | 697 | -- TODO make KA interval configurable |
684 | { connProtocol = hsProtocol hs | 698 | let kaInterval = defaultKeepAliveInterval |
685 | , connCaps = caps | 699 | bracket |
686 | , connTopic = hsInfoHash hs | 700 | (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock) |
687 | , connRemotePeerId = hsPeerId hs' | 701 | (killThread) $ \ _ -> |
688 | , connThisPeerId = hsPeerId hs | 702 | runWire wire' sock chan $ Connection |
689 | , connOptions = def | 703 | { connProtocol = hsProtocol hs |
690 | , connState = cstate | 704 | , connCaps = caps |
691 | , connSession = session | 705 | , connTopic = hsInfoHash hs |
692 | } | 706 | , connRemotePeerId = hsPeerId hs' |
707 | , connThisPeerId = hsPeerId hs | ||
708 | , connOptions = def | ||
709 | , connState = cstate | ||
710 | , connSession = session | ||
711 | } | ||
693 | 712 | ||
694 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed | 713 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed |
695 | -- socket. For peer listener loop the 'acceptSafe' should be | 714 | -- socket. For peer listener loop the 'acceptSafe' should be |