diff options
author | Joe Crayne <joe@jerkface.net> | 2019-12-21 12:30:14 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 23:27:51 -0500 |
commit | 5b488a95be8adbf685569a98a9ada6f70d71cf81 (patch) | |
tree | 9a9bce058edc01f7c569685ae23a504bc000f000 | |
parent | ca7f03b9b35ca1d47c51ed6b63c8d08a2b27b230 (diff) |
WIP: tracker protocol
-rw-r--r-- | dht/src/Network/BitTorrent/Tracker/Transport.hs | 79 | ||||
-rw-r--r-- | dht/src/Network/QueryResponse.hs | 39 |
2 files changed, 75 insertions, 43 deletions
diff --git a/dht/src/Network/BitTorrent/Tracker/Transport.hs b/dht/src/Network/BitTorrent/Tracker/Transport.hs index 60cb832d..5d225a7e 100644 --- a/dht/src/Network/BitTorrent/Tracker/Transport.hs +++ b/dht/src/Network/BitTorrent/Tracker/Transport.hs | |||
@@ -1,5 +1,10 @@ | |||
1 | {-# LANGUAGE TupleSections #-} | 1 | {-# LANGUAGE TupleSections #-} |
2 | module Network.BitTorrent.Tracker.Transport where | 2 | module Network.BitTorrent.Tracker.Transport |
3 | ( implementTracker | ||
4 | , Callbacks(..) | ||
5 | , parseTracker | ||
6 | , encodeTracker | ||
7 | ) where | ||
3 | 8 | ||
4 | import Control.Concurrent.STM | 9 | import Control.Concurrent.STM |
5 | import Crypto.Random | 10 | import Crypto.Random |
@@ -7,7 +12,7 @@ import Data.ByteString (ByteString) | |||
7 | import Data.Functor.Contravariant | 12 | import Data.Functor.Contravariant |
8 | import Data.IntMap.Strict (IntMap) | 13 | import Data.IntMap.Strict (IntMap) |
9 | import qualified Data.Serialize as S | 14 | import qualified Data.Serialize as S |
10 | import Network.Socket | 15 | import Network.Socket (SockAddr) |
11 | 16 | ||
12 | import Data.QueryResponse.Table | 17 | import Data.QueryResponse.Table |
13 | import Data.TableMethods | 18 | import Data.TableMethods |
@@ -15,9 +20,11 @@ import Network.Address | |||
15 | import Network.BitTorrent.Tracker.Message | 20 | import Network.BitTorrent.Tracker.Message |
16 | import Network.QueryResponse as QR | 21 | import Network.QueryResponse as QR |
17 | 22 | ||
23 | -- | For use with 'layerTransport'. | ||
18 | parseTracker :: ByteString -> SockAddr -> Either String (Transaction Request, SockAddr) | 24 | parseTracker :: ByteString -> SockAddr -> Either String (Transaction Request, SockAddr) |
19 | parseTracker bs saddr = fmap (, saddr) $ S.decode bs | 25 | parseTracker bs saddr = fmap (, saddr) $ S.decode bs |
20 | 26 | ||
27 | -- | For use with 'layerTransport'. | ||
21 | encodeTracker :: Transaction Response -> SockAddr -> (ByteString, SockAddr) | 28 | encodeTracker :: Transaction Response -> SockAddr -> (ByteString, SockAddr) |
22 | encodeTracker resp saddr = (S.encode resp,saddr) | 29 | encodeTracker resp saddr = (S.encode resp,saddr) |
23 | 30 | ||
@@ -25,12 +32,28 @@ type Handler = MethodHandlerA String TransactionId SockAddr (Transaction Request | |||
25 | 32 | ||
26 | type Client = QR.ClientA String MessageId TransactionId SockAddr (Transaction Request) (Transaction Response) | 33 | type Client = QR.ClientA String MessageId TransactionId SockAddr (Transaction Request) (Transaction Response) |
27 | 34 | ||
28 | handlers :: MessageId -> Maybe Handler | 35 | |
29 | handlers ConnectId = Nothing | 36 | -- | Handlers for inbound tracker requests. It is safe to throw 'DropQuery' |
30 | handlers AnnounceId = Nothing | 37 | -- from these, which should be done if the ConnectionId for an address does not |
31 | handlers ScrapeId = Nothing | 38 | -- match the one issued by the 'callOnConnect' method. |
32 | handlers ErrorId = Nothing | 39 | data Callbacks = Callbacks |
33 | handlers _ = Nothing | 40 | { callOnConnect :: SockAddr -> IO ConnectionId |
41 | , callOnAnnounce :: SockAddr -> ConnectionId -> AnnounceQuery -> IO AnnounceInfo | ||
42 | , callOnScrape :: SockAddr -> ConnectionId -> ScrapeQuery -> IO [ScrapeEntry] | ||
43 | } | ||
44 | |||
45 | handlers :: Callbacks -> MessageId -> Maybe Handler | ||
46 | handlers c ConnectId = Just $ MethodHandler (const $ Right ()) | ||
47 | (\qid from to conid -> TransactionR qid $ Connected conid) | ||
48 | (\from () -> callOnConnect c from) | ||
49 | handlers c AnnounceId = Just $ MethodHandler (\TransactionQ{connIdQ=cid,request=Announce qry} -> Right (cid,qry)) | ||
50 | (\qid from to info -> TransactionR qid $ Announced info) | ||
51 | (uncurry . callOnAnnounce c) | ||
52 | handlers c ScrapeId = Just $ MethodHandler (\TransactionQ{connIdQ=cid,request=Scrape qry} -> Right (cid,qry)) | ||
53 | (\qid from to entries -> TransactionR qid $ Scraped entries) | ||
54 | (uncurry . callOnScrape c) | ||
55 | handlers c ErrorId = Nothing | ||
56 | handlers c _ = Nothing | ||
34 | 57 | ||
35 | methodOfRequest :: Request -> MessageId | 58 | methodOfRequest :: Request -> MessageId |
36 | methodOfRequest Connect {} = ConnectId | 59 | methodOfRequest Connect {} = ConnectId |
@@ -41,36 +64,34 @@ classify :: Transaction Request -> MessageClass String MessageId TransactionId S | |||
41 | classify qry = IsQuery (methodOfRequest $ request qry) (transIdQ qry) | 64 | classify qry = IsQuery (methodOfRequest $ request qry) (transIdQ qry) |
42 | 65 | ||
43 | dispatch :: DRG g => | 66 | dispatch :: DRG g => |
44 | DispatchMethodsA | 67 | Callbacks |
45 | (g, IntMap (Maybe (Transaction Response) -> IO ())) | 68 | -> DispatchMethodsA |
69 | (g, IntMap (Maybe (Transaction Request) -> IO ())) | ||
46 | String | 70 | String |
47 | MessageId | 71 | MessageId |
48 | TransactionId | 72 | TransactionId |
49 | SockAddr | 73 | SockAddr |
50 | (Transaction Request) | 74 | (Transaction Request) |
51 | (Transaction Response) | 75 | (Transaction Response) |
52 | dispatch = DispatchMethods | 76 | dispatch c = DispatchMethods |
53 | { classifyInbound = classify | 77 | { classifyInbound = classify |
54 | , lookupHandler = handlers | 78 | , lookupHandler = handlers c |
55 | , tableMethods = transactionMethods (contramap fromEnum intMapMethods) genTransactionId | 79 | , tableMethods = transactionMethods (contramap fromEnum intMapMethods) genTransactionId |
56 | } | 80 | } |
57 | 81 | ||
58 | newClient :: ErrorReporter SockAddr (Transaction Request) MessageId TransactionId String | 82 | implementTracker :: ErrorReporter SockAddr (Transaction Request) MessageId TransactionId String |
59 | -> TransportA String SockAddr (Transaction Request) (Transaction Response) | 83 | -> TransportA String SockAddr (Transaction Request) (Transaction Response) |
60 | -> IO (ClientA String | 84 | -> Callbacks |
61 | MessageId | 85 | -> IO (TransportA String SockAddr (Transaction Request) (Transaction Response)) |
62 | TransactionId | 86 | implementTracker err net c = do |
63 | SockAddr | ||
64 | (Transaction Request) | ||
65 | (Transaction Response)) | ||
66 | newClient err net = do | ||
67 | drg <- drgNew | 87 | drg <- drgNew |
68 | state <- newTVarIO (drg, mempty) | 88 | state <- newTVarIO (drg, mempty) |
69 | return QR.Client | 89 | let client = QR.Client |
70 | { clientNet = net | 90 | { clientNet = net |
71 | , clientDispatcher = dispatch | 91 | , clientDispatcher = dispatch c |
72 | , clientErrorReporter = err | 92 | , clientErrorReporter = err |
73 | , clientPending = state | 93 | , clientPending = state |
74 | , clientAddress = const $ return localhost4 | 94 | , clientAddress = const $ return localhost4 |
75 | , clientResponseId = return | 95 | , clientResponseId = return |
76 | } | 96 | } |
97 | return $ addHandler (\err -> return ()) (handleMessage client) net | ||
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs index 877c7ab6..20e7ecf0 100644 --- a/dht/src/Network/QueryResponse.hs +++ b/dht/src/Network/QueryResponse.hs | |||
@@ -165,7 +165,7 @@ partitionTransport parse encodex tr = | |||
165 | -- Note: If you add a handler to one of the branches before applying a | 165 | -- Note: If you add a handler to one of the branches before applying a |
166 | -- 'mergeTransports' combinator, then this handler may not block or return | 166 | -- 'mergeTransports' combinator, then this handler may not block or return |
167 | -- Nothing. | 167 | -- Nothing. |
168 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x | 168 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> TransportA err addr x y -> TransportA err addr x y |
169 | addHandler onParseError f tr = tr | 169 | addHandler onParseError f tr = tr |
170 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case | 170 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case |
171 | Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) | 171 | Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) |
@@ -283,6 +283,8 @@ data MethodHandlerA err tid addr x y = forall a b. MethodHandler | |||
283 | , methodSerialize :: tid -> addr -> addr -> b -> y | 283 | , methodSerialize :: tid -> addr -> addr -> b -> y |
284 | -- | Fully typed action to perform upon the query. The remote origin | 284 | -- | Fully typed action to perform upon the query. The remote origin |
285 | -- address of the query is provided to the handler. | 285 | -- address of the query is provided to the handler. |
286 | -- | ||
287 | -- TODO: Allow queries to be ignored? | ||
286 | , methodAction :: addr -> a -> IO b | 288 | , methodAction :: addr -> a -> IO b |
287 | } | 289 | } |
288 | -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. | 290 | -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. |
@@ -301,22 +303,22 @@ data MethodHandlerA err tid addr x y = forall a b. MethodHandler | |||
301 | -- | 303 | -- |
302 | -- The type variable /d/ is used to represent the current state of the | 304 | -- The type variable /d/ is used to represent the current state of the |
303 | -- transaction generator and the table of pending transactions. | 305 | -- transaction generator and the table of pending transactions. |
304 | data TransactionMethods d qid addr y = TransactionMethods | 306 | data TransactionMethods d qid addr x = TransactionMethods |
305 | { | 307 | { |
306 | -- | Before a query is sent, this function stores an 'MVar' to which the | 308 | -- | Before a query is sent, this function stores an 'MVar' to which the |
307 | -- response will be written too. The returned /qid/ is a transaction id | 309 | -- response will be written too. The returned /qid/ is a transaction id |
308 | -- that can be used to forget the 'MVar' if the remote peer is not | 310 | -- that can be used to forget the 'MVar' if the remote peer is not |
309 | -- responding. | 311 | -- responding. |
310 | dispatchRegister :: POSIXTime -- time of expiry | 312 | dispatchRegister :: POSIXTime -- time of expiry |
311 | -> (Maybe y -> IO ()) -- callback upon response (or timeout) | 313 | -> (Maybe x -> IO ()) -- callback upon response (or timeout) |
312 | -> addr | 314 | -> addr |
313 | -> d | 315 | -> d |
314 | -> STM (qid, d) | 316 | -> STM (qid, d) |
315 | -- | This method is invoked when an incoming packet /y/ indicates it is | 317 | -- | This method is invoked when an incoming packet /x/ indicates it is |
316 | -- a response to the transaction with id /qid/. The returned IO action | 318 | -- a response to the transaction with id /qid/. The returned IO action |
317 | -- will write the packet to the correct 'MVar' thus completing the | 319 | -- will write the packet to the correct 'MVar' thus completing the |
318 | -- dispatch. | 320 | -- dispatch. |
319 | , dispatchResponse :: qid -> y -> d -> STM (d, IO ()) | 321 | , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) |
320 | -- | When a timeout interval elapses, this method is called to remove the | 322 | -- | When a timeout interval elapses, this method is called to remove the |
321 | -- transaction from the table. | 323 | -- transaction from the table. |
322 | , dispatchCancel :: qid -> d -> STM d | 324 | , dispatchCancel :: qid -> d -> STM d |
@@ -332,7 +334,7 @@ data DispatchMethodsA tbl err meth tid addr x y = DispatchMethods | |||
332 | -- | Lookup the handler for a inbound query. | 334 | -- | Lookup the handler for a inbound query. |
333 | , lookupHandler :: meth -> Maybe (MethodHandlerA err tid addr x y) | 335 | , lookupHandler :: meth -> Maybe (MethodHandlerA err tid addr x y) |
334 | -- | Methods for handling incoming responses. | 336 | -- | Methods for handling incoming responses. |
335 | , tableMethods :: TransactionMethods tbl tid addr y | 337 | , tableMethods :: TransactionMethods tbl tid addr x |
336 | } | 338 | } |
337 | 339 | ||
338 | -- | All inputs required to implement a query\/response client. | 340 | -- | All inputs required to implement a query\/response client. |
@@ -466,17 +468,26 @@ contramapAddr f (MethodHandler p s a) | |||
466 | contramapAddr f (NoReply p a) | 468 | contramapAddr f (NoReply p a) |
467 | = NoReply p (\addr arg -> a (f addr) arg) | 469 | = NoReply p (\addr arg -> a (f addr) arg) |
468 | 470 | ||
471 | -- | Query handlers can throw this to ignore a query instead of responding to | ||
472 | -- it. | ||
473 | data DropQuery = DropQuery | ||
474 | deriving Show | ||
475 | |||
476 | instance Exception DropQuery | ||
477 | |||
469 | -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the | 478 | -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the |
470 | -- parse is successful, the returned IO action will construct our reply if | 479 | -- parse is successful, the returned IO action will construct our reply if |
471 | -- there is one. Otherwise, a parse err is returned. | 480 | -- there is one. Otherwise, a parse err is returned. |
472 | dispatchQuery :: MethodHandler err tid addr x -- ^ Handler to invoke. | 481 | dispatchQuery :: MethodHandlerA err tid addr x y -- ^ Handler to invoke. |
473 | -> tid -- ^ The transaction id for this query\/response session. | 482 | -> tid -- ^ The transaction id for this query\/response session. |
474 | -> addr -- ^ Our own address, to which the query was sent. | 483 | -> addr -- ^ Our own address, to which the query was sent. |
475 | -> x -- ^ The query packet. | 484 | -> x -- ^ The query packet. |
476 | -> addr -- ^ The origin address of the query. | 485 | -> addr -- ^ The origin address of the query. |
477 | -> Either err (IO (Maybe x)) | 486 | -> Either err (IO (Maybe y)) |
478 | dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr = | 487 | dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr = |
479 | fmap (\a -> Just . wrapR tid self addr <$> f addr a) $ unwrapQ x | 488 | fmap (\a -> catch (Just . wrapR tid self addr <$> f addr a) |
489 | (\DropQuery -> return Nothing)) | ||
490 | $ unwrapQ x | ||
480 | dispatchQuery (NoReply unwrapQ f) tid self x addr = | 491 | dispatchQuery (NoReply unwrapQ f) tid self x addr = |
481 | fmap (\a -> f addr a >> return Nothing) $ unwrapQ x | 492 | fmap (\a -> f addr a >> return Nothing) $ unwrapQ x |
482 | 493 | ||
@@ -515,7 +526,7 @@ transactionMethods methods generate = transactionMethods' id id methods generate | |||
515 | -- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or | 526 | -- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or |
516 | -- throws an exception. | 527 | -- throws an exception. |
517 | handleMessage :: | 528 | handleMessage :: |
518 | Client err meth tid addr x | 529 | ClientA err meth tid addr x y |
519 | -> addr | 530 | -> addr |
520 | -> x | 531 | -> x |
521 | -> IO (Maybe (x -> x)) | 532 | -> IO (Maybe (x -> x)) |