summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/MkTorrent.hs2
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs13
-rw-r--r--src/Network/BitTorrent/Exchange/Wire.hs75
3 files changed, 48 insertions, 42 deletions
diff --git a/examples/MkTorrent.hs b/examples/MkTorrent.hs
index 93ac639b..e9eb7f1a 100644
--- a/examples/MkTorrent.hs
+++ b/examples/MkTorrent.hs
@@ -361,7 +361,7 @@ exchangeTorrent ih addr = do
361 pid <- genPeerId 361 pid <- genPeerId
362 var <- newEmptyMVar 362 var <- newEmptyMVar
363 let hs = Handshake def (toCaps [ExtExtended]) ih pid 363 let hs = Handshake def (toCaps [ExtExtended]) ih pid
364 connectWire hs addr (toCaps [ExtMetadata]) $ do 364 connectWire () hs addr (toCaps [ExtMetadata]) $ do
365 infodict <- getMetadata 365 infodict <- getMetadata
366 liftIO $ putMVar var infodict 366 liftIO $ putMVar var infodict
367 takeMVar var 367 takeMVar var
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index 5bfc2a71..885dcb13 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -8,7 +8,7 @@ module Network.BitTorrent.Exchange.Session
8 , Network.BitTorrent.Exchange.Session.insert 8 , Network.BitTorrent.Exchange.Session.insert
9 ) where 9 ) where
10 10
11import Control.Concurrent.STM 11import Control.Concurrent
12import Control.Exception 12import Control.Exception
13import Control.Lens 13import Control.Lens
14import Control.Monad.Reader 14import Control.Monad.Reader
@@ -37,24 +37,23 @@ data ExchangeError
37 | CorruptedPiece PieceIx 37 | CorruptedPiece PieceIx
38 38
39data Session = Session 39data Session = Session
40 { peerId :: PeerId 40 { tpeerId :: PeerId
41 , bitfield :: Bitfield 41 , bitfield :: Bitfield
42 , assembler :: Assembler 42 , assembler :: Assembler
43 , storage :: Storage 43 , storage :: Storage
44 , unchoked :: [PeerAddr IP] 44 , unchoked :: [PeerAddr IP]
45 , handler :: Exchange () 45 , connections :: MVar (Map (PeerAddr IP) (Connection Session))
46 , connections :: Map (PeerAddr IP) Connection
47 } 46 }
48 47
48
49newSession :: PeerAddr IP -> Storage -> Bitfield -> IO Session 49newSession :: PeerAddr IP -> Storage -> Bitfield -> IO Session
50newSession addr st bf = do 50newSession addr st bf = do
51 return Session 51 return Session
52 { peerId = undefined 52 { tpeerId = undefined
53 , bitfield = undefined 53 , bitfield = undefined
54 , assembler = undefined 54 , assembler = undefined
55 , storage = undefined 55 , storage = undefined
56 , unchoked = undefined 56 , unchoked = undefined
57 , handler = undefined
58 , connections = undefined 57 , connections = undefined
59 } 58 }
60 59
@@ -76,7 +75,7 @@ deleteAll = undefined
76-- Event loop 75-- Event loop
77-----------------------------------------------------------------------} 76-----------------------------------------------------------------------}
78 77
79type Exchange = StateT Session (ReaderT Connection IO) 78type Exchange = StateT Session (ReaderT (Connection Session) IO)
80 79
81--runExchange :: Exchange () -> [PeerAddr] -> IO () 80--runExchange :: Exchange () -> [PeerAddr] -> IO ()
82--runExchange exchange peers = do 81--runExchange exchange peers = do
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs
index 31da3f0c..4bd342ca 100644
--- a/src/Network/BitTorrent/Exchange/Wire.hs
+++ b/src/Network/BitTorrent/Exchange/Wire.hs
@@ -67,7 +67,7 @@ import Control.Monad.Reader
67import Control.Monad.State 67import Control.Monad.State
68import Control.Lens 68import Control.Lens
69import Data.ByteString as BS 69import Data.ByteString as BS
70import Data.ByteString.Lazy as BSL 70import Data.ByteString.Lazy as BSL
71import Data.Conduit 71import Data.Conduit
72import Data.Conduit.Cereal 72import Data.Conduit.Cereal
73import Data.Conduit.List 73import Data.Conduit.List
@@ -85,12 +85,13 @@ import Text.PrettyPrint as PP hiding (($$), (<>))
85import Text.PrettyPrint.Class 85import Text.PrettyPrint.Class
86import Text.Show.Functions 86import Text.Show.Functions
87 87
88import Data.BEncode as BE
89import Data.Torrent
90import Data.Torrent.Bitfield
88import Data.Torrent.InfoHash 91import Data.Torrent.InfoHash
92import Data.Torrent.Piece
89import Network.BitTorrent.Core 93import Network.BitTorrent.Core
90import Network.BitTorrent.Exchange.Message 94import Network.BitTorrent.Exchange.Message
91import Data.Torrent
92import Data.Torrent.Piece
93import Data.BEncode as BE
94 95
95-- TODO handle port message? 96-- TODO handle port message?
96-- TODO handle limits? 97-- TODO handle limits?
@@ -446,7 +447,7 @@ data ConnectionState = ConnectionState {
446makeLenses ''ConnectionState 447makeLenses ''ConnectionState
447 448
448-- | Connection keep various info about both peers. 449-- | Connection keep various info about both peers.
449data Connection = Connection 450data Connection s = Connection
450 { -- | /Both/ peers handshaked with this protocol string. The only 451 { -- | /Both/ peers handshaked with this protocol string. The only
451 -- value is \"Bittorrent Protocol\" but this can be changed in 452 -- value is \"Bittorrent Protocol\" but this can be changed in
452 -- future. 453 -- future.
@@ -476,13 +477,16 @@ data Connection = Connection
476 477
477-- -- | Max request queue length. 478-- -- | Max request queue length.
478-- , connMaxQueueLen :: !Int 479-- , connMaxQueueLen :: !Int
480
481 -- | Environment data.
482 , connSession :: !s
479 } 483 }
480 484
481instance Pretty Connection where 485instance Pretty (Connection s) where
482 pretty Connection {..} = "Connection" 486 pretty Connection {..} = "Connection"
483 487
484-- TODO check extended messages too 488-- TODO check extended messages too
485isAllowed :: Connection -> Message -> Bool 489isAllowed :: Connection s -> Message -> Bool
486isAllowed Connection {..} msg 490isAllowed Connection {..} msg
487 | Just ext <- requires msg = ext `allowed` connCaps 491 | Just ext <- requires msg = ext `allowed` connCaps
488 | otherwise = True 492 | otherwise = True
@@ -523,56 +527,58 @@ initiateHandshake sock hs = do
523-----------------------------------------------------------------------} 527-----------------------------------------------------------------------}
524 528
525-- | do not expose this so we can change it without breaking api 529-- | do not expose this so we can change it without breaking api
526newtype Connected m a = Connected { runConnected :: (ReaderT Connection m a) } 530newtype Connected s m a = Connected { runConnected :: (ReaderT (Connection s) m a) }
527 deriving (Functor, Applicative, Monad, MonadIO, MonadReader Connection, MonadThrow) 531 deriving (Functor, Applicative, Monad
532 , MonadIO, MonadReader (Connection s), MonadThrow
533 )
528 534
529instance (MonadIO m) => MonadState ConnectionState (Connected m) where 535instance MonadIO m => MonadState ConnectionState (Connected s m) where
530 get = Connected (asks connState) >>= liftIO . readIORef 536 get = Connected (asks connState) >>= liftIO . readIORef
531 put x = Connected (asks connState) >>= liftIO . flip writeIORef x 537 put x = Connected (asks connState) >>= liftIO . flip writeIORef x
532 538
533instance MonadTrans Connected where 539instance MonadTrans (Connected s) where
534 lift = Connected . lift 540 lift = Connected . lift
535 541
536-- | A duplex channel connected to a remote peer which keep tracks 542-- | A duplex channel connected to a remote peer which keep tracks
537-- connection parameters. 543-- connection parameters.
538type Wire a = ConduitM Message Message (Connected IO) a 544type Wire s a = ConduitM Message Message (Connected s IO) a
539 545
540{----------------------------------------------------------------------- 546{-----------------------------------------------------------------------
541-- Query 547-- Query
542-----------------------------------------------------------------------} 548-----------------------------------------------------------------------}
543 549
544setExtCaps :: ExtendedCaps -> Wire () 550setExtCaps :: ExtendedCaps -> Wire s ()
545setExtCaps x = lift $ connExtCaps .= x 551setExtCaps x = lift $ connExtCaps .= x
546 552
547-- | Get current extended capabilities. Note that this value can 553-- | Get current extended capabilities. Note that this value can
548-- change in current session if either this or remote peer will 554-- change in current session if either this or remote peer will
549-- initiate rehandshaking. 555-- initiate rehandshaking.
550getExtCaps :: Wire ExtendedCaps 556getExtCaps :: Wire s ExtendedCaps
551getExtCaps = lift $ use connExtCaps 557getExtCaps = lift $ use connExtCaps
552 558
553setRemoteEhs :: ExtendedHandshake -> Wire () 559setRemoteEhs :: ExtendedHandshake -> Wire s ()
554setRemoteEhs x = lift $ connRemoteEhs .= x 560setRemoteEhs x = lift $ connRemoteEhs .= x
555 561
556getRemoteEhs :: Wire ExtendedHandshake 562getRemoteEhs :: Wire s ExtendedHandshake
557getRemoteEhs = lift $ use connRemoteEhs 563getRemoteEhs = lift $ use connRemoteEhs
558 564
559-- | Get current stats. Note that this value will change with the next 565-- | Get current stats. Note that this value will change with the next
560-- sent or received message. 566-- sent or received message.
561getStats :: Wire ConnectionStats 567getStats :: Wire s ConnectionStats
562getStats = lift $ use connStats 568getStats = lift $ use connStats
563 569
564-- | See the 'Connection' section for more info. 570-- | See the 'Connection' section for more info.
565getConnection :: Wire Connection 571getConnection :: Wire s (Connection s)
566getConnection = lift ask 572getConnection = lift ask
567 573
568{----------------------------------------------------------------------- 574{-----------------------------------------------------------------------
569-- Wrapper 575-- Wrapper
570-----------------------------------------------------------------------} 576-----------------------------------------------------------------------}
571 577
572putStats :: ChannelSide -> Message -> Connected IO () 578putStats :: ChannelSide -> Message -> Connected s IO ()
573putStats side msg = connStats %= addStats side (stats msg) 579putStats side msg = connStats %= addStats side (stats msg)
574 580
575validate :: ChannelSide -> Message -> Connected IO () 581validate :: ChannelSide -> Message -> Connected s IO ()
576validate side msg = do 582validate side msg = do
577 caps <- asks connCaps 583 caps <- asks connCaps
578 case requires msg of 584 case requires msg of
@@ -581,7 +587,7 @@ validate side msg = do
581 | ext `allowed` caps -> return () 587 | ext `allowed` caps -> return ()
582 | otherwise -> protocolError $ DisallowedMessage side ext 588 | otherwise -> protocolError $ DisallowedMessage side ext
583 589
584trackFlow :: ChannelSide -> Wire () 590trackFlow :: ChannelSide -> Wire s ()
585trackFlow side = iterM $ do 591trackFlow side = iterM $ do
586 validate side 592 validate side
587 putStats side 593 putStats side
@@ -591,7 +597,7 @@ trackFlow side = iterM $ do
591-----------------------------------------------------------------------} 597-----------------------------------------------------------------------}
592 598
593-- | Normally you should use 'connectWire' or 'acceptWire'. 599-- | Normally you should use 'connectWire' or 'acceptWire'.
594runWire :: Wire () -> Socket -> Connection -> IO () 600runWire :: Wire s () -> Socket -> Connection s -> IO ()
595runWire action sock conn = flip runReaderT conn $ runConnected $ 601runWire action sock conn = flip runReaderT conn $ runConnected $
596 sourceSocket sock $= 602 sourceSocket sock $=
597 conduitGet S.get $= 603 conduitGet S.get $=
@@ -603,20 +609,20 @@ runWire action sock conn = flip runReaderT conn $ runConnected $
603 609
604-- | This function will block until a peer send new message. You can 610-- | This function will block until a peer send new message. You can
605-- also use 'await'. 611-- also use 'await'.
606recvMessage :: Wire Message 612recvMessage :: Wire s Message
607recvMessage = await >>= maybe (monadThrow PeerDisconnected) return 613recvMessage = await >>= maybe (monadThrow PeerDisconnected) return
608 614
609-- | You can also use 'yield'. 615-- | You can also use 'yield'.
610sendMessage :: PeerMessage msg => msg -> Wire () 616sendMessage :: PeerMessage msg => msg -> Wire s ()
611sendMessage msg = do 617sendMessage msg = do
612 ecaps <- use connExtCaps 618 ecaps <- use connExtCaps
613 yield $ envelop ecaps msg 619 yield $ envelop ecaps msg
614 620
615-- | Forcefully terminate wire session and close socket. 621-- | Forcefully terminate wire session and close socket.
616disconnectPeer :: Wire a 622disconnectPeer :: Wire s a
617disconnectPeer = monadThrow DisconnectPeer 623disconnectPeer = monadThrow DisconnectPeer
618 624
619extendedHandshake :: ExtendedCaps -> Wire () 625extendedHandshake :: ExtendedCaps -> Wire s ()
620extendedHandshake caps = do 626extendedHandshake caps = do
621 -- TODO add other params to the handshake 627 -- TODO add other params to the handshake
622 sendMessage $ nullExtendedHandshake caps 628 sendMessage $ nullExtendedHandshake caps
@@ -627,10 +633,10 @@ extendedHandshake caps = do
627 setRemoteEhs remoteEhs 633 setRemoteEhs remoteEhs
628 _ -> protocolError HandshakeRefused 634 _ -> protocolError HandshakeRefused
629 635
630rehandshake :: ExtendedCaps -> Wire () 636rehandshake :: ExtendedCaps -> Wire s ()
631rehandshake caps = undefined 637rehandshake caps = undefined
632 638
633reconnect :: Wire () 639reconnect :: Wire s ()
634reconnect = undefined 640reconnect = undefined
635 641
636-- | Initiate 'Wire' connection and handshake with a peer. This function will 642-- | Initiate 'Wire' connection and handshake with a peer. This function will
@@ -639,8 +645,8 @@ reconnect = undefined
639-- 645--
640-- This function can throw 'WireFailure' exception. 646-- This function can throw 'WireFailure' exception.
641-- 647--
642connectWire :: Handshake -> PeerAddr IP -> ExtendedCaps -> Wire () -> IO () 648connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Wire s () -> IO ()
643connectWire hs addr extCaps wire = 649connectWire session hs addr extCaps wire =
644 bracket (peerSocket Stream addr) close $ \ sock -> do 650 bracket (peerSocket Stream addr) close $ \ sock -> do
645 hs' <- initiateHandshake sock hs 651 hs' <- initiateHandshake sock hs
646 652
@@ -678,6 +684,7 @@ connectWire hs addr extCaps wire =
678 , connThisPeerId = hsPeerId hs 684 , connThisPeerId = hsPeerId hs
679 , connOptions = def 685 , connOptions = def
680 , connState = cstate 686 , connState = cstate
687 , connSession = session
681 } 688 }
682 689
683-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed 690-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed
@@ -686,7 +693,7 @@ connectWire hs addr extCaps wire =
686-- 693--
687-- This function can throw 'WireFailure' exception. 694-- This function can throw 'WireFailure' exception.
688-- 695--
689acceptWire :: Socket -> PeerAddr IP -> Wire () -> IO () 696acceptWire :: Socket -> PeerAddr IP -> Wire s () -> IO ()
690acceptWire sock peerAddr wire = do 697acceptWire sock peerAddr wire = do
691 bracket (return sock) close $ \ _ -> do 698 bracket (return sock) close $ \ _ -> do
692 error "acceptWire: not implemented" 699 error "acceptWire: not implemented"
@@ -696,7 +703,7 @@ acceptWire sock peerAddr wire = do
696-----------------------------------------------------------------------} 703-----------------------------------------------------------------------}
697-- TODO introduce new metadata exchange specific exceptions 704-- TODO introduce new metadata exchange specific exceptions
698 705
699fetchMetadata :: Wire [BS.ByteString] 706fetchMetadata :: Wire s [BS.ByteString]
700fetchMetadata = loop 0 707fetchMetadata = loop 0
701 where 708 where
702 recvData = recvMessage >>= inspect 709 recvData = recvMessage >>= inspect
@@ -721,7 +728,7 @@ fetchMetadata = loop 0
721 then pure [pieceData piece] 728 then pure [pieceData piece]
722 else (pieceData piece :) <$> loop (succ i) 729 else (pieceData piece :) <$> loop (succ i)
723 730
724getMetadata :: Wire InfoDict 731getMetadata :: Wire s InfoDict
725getMetadata = do 732getMetadata = do
726 chunks <- fetchMetadata 733 chunks <- fetchMetadata
727 Connection {..} <- getConnection 734 Connection {..} <- getConnection