diff options
Diffstat (limited to 'src/Network/BitTorrent/Exchange.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 118 |
1 files changed, 64 insertions, 54 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index be9a455b..2d0393c0 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -61,28 +61,73 @@ runPeerWire :: Socket -> PeerWire () -> IO () | |||
61 | runPeerWire sock p2p = | 61 | runPeerWire sock p2p = |
62 | sourceSocket sock $= | 62 | sourceSocket sock $= |
63 | conduitGet S.get $= | 63 | conduitGet S.get $= |
64 | forever p2p $= | 64 | p2p $= |
65 | conduitPut S.put $$ | 65 | conduitPut S.put $$ |
66 | sinkSocket sock | 66 | sinkSocket sock |
67 | 67 | ||
68 | awaitMessage :: P2P Message | 68 | awaitMessage :: P2P Message |
69 | awaitMessage = P2P (ReaderT go) | 69 | awaitMessage = P2P (ReaderT (const go)) |
70 | where | 70 | where |
71 | go _ = do | 71 | go = await >>= maybe disconnect return |
72 | liftIO $ putStrLn "trying recv:" | ||
73 | mmsg <- await | ||
74 | case mmsg of | ||
75 | Nothing -> monadThrow SessionException | ||
76 | Just msg -> do | ||
77 | -- liftIO $ updateIncoming se | ||
78 | liftIO $ print ("recv:" <+> ppMessage msg) | ||
79 | return msg | ||
80 | 72 | ||
81 | yieldMessage :: Message -> P2P () | 73 | yieldMessage :: Message -> P2P () |
82 | yieldMessage msg = P2P $ ReaderT $ \se -> do | 74 | yieldMessage msg = P2P $ ReaderT $ \se -> C.yield msg |
83 | C.yield msg | 75 | |
84 | liftIO $ print $ "sent:" <+> ppMessage msg | 76 | {----------------------------------------------------------------------- |
85 | liftIO $ updateOutcoming se | 77 | P2P monad |
78 | -----------------------------------------------------------------------} | ||
79 | |||
80 | -- | | ||
81 | -- Exceptions: | ||
82 | -- | ||
83 | -- * SessionException: is visible only within one peer | ||
84 | -- session. Use this exception to terminate P2P session, but not | ||
85 | -- the swarm session. | ||
86 | -- | ||
87 | newtype P2P a = P2P { | ||
88 | unP2P :: ReaderT PeerSession PeerWire a | ||
89 | } deriving ( Functor, Applicative, Monad | ||
90 | , MonadIO, MonadThrow, MonadActive | ||
91 | , MonadReader PeerSession | ||
92 | ) | ||
93 | -- TODO instance for MonadFork | ||
94 | |||
95 | runSession :: SwarmSession -> PeerAddr -> P2P () -> IO () | ||
96 | runSession se addr p2p = | ||
97 | handle isIOException $ | ||
98 | withPeerSession se addr $ \(sock, pses) -> do | ||
99 | runPeerWire sock (runReaderT (unP2P p2p) pses) | ||
100 | where | ||
101 | isIOException :: IOException -> IO () | ||
102 | isIOException _ = return () | ||
103 | |||
104 | -- | Run P2P session in the current thread. Normally you don't need this | ||
105 | -- function in client application. | ||
106 | runP2P :: SwarmSession -> PeerAddr -> P2P () -> IO () | ||
107 | runP2P se addr p2p = waitVacancy se $ runSession se addr p2p | ||
108 | |||
109 | -- | Run P2P session in forked thread. Might be used in listener or | ||
110 | -- some other loop. Note that this function may block while waiting | ||
111 | -- for a vacant place: use forkIO and runP2P instead. | ||
112 | spawnP2P :: SwarmSession -> PeerAddr -> P2P () -> IO ThreadId | ||
113 | spawnP2P se addr p2p = do | ||
114 | enterSwarm se | ||
115 | forkIO $ do | ||
116 | runSession se addr p2p `finally` leaveSwarm se | ||
117 | |||
118 | {----------------------------------------------------------------------- | ||
119 | Exceptions | ||
120 | -----------------------------------------------------------------------} | ||
121 | |||
122 | disconnect :: MonadThrow m => m a | ||
123 | disconnect = monadThrow PeerDisconnected | ||
124 | |||
125 | protocolError :: MonadThrow m => Doc -> m a | ||
126 | protocolError = monadThrow . ProtocolError | ||
127 | |||
128 | {----------------------------------------------------------------------- | ||
129 | Helpers | ||
130 | -----------------------------------------------------------------------} | ||
86 | 131 | ||
87 | peerWant :: P2P Bitfield | 132 | peerWant :: P2P Bitfield |
88 | peerWant = BF.difference <$> getClientBF <*> use bitfield | 133 | peerWant = BF.difference <$> getClientBF <*> use bitfield |
@@ -116,7 +161,7 @@ requireExtension :: Extension -> P2P () | |||
116 | requireExtension required = do | 161 | requireExtension required = do |
117 | enabled <- asks enabledExtensions | 162 | enabled <- asks enabledExtensions |
118 | unless (required `elem` enabled) $ | 163 | unless (required `elem` enabled) $ |
119 | sessionError $ ppExtension required <+> "not enabled" | 164 | protocolError $ ppExtension required <+> "not enabled" |
120 | 165 | ||
121 | -- haveMessage bf = do | 166 | -- haveMessage bf = do |
122 | -- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession | 167 | -- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession |
@@ -124,6 +169,9 @@ requireExtension required = do | |||
124 | -- then nextEvent se | 169 | -- then nextEvent se |
125 | -- else undefined -- return $ Available diff | 170 | -- else undefined -- return $ Available diff |
126 | 171 | ||
172 | {----------------------------------------------------------------------- | ||
173 | Exchange | ||
174 | -----------------------------------------------------------------------} | ||
127 | 175 | ||
128 | -- | | 176 | -- | |
129 | -- +----------+---------+ | 177 | -- +----------+---------+ |
@@ -266,41 +314,3 @@ yieldEvent (Fragment blk) = do | |||
266 | 314 | ||
267 | checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool | 315 | checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool |
268 | checkPiece = undefined | 316 | checkPiece = undefined |
269 | |||
270 | {----------------------------------------------------------------------- | ||
271 | P2P monad | ||
272 | -----------------------------------------------------------------------} | ||
273 | |||
274 | -- | | ||
275 | -- Exceptions: | ||
276 | -- | ||
277 | -- * SessionException: is visible only within one peer | ||
278 | -- session. Use this exception to terminate P2P session, but not | ||
279 | -- the swarm session. | ||
280 | -- | ||
281 | newtype P2P a = P2P { | ||
282 | unP2P :: ReaderT PeerSession PeerWire a | ||
283 | } deriving ( Functor, Applicative, Monad | ||
284 | , MonadIO, MonadThrow, MonadActive | ||
285 | , MonadReader PeerSession | ||
286 | ) | ||
287 | -- TODO instance for MonadFork | ||
288 | |||
289 | runSession :: SwarmSession -> PeerAddr -> P2P () -> IO () | ||
290 | runSession se addr p2p = | ||
291 | withPeerSession se addr $ \(sock, pses) -> do | ||
292 | runPeerWire sock (runReaderT (unP2P p2p) pses) | ||
293 | |||
294 | -- | Run P2P session in the current thread. Normally you don't need this | ||
295 | -- function in client application. | ||
296 | runP2P :: SwarmSession -> PeerAddr -> P2P () -> IO () | ||
297 | runP2P se addr p2p = waitVacancy se $ runSession se addr p2p | ||
298 | |||
299 | -- | Run P2P session in forked thread. Might be used in listener or | ||
300 | -- some other loop. Note that this function may block while waiting | ||
301 | -- for a vacant place: use forkIO and runP2P instead. | ||
302 | spawnP2P :: SwarmSession -> PeerAddr -> P2P () -> IO ThreadId | ||
303 | spawnP2P se addr p2p = do | ||
304 | enterSwarm se | ||
305 | forkIO $ do | ||
306 | runSession se addr p2p `finally` leaveSwarm se | ||