From 8b005c4eb0f58db974c342efe0821240f39a6331 Mon Sep 17 00:00:00 2001 From: Sam T Date: Wed, 12 Jun 2013 06:34:19 +0400 Subject: + Rename to await and yield. --- src/Network/BitTorrent/Exchange.hs | 124 ++++++++++++++++++++-------- src/Network/BitTorrent/Exchange/Protocol.hs | 4 + src/Network/BitTorrent/Internal.hs | 20 +++-- src/Network/BitTorrent/Peer.hs | 4 + src/Network/BitTorrent/Tracker.hs | 4 +- 5 files changed, 113 insertions(+), 43 deletions(-) (limited to 'src/Network/BitTorrent') diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 65ec0eb7..de13d4ce 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs @@ -18,7 +18,7 @@ module Network.BitTorrent.Exchange , Event(..) , P2P, withPeer - , awaitEvent, signalEvent + , awaitEvent, yieldEvent ) where import Control.Applicative @@ -70,29 +70,31 @@ runConduit sock p2p = conduitPut S.put $$ sinkSocket sock -waitMessage :: P2P Message -waitMessage = P2P (ReaderT go) +awaitMessage :: P2P Message +awaitMessage = P2P (ReaderT go) where go se = do + liftIO $ putStrLn "trying recv:" mmsg <- await case mmsg of Nothing -> go se Just msg -> do - liftIO $ updateIncoming se - liftIO $ print msg +-- liftIO $ updateIncoming se + liftIO $ print ("recv:" <+> ppMessage msg) return msg -signalMessage :: Message -> P2P () -signalMessage msg = P2P $ ReaderT $ \se -> do +yieldMessage :: Message -> P2P () +yieldMessage msg = P2P $ ReaderT $ \se -> do C.yield msg + liftIO $ print $ "sent:" <+> ppMessage msg liftIO $ updateOutcoming se peerWant :: P2P Bitfield -peerWant = BF.difference <$> getPeerBF <*> use bitfield +peerWant = BF.difference <$> getClientBF <*> use bitfield clientWant :: P2P Bitfield -clientWant = BF.difference <$> use bitfield <*> getPeerBF +clientWant = BF.difference <$> use bitfield <*> getClientBF peerOffer :: P2P Bitfield peerOffer = do @@ -104,13 +106,17 @@ clientOffer = do sessionStatus <- use status if canUpload sessionStatus then peerWant else emptyBF -revise :: P2P () +revise :: P2P Bitfield revise = do - peerInteresting <- (not . BF.null) <$> clientWant + want <- clientWant + let peerInteresting = not (BF.null want) clientInterested <- use (status.clientStatus.interested) - when (clientInterested /= peerInteresting) $ - signalMessage $ if peerInteresting then Interested else NotInterested + when (clientInterested /= peerInteresting) $ do + yieldMessage $ if peerInteresting then Interested else NotInterested + status.clientStatus.interested .= peerInteresting + + return want requireExtension :: Extension -> P2P () requireExtension required = do @@ -118,9 +124,6 @@ requireExtension required = do unless (required `elem` enabled) $ sessionError $ ppExtension required <+> "not enabled" -peerHave :: P2P Event -peerHave = undefined - -- haveMessage bf = do -- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession -- if undefined -- ix `member` bf @@ -129,12 +132,21 @@ peerHave = undefined -- | +-- +----------+---------+ +-- | Leacher | Seeder | +-- |----------+---------+ +-- | Available| | +-- | Want | Want | +-- | Fragment | | +-- +----------+---------+ +-- +-- -- properties: -- -- forall (Fragment block). isPiece block == True -- awaitEvent :: P2P Event -awaitEvent = waitMessage >>= go +awaitEvent = awaitMessage >>= go where go KeepAlive = awaitEvent go Choke = do @@ -142,8 +154,11 @@ awaitEvent = waitMessage >>= go awaitEvent go Unchoke = do - status.clientStatus.choking .= False - awaitEvent + status.peerStatus.choking .= False + offer <- peerOffer + if BF.null offer + then awaitEvent + else return (Available offer) go Interested = do status.peerStatus.interested .= True @@ -153,21 +168,45 @@ awaitEvent = waitMessage >>= go status.peerStatus.interested .= False awaitEvent --- go (Have ix) = peerHave =<< singletonBF ix --- go (Bitfield bf) = peerHave =<< adjustBF bf + go (Have ix) = do + new <- singletonBF ix + bitfield %= BF.union new + revise + + offer <- peerOffer + if not (BF.null offer) + then return (Available offer) + else awaitEvent + + go (Bitfield bf) = do + new <- adjustBF bf + bitfield .= new + revise + + offer <- peerOffer + if not (BF.null offer) + then return (Available offer) + else awaitEvent + go (Request bix) = do - bf <- use bitfield + bf <- clientOffer if ixPiece bix `BF.member` bf then return (Want bix) else do - signalMessage (RejectRequest bix) +-- check if extension is enabled +-- yieldMessage (RejectRequest bix) awaitEvent - go (Piece blk) = undefined + go (Piece blk) = do + -- this protect us from malicious peers and duplication + wanted <- clientWant + if blkPiece blk `BF.member` wanted + then return (Fragment blk) + else awaitEvent {- - go msg @ (Port _) - = checkExtension msg ExtDHT $ do + go (Port _) = do + requireExtension ExtDHT undefined go HaveAll = do @@ -189,24 +228,39 @@ awaitEvent = waitMessage >>= go then Available <$> singletonBF ix else awaitEvent - go msg @ (RejectRequest ix) - = checkExtension msg ExtFast $ do - undefined + go (RejectRequest ix) = do + requireExtension ExtFast + awaitMessage - go msg @ (AllowedFast pix) - = checkExtension msg ExtFast $ do - undefined + go (AllowedFast pix) = + requireExtension ExtFast + awaitMessage -} -signalEvent :: Event -> P2P () -signalEvent (Available bf) = undefined -signalEvent _ = undefined + +-- | +-- @ +-- +----------+---------+ +-- | Leacher | Seeder | +-- |----------+---------+ +-- | Available| | +-- | Want |Fragment | +-- | Fragment | | +-- +----------+---------+ +-- @ +-- +yieldEvent :: Event -> P2P () +yieldEvent (Available bf) = undefined +yieldEvent _ = undefined --flushBroadcast :: P2P () --flushBroadcast = nextBroadcast >>= maybe (return ()) go -- where -- go = undefined +checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool +checkPiece = undefined + {----------------------------------------------------------------------- P2P monad -----------------------------------------------------------------------} diff --git a/src/Network/BitTorrent/Exchange/Protocol.hs b/src/Network/BitTorrent/Exchange/Protocol.hs index 718e339d..6b97d8d1 100644 --- a/src/Network/BitTorrent/Exchange/Protocol.hs +++ b/src/Network/BitTorrent/Exchange/Protocol.hs @@ -164,14 +164,18 @@ defaultHandshake = Handshake defaultBTProtocol defaultReserved -- | Handshaking with a peer specified by the second argument. handshake :: Socket -> Handshake -> IO Handshake handshake sock hs = do + putStrLn "send handshake" sendAll sock (S.encode hs) + putStrLn "recv handshake size" header <- recv sock 1 when (B.length header == 0) $ throw $ userError "Unable to receive handshake." let protocolLen = B.head header let restLen = handshakeSize protocolLen - 1 + + putStrLn "recv handshake body" body <- recv sock restLen let resp = B.cons protocolLen body diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs index 2f538652..3d07a82f 100644 --- a/src/Network/BitTorrent/Internal.hs +++ b/src/Network/BitTorrent/Internal.hs @@ -34,8 +34,8 @@ module Network.BitTorrent.Internal ) , SessionState , bitfield, status - , emptyBF, fullBF, singletonBF - , getPieceCount, getPeerBF + , emptyBF, fullBF, singletonBF, adjustBF + , getPieceCount, getClientBF , sessionError, withPeerSession -- * Timeouts @@ -250,10 +250,15 @@ withPeerSession ss @ SwarmSession {..} addr let caps = encodeExts $ allowedExtensions $ clientSession let pid = clientPeerID $ clientSession let chs = Handshake defaultBTProtocol caps (tInfoHash torrentMeta) pid - + putStrLn "trying to connect" sock <- connectToPeer addr + + putStrLn "trying to handshake" phs <- handshake sock chs `onException` close sock + cbf <- readTVarIO clientBitfield + sendAll sock (encode (Bitfield cbf)) + let enabled = decodeExts (enabledCaps caps (handshakeCaps phs)) ps <- PeerSession addr ss enabled <$> registerTimeout (eventManager clientSession) @@ -281,8 +286,11 @@ fullBF = liftM haveAll getPieceCount singletonBF :: (MonadReader PeerSession m) => PieceIx -> m Bitfield singletonBF ix = liftM (BF.singleton ix) getPieceCount -getPeerBF :: (MonadIO m, MonadReader PeerSession m) => m Bitfield -getPeerBF = asks swarmSession >>= liftIO . readTVarIO . clientBitfield +adjustBF :: (MonadReader PeerSession m) => Bitfield -> m Bitfield +adjustBF bf = (`adjustSize` bf) `liftM` getPieceCount + +getClientBF :: (MonadIO m, MonadReader PeerSession m) => m Bitfield +getClientBF = asks swarmSession >>= liftIO . readTVarIO . clientBitfield --data Signal = --nextBroadcast :: P2P (Maybe Signal) @@ -317,7 +325,7 @@ updateOutcoming PeerSession {..} = sendKA :: Socket -> IO () sendKA sock {- SwarmSession {..} -} = do print "I'm sending keep alive." - sendAll sock (encode BT.KeepAlive) +-- sendAll sock (encode BT.KeepAlive) -- let mgr = eventManager clientSession -- updateTimeout mgr print "Done.." diff --git a/src/Network/BitTorrent/Peer.hs b/src/Network/BitTorrent/Peer.hs index 9aa924d3..6e5db0e0 100644 --- a/src/Network/BitTorrent/Peer.hs +++ b/src/Network/BitTorrent/Peer.hs @@ -531,8 +531,12 @@ peerSockAddr = SockAddrInet <$> (g . peerPort) <*> (htonl . peerIP) -- | Tries to connect to peer using reasonable default parameters. connectToPeer :: PeerAddr -> IO Socket connectToPeer p = do + putStrLn "socket" sock <- socket AF_INET Stream Network.Socket.defaultProtocol + + putStrLn "connect" connect sock (peerSockAddr p) + putStrLn "connected" return sock -- | Pretty print peer address in human readable form. diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs index 275b5422..cb776431 100644 --- a/src/Network/BitTorrent/Tracker.hs +++ b/src/Network/BitTorrent/Tracker.hs @@ -94,8 +94,8 @@ genericReq ses pr = TRequest { } --- | The first request to the tracker that should be created is 'startedReq'. --- It includes necessary 'Started' event field. +-- | The first request to the tracker that should be created is +-- 'startedReq'. It includes necessary 'Started' event field. -- startedReq :: TConnection -> Progress -> TRequest startedReq ses pr = (genericReq ses pr) { -- cgit v1.2.3