diff options
Diffstat (limited to 'bittorrent/src/Network/BitTorrent/Exchange/Session.hs')
-rw-r--r-- | bittorrent/src/Network/BitTorrent/Exchange/Session.hs | 586 |
1 files changed, 0 insertions, 586 deletions
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Session.hs b/bittorrent/src/Network/BitTorrent/Exchange/Session.hs deleted file mode 100644 index 38a3c3a6..00000000 --- a/bittorrent/src/Network/BitTorrent/Exchange/Session.hs +++ /dev/null | |||
@@ -1,586 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE DeriveDataTypeable #-} | ||
3 | {-# LANGUAGE FlexibleInstances #-} | ||
4 | {-# LANGUAGE StandaloneDeriving #-} | ||
5 | {-# LANGUAGE TemplateHaskell #-} | ||
6 | {-# LANGUAGE TypeFamilies #-} | ||
7 | module Network.BitTorrent.Exchange.Session | ||
8 | ( -- * Session | ||
9 | Session | ||
10 | , Event (..) | ||
11 | , LogFun | ||
12 | , sessionLogger | ||
13 | |||
14 | -- * Construction | ||
15 | , newSession | ||
16 | , closeSession | ||
17 | , withSession | ||
18 | |||
19 | -- * Connection Set | ||
20 | , connect | ||
21 | , connectSink | ||
22 | , establish | ||
23 | |||
24 | -- * Query | ||
25 | , waitMetadata | ||
26 | , takeMetadata | ||
27 | ) where | ||
28 | |||
29 | import Control.Applicative | ||
30 | import Control.Concurrent | ||
31 | import Control.Concurrent.Chan.Split as CS | ||
32 | import Control.Concurrent.STM | ||
33 | import Control.Exception hiding (Handler) | ||
34 | import Control.Lens | ||
35 | import Control.Monad as M | ||
36 | import Control.Monad.Logger | ||
37 | import Control.Monad.Reader | ||
38 | import Data.ByteString as BS | ||
39 | import Data.ByteString.Lazy as BL | ||
40 | import Data.Conduit as C (Sink, awaitForever, (=$=), ($=)) | ||
41 | import qualified Data.Conduit as C | ||
42 | import Data.Conduit.List as C | ||
43 | import Data.Map as M | ||
44 | import Data.Monoid | ||
45 | import Data.Set as S | ||
46 | import Data.Text as T | ||
47 | import Data.Typeable | ||
48 | import Text.PrettyPrint hiding ((<>)) | ||
49 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
50 | import System.Log.FastLogger (LogStr, ToLogStr (..)) | ||
51 | |||
52 | import Data.BEncode as BE | ||
53 | import Data.Torrent as Torrent | ||
54 | import Network.BitTorrent.Internal.Types | ||
55 | import Network.Address | ||
56 | import Network.BitTorrent.Exchange.Bitfield as BF | ||
57 | import Network.BitTorrent.Exchange.Block as Block | ||
58 | import Network.BitTorrent.Exchange.Connection | ||
59 | import Network.BitTorrent.Exchange.Download as D | ||
60 | import Network.BitTorrent.Exchange.Message as Message | ||
61 | import System.Torrent.Storage | ||
62 | |||
63 | #if !MIN_VERSION_iproute(1,2,12) | ||
64 | deriving instance Ord IP | ||
65 | #endif | ||
66 | |||
67 | {----------------------------------------------------------------------- | ||
68 | -- Exceptions | ||
69 | -----------------------------------------------------------------------} | ||
70 | |||
71 | data ExchangeError | ||
72 | = InvalidRequest BlockIx StorageFailure | ||
73 | | CorruptedPiece PieceIx | ||
74 | deriving (Show, Typeable) | ||
75 | |||
76 | instance Exception ExchangeError | ||
77 | |||
78 | packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a | ||
79 | packException f m = try m >>= either (throwIO . f) return | ||
80 | |||
81 | {----------------------------------------------------------------------- | ||
82 | -- Session state | ||
83 | -----------------------------------------------------------------------} | ||
84 | -- TODO unmap storage on zero connections | ||
85 | |||
86 | data Cached a = Cached | ||
87 | { cachedValue :: !a | ||
88 | , cachedData :: BL.ByteString -- keep lazy | ||
89 | } | ||
90 | |||
91 | cache :: BEncode a => a -> Cached a | ||
92 | cache s = Cached s (BE.encode s) | ||
93 | |||
94 | -- | Logger function. | ||
95 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | ||
96 | |||
97 | --data SessionStatus = Seeder | Leecher | ||
98 | |||
99 | data SessionState | ||
100 | = WaitingMetadata | ||
101 | { metadataDownload :: MVar MetadataDownload | ||
102 | , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters | ||
103 | , contentRootPath :: FilePath | ||
104 | } | ||
105 | | HavingMetadata | ||
106 | { metadataCache :: Cached InfoDict | ||
107 | , contentDownload :: MVar ContentDownload | ||
108 | , contentStorage :: Storage | ||
109 | } | ||
110 | |||
111 | newSessionState :: FilePath -> Either InfoHash InfoDict -> IO SessionState | ||
112 | newSessionState rootPath (Left ih ) = do | ||
113 | WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath | ||
114 | newSessionState rootPath (Right dict) = do | ||
115 | storage <- openInfoDict ReadWriteEx rootPath dict | ||
116 | download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) | ||
117 | (piPieceLength (idPieceInfo dict)) | ||
118 | storage | ||
119 | return $ HavingMetadata (cache dict) download storage | ||
120 | |||
121 | closeSessionState :: SessionState -> IO () | ||
122 | closeSessionState WaitingMetadata {..} = return () | ||
123 | closeSessionState HavingMetadata {..} = close contentStorage | ||
124 | |||
125 | haveMetadata :: InfoDict -> SessionState -> IO SessionState | ||
126 | haveMetadata dict WaitingMetadata {..} = do | ||
127 | storage <- openInfoDict ReadWriteEx contentRootPath dict | ||
128 | download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) | ||
129 | (piPieceLength (idPieceInfo dict)) | ||
130 | storage | ||
131 | return HavingMetadata | ||
132 | { metadataCache = cache dict | ||
133 | , contentDownload = download | ||
134 | , contentStorage = storage | ||
135 | } | ||
136 | haveMetadata _ s = return s | ||
137 | |||
138 | {----------------------------------------------------------------------- | ||
139 | -- Session | ||
140 | -----------------------------------------------------------------------} | ||
141 | |||
142 | data Session = Session | ||
143 | { sessionPeerId :: !(PeerId) | ||
144 | , sessionTopic :: !(InfoHash) | ||
145 | , sessionLogger :: !(LogFun) | ||
146 | , sessionEvents :: !(SendPort (Event Session)) | ||
147 | |||
148 | , sessionState :: !(MVar SessionState) | ||
149 | |||
150 | ------------------------------------------------------------------------ | ||
151 | , connectionsPrefs :: !ConnectionPrefs | ||
152 | |||
153 | -- | Connections either waiting for TCP/uTP 'connect' or waiting | ||
154 | -- for BT handshake. | ||
155 | , connectionsPending :: !(TVar (Set (PeerAddr IP))) | ||
156 | |||
157 | -- | Connections successfully handshaked and data transfer can | ||
158 | -- take place. | ||
159 | , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session))) | ||
160 | |||
161 | -- | TODO implement choking mechanism | ||
162 | , connectionsUnchoked :: [PeerAddr IP] | ||
163 | |||
164 | -- | Messages written to this channel will be sent to the all | ||
165 | -- connections, including pending connections (but right after | ||
166 | -- handshake). | ||
167 | , connectionsBroadcast :: !(Chan Message) | ||
168 | } | ||
169 | |||
170 | instance EventSource Session where | ||
171 | data Event Session | ||
172 | = ConnectingTo (PeerAddr IP) | ||
173 | | ConnectionEstablished (PeerAddr IP) | ||
174 | | ConnectionAborted | ||
175 | | ConnectionClosed (PeerAddr IP) | ||
176 | | SessionClosed | ||
177 | deriving Show | ||
178 | |||
179 | listen Session {..} = CS.listen sessionEvents | ||
180 | |||
181 | newSession :: LogFun | ||
182 | -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; | ||
183 | -> FilePath -- ^ root directory for content files; | ||
184 | -> Either InfoHash InfoDict -- ^ torrent info dictionary; | ||
185 | -> IO Session | ||
186 | newSession logFun addr rootPath source = do | ||
187 | let ih = either id idInfoHash source | ||
188 | pid <- maybe genPeerId return (peerId addr) | ||
189 | eventStream <- newSendPort | ||
190 | sState <- newSessionState rootPath source | ||
191 | sStateVar <- newMVar sState | ||
192 | pSetVar <- newTVarIO S.empty | ||
193 | eSetVar <- newTVarIO M.empty | ||
194 | chan <- newChan | ||
195 | return Session | ||
196 | { sessionPeerId = pid | ||
197 | , sessionTopic = ih | ||
198 | , sessionLogger = logFun | ||
199 | , sessionEvents = eventStream | ||
200 | , sessionState = sStateVar | ||
201 | , connectionsPrefs = def | ||
202 | , connectionsPending = pSetVar | ||
203 | , connectionsEstablished = eSetVar | ||
204 | , connectionsUnchoked = [] | ||
205 | , connectionsBroadcast = chan | ||
206 | } | ||
207 | |||
208 | closeSession :: Session -> IO () | ||
209 | closeSession Session {..} = do | ||
210 | s <- readMVar sessionState | ||
211 | closeSessionState s | ||
212 | {- | ||
213 | hSet <- atomically $ do | ||
214 | pSet <- swapTVar connectionsPending S.empty | ||
215 | eSet <- swapTVar connectionsEstablished S.empty | ||
216 | return pSet | ||
217 | mapM_ kill hSet | ||
218 | -} | ||
219 | |||
220 | withSession :: () | ||
221 | withSession = error "withSession" | ||
222 | |||
223 | {----------------------------------------------------------------------- | ||
224 | -- Logging | ||
225 | -----------------------------------------------------------------------} | ||
226 | |||
227 | instance MonadLogger (Connected Session) where | ||
228 | monadLoggerLog loc src lvl msg = do | ||
229 | conn <- ask | ||
230 | ses <- asks connSession | ||
231 | addr <- asks connRemoteAddr | ||
232 | let addrSrc = src <> " @ " <> T.pack (render (pPrint addr)) | ||
233 | liftIO $ sessionLogger ses loc addrSrc lvl (toLogStr msg) | ||
234 | |||
235 | logMessage :: MonadLogger m => Message -> m () | ||
236 | logMessage msg = logDebugN $ T.pack (render (pPrint msg)) | ||
237 | |||
238 | logEvent :: MonadLogger m => Text -> m () | ||
239 | logEvent = logInfoN | ||
240 | |||
241 | {----------------------------------------------------------------------- | ||
242 | -- Connection set | ||
243 | -----------------------------------------------------------------------} | ||
244 | --- Connection status transition: | ||
245 | --- | ||
246 | --- pending -> established -> finished -> closed | ||
247 | --- | \|/ /|\ | ||
248 | --- \-------------------------------------| | ||
249 | --- | ||
250 | --- Purpose of slots: | ||
251 | --- 1) to avoid duplicates | ||
252 | --- 2) connect concurrently | ||
253 | --- | ||
254 | |||
255 | -- | Add connection to the pending set. | ||
256 | pendingConnection :: PeerAddr IP -> Session -> STM Bool | ||
257 | pendingConnection addr Session {..} = do | ||
258 | pSet <- readTVar connectionsPending | ||
259 | eSet <- readTVar connectionsEstablished | ||
260 | if (addr `S.member` pSet) || (addr `M.member` eSet) | ||
261 | then return False | ||
262 | else do | ||
263 | modifyTVar' connectionsPending (S.insert addr) | ||
264 | return True | ||
265 | |||
266 | -- | Pending connection successfully established, add it to the | ||
267 | -- established set. | ||
268 | establishedConnection :: Connected Session () | ||
269 | establishedConnection = do | ||
270 | conn <- ask | ||
271 | addr <- asks connRemoteAddr | ||
272 | Session {..} <- asks connSession | ||
273 | liftIO $ atomically $ do | ||
274 | modifyTVar connectionsPending (S.delete addr) | ||
275 | modifyTVar connectionsEstablished (M.insert addr conn) | ||
276 | |||
277 | -- | Either this or remote peer decided to finish conversation | ||
278 | -- (conversation is alread /established/ connection), remote it from | ||
279 | -- the established set. | ||
280 | finishedConnection :: Connected Session () | ||
281 | finishedConnection = do | ||
282 | Session {..} <- asks connSession | ||
283 | addr <- asks connRemoteAddr | ||
284 | liftIO $ atomically $ do | ||
285 | modifyTVar connectionsEstablished $ M.delete addr | ||
286 | |||
287 | -- | There are no state for this connection, remove it from the all | ||
288 | -- sets. | ||
289 | closedConnection :: PeerAddr IP -> Session -> STM () | ||
290 | closedConnection addr Session {..} = do | ||
291 | modifyTVar connectionsPending $ S.delete addr | ||
292 | modifyTVar connectionsEstablished $ M.delete addr | ||
293 | |||
294 | getConnectionConfig :: Session -> IO (ConnectionConfig Session) | ||
295 | getConnectionConfig s @ Session {..} = do | ||
296 | chan <- dupChan connectionsBroadcast | ||
297 | let sessionLink = SessionLink { | ||
298 | linkTopic = sessionTopic | ||
299 | , linkPeerId = sessionPeerId | ||
300 | , linkMetadataSize = Nothing | ||
301 | , linkOutputChan = Just chan | ||
302 | , linkSession = s | ||
303 | } | ||
304 | return ConnectionConfig | ||
305 | { cfgPrefs = connectionsPrefs | ||
306 | , cfgSession = sessionLink | ||
307 | , cfgWire = mainWire | ||
308 | } | ||
309 | |||
310 | type Finalizer = IO () | ||
311 | type Runner = (ConnectionConfig Session -> IO ()) | ||
312 | |||
313 | runConnection :: Runner -> Finalizer -> PeerAddr IP -> Session -> IO () | ||
314 | runConnection runner finalize addr set @ Session {..} = do | ||
315 | _ <- forkIO (action `finally` cleanup) | ||
316 | return () | ||
317 | where | ||
318 | action = do | ||
319 | notExist <- atomically $ pendingConnection addr set | ||
320 | when notExist $ do | ||
321 | cfg <- getConnectionConfig set | ||
322 | runner cfg | ||
323 | |||
324 | cleanup = do | ||
325 | finalize | ||
326 | -- runStatusUpdates status (SS.resetPending addr) | ||
327 | -- TODO Metata.resetPending addr | ||
328 | atomically $ closedConnection addr set | ||
329 | |||
330 | -- | Establish connection from scratch. If this endpoint is already | ||
331 | -- connected, no new connections is created. This function do not block. | ||
332 | connect :: PeerAddr IP -> Session -> IO () | ||
333 | connect addr = runConnection (connectWire addr) (return ()) addr | ||
334 | |||
335 | -- | Establish connection with already pre-connected endpoint. If this | ||
336 | -- endpoint is already connected, no new connections is created. This | ||
337 | -- function do not block. | ||
338 | -- | ||
339 | -- 'PendingConnection' will be closed automatically, you do not need | ||
340 | -- to call 'closePending'. | ||
341 | establish :: PendingConnection -> Session -> IO () | ||
342 | establish conn = runConnection (acceptWire conn) (closePending conn) | ||
343 | (pendingPeer conn) | ||
344 | |||
345 | -- | Conduit version of 'connect'. | ||
346 | connectSink :: MonadIO m => Session -> Sink [PeerAddr IPv4] m () | ||
347 | connectSink s = C.mapM_ (liftIO . connectBatch) | ||
348 | where | ||
349 | connectBatch = M.mapM_ (\ addr -> connect (IPv4 <$> addr) s) | ||
350 | |||
351 | -- | Why do we need this message? | ||
352 | type BroadcastMessage = ExtendedCaps -> Message | ||
353 | |||
354 | broadcast :: BroadcastMessage -> Session -> IO () | ||
355 | broadcast = error "broadcast" | ||
356 | |||
357 | {----------------------------------------------------------------------- | ||
358 | -- Helpers | ||
359 | -----------------------------------------------------------------------} | ||
360 | |||
361 | waitMVar :: MVar a -> IO () | ||
362 | waitMVar m = withMVar m (const (return ())) | ||
363 | |||
364 | -- This function appear in new GHC "out of box". (moreover it is atomic) | ||
365 | tryReadMVar :: MVar a -> IO (Maybe a) | ||
366 | tryReadMVar m = do | ||
367 | ma <- tryTakeMVar m | ||
368 | maybe (return ()) (putMVar m) ma | ||
369 | return ma | ||
370 | |||
371 | readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString) | ||
372 | readBlock bix @ BlockIx {..} s = do | ||
373 | p <- packException (InvalidRequest bix) $ do readPiece ixPiece s | ||
374 | let chunk = BL.take (fromIntegral ixLength) $ | ||
375 | BL.drop (fromIntegral ixOffset) (pieceData p) | ||
376 | if BL.length chunk == fromIntegral ixLength | ||
377 | then return $ Block ixPiece ixOffset chunk | ||
378 | else throwIO $ InvalidRequest bix (InvalidSize ixLength) | ||
379 | |||
380 | -- | | ||
381 | tryReadMetadataBlock :: PieceIx | ||
382 | -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) | ||
383 | tryReadMetadataBlock pix = do | ||
384 | Session {..} <- asks connSession | ||
385 | s <- liftIO (readMVar sessionState) | ||
386 | case s of | ||
387 | WaitingMetadata {..} -> error "tryReadMetadataBlock" | ||
388 | HavingMetadata {..} -> error "tryReadMetadataBlock" | ||
389 | |||
390 | sendBroadcast :: PeerMessage msg => msg -> Wire Session () | ||
391 | sendBroadcast msg = do | ||
392 | Session {..} <- asks connSession | ||
393 | error "sendBroadcast" | ||
394 | -- liftIO $ msg `broadcast` sessionConnections | ||
395 | |||
396 | waitMetadata :: Session -> IO InfoDict | ||
397 | waitMetadata Session {..} = do | ||
398 | s <- readMVar sessionState | ||
399 | case s of | ||
400 | WaitingMetadata {..} -> readMVar metadataCompleted | ||
401 | HavingMetadata {..} -> return (cachedValue metadataCache) | ||
402 | |||
403 | takeMetadata :: Session -> IO (Maybe InfoDict) | ||
404 | takeMetadata Session {..} = do | ||
405 | s <- readMVar sessionState | ||
406 | case s of | ||
407 | WaitingMetadata {..} -> return Nothing | ||
408 | HavingMetadata {..} -> return (Just (cachedValue metadataCache)) | ||
409 | |||
410 | {----------------------------------------------------------------------- | ||
411 | -- Triggers | ||
412 | -----------------------------------------------------------------------} | ||
413 | |||
414 | -- | Trigger is the reaction of a handler at some event. | ||
415 | type Trigger = Wire Session () | ||
416 | |||
417 | interesting :: Trigger | ||
418 | interesting = do | ||
419 | addr <- asks connRemoteAddr | ||
420 | sendMessage (Interested True) | ||
421 | sendMessage (Choking False) | ||
422 | tryFillRequestQueue | ||
423 | |||
424 | fillRequestQueue :: Trigger | ||
425 | fillRequestQueue = do | ||
426 | maxN <- lift getMaxQueueLength | ||
427 | rbf <- use connBitfield | ||
428 | addr <- asks connRemoteAddr | ||
429 | -- blks <- withStatusUpdates $ do | ||
430 | -- n <- getRequestQueueLength addr | ||
431 | -- scheduleBlocks addr rbf (maxN - n) | ||
432 | -- mapM_ (sendMessage . Request) blks | ||
433 | return () | ||
434 | |||
435 | tryFillRequestQueue :: Trigger | ||
436 | tryFillRequestQueue = do | ||
437 | allowed <- canDownload <$> use connStatus | ||
438 | when allowed $ do | ||
439 | fillRequestQueue | ||
440 | |||
441 | {----------------------------------------------------------------------- | ||
442 | -- Incoming message handling | ||
443 | -----------------------------------------------------------------------} | ||
444 | |||
445 | type Handler msg = msg -> Wire Session () | ||
446 | |||
447 | handleStatus :: Handler StatusUpdate | ||
448 | handleStatus s = do | ||
449 | connStatus %= over remoteStatus (updateStatus s) | ||
450 | case s of | ||
451 | Interested _ -> return () | ||
452 | Choking True -> do | ||
453 | addr <- asks connRemoteAddr | ||
454 | -- withStatusUpdates (SS.resetPending addr) | ||
455 | return () | ||
456 | Choking False -> tryFillRequestQueue | ||
457 | |||
458 | handleAvailable :: Handler Available | ||
459 | handleAvailable msg = do | ||
460 | connBitfield %= case msg of | ||
461 | Have ix -> BF.insert ix | ||
462 | Bitfield bf -> const bf | ||
463 | |||
464 | --thisBf <- getThisBitfield | ||
465 | thisBf <- undefined | ||
466 | case msg of | ||
467 | Have ix | ||
468 | | ix `BF.member` thisBf -> return () | ||
469 | | otherwise -> interesting | ||
470 | Bitfield bf | ||
471 | | bf `BF.isSubsetOf` thisBf -> return () | ||
472 | | otherwise -> interesting | ||
473 | |||
474 | handleTransfer :: Handler Transfer | ||
475 | handleTransfer (Request bix) = do | ||
476 | Session {..} <- asks connSession | ||
477 | s <- liftIO $ readMVar sessionState | ||
478 | case s of | ||
479 | WaitingMetadata {..} -> return () | ||
480 | HavingMetadata {..} -> do | ||
481 | bitfield <- undefined -- getThisBitfield | ||
482 | upload <- canUpload <$> use connStatus | ||
483 | when (upload && ixPiece bix `BF.member` bitfield) $ do | ||
484 | blk <- liftIO $ readBlock bix contentStorage | ||
485 | sendMessage (Message.Piece blk) | ||
486 | |||
487 | handleTransfer (Message.Piece blk) = do | ||
488 | Session {..} <- asks connSession | ||
489 | s <- liftIO $ readMVar sessionState | ||
490 | case s of | ||
491 | WaitingMetadata {..} -> return () -- TODO (?) break connection | ||
492 | HavingMetadata {..} -> do | ||
493 | isSuccess <- undefined -- withStatusUpdates (SS.pushBlock blk storage) | ||
494 | case isSuccess of | ||
495 | Nothing -> liftIO $ throwIO $ userError "block is not requested" | ||
496 | Just isCompleted -> do | ||
497 | when isCompleted $ do | ||
498 | sendBroadcast (Have (blkPiece blk)) | ||
499 | -- maybe send not interested | ||
500 | tryFillRequestQueue | ||
501 | |||
502 | handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) | ||
503 | where | ||
504 | transferResponse bix (Transfer (Message.Piece blk)) = blockIx blk == bix | ||
505 | transferResponse _ _ = False | ||
506 | |||
507 | {----------------------------------------------------------------------- | ||
508 | -- Metadata exchange | ||
509 | -----------------------------------------------------------------------} | ||
510 | -- TODO introduce new metadata exchange specific exceptions | ||
511 | |||
512 | waitForMetadata :: Trigger | ||
513 | waitForMetadata = do | ||
514 | Session {..} <- asks connSession | ||
515 | needFetch <- undefined --liftIO (isEmptyMVar infodict) | ||
516 | when needFetch $ do | ||
517 | canFetch <- allowed ExtMetadata <$> use connExtCaps | ||
518 | if canFetch | ||
519 | then tryRequestMetadataBlock | ||
520 | else undefined -- liftIO (waitMVar infodict) | ||
521 | |||
522 | tryRequestMetadataBlock :: Trigger | ||
523 | tryRequestMetadataBlock = do | ||
524 | mpix <- lift $ undefined --withMetadataUpdates Metadata.scheduleBlock | ||
525 | case mpix of | ||
526 | Nothing -> error "tryRequestMetadataBlock" | ||
527 | Just pix -> sendMessage (MetadataRequest pix) | ||
528 | |||
529 | handleMetadata :: Handler ExtendedMetadata | ||
530 | handleMetadata (MetadataRequest pix) = | ||
531 | lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse | ||
532 | where | ||
533 | mkResponse Nothing = MetadataReject pix | ||
534 | mkResponse (Just (piece, total)) = MetadataData piece total | ||
535 | |||
536 | handleMetadata (MetadataData {..}) = do | ||
537 | ih <- asks connTopic | ||
538 | mdict <- lift $ undefined --withMetadataUpdates (Metadata.pushBlock piece ih) | ||
539 | case mdict of | ||
540 | Nothing -> tryRequestMetadataBlock -- not completed, need all blocks | ||
541 | Just dict -> do -- complete, wake up payload fetch | ||
542 | Session {..} <- asks connSession | ||
543 | liftIO $ modifyMVar_ sessionState (haveMetadata dict) | ||
544 | |||
545 | handleMetadata (MetadataReject pix) = do | ||
546 | lift $ undefined -- withMetadataUpdates (Metadata.cancelPending pix) | ||
547 | |||
548 | handleMetadata (MetadataUnknown _ ) = do | ||
549 | logInfoN "Unknown metadata message" | ||
550 | |||
551 | {----------------------------------------------------------------------- | ||
552 | -- Main entry point | ||
553 | -----------------------------------------------------------------------} | ||
554 | |||
555 | acceptRehandshake :: ExtendedHandshake -> Trigger | ||
556 | acceptRehandshake ehs = error "acceptRehandshake" | ||
557 | |||
558 | handleExtended :: Handler ExtendedMessage | ||
559 | handleExtended (EHandshake ehs) = acceptRehandshake ehs | ||
560 | handleExtended (EMetadata _ msg) = handleMetadata msg | ||
561 | handleExtended (EUnknown _ _ ) = logWarnN "Unknown extension message" | ||
562 | |||
563 | handleMessage :: Handler Message | ||
564 | handleMessage KeepAlive = return () | ||
565 | handleMessage (Status s) = handleStatus s | ||
566 | handleMessage (Available msg) = handleAvailable msg | ||
567 | handleMessage (Transfer msg) = handleTransfer msg | ||
568 | handleMessage (Port n) = error "handleMessage" | ||
569 | handleMessage (Fast _) = error "handleMessage" | ||
570 | handleMessage (Extended msg) = handleExtended msg | ||
571 | |||
572 | exchange :: Wire Session () | ||
573 | exchange = do | ||
574 | waitForMetadata | ||
575 | bf <- undefined --getThisBitfield | ||
576 | sendMessage (Bitfield bf) | ||
577 | awaitForever handleMessage | ||
578 | |||
579 | mainWire :: Wire Session () | ||
580 | mainWire = do | ||
581 | lift establishedConnection | ||
582 | Session {..} <- asks connSession | ||
583 | -- lift $ resizeBitfield (totalPieces storage) | ||
584 | logEvent "Connection established" | ||
585 | iterM logMessage =$= exchange =$= iterM logMessage | ||
586 | lift finishedConnection | ||