summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Wire.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Wire.hs')
-rw-r--r--src/Network/BitTorrent/Exchange/Wire.hs55
1 files changed, 37 insertions, 18 deletions
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs
index 0414ebe7..6b6abde1 100644
--- a/src/Network/BitTorrent/Exchange/Wire.hs
+++ b/src/Network/BitTorrent/Exchange/Wire.hs
@@ -63,6 +63,7 @@ module Network.BitTorrent.Exchange.Wire
63 ) where 63 ) where
64 64
65import Control.Applicative 65import Control.Applicative
66import Control.Concurrent hiding (yield)
66import Control.Exception 67import Control.Exception
67import Control.Monad.Reader 68import Control.Monad.Reader
68import Control.Monad.State 69import Control.Monad.State
@@ -85,6 +86,7 @@ import Network.Socket.ByteString as BS
85import Text.PrettyPrint as PP hiding (($$), (<>)) 86import Text.PrettyPrint as PP hiding (($$), (<>))
86import Text.PrettyPrint.Class 87import Text.PrettyPrint.Class
87import Text.Show.Functions 88import Text.Show.Functions
89import System.Timeout
88 90
89import Data.BEncode as BE 91import Data.BEncode as BE
90import Data.Torrent 92import Data.Torrent
@@ -92,7 +94,7 @@ import Data.Torrent.Bitfield
92import Data.Torrent.InfoHash 94import Data.Torrent.InfoHash
93import Data.Torrent.Piece 95import Data.Torrent.Piece
94import Network.BitTorrent.Core 96import Network.BitTorrent.Core
95import Network.BitTorrent.Exchange.Message 97import Network.BitTorrent.Exchange.Message as Msg
96 98
97-- TODO handle port message? 99-- TODO handle port message?
98-- TODO handle limits? 100-- TODO handle limits?
@@ -600,16 +602,27 @@ trackFlow side = iterM $ do
600-- Setup 602-- Setup
601-----------------------------------------------------------------------} 603-----------------------------------------------------------------------}
602 604
605-- System.Timeout.timeout multiplier
606seconds :: Int
607seconds = 1000000
608
609sinkChan :: MonadIO m => Chan Message -> Sink Message m ()
610sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan)
611
612sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message
613sourceChan interval chan = do
614 mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan
615 yield $ fromMaybe Msg.KeepAlive mmsg
616
603-- | Normally you should use 'connectWire' or 'acceptWire'. 617-- | Normally you should use 'connectWire' or 'acceptWire'.
604runWire :: Wire s () -> Socket -> Connection s -> IO () 618runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO ()
605runWire action sock conn = flip runReaderT conn $ runConnected $ 619runWire action sock chan conn = flip runReaderT conn $ runConnected $
606 sourceSocket sock $= 620 sourceSocket sock $=
607 conduitGet S.get $= 621 conduitGet S.get $=
608 trackFlow RemotePeer $= 622 trackFlow RemotePeer $=
609 action $= 623 action $=
610 trackFlow ThisPeer $= 624 trackFlow ThisPeer $$
611 conduitPut S.put $$ 625 sinkChan chan
612 sinkSocket sock
613 626
614-- | This function will block until a peer send new message. You can 627-- | This function will block until a peer send new message. You can
615-- also use 'await'. 628-- also use 'await'.
@@ -649,8 +662,9 @@ reconnect = undefined
649-- 662--
650-- This function can throw 'WireFailure' exception. 663-- This function can throw 'WireFailure' exception.
651-- 664--
652connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Wire s () -> IO () 665connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message
653connectWire session hs addr extCaps wire = 666 -> Wire s () -> IO ()
667connectWire session hs addr extCaps chan wire =
654 bracket (peerSocket Stream addr) close $ \ sock -> do 668 bracket (peerSocket Stream addr) close $ \ sock -> do
655 hs' <- initiateHandshake sock hs 669 hs' <- initiateHandshake sock hs
656 670
@@ -680,16 +694,21 @@ connectWire session hs addr extCaps wire =
680 , _connMetadata = Nothing 694 , _connMetadata = Nothing
681 } 695 }
682 696
683 runWire wire' sock $ Connection 697 -- TODO make KA interval configurable
684 { connProtocol = hsProtocol hs 698 let kaInterval = defaultKeepAliveInterval
685 , connCaps = caps 699 bracket
686 , connTopic = hsInfoHash hs 700 (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock)
687 , connRemotePeerId = hsPeerId hs' 701 (killThread) $ \ _ ->
688 , connThisPeerId = hsPeerId hs 702 runWire wire' sock chan $ Connection
689 , connOptions = def 703 { connProtocol = hsProtocol hs
690 , connState = cstate 704 , connCaps = caps
691 , connSession = session 705 , connTopic = hsInfoHash hs
692 } 706 , connRemotePeerId = hsPeerId hs'
707 , connThisPeerId = hsPeerId hs
708 , connOptions = def
709 , connState = cstate
710 , connSession = session
711 }
693 712
694-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed 713-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed
695-- socket. For peer listener loop the 'acceptSafe' should be 714-- socket. For peer listener loop the 'acceptSafe' should be