diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-15 04:18:05 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-15 04:18:05 +0400 |
commit | 0fa6a0ee5eb1fbf648d3864626430efcbdb4aaae (patch) | |
tree | 7d2c6b8db43943974772069efb22480db8186bb1 /src/Network | |
parent | daf978ddd1f0a07ce4711fa97f51d0ec02478f73 (diff) |
Move metadata exchange from Wire to Session
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 154 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session/Metadata.hs | 93 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Wire.hs | 54 |
3 files changed, 218 insertions, 83 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 1e72ba96..f10f601e 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -12,7 +12,7 @@ module Network.BitTorrent.Exchange.Session | |||
12 | 12 | ||
13 | import Control.Applicative | 13 | import Control.Applicative |
14 | import Control.Concurrent | 14 | import Control.Concurrent |
15 | import Control.Exception | 15 | import Control.Exception hiding (Handler) |
16 | import Control.Lens | 16 | import Control.Lens |
17 | import Control.Monad.Logger | 17 | import Control.Monad.Logger |
18 | import Control.Monad.Reader | 18 | import Control.Monad.Reader |
@@ -20,6 +20,7 @@ import Control.Monad.State | |||
20 | import Data.ByteString as BS | 20 | import Data.ByteString as BS |
21 | import Data.ByteString.Lazy as BL | 21 | import Data.ByteString.Lazy as BL |
22 | import Data.Conduit | 22 | import Data.Conduit |
23 | import Data.Conduit.List as CL (iterM) | ||
23 | import Data.Function | 24 | import Data.Function |
24 | import Data.IORef | 25 | import Data.IORef |
25 | import Data.List as L | 26 | import Data.List as L |
@@ -34,16 +35,18 @@ import Text.PrettyPrint hiding ((<>)) | |||
34 | import Text.PrettyPrint.Class | 35 | import Text.PrettyPrint.Class |
35 | import System.Log.FastLogger (LogStr, ToLogStr (..)) | 36 | import System.Log.FastLogger (LogStr, ToLogStr (..)) |
36 | 37 | ||
38 | import Data.BEncode as BE | ||
37 | import Data.Torrent (InfoDict (..)) | 39 | import Data.Torrent (InfoDict (..)) |
38 | import Data.Torrent.Bitfield as BF | 40 | import Data.Torrent.Bitfield as BF |
39 | import Data.Torrent.InfoHash | 41 | import Data.Torrent.InfoHash |
40 | import Data.Torrent.Piece (pieceData, piPieceLength) | 42 | import Data.Torrent.Piece |
41 | import qualified Data.Torrent.Piece as Torrent (Piece (Piece)) | 43 | import qualified Data.Torrent.Piece as Torrent (Piece (Piece)) |
42 | import Network.BitTorrent.Core | 44 | import Network.BitTorrent.Core |
43 | import Network.BitTorrent.Exchange.Assembler | 45 | import Network.BitTorrent.Exchange.Assembler |
44 | import Network.BitTorrent.Exchange.Block as Block | 46 | import Network.BitTorrent.Exchange.Block as Block |
45 | import Network.BitTorrent.Exchange.Message | 47 | import Network.BitTorrent.Exchange.Message as Message |
46 | import Network.BitTorrent.Exchange.Session.Status as SS | 48 | import Network.BitTorrent.Exchange.Session.Metadata as Metadata |
49 | import Network.BitTorrent.Exchange.Session.Status as SS | ||
47 | import Network.BitTorrent.Exchange.Status | 50 | import Network.BitTorrent.Exchange.Status |
48 | import Network.BitTorrent.Exchange.Wire | 51 | import Network.BitTorrent.Exchange.Wire |
49 | import System.Torrent.Storage | 52 | import System.Torrent.Storage |
@@ -66,6 +69,14 @@ packException f m = try m >>= either (throwIO . f) return | |||
66 | -- Session | 69 | -- Session |
67 | -----------------------------------------------------------------------} | 70 | -----------------------------------------------------------------------} |
68 | 71 | ||
72 | data Cached a = Cached | ||
73 | { cachedValue :: !a | ||
74 | , cachedData :: BL.ByteString -- keep lazy | ||
75 | } | ||
76 | |||
77 | cache :: BEncode a => a -> Cached a | ||
78 | cache s = Cached s (BE.encode s) | ||
79 | |||
69 | data ConnectionEntry = ConnectionEntry | 80 | data ConnectionEntry = ConnectionEntry |
70 | { initiatedBy :: !ChannelSide | 81 | { initiatedBy :: !ChannelSide |
71 | , connection :: !(Connection Session) | 82 | , connection :: !(Connection Session) |
@@ -74,12 +85,14 @@ data ConnectionEntry = ConnectionEntry | |||
74 | data Session = Session | 85 | data Session = Session |
75 | { tpeerId :: PeerId | 86 | { tpeerId :: PeerId |
76 | , infohash :: InfoHash | 87 | , infohash :: InfoHash |
88 | , metadata :: MVar Metadata.Status | ||
77 | , storage :: Storage | 89 | , storage :: Storage |
78 | , status :: MVar SessionStatus | 90 | , status :: MVar SessionStatus |
79 | , unchoked :: [PeerAddr IP] | 91 | , unchoked :: [PeerAddr IP] |
80 | , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) | 92 | , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) |
81 | , broadcast :: Chan Message | 93 | , broadcast :: Chan Message |
82 | , logger :: LogFun | 94 | , logger :: LogFun |
95 | , infodict :: MVar (Cached InfoDict) | ||
83 | } | 96 | } |
84 | 97 | ||
85 | -- | Logger function. | 98 | -- | Logger function. |
@@ -108,7 +121,13 @@ newSession logFun addr rootPath dict = do | |||
108 | } | 121 | } |
109 | 122 | ||
110 | closeSession :: Session -> IO () | 123 | closeSession :: Session -> IO () |
111 | closeSession = undefined | 124 | closeSession ses = do |
125 | deleteAll ses | ||
126 | undefined | ||
127 | |||
128 | {----------------------------------------------------------------------- | ||
129 | -- Logging | ||
130 | -----------------------------------------------------------------------} | ||
112 | 131 | ||
113 | instance MonadLogger (Connected Session) where | 132 | instance MonadLogger (Connected Session) where |
114 | monadLoggerLog loc src lvl msg = do | 133 | monadLoggerLog loc src lvl msg = do |
@@ -118,11 +137,11 @@ instance MonadLogger (Connected Session) where | |||
118 | let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) | 137 | let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) |
119 | liftIO $ logger ses loc addrSrc lvl (toLogStr msg) | 138 | liftIO $ logger ses loc addrSrc lvl (toLogStr msg) |
120 | 139 | ||
121 | logMessage :: Message -> Wire Session () | 140 | logMessage :: MonadLogger m => Message -> m () |
122 | logMessage msg = logDebugN $ T.pack (render (pretty msg)) | 141 | logMessage msg = logDebugN $ T.pack (render (pretty msg)) |
123 | 142 | ||
124 | logEvent :: Text -> Wire Session () | 143 | logEvent :: MonadLogger m => Text -> m () |
125 | logEvent = logInfoN | 144 | logEvent = logInfoN |
126 | 145 | ||
127 | {----------------------------------------------------------------------- | 146 | {----------------------------------------------------------------------- |
128 | -- Connections | 147 | -- Connections |
@@ -132,11 +151,12 @@ logEvent = logInfoN | |||
132 | insert :: PeerAddr IP | 151 | insert :: PeerAddr IP |
133 | -> {- Maybe Socket | 152 | -> {- Maybe Socket |
134 | -> -} Session -> IO () | 153 | -> -} Session -> IO () |
135 | insert addr ses @ Session {..} = do | 154 | insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) |
136 | forkIO $ do | ||
137 | action `finally` runStatusUpdates status (resetPending addr) | ||
138 | return () | ||
139 | where | 155 | where |
156 | cleanup = do | ||
157 | runStatusUpdates status (SS.resetPending addr) | ||
158 | -- TODO Metata.resetPending addr | ||
159 | |||
140 | action = do | 160 | action = do |
141 | let caps = def | 161 | let caps = def |
142 | let ecaps = def | 162 | let ecaps = def |
@@ -147,7 +167,7 @@ insert addr ses @ Session {..} = do | |||
147 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn | 167 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn |
148 | lift $ resizeBitfield (totalPieces storage) | 168 | lift $ resizeBitfield (totalPieces storage) |
149 | logEvent "Connection established" | 169 | logEvent "Connection established" |
150 | exchange | 170 | iterM logMessage =$= exchange =$= iterM logMessage |
151 | -- liftIO $ modifyMVar_ connections $ pure . M.delete addr | 171 | -- liftIO $ modifyMVar_ connections $ pure . M.delete addr |
152 | 172 | ||
153 | delete :: PeerAddr IP -> Session -> IO () | 173 | delete :: PeerAddr IP -> Session -> IO () |
@@ -160,11 +180,26 @@ deleteAll = undefined | |||
160 | -- Helpers | 180 | -- Helpers |
161 | -----------------------------------------------------------------------} | 181 | -----------------------------------------------------------------------} |
162 | 182 | ||
183 | waitMVar :: MVar a -> IO () | ||
184 | waitMVar m = withMVar m (const (return ())) | ||
185 | |||
186 | -- This function appear in new GHC "out of box". (moreover it is atomic) | ||
187 | tryReadMVar :: MVar a -> IO (Maybe a) | ||
188 | tryReadMVar m = do | ||
189 | ma <- tryTakeMVar m | ||
190 | maybe (return ()) (putMVar m) ma | ||
191 | return ma | ||
192 | |||
163 | withStatusUpdates :: StatusUpdates a -> Wire Session a | 193 | withStatusUpdates :: StatusUpdates a -> Wire Session a |
164 | withStatusUpdates m = do | 194 | withStatusUpdates m = do |
165 | Session {..} <- asks connSession | 195 | Session {..} <- asks connSession |
166 | liftIO $ runStatusUpdates status m | 196 | liftIO $ runStatusUpdates status m |
167 | 197 | ||
198 | withMetadataUpdates :: Updates a -> Connected Session a | ||
199 | withMetadataUpdates m = do | ||
200 | Session {..} <- asks connSession | ||
201 | liftIO $ runUpdates metadata m | ||
202 | |||
168 | getThisBitfield :: Wire Session Bitfield | 203 | getThisBitfield :: Wire Session Bitfield |
169 | getThisBitfield = do | 204 | getThisBitfield = do |
170 | ses <- asks connSession | 205 | ses <- asks connSession |
@@ -179,6 +214,16 @@ readBlock bix @ BlockIx {..} s = do | |||
179 | then return $ Block ixPiece ixOffset chunk | 214 | then return $ Block ixPiece ixOffset chunk |
180 | else throwIO $ InvalidRequest bix (InvalidSize ixLength) | 215 | else throwIO $ InvalidRequest bix (InvalidSize ixLength) |
181 | 216 | ||
217 | -- | | ||
218 | tryReadMetadataBlock :: PieceIx | ||
219 | -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) | ||
220 | tryReadMetadataBlock pix = do | ||
221 | Session {..} <- asks connSession | ||
222 | mcached <- liftIO (tryReadMVar infodict) | ||
223 | case mcached of | ||
224 | Nothing -> undefined | ||
225 | Just (Cached {..}) -> undefined | ||
226 | |||
182 | sendBroadcast :: PeerMessage msg => msg -> Wire Session () | 227 | sendBroadcast :: PeerMessage msg => msg -> Wire Session () |
183 | sendBroadcast msg = do | 228 | sendBroadcast msg = do |
184 | Session {..} <- asks connSession | 229 | Session {..} <- asks connSession |
@@ -208,9 +253,7 @@ tryFillRequestQueue = do | |||
208 | interesting :: Wire Session () | 253 | interesting :: Wire Session () |
209 | interesting = do | 254 | interesting = do |
210 | addr <- asks connRemoteAddr | 255 | addr <- asks connRemoteAddr |
211 | logMessage (Status (Interested True)) | ||
212 | sendMessage (Interested True) | 256 | sendMessage (Interested True) |
213 | logMessage (Status (Choking False)) | ||
214 | sendMessage (Choking False) | 257 | sendMessage (Choking False) |
215 | tryFillRequestQueue | 258 | tryFillRequestQueue |
216 | 259 | ||
@@ -218,17 +261,19 @@ interesting = do | |||
218 | -- Incoming message handling | 261 | -- Incoming message handling |
219 | -----------------------------------------------------------------------} | 262 | -----------------------------------------------------------------------} |
220 | 263 | ||
221 | handleStatus :: StatusUpdate -> Wire Session () | 264 | type Handler msg = msg -> Wire Session () |
265 | |||
266 | handleStatus :: Handler StatusUpdate | ||
222 | handleStatus s = do | 267 | handleStatus s = do |
223 | connStatus %= over remoteStatus (updateStatus s) | 268 | connStatus %= over remoteStatus (updateStatus s) |
224 | case s of | 269 | case s of |
225 | Interested _ -> return () | 270 | Interested _ -> return () |
226 | Choking True -> do | 271 | Choking True -> do |
227 | addr <- asks connRemoteAddr | 272 | addr <- asks connRemoteAddr |
228 | withStatusUpdates (resetPending addr) | 273 | withStatusUpdates (SS.resetPending addr) |
229 | Choking False -> tryFillRequestQueue | 274 | Choking False -> tryFillRequestQueue |
230 | 275 | ||
231 | handleAvailable :: Available -> Wire Session () | 276 | handleAvailable :: Handler Available |
232 | handleAvailable msg = do | 277 | handleAvailable msg = do |
233 | connBitfield %= case msg of | 278 | connBitfield %= case msg of |
234 | Have ix -> BF.insert ix | 279 | Have ix -> BF.insert ix |
@@ -243,18 +288,18 @@ handleAvailable msg = do | |||
243 | | bf `BF.isSubsetOf` thisBf -> return () | 288 | | bf `BF.isSubsetOf` thisBf -> return () |
244 | | otherwise -> interesting | 289 | | otherwise -> interesting |
245 | 290 | ||
246 | handleTransfer :: Transfer -> Wire Session () | 291 | handleTransfer :: Handler Transfer |
247 | handleTransfer (Request bix) = do | 292 | handleTransfer (Request bix) = do |
248 | Session {..} <- asks connSession | 293 | Session {..} <- asks connSession |
249 | bitfield <- getThisBitfield | 294 | bitfield <- getThisBitfield |
250 | upload <- canUpload <$> use connStatus | 295 | upload <- canUpload <$> use connStatus |
251 | when (upload && ixPiece bix `BF.member` bitfield) $ do | 296 | when (upload && ixPiece bix `BF.member` bitfield) $ do |
252 | blk <- liftIO $ readBlock bix storage | 297 | blk <- liftIO $ readBlock bix storage |
253 | sendMessage (Piece blk) | 298 | sendMessage (Message.Piece blk) |
254 | 299 | ||
255 | handleTransfer (Piece blk) = do | 300 | handleTransfer (Message.Piece blk) = do |
256 | Session {..} <- asks connSession | 301 | Session {..} <- asks connSession |
257 | isSuccess <- withStatusUpdates (pushBlock blk storage) | 302 | isSuccess <- withStatusUpdates (SS.pushBlock blk storage) |
258 | case isSuccess of | 303 | case isSuccess of |
259 | Nothing -> liftIO $ throwIO $ userError "block is not requested" | 304 | Nothing -> liftIO $ throwIO $ userError "block is not requested" |
260 | Just isCompleted -> do | 305 | Just isCompleted -> do |
@@ -265,29 +310,78 @@ handleTransfer (Piece blk) = do | |||
265 | 310 | ||
266 | handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) | 311 | handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) |
267 | where | 312 | where |
268 | transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix | 313 | transferResponse bix (Transfer (Message.Piece blk)) = blockIx blk == bix |
269 | transferResponse _ _ = False | 314 | transferResponse _ _ = False |
315 | |||
316 | {----------------------------------------------------------------------- | ||
317 | -- Metadata exchange | ||
318 | -----------------------------------------------------------------------} | ||
319 | -- TODO introduce new metadata exchange specific exceptions | ||
320 | |||
321 | tryRequestMetadataBlock :: Wire Session () | ||
322 | tryRequestMetadataBlock = do | ||
323 | addr <- asks connRemoteAddr | ||
324 | mpix <- lift $ withMetadataUpdates (Metadata.scheduleBlock addr) | ||
325 | case mpix of | ||
326 | Nothing -> undefined | ||
327 | Just pix -> sendMessage (MetadataRequest pix) | ||
328 | |||
329 | handleMetadata :: Handler ExtendedMetadata | ||
330 | handleMetadata (MetadataRequest pix) = | ||
331 | lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse | ||
332 | where | ||
333 | mkResponse Nothing = MetadataReject pix | ||
334 | mkResponse (Just (piece, total)) = MetadataData piece total | ||
335 | |||
336 | handleMetadata (MetadataData {..}) = do | ||
337 | addr <- asks connRemoteAddr | ||
338 | ih <- asks connTopic | ||
339 | lift $ withMetadataUpdates (Metadata.pushBlock addr piece ih) | ||
340 | tryRequestMetadataBlock | ||
341 | |||
342 | handleMetadata (MetadataReject pix) = do | ||
343 | lift $ withMetadataUpdates (Metadata.cancelPending pix) | ||
344 | |||
345 | handleMetadata (MetadataUnknown _ ) = do | ||
346 | logInfoN "Unknown metadata message" | ||
347 | |||
348 | waitForMetadata :: Wire Session () | ||
349 | waitForMetadata = do | ||
350 | Session {..} <- asks connSession | ||
351 | needFetch <- liftIO (isEmptyMVar infodict) | ||
352 | when needFetch $ do | ||
353 | canFetch <- allowed ExtMetadata <$> use connExtCaps | ||
354 | if canFetch | ||
355 | then tryRequestMetadataBlock | ||
356 | else liftIO (waitMVar infodict) | ||
270 | 357 | ||
271 | {----------------------------------------------------------------------- | 358 | {----------------------------------------------------------------------- |
272 | -- Event loop | 359 | -- Event loop |
273 | -----------------------------------------------------------------------} | 360 | -----------------------------------------------------------------------} |
274 | 361 | ||
275 | handleMessage :: Message -> Wire Session () | 362 | acceptRehandshake :: ExtendedHandshake -> Wire s () |
363 | acceptRehandshake ehs = undefined | ||
364 | |||
365 | handleExtended :: Handler ExtendedMessage | ||
366 | handleExtended (EHandshake ehs) = acceptRehandshake ehs | ||
367 | handleExtended (EMetadata _ msg) = handleMetadata msg | ||
368 | handleExtended (EUnknown _ _ ) = logWarnN "Unknown extension message" | ||
369 | |||
370 | handleMessage :: Handler Message | ||
276 | handleMessage KeepAlive = return () | 371 | handleMessage KeepAlive = return () |
277 | handleMessage (Status s) = handleStatus s | 372 | handleMessage (Status s) = handleStatus s |
278 | handleMessage (Available msg) = handleAvailable msg | 373 | handleMessage (Available msg) = handleAvailable msg |
279 | handleMessage (Transfer msg) = handleTransfer msg | 374 | handleMessage (Transfer msg) = handleTransfer msg |
280 | handleMessage (Port n) = undefined | 375 | handleMessage (Port n) = undefined |
281 | handleMessage (Fast _) = undefined | 376 | handleMessage (Fast _) = undefined |
282 | handleMessage (Extended _) = undefined | 377 | handleMessage (Extended msg) = handleExtended msg |
283 | 378 | ||
284 | exchange :: Wire Session () | 379 | exchange :: Wire Session () |
285 | exchange = do | 380 | exchange = do |
381 | waitForMetadata | ||
286 | bf <- getThisBitfield | 382 | bf <- getThisBitfield |
287 | sendMessage (Bitfield bf) | 383 | sendMessage (Bitfield bf) |
288 | awaitForever $ \ msg -> do | 384 | awaitForever handleMessage |
289 | logMessage msg | ||
290 | handleMessage msg | ||
291 | 385 | ||
292 | data Event = NewMessage (PeerAddr IP) Message | 386 | data Event = NewMessage (PeerAddr IP) Message |
293 | | Timeout -- for scheduling | 387 | | Timeout -- for scheduling |
diff --git a/src/Network/BitTorrent/Exchange/Session/Metadata.hs b/src/Network/BitTorrent/Exchange/Session/Metadata.hs new file mode 100644 index 00000000..7e14f493 --- /dev/null +++ b/src/Network/BitTorrent/Exchange/Session/Metadata.hs | |||
@@ -0,0 +1,93 @@ | |||
1 | {-# LANGUAGE TemplateHaskell #-} | ||
2 | module Network.BitTorrent.Exchange.Session.Metadata | ||
3 | ( -- * Metadata transfer state | ||
4 | Status | ||
5 | , nullStatus | ||
6 | |||
7 | -- * Metadata updates | ||
8 | , Updates | ||
9 | , runUpdates | ||
10 | |||
11 | -- * Metadata piece control | ||
12 | , scheduleBlock | ||
13 | , resetPending | ||
14 | , cancelPending | ||
15 | , pushBlock | ||
16 | ) where | ||
17 | |||
18 | import Control.Concurrent | ||
19 | import Control.Lens | ||
20 | import Control.Monad.State | ||
21 | import Data.ByteString as BS | ||
22 | import Data.ByteString.Lazy as BL | ||
23 | import Data.List as L | ||
24 | |||
25 | import Data.BEncode as BE | ||
26 | import Data.Torrent | ||
27 | import Data.Torrent.InfoHash | ||
28 | import Data.Torrent.Piece as Torrent | ||
29 | import Network.BitTorrent.Core | ||
30 | import Network.BitTorrent.Exchange.Block as Block | ||
31 | import Network.BitTorrent.Exchange.Message as Message hiding (Status) | ||
32 | |||
33 | |||
34 | data Status = Status | ||
35 | { _pending :: [(PeerAddr IP, PieceIx)] | ||
36 | , _bucket :: Bucket | ||
37 | } | ||
38 | |||
39 | makeLenses ''Status | ||
40 | |||
41 | nullStatus :: PieceSize -> Status | ||
42 | nullStatus ps = Status [] (Block.empty ps) | ||
43 | |||
44 | type Updates a = State Status a | ||
45 | |||
46 | runUpdates :: MVar Status -> Updates a -> IO a | ||
47 | runUpdates v m = undefined | ||
48 | |||
49 | scheduleBlock :: PeerAddr IP -> Updates (Maybe PieceIx) | ||
50 | scheduleBlock addr = do | ||
51 | bkt <- use bucket | ||
52 | case spans metadataPieceSize bkt of | ||
53 | [] -> return Nothing | ||
54 | ((off, _ ) : _) -> do | ||
55 | let pix = undefined | ||
56 | pending %= ((addr, pix) :) | ||
57 | return (Just pix) | ||
58 | |||
59 | cancelPending :: PieceIx -> Updates () | ||
60 | cancelPending pix = pending %= L.filter ((pix ==) . snd) | ||
61 | |||
62 | resetPending :: PeerAddr IP -> Updates () | ||
63 | resetPending addr = pending %= L.filter ((addr ==) . fst) | ||
64 | |||
65 | parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict | ||
66 | parseInfoDict chunk topic = | ||
67 | case BE.decode chunk of | ||
68 | Right (infodict @ InfoDict {..}) | ||
69 | | topic == idInfoHash -> return infodict | ||
70 | | otherwise -> Left "broken infodict" | ||
71 | Left err -> Left $ "unable to parse infodict " ++ err | ||
72 | |||
73 | -- todo use incremental parsing to avoid BS.concat call | ||
74 | pushBlock :: PeerAddr IP -> Torrent.Piece BS.ByteString -> InfoHash | ||
75 | -> Updates (Maybe InfoDict) | ||
76 | pushBlock addr Torrent.Piece {..} topic = do | ||
77 | p <- use pending | ||
78 | when ((addr, pieceIndex) `L.notElem` p) $ error "not requested" | ||
79 | cancelPending pieceIndex | ||
80 | |||
81 | bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData | ||
82 | b <- use bucket | ||
83 | case toPiece b of | ||
84 | Nothing -> return Nothing | ||
85 | Just chunks -> | ||
86 | case parseInfoDict (BL.toStrict chunks) topic of | ||
87 | Right x -> do | ||
88 | pending .= [] | ||
89 | return (Just x) | ||
90 | Left e -> do | ||
91 | pending .= [] | ||
92 | bucket .= Block.empty (Block.size b) | ||
93 | return Nothing | ||
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 4aebdd24..4224a25d 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs | |||
@@ -17,6 +17,7 @@ module Network.BitTorrent.Exchange.Wire | |||
17 | ( -- * Wire | 17 | ( -- * Wire |
18 | Connected | 18 | Connected |
19 | , Wire | 19 | , Wire |
20 | , ChannelSide (..) | ||
20 | 21 | ||
21 | -- * Connection | 22 | -- * Connection |
22 | , Connection | 23 | , Connection |
@@ -54,11 +55,7 @@ module Network.BitTorrent.Exchange.Wire | |||
54 | , filterQueue | 55 | , filterQueue |
55 | , getMaxQueueLength | 56 | , getMaxQueueLength |
56 | 57 | ||
57 | -- * Query | ||
58 | , getMetadata | ||
59 | |||
60 | -- * Exceptions | 58 | -- * Exceptions |
61 | , ChannelSide (..) | ||
62 | , ProtocolError (..) | 59 | , ProtocolError (..) |
63 | , WireFailure (..) | 60 | , WireFailure (..) |
64 | , peerPenalty | 61 | , peerPenalty |
@@ -448,11 +445,6 @@ instance Default Options where | |||
448 | -- Connection | 445 | -- Connection |
449 | -----------------------------------------------------------------------} | 446 | -----------------------------------------------------------------------} |
450 | 447 | ||
451 | data Cached a = Cached { unCache :: a, cached :: BS.ByteString } | ||
452 | |||
453 | cache :: (BEncode a) => a -> Cached a | ||
454 | cache s = Cached s (BSL.toStrict $ BE.encode s) | ||
455 | |||
456 | data ConnectionState = ConnectionState { | 448 | data ConnectionState = ConnectionState { |
457 | -- | If @not (allowed ExtExtended connCaps)@ then this set is always | 449 | -- | If @not (allowed ExtExtended connCaps)@ then this set is always |
458 | -- empty. Otherwise it has the BEP10 extension protocol mandated mapping of | 450 | -- empty. Otherwise it has the BEP10 extension protocol mandated mapping of |
@@ -477,9 +469,6 @@ data ConnectionState = ConnectionState { | |||
477 | 469 | ||
478 | -- | Bitfield of remote endpoint. | 470 | -- | Bitfield of remote endpoint. |
479 | , _connBitfield :: !Bitfield | 471 | , _connBitfield :: !Bitfield |
480 | |||
481 | -- | Infodict associated with this Connection's connTopic. | ||
482 | , _connMetadata :: Maybe (Cached InfoDict) | ||
483 | } | 472 | } |
484 | 473 | ||
485 | makeLenses ''ConnectionState | 474 | makeLenses ''ConnectionState |
@@ -722,7 +711,6 @@ connectWire session hs addr extCaps chan wire = do | |||
722 | } | 711 | } |
723 | , _connStatus = def | 712 | , _connStatus = def |
724 | , _connBitfield = BF.haveNone 0 | 713 | , _connBitfield = BF.haveNone 0 |
725 | , _connMetadata = Nothing | ||
726 | } | 714 | } |
727 | 715 | ||
728 | -- TODO make KA interval configurable | 716 | -- TODO make KA interval configurable |
@@ -757,43 +745,3 @@ acceptWire sock peerAddr wire = do | |||
757 | -- | Used when size of bitfield becomes known. | 745 | -- | Used when size of bitfield becomes known. |
758 | resizeBitfield :: Int -> Connected s () | 746 | resizeBitfield :: Int -> Connected s () |
759 | resizeBitfield n = connBitfield %= adjustSize n | 747 | resizeBitfield n = connBitfield %= adjustSize n |
760 | |||
761 | {----------------------------------------------------------------------- | ||
762 | -- Metadata exchange | ||
763 | -----------------------------------------------------------------------} | ||
764 | -- TODO introduce new metadata exchange specific exceptions | ||
765 | |||
766 | fetchMetadata :: Wire s [BS.ByteString] | ||
767 | fetchMetadata = loop 0 | ||
768 | where | ||
769 | recvData = recvMessage >>= inspect | ||
770 | where | ||
771 | inspect (Extended (EMetadata _ meta)) = | ||
772 | case meta of | ||
773 | MetadataRequest pix -> do | ||
774 | sendMessage (MetadataReject pix) | ||
775 | recvData | ||
776 | MetadataData {..} -> return (piece, totalSize) | ||
777 | MetadataReject _ -> disconnectPeer | ||
778 | MetadataUnknown _ -> recvData | ||
779 | inspect _ = recvData | ||
780 | |||
781 | loop i = do | ||
782 | sendMessage (MetadataRequest i) | ||
783 | (piece, totalSize) <- recvData | ||
784 | unless (pieceIndex piece == i) $ do | ||
785 | disconnectPeer | ||
786 | |||
787 | if piece `isLastPiece` totalSize | ||
788 | then pure [pieceData piece] | ||
789 | else (pieceData piece :) <$> loop (succ i) | ||
790 | |||
791 | getMetadata :: Wire s InfoDict | ||
792 | getMetadata = do | ||
793 | chunks <- fetchMetadata | ||
794 | Connection {..} <- ask | ||
795 | case BE.decode (BS.concat chunks) of | ||
796 | Right (infodict @ InfoDict {..}) | ||
797 | | connTopic == idInfoHash -> return infodict | ||
798 | | otherwise -> error "broken infodict" | ||
799 | Left err -> error $ "unable to parse infodict" ++ err | ||