summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-06-21 22:34:40 -0400
committerjoe <joe@jerkface.net>2017-06-21 22:34:40 -0400
commit012d138b1061d967ef3a05dfb7dc819d199b3902 (patch)
tree1f8929792a6d7120983087b17528e0eb9da480f6 /src/Network/BitTorrent/DHT
parent89c45d3ca6b5e5a0bb65c74111f0f2fdff4445af (diff)
Propogated the deletion of MonadKRPC to Network.BitTorrent.DHT.Query.
Diffstat (limited to 'src/Network/BitTorrent/DHT')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs73
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs102
2 files changed, 109 insertions, 66 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 68c67900..254b347c 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -80,6 +80,7 @@ import Text.PrettyPrint as PP hiding ((<>), ($$))
80import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) 80import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
81import Data.Time 81import Data.Time
82import Data.Time.Clock.POSIX 82import Data.Time.Clock.POSIX
83import Data.Hashable (Hashable)
83 84
84import Network.DatagramServer as KRPC hiding (Options, def) 85import Network.DatagramServer as KRPC hiding (Options, def)
85import Network.KRPC.Method as KRPC 86import Network.KRPC.Method as KRPC
@@ -109,13 +110,9 @@ import Control.Monad.Trans.Control
109 110
110nodeHandler :: ( Address ip 111nodeHandler :: ( Address ip
111 , KRPC (Query a) (Response b) 112 , KRPC (Query a) (Response b)
112#ifdef VERSION_bencoding 113 )
113 , KRPC.Envelope (Query a) (Response b) ~ BValue ) 114 => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler
114#else 115nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do
115 , KPRC.Envelope (Query a) (Response b) ~ ByteString )
116#endif
117 => QueryMethod KMessageOf -> (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip
118nodeHandler method action = handler method $ \ sockAddr qry -> do
119#ifdef VERSION_bencoding 116#ifdef VERSION_bencoding
120 let remoteId = queringNodeId qry 117 let remoteId = queringNodeId qry
121 read_only = queryIsReadOnly qry 118 read_only = queryIsReadOnly qry
@@ -131,53 +128,55 @@ nodeHandler method action = handler method $ \ sockAddr qry -> do
131 let ni = NodeInfo remoteId naddr () 128 let ni = NodeInfo remoteId naddr ()
132 -- Do not route read-only nodes. (bep 43) 129 -- Do not route read-only nodes. (bep 43)
133 if read_only 130 if read_only
134 then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) 131 then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni)
135 else insertNode ni Nothing >> return () -- TODO need to block. why? 132 else insertNode ni Nothing >> return () -- TODO need to block. why?
136 Response 133 Response
137 <$> myNodeIdAccordingTo naddr 134 <$> myNodeIdAccordingTo naddr
138 <*> action naddr q 135 <*> action naddr q
139 136
140-- | Default 'Ping' handler. 137-- | Default 'Ping' handler.
141pingH :: Address ip => NodeHandler ip 138pingH :: NodeAddr ip -> Ping -> IO Ping
142#ifdef VERSION_bencoding 139pingH _ Ping = return Ping
143pingH = nodeHandler "ping" $ \ _ Ping -> return Ping 140-- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True }
144#else
145pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True }
146#endif
147 141
148-- | Default 'FindNode' handler. 142-- | Default 'FindNode' handler.
149findNodeH :: Address ip => NodeHandler ip 143findNodeH :: (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) -> NodeAddr ip -> FindNode ip -> IO (NodeFound ip)
150findNodeH = nodeHandler "find-nodes" $ \ _ (FindNode nid) -> do 144findNodeH getclosest _ (FindNode nid) = NodeFound <$> getclosest nid
151 NodeFound <$> getClosest nid
152 145
153#ifdef VERSION_bencoding
154-- | Default 'GetPeers' handler. 146-- | Default 'GetPeers' handler.
155getPeersH :: Ord ip => Address ip => NodeHandler ip 147getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip)
156getPeersH = nodeHandler "get_peers" $ \ naddr (GetPeers ih) -> do 148getPeersH getPeerList toks naddr (GetPeers ih) = do
157 ps <- getPeerList ih 149 ps <- getPeerList ih
158 tok <- grantToken naddr 150 tok <- grantToken toks naddr
159 return $ GotPeers ps tok 151 return $ GotPeers ps tok
160 152
161-- | Default 'Announce' handler. 153-- | Default 'Announce' handler.
162announceH :: Ord ip => Address ip => NodeHandler ip 154announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced
163announceH = nodeHandler "announce_peer" $ \ naddr @ NodeAddr {..} (Announce {..}) -> do 155announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do
164 valid <- checkToken naddr sessionToken 156 valid <- checkToken toks naddr sessionToken
165 unless valid $ do 157 unless valid $ do
166 throwIO $ InvalidParameter "token" 158 throwIO $ InvalidParameter "token"
167 159
168 let annPort = if impliedPort then nodePort else port 160 let annPort = if impliedPort then nodePort else port
169 peerAddr = PeerAddr Nothing nodeHost annPort 161 peerAddr = PeerAddr Nothing nodeHost annPort
170 insertPeer topic announcedName peerAddr 162 insertPeer peers topic announcedName peerAddr
171 return Announced 163 return Announced
172 164
173-- | Includes all default query handlers. 165-- | Includes all default query handlers.
174defaultHandlers :: Ord ip => Address ip => [NodeHandler ip] 166defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT ip [NodeHandler]
175defaultHandlers = [pingH, findNodeH, getPeersH, announceH] 167defaultHandlers logger = do
176#else 168 groknode <- insertNode1
177-- | Includes all default query handlers. 169 toks <- asks sessionTokens
178defaultHandlers :: Ord ip => Address ip => [NodeHandler ip] 170 getclosest <- getClosest1
179defaultHandlers = [pingH, findNodeH] 171 mynid <- myNodeIdAccordingTo1
180#endif 172 peers <- asks contactInfo
173 getpeers <- getPeerList1
174 let handler :: KRPC (Query a) (Response b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler
175 handler = nodeHandler groknode mynid (logt logger)
176 return [ handler "ping" $ pingH
177 , handler "find-nodes" $ findNodeH getclosest
178 , handler "get_peers" $ getPeersH getpeers toks
179 , handler "announce_peer" $ announceH peers toks ]
181 180
182{----------------------------------------------------------------------- 181{-----------------------------------------------------------------------
183-- Basic queries 182-- Basic queries
@@ -324,6 +323,11 @@ logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :)
324-- routing table. 323-- routing table.
325insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () 324insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip ()
326insertNode info witnessed_ip0 = do 325insertNode info witnessed_ip0 = do
326 f <- insertNode1
327 liftIO $ f info witnessed_ip0
328
329insertNode1 :: forall ip. Address ip => DHT ip (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ())
330insertNode1 = do
327 bc <- optBucketCount <$> asks options 331 bc <- optBucketCount <$> asks options
328 nid <- asks tentativeNodeId 332 nid <- asks tentativeNodeId
329 logm0 <- embed_ (uncurry logc) 333 logm0 <- embed_ (uncurry logc)
@@ -349,7 +353,7 @@ insertNode info witnessed_ip0 = do
349 , grokNode = DHT.insertNode params state 353 , grokNode = DHT.insertNode params state
350 , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO () 354 , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO ()
351 } 355 }
352 liftIO $ DHT.insertNode params state info witnessed_ip0 356 return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0
353 357
354-- | Throws exception if node is not responding. 358-- | Throws exception if node is not responding.
355queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) 359queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b)
@@ -362,7 +366,8 @@ queryNode' addr q = do
362 nid <- myNodeIdAccordingTo addr 366 nid <- myNodeIdAccordingTo addr
363 let read_only = False -- TODO: check for NAT issues. (BEP 43) 367 let read_only = False -- TODO: check for NAT issues. (BEP 43)
364 let KRPC.Method name = KRPC.method :: KRPC.Method (Query a) (Response b) 368 let KRPC.Method name = KRPC.method :: KRPC.Method (Query a) (Response b)
365 (Response remoteId r, witnessed_ip) <- query' name (toSockAddr addr) (Query nid read_only q) 369 mgr <- asks manager
370 (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr name (toSockAddr addr) (Query nid read_only q)
366 -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) 371 -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip)
367 -- <> " by " <> T.pack (show (toSockAddr addr)) 372 -- <> " by " <> T.pack (show (toSockAddr addr))
368 _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip 373 _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index 7e87df6c..d8665773 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -31,12 +31,19 @@ module Network.BitTorrent.DHT.Session
31 , options 31 , options
32 , tentativeNodeId 32 , tentativeNodeId
33 , myNodeIdAccordingTo 33 , myNodeIdAccordingTo
34 , myNodeIdAccordingTo1
34 , routingInfo 35 , routingInfo
35 , routableAddress 36 , routableAddress
36 , getTimestamp 37 , getTimestamp
38 , SessionTokens
39 , sessionTokens
40 , contactInfo
41 , PeerStore
42 , manager
37 43
38 -- ** Initialization 44 -- ** Initialization
39 , LogFun 45 , LogFun
46 , logt
40 , NodeHandler 47 , NodeHandler
41 , newNode 48 , newNode
42 , closeNode 49 , closeNode
@@ -54,11 +61,13 @@ module Network.BitTorrent.DHT.Session
54 -- ** Routing table 61 -- ** Routing table
55 , getTable 62 , getTable
56 , getClosest 63 , getClosest
64 , getClosest1
57 65
58#ifdef VERSION_bencoding 66#ifdef VERSION_bencoding
59 -- ** Peer storage 67 -- ** Peer storage
60 , insertPeer 68 , insertPeer
61 , getPeerList 69 , getPeerList
70 , getPeerList1
62 , insertTopic 71 , insertTopic
63 , deleteTopic 72 , deleteTopic
64 , getSwarms 73 , getSwarms
@@ -333,7 +342,7 @@ instance MonadLogger (DHT ip) where
333 liftIO $ logger loc src lvl (toLogStr msg) 342 liftIO $ logger loc src lvl (toLogStr msg)
334 343
335#ifdef VERSION_bencoding 344#ifdef VERSION_bencoding
336type NodeHandler ip = Handler IO KMessageOf BValue 345type NodeHandler = Handler IO KMessageOf BValue
337#else 346#else
338type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString 347type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString
339#endif 348#endif
@@ -368,10 +377,10 @@ locFromCS cs = case getCallStack cs of
368-- 'closeNode' function, otherwise socket or other scarce resources may 377-- 'closeNode' function, otherwise socket or other scarce resources may
369-- leak. 378-- leak.
370newNode :: Address ip 379newNode :: Address ip
371 => [NodeHandler ip] -- ^ handlers to run on accepted queries; 380 => [NodeHandler] -- ^ handlers to run on accepted queries;
372 -> Options -- ^ various dht options; 381 -> Options -- ^ various dht options;
373 -> NodeAddr ip -- ^ node address to bind; 382 -> NodeAddr ip -- ^ node address to bind;
374 -> LogFun -- ^ 383 -> LogFun -- ^ invoked on log messages;
375#ifdef VERSION_bencoding 384#ifdef VERSION_bencoding
376 -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. 385 -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated.
377#else 386#else
@@ -420,24 +429,23 @@ runDHT node action = runReaderT (unDHT action) node
420-- Tokens 429-- Tokens
421-----------------------------------------------------------------------} 430-----------------------------------------------------------------------}
422 431
423tryUpdateSecret :: DHT ip () 432tryUpdateSecret :: TVar SessionTokens -> IO ()
424tryUpdateSecret = do 433tryUpdateSecret toks = do
425 curTime <- liftIO getCurrentTime 434 curTime <- liftIO getCurrentTime
426 toks <- asks sessionTokens
427 liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime) 435 liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime)
428 436
429grantToken :: Hashable a => NodeAddr a -> DHT ip Token 437grantToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> IO Token
430grantToken addr = do 438grantToken sessionTokens addr = do
431 tryUpdateSecret 439 tryUpdateSecret sessionTokens
432 toks <- asks sessionTokens >>= liftIO . readTVarIO 440 toks <- readTVarIO sessionTokens
433 return $ T.lookup addr $ tokenMap toks 441 return $ T.lookup addr $ tokenMap toks
434 442
435-- | Throws 'HandlerError' if the token is invalid or already 443-- | Throws 'HandlerError' if the token is invalid or already
436-- expired. See 'TokenMap' for details. 444-- expired. See 'TokenMap' for details.
437checkToken :: Hashable a => NodeAddr a -> Token -> DHT ip Bool 445checkToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> Token -> IO Bool
438checkToken addr questionableToken = do 446checkToken sessionTokens addr questionableToken = do
439 tryUpdateSecret 447 tryUpdateSecret sessionTokens
440 toks <- asks sessionTokens >>= liftIO . readTVarIO 448 toks <- readTVarIO sessionTokens
441 return $ T.member addr questionableToken (tokenMap toks) 449 return $ T.member addr questionableToken (tokenMap toks)
442 450
443 451
@@ -463,6 +471,14 @@ myNodeIdAccordingTo _ = do
463 (return . myNodeId) 471 (return . myNodeId)
464 info 472 info
465 473
474myNodeIdAccordingTo1 :: DHT ip ( NodeAddr ip -> IO (NodeId KMessageOf) )
475myNodeIdAccordingTo1 = do
476 var <- asks routingInfo
477 tid <- asks tentativeNodeId
478 return $ \ _ -> do
479 info <- atomically $ readTVar var
480 return $ maybe tid myNodeId info
481
466-- | Get current routing table. Normally you don't need to use this 482-- | Get current routing table. Normally you don't need to use this
467-- function, but it can be usefull for debugging and profiling purposes. 483-- function, but it can be usefull for debugging and profiling purposes.
468#ifdef VERSION_bencoding 484#ifdef VERSION_bencoding
@@ -515,38 +531,48 @@ getClosest node = do
515 k <- asks (optK . options) 531 k <- asks (optK . options)
516 kclosest k node <$> getTable 532 kclosest k node <$> getTable
517 533
534getClosest1 :: ( Eq ip
535 , TableKey KMessageOf k
536 ) => DHT ip (k -> IO [NodeInfo KMessageOf ip ()])
537getClosest1 = do
538 k <- asks (optK . options)
539 nobkts <- asks (optBucketCount . options)
540 myid <- asks tentativeNodeId
541 var <- asks routingInfo
542 return $ \node -> do nfo <- atomically $ readTVar var
543 let tbl = maybe (nullTable myid nobkts) R.myBuckets nfo
544 return $ kclosest k node tbl
545
518{----------------------------------------------------------------------- 546{-----------------------------------------------------------------------
519-- Peer storage 547-- Peer storage
520-----------------------------------------------------------------------} 548-----------------------------------------------------------------------}
521 549
522refreshContacts :: DHT ip () 550refreshContacts :: Ord ip => TVar (PeerStore ip) -> IO ()
523refreshContacts = 551refreshContacts var =
524 -- TODO limit dht peer store in size (probably by removing oldest peers) 552 -- TODO limit dht peer store in size (probably by removing oldest peers)
525 return () 553 return ()
526 554
527 555
528-- | Insert peer to peer store. Used to handle announce requests. 556-- | Insert peer to peer store. Used to handle announce requests.
529insertPeer :: Ord ip => InfoHash -> Maybe ByteString -> PeerAddr ip -> DHT ip () 557insertPeer :: Ord ip => TVar (PeerStore ip) -> InfoHash -> Maybe ByteString -> PeerAddr ip -> IO ()
530insertPeer ih name addr = do 558insertPeer var ih name addr = do
531 refreshContacts 559 refreshContacts var
532 var <- asks contactInfo 560 atomically $ modifyTVar' var (P.insertPeer ih name addr)
533 liftIO $ atomically $ modifyTVar' var (P.insertPeer ih name addr)
534 561
535-- | Get peer set for specific swarm. 562-- | Get peer set for specific swarm.
536lookupPeers :: Ord ip => InfoHash -> DHT ip [PeerAddr ip] 563lookupPeers :: Ord ip => TVar (PeerStore ip) -> InfoHash -> IO [PeerAddr ip]
537lookupPeers ih = do 564lookupPeers var ih = do
538 refreshContacts 565 refreshContacts var
539 var <- asks contactInfo
540 tm <- getTimestamp 566 tm <- getTimestamp
541 liftIO $ atomically $ do 567 atomically $ do
542 (ps,store') <- P.freshPeers ih tm <$> readTVar var 568 (ps,store') <- P.freshPeers ih tm <$> readTVar var
543 writeTVar var store' 569 writeTVar var store'
544 return ps 570 return ps
545 571
546getTimestamp :: DHT ip Timestamp 572getTimestamp :: IO Timestamp
547getTimestamp = do 573getTimestamp = do
548 utcTime <- liftIO $ getCurrentTime 574 utcTime <- getCurrentTime
549 $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) 575 -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime)))
550 return $ utcTimeToPOSIXSeconds utcTime 576 return $ utcTimeToPOSIXSeconds utcTime
551 577
552 578
@@ -557,11 +583,23 @@ getTimestamp = do
557-- 583--
558getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip) 584getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip)
559getPeerList ih = do 585getPeerList ih = do
560 ps <- lookupPeers ih 586 var <- asks contactInfo
587 ps <- liftIO $ lookupPeers var ih
561 if L.null ps 588 if L.null ps
562 then Left <$> getClosest ih 589 then Left <$> getClosest ih
563 else return (Right ps) 590 else return (Right ps)
564 591
592getPeerList1 :: Ord ip => DHT ip (InfoHash -> IO (PeerList ip))
593getPeerList1 = do
594 var <- asks contactInfo
595 getclosest <- getClosest1
596 return $ \ih -> do
597 ps <- lookupPeers var ih
598 if L.null ps
599 then Left <$> getclosest ih
600 else return (Right ps)
601
602
565insertTopic :: InfoHash -> PortNumber -> DHT ip () 603insertTopic :: InfoHash -> PortNumber -> DHT ip ()
566insertTopic ih p = do 604insertTopic ih p = do
567 var <- asks announceInfo 605 var <- asks announceInfo