diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 13 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Wire.hs | 75 |
2 files changed, 47 insertions, 41 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 5bfc2a71..885dcb13 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -8,7 +8,7 @@ module Network.BitTorrent.Exchange.Session | |||
8 | , Network.BitTorrent.Exchange.Session.insert | 8 | , Network.BitTorrent.Exchange.Session.insert |
9 | ) where | 9 | ) where |
10 | 10 | ||
11 | import Control.Concurrent.STM | 11 | import Control.Concurrent |
12 | import Control.Exception | 12 | import Control.Exception |
13 | import Control.Lens | 13 | import Control.Lens |
14 | import Control.Monad.Reader | 14 | import Control.Monad.Reader |
@@ -37,24 +37,23 @@ data ExchangeError | |||
37 | | CorruptedPiece PieceIx | 37 | | CorruptedPiece PieceIx |
38 | 38 | ||
39 | data Session = Session | 39 | data Session = Session |
40 | { peerId :: PeerId | 40 | { tpeerId :: PeerId |
41 | , bitfield :: Bitfield | 41 | , bitfield :: Bitfield |
42 | , assembler :: Assembler | 42 | , assembler :: Assembler |
43 | , storage :: Storage | 43 | , storage :: Storage |
44 | , unchoked :: [PeerAddr IP] | 44 | , unchoked :: [PeerAddr IP] |
45 | , handler :: Exchange () | 45 | , connections :: MVar (Map (PeerAddr IP) (Connection Session)) |
46 | , connections :: Map (PeerAddr IP) Connection | ||
47 | } | 46 | } |
48 | 47 | ||
48 | |||
49 | newSession :: PeerAddr IP -> Storage -> Bitfield -> IO Session | 49 | newSession :: PeerAddr IP -> Storage -> Bitfield -> IO Session |
50 | newSession addr st bf = do | 50 | newSession addr st bf = do |
51 | return Session | 51 | return Session |
52 | { peerId = undefined | 52 | { tpeerId = undefined |
53 | , bitfield = undefined | 53 | , bitfield = undefined |
54 | , assembler = undefined | 54 | , assembler = undefined |
55 | , storage = undefined | 55 | , storage = undefined |
56 | , unchoked = undefined | 56 | , unchoked = undefined |
57 | , handler = undefined | ||
58 | , connections = undefined | 57 | , connections = undefined |
59 | } | 58 | } |
60 | 59 | ||
@@ -76,7 +75,7 @@ deleteAll = undefined | |||
76 | -- Event loop | 75 | -- Event loop |
77 | -----------------------------------------------------------------------} | 76 | -----------------------------------------------------------------------} |
78 | 77 | ||
79 | type Exchange = StateT Session (ReaderT Connection IO) | 78 | type Exchange = StateT Session (ReaderT (Connection Session) IO) |
80 | 79 | ||
81 | --runExchange :: Exchange () -> [PeerAddr] -> IO () | 80 | --runExchange :: Exchange () -> [PeerAddr] -> IO () |
82 | --runExchange exchange peers = do | 81 | --runExchange exchange peers = do |
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 31da3f0c..4bd342ca 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs | |||
@@ -67,7 +67,7 @@ import Control.Monad.Reader | |||
67 | import Control.Monad.State | 67 | import Control.Monad.State |
68 | import Control.Lens | 68 | import Control.Lens |
69 | import Data.ByteString as BS | 69 | import Data.ByteString as BS |
70 | import Data.ByteString.Lazy as BSL | 70 | import Data.ByteString.Lazy as BSL |
71 | import Data.Conduit | 71 | import Data.Conduit |
72 | import Data.Conduit.Cereal | 72 | import Data.Conduit.Cereal |
73 | import Data.Conduit.List | 73 | import Data.Conduit.List |
@@ -85,12 +85,13 @@ import Text.PrettyPrint as PP hiding (($$), (<>)) | |||
85 | import Text.PrettyPrint.Class | 85 | import Text.PrettyPrint.Class |
86 | import Text.Show.Functions | 86 | import Text.Show.Functions |
87 | 87 | ||
88 | import Data.BEncode as BE | ||
89 | import Data.Torrent | ||
90 | import Data.Torrent.Bitfield | ||
88 | import Data.Torrent.InfoHash | 91 | import Data.Torrent.InfoHash |
92 | import Data.Torrent.Piece | ||
89 | import Network.BitTorrent.Core | 93 | import Network.BitTorrent.Core |
90 | import Network.BitTorrent.Exchange.Message | 94 | import Network.BitTorrent.Exchange.Message |
91 | import Data.Torrent | ||
92 | import Data.Torrent.Piece | ||
93 | import Data.BEncode as BE | ||
94 | 95 | ||
95 | -- TODO handle port message? | 96 | -- TODO handle port message? |
96 | -- TODO handle limits? | 97 | -- TODO handle limits? |
@@ -446,7 +447,7 @@ data ConnectionState = ConnectionState { | |||
446 | makeLenses ''ConnectionState | 447 | makeLenses ''ConnectionState |
447 | 448 | ||
448 | -- | Connection keep various info about both peers. | 449 | -- | Connection keep various info about both peers. |
449 | data Connection = Connection | 450 | data Connection s = Connection |
450 | { -- | /Both/ peers handshaked with this protocol string. The only | 451 | { -- | /Both/ peers handshaked with this protocol string. The only |
451 | -- value is \"Bittorrent Protocol\" but this can be changed in | 452 | -- value is \"Bittorrent Protocol\" but this can be changed in |
452 | -- future. | 453 | -- future. |
@@ -476,13 +477,16 @@ data Connection = Connection | |||
476 | 477 | ||
477 | -- -- | Max request queue length. | 478 | -- -- | Max request queue length. |
478 | -- , connMaxQueueLen :: !Int | 479 | -- , connMaxQueueLen :: !Int |
480 | |||
481 | -- | Environment data. | ||
482 | , connSession :: !s | ||
479 | } | 483 | } |
480 | 484 | ||
481 | instance Pretty Connection where | 485 | instance Pretty (Connection s) where |
482 | pretty Connection {..} = "Connection" | 486 | pretty Connection {..} = "Connection" |
483 | 487 | ||
484 | -- TODO check extended messages too | 488 | -- TODO check extended messages too |
485 | isAllowed :: Connection -> Message -> Bool | 489 | isAllowed :: Connection s -> Message -> Bool |
486 | isAllowed Connection {..} msg | 490 | isAllowed Connection {..} msg |
487 | | Just ext <- requires msg = ext `allowed` connCaps | 491 | | Just ext <- requires msg = ext `allowed` connCaps |
488 | | otherwise = True | 492 | | otherwise = True |
@@ -523,56 +527,58 @@ initiateHandshake sock hs = do | |||
523 | -----------------------------------------------------------------------} | 527 | -----------------------------------------------------------------------} |
524 | 528 | ||
525 | -- | do not expose this so we can change it without breaking api | 529 | -- | do not expose this so we can change it without breaking api |
526 | newtype Connected m a = Connected { runConnected :: (ReaderT Connection m a) } | 530 | newtype Connected s m a = Connected { runConnected :: (ReaderT (Connection s) m a) } |
527 | deriving (Functor, Applicative, Monad, MonadIO, MonadReader Connection, MonadThrow) | 531 | deriving (Functor, Applicative, Monad |
532 | , MonadIO, MonadReader (Connection s), MonadThrow | ||
533 | ) | ||
528 | 534 | ||
529 | instance (MonadIO m) => MonadState ConnectionState (Connected m) where | 535 | instance MonadIO m => MonadState ConnectionState (Connected s m) where |
530 | get = Connected (asks connState) >>= liftIO . readIORef | 536 | get = Connected (asks connState) >>= liftIO . readIORef |
531 | put x = Connected (asks connState) >>= liftIO . flip writeIORef x | 537 | put x = Connected (asks connState) >>= liftIO . flip writeIORef x |
532 | 538 | ||
533 | instance MonadTrans Connected where | 539 | instance MonadTrans (Connected s) where |
534 | lift = Connected . lift | 540 | lift = Connected . lift |
535 | 541 | ||
536 | -- | A duplex channel connected to a remote peer which keep tracks | 542 | -- | A duplex channel connected to a remote peer which keep tracks |
537 | -- connection parameters. | 543 | -- connection parameters. |
538 | type Wire a = ConduitM Message Message (Connected IO) a | 544 | type Wire s a = ConduitM Message Message (Connected s IO) a |
539 | 545 | ||
540 | {----------------------------------------------------------------------- | 546 | {----------------------------------------------------------------------- |
541 | -- Query | 547 | -- Query |
542 | -----------------------------------------------------------------------} | 548 | -----------------------------------------------------------------------} |
543 | 549 | ||
544 | setExtCaps :: ExtendedCaps -> Wire () | 550 | setExtCaps :: ExtendedCaps -> Wire s () |
545 | setExtCaps x = lift $ connExtCaps .= x | 551 | setExtCaps x = lift $ connExtCaps .= x |
546 | 552 | ||
547 | -- | Get current extended capabilities. Note that this value can | 553 | -- | Get current extended capabilities. Note that this value can |
548 | -- change in current session if either this or remote peer will | 554 | -- change in current session if either this or remote peer will |
549 | -- initiate rehandshaking. | 555 | -- initiate rehandshaking. |
550 | getExtCaps :: Wire ExtendedCaps | 556 | getExtCaps :: Wire s ExtendedCaps |
551 | getExtCaps = lift $ use connExtCaps | 557 | getExtCaps = lift $ use connExtCaps |
552 | 558 | ||
553 | setRemoteEhs :: ExtendedHandshake -> Wire () | 559 | setRemoteEhs :: ExtendedHandshake -> Wire s () |
554 | setRemoteEhs x = lift $ connRemoteEhs .= x | 560 | setRemoteEhs x = lift $ connRemoteEhs .= x |
555 | 561 | ||
556 | getRemoteEhs :: Wire ExtendedHandshake | 562 | getRemoteEhs :: Wire s ExtendedHandshake |
557 | getRemoteEhs = lift $ use connRemoteEhs | 563 | getRemoteEhs = lift $ use connRemoteEhs |
558 | 564 | ||
559 | -- | Get current stats. Note that this value will change with the next | 565 | -- | Get current stats. Note that this value will change with the next |
560 | -- sent or received message. | 566 | -- sent or received message. |
561 | getStats :: Wire ConnectionStats | 567 | getStats :: Wire s ConnectionStats |
562 | getStats = lift $ use connStats | 568 | getStats = lift $ use connStats |
563 | 569 | ||
564 | -- | See the 'Connection' section for more info. | 570 | -- | See the 'Connection' section for more info. |
565 | getConnection :: Wire Connection | 571 | getConnection :: Wire s (Connection s) |
566 | getConnection = lift ask | 572 | getConnection = lift ask |
567 | 573 | ||
568 | {----------------------------------------------------------------------- | 574 | {----------------------------------------------------------------------- |
569 | -- Wrapper | 575 | -- Wrapper |
570 | -----------------------------------------------------------------------} | 576 | -----------------------------------------------------------------------} |
571 | 577 | ||
572 | putStats :: ChannelSide -> Message -> Connected IO () | 578 | putStats :: ChannelSide -> Message -> Connected s IO () |
573 | putStats side msg = connStats %= addStats side (stats msg) | 579 | putStats side msg = connStats %= addStats side (stats msg) |
574 | 580 | ||
575 | validate :: ChannelSide -> Message -> Connected IO () | 581 | validate :: ChannelSide -> Message -> Connected s IO () |
576 | validate side msg = do | 582 | validate side msg = do |
577 | caps <- asks connCaps | 583 | caps <- asks connCaps |
578 | case requires msg of | 584 | case requires msg of |
@@ -581,7 +587,7 @@ validate side msg = do | |||
581 | | ext `allowed` caps -> return () | 587 | | ext `allowed` caps -> return () |
582 | | otherwise -> protocolError $ DisallowedMessage side ext | 588 | | otherwise -> protocolError $ DisallowedMessage side ext |
583 | 589 | ||
584 | trackFlow :: ChannelSide -> Wire () | 590 | trackFlow :: ChannelSide -> Wire s () |
585 | trackFlow side = iterM $ do | 591 | trackFlow side = iterM $ do |
586 | validate side | 592 | validate side |
587 | putStats side | 593 | putStats side |
@@ -591,7 +597,7 @@ trackFlow side = iterM $ do | |||
591 | -----------------------------------------------------------------------} | 597 | -----------------------------------------------------------------------} |
592 | 598 | ||
593 | -- | Normally you should use 'connectWire' or 'acceptWire'. | 599 | -- | Normally you should use 'connectWire' or 'acceptWire'. |
594 | runWire :: Wire () -> Socket -> Connection -> IO () | 600 | runWire :: Wire s () -> Socket -> Connection s -> IO () |
595 | runWire action sock conn = flip runReaderT conn $ runConnected $ | 601 | runWire action sock conn = flip runReaderT conn $ runConnected $ |
596 | sourceSocket sock $= | 602 | sourceSocket sock $= |
597 | conduitGet S.get $= | 603 | conduitGet S.get $= |
@@ -603,20 +609,20 @@ runWire action sock conn = flip runReaderT conn $ runConnected $ | |||
603 | 609 | ||
604 | -- | This function will block until a peer send new message. You can | 610 | -- | This function will block until a peer send new message. You can |
605 | -- also use 'await'. | 611 | -- also use 'await'. |
606 | recvMessage :: Wire Message | 612 | recvMessage :: Wire s Message |
607 | recvMessage = await >>= maybe (monadThrow PeerDisconnected) return | 613 | recvMessage = await >>= maybe (monadThrow PeerDisconnected) return |
608 | 614 | ||
609 | -- | You can also use 'yield'. | 615 | -- | You can also use 'yield'. |
610 | sendMessage :: PeerMessage msg => msg -> Wire () | 616 | sendMessage :: PeerMessage msg => msg -> Wire s () |
611 | sendMessage msg = do | 617 | sendMessage msg = do |
612 | ecaps <- use connExtCaps | 618 | ecaps <- use connExtCaps |
613 | yield $ envelop ecaps msg | 619 | yield $ envelop ecaps msg |
614 | 620 | ||
615 | -- | Forcefully terminate wire session and close socket. | 621 | -- | Forcefully terminate wire session and close socket. |
616 | disconnectPeer :: Wire a | 622 | disconnectPeer :: Wire s a |
617 | disconnectPeer = monadThrow DisconnectPeer | 623 | disconnectPeer = monadThrow DisconnectPeer |
618 | 624 | ||
619 | extendedHandshake :: ExtendedCaps -> Wire () | 625 | extendedHandshake :: ExtendedCaps -> Wire s () |
620 | extendedHandshake caps = do | 626 | extendedHandshake caps = do |
621 | -- TODO add other params to the handshake | 627 | -- TODO add other params to the handshake |
622 | sendMessage $ nullExtendedHandshake caps | 628 | sendMessage $ nullExtendedHandshake caps |
@@ -627,10 +633,10 @@ extendedHandshake caps = do | |||
627 | setRemoteEhs remoteEhs | 633 | setRemoteEhs remoteEhs |
628 | _ -> protocolError HandshakeRefused | 634 | _ -> protocolError HandshakeRefused |
629 | 635 | ||
630 | rehandshake :: ExtendedCaps -> Wire () | 636 | rehandshake :: ExtendedCaps -> Wire s () |
631 | rehandshake caps = undefined | 637 | rehandshake caps = undefined |
632 | 638 | ||
633 | reconnect :: Wire () | 639 | reconnect :: Wire s () |
634 | reconnect = undefined | 640 | reconnect = undefined |
635 | 641 | ||
636 | -- | Initiate 'Wire' connection and handshake with a peer. This function will | 642 | -- | Initiate 'Wire' connection and handshake with a peer. This function will |
@@ -639,8 +645,8 @@ reconnect = undefined | |||
639 | -- | 645 | -- |
640 | -- This function can throw 'WireFailure' exception. | 646 | -- This function can throw 'WireFailure' exception. |
641 | -- | 647 | -- |
642 | connectWire :: Handshake -> PeerAddr IP -> ExtendedCaps -> Wire () -> IO () | 648 | connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Wire s () -> IO () |
643 | connectWire hs addr extCaps wire = | 649 | connectWire session hs addr extCaps wire = |
644 | bracket (peerSocket Stream addr) close $ \ sock -> do | 650 | bracket (peerSocket Stream addr) close $ \ sock -> do |
645 | hs' <- initiateHandshake sock hs | 651 | hs' <- initiateHandshake sock hs |
646 | 652 | ||
@@ -678,6 +684,7 @@ connectWire hs addr extCaps wire = | |||
678 | , connThisPeerId = hsPeerId hs | 684 | , connThisPeerId = hsPeerId hs |
679 | , connOptions = def | 685 | , connOptions = def |
680 | , connState = cstate | 686 | , connState = cstate |
687 | , connSession = session | ||
681 | } | 688 | } |
682 | 689 | ||
683 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed | 690 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed |
@@ -686,7 +693,7 @@ connectWire hs addr extCaps wire = | |||
686 | -- | 693 | -- |
687 | -- This function can throw 'WireFailure' exception. | 694 | -- This function can throw 'WireFailure' exception. |
688 | -- | 695 | -- |
689 | acceptWire :: Socket -> PeerAddr IP -> Wire () -> IO () | 696 | acceptWire :: Socket -> PeerAddr IP -> Wire s () -> IO () |
690 | acceptWire sock peerAddr wire = do | 697 | acceptWire sock peerAddr wire = do |
691 | bracket (return sock) close $ \ _ -> do | 698 | bracket (return sock) close $ \ _ -> do |
692 | error "acceptWire: not implemented" | 699 | error "acceptWire: not implemented" |
@@ -696,7 +703,7 @@ acceptWire sock peerAddr wire = do | |||
696 | -----------------------------------------------------------------------} | 703 | -----------------------------------------------------------------------} |
697 | -- TODO introduce new metadata exchange specific exceptions | 704 | -- TODO introduce new metadata exchange specific exceptions |
698 | 705 | ||
699 | fetchMetadata :: Wire [BS.ByteString] | 706 | fetchMetadata :: Wire s [BS.ByteString] |
700 | fetchMetadata = loop 0 | 707 | fetchMetadata = loop 0 |
701 | where | 708 | where |
702 | recvData = recvMessage >>= inspect | 709 | recvData = recvMessage >>= inspect |
@@ -721,7 +728,7 @@ fetchMetadata = loop 0 | |||
721 | then pure [pieceData piece] | 728 | then pure [pieceData piece] |
722 | else (pieceData piece :) <$> loop (succ i) | 729 | else (pieceData piece :) <$> loop (succ i) |
723 | 730 | ||
724 | getMetadata :: Wire InfoDict | 731 | getMetadata :: Wire s InfoDict |
725 | getMetadata = do | 732 | getMetadata = do |
726 | chunks <- fetchMetadata | 733 | chunks <- fetchMetadata |
727 | Connection {..} <- getConnection | 734 | Connection {..} <- getConnection |