summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Session.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-06-29 10:37:07 -0400
committerjoe <joe@jerkface.net>2017-06-29 13:00:16 -0400
commit3195c0877b443e5ccd4d489f03944fc059d4d7aa (patch)
tree2a05c35a9b43d8f0725c52fc860b30ae191f3871 /src/Network/BitTorrent/DHT/Session.hs
parent05e70386c2248d87a61a8e8267e0211597f2fa88 (diff)
WIP: Generalizing DHT monad.
Diffstat (limited to 'src/Network/BitTorrent/DHT/Session.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs114
1 files changed, 49 insertions, 65 deletions
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index d4794038..f96ba707 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -96,6 +96,7 @@ import Control.Monad.Trans.Control
96import Control.Monad.Trans.Resource 96import Control.Monad.Trans.Resource
97import Data.Typeable 97import Data.Typeable
98import Data.String 98import Data.String
99import Data.Bits
99import Data.ByteString 100import Data.ByteString
100import Data.Conduit.Lazy 101import Data.Conduit.Lazy
101import Data.Default 102import Data.Default
@@ -265,26 +266,17 @@ type AnnounceSet = Set (InfoHash, PortNumber)
265type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () 266type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
266 267
267-- | DHT session keep track state of /this/ node. 268-- | DHT session keep track state of /this/ node.
268data Node ip = Node 269data Node raw dht u ip = Node
269 { -- | Session configuration; 270 { -- | Session configuration;
270 options :: !Options 271 options :: !Options
271 272
272 -- | Pseudo-unique self-assigned session identifier. This value is 273 -- | Pseudo-unique self-assigned session identifier. This value is
273 -- constant during DHT session and (optionally) between sessions. 274 -- constant during DHT session and (optionally) between sessions.
274#ifdef VERSION_bencoding 275 , tentativeNodeId :: !(NodeId dht)
275 , tentativeNodeId :: !(NodeId KMessageOf)
276#else
277 , tentativeNodeId :: !(NodeId Tox.Message)
278#endif
279 276
280 , resources :: !InternalState 277 , resources :: !InternalState
281#ifdef VERSION_bencoding 278 , manager :: !(Manager raw dht) -- ^ RPC manager;
282 , manager :: !(Manager IO BValue KMessageOf) -- ^ RPC manager; 279 , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table;
283 , routingInfo :: !(TVar (Maybe (R.Info KMessageOf ip ()))) -- ^ search table;
284#else
285 , manager :: !(Manager (DHT ip) ByteString Tox.Message) -- ^ RPC manager;
286 , routingInfo :: !(TVar (Maybe (R.Info Tox.Message ip Bool))) -- ^ search table;
287#endif
288 , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; 280 , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes;
289 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; 281 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node;
290 , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. 282 , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs.
@@ -293,23 +285,23 @@ data Node ip = Node
293 285
294-- | DHT keep track current session and proper resource allocation for 286-- | DHT keep track current session and proper resource allocation for
295-- safe multithreading. 287-- safe multithreading.
296newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } 288newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a }
297 deriving ( Functor, Applicative, Monad, MonadIO 289 deriving ( Functor, Applicative, Monad, MonadIO
298 , MonadBase IO, MonadReader (Node ip), MonadThrow 290 , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow
299 ) 291 )
300 292
301#if MIN_VERSION_monad_control(1,0,0) 293#if MIN_VERSION_monad_control(1,0,0)
302newtype DHTStM ip a = StM { 294newtype DHTStM raw dht u ip a = StM {
303 unSt :: StM (ReaderT (Node ip) IO) a 295 unSt :: StM (ReaderT (Node raw dht u ip) IO) a
304 } 296 }
305#endif 297#endif
306 298
307instance MonadBaseControl IO (DHT ip) where 299instance MonadBaseControl IO (DHT raw dht u ip) where
308#if MIN_VERSION_monad_control(1,0,0) 300#if MIN_VERSION_monad_control(1,0,0)
309 type StM (DHT ip) a = DHTStM ip a 301 type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a
310#else 302#else
311 newtype StM (DHT ip) a = StM { 303 newtype StM (DHT raw dht u ip) a = StM {
312 unSt :: StM (ReaderT (Node ip) IO) a 304 unSt :: StM (ReaderT (Node raw dht u ip) IO) a
313 } 305 }
314#endif 306#endif
315 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> 307 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' ->
@@ -321,22 +313,22 @@ instance MonadBaseControl IO (DHT ip) where
321 313
322-- | Check is it is possible to run 'queryNode' or handle pending 314-- | Check is it is possible to run 'queryNode' or handle pending
323-- query from remote node. 315-- query from remote node.
324instance MonadActive (DHT ip) where 316instance MonadActive (DHT raw dht u ip) where
325 monadActive = getManager >>= liftIO . isActive 317 monadActive = getManager >>= liftIO . isActive
326 {-# INLINE monadActive #-} 318 {-# INLINE monadActive #-}
327 319
328-- | All allocated resources will be closed at 'closeNode'. 320-- | All allocated resources will be closed at 'closeNode'.
329instance MonadResource (DHT ip) where 321instance MonadResource (DHT raw dht u ip) where
330 liftResourceT m = do 322 liftResourceT m = do
331 s <- asks resources 323 s <- asks resources
332 liftIO $ runInternalState m s 324 liftIO $ runInternalState m s
333 325
334-- instance MonadKRPC (DHT ip) (DHT ip) BValue KMessageOf where 326-- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where
335 327
336getManager :: DHT ip (Manager IO BValue KMessageOf) 328getManager :: DHT raw dht u ip (Manager raw dht)
337getManager = asks manager 329getManager = asks manager
338 330
339instance MonadLogger (DHT ip) where 331instance MonadLogger (DHT raw dht u ip) where
340 monadLoggerLog loc src lvl msg = do 332 monadLoggerLog loc src lvl msg = do
341 logger <- asks loggerFun 333 logger <- asks loggerFun
342 liftIO $ logger loc src lvl (toLogStr msg) 334 liftIO $ logger loc src lvl (toLogStr msg)
@@ -344,7 +336,7 @@ instance MonadLogger (DHT ip) where
344#ifdef VERSION_bencoding 336#ifdef VERSION_bencoding
345type NodeHandler = Handler IO KMessageOf BValue 337type NodeHandler = Handler IO KMessageOf BValue
346#else 338#else
347type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString 339type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString
348#endif 340#endif
349 341
350logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () 342logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO ()
@@ -376,17 +368,16 @@ locFromCS cs = case getCallStack cs of
376-- | Run DHT session. You /must/ properly close session using 368-- | Run DHT session. You /must/ properly close session using
377-- 'closeNode' function, otherwise socket or other scarce resources may 369-- 'closeNode' function, otherwise socket or other scarce resources may
378-- leak. 370-- leak.
379newNode :: Address ip 371newNode :: ( Address ip
372 , FiniteBits (NodeId dht)
373 , Serialize (NodeId dht)
374 )
380 => -- [NodeHandler] -- ^ handlers to run on accepted queries; 375 => -- [NodeHandler] -- ^ handlers to run on accepted queries;
381 Options -- ^ various dht options; 376 Options -- ^ various dht options;
382 -> NodeAddr ip -- ^ node address to bind; 377 -> NodeAddr ip -- ^ node address to bind;
383 -> LogFun -- ^ invoked on log messages; 378 -> LogFun -- ^ invoked on log messages;
384#ifdef VERSION_bencoding 379 -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated.
385 -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. 380 -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address.
386#else
387 -> Maybe (NodeId Tox.Message) -- ^ use this NodeId, if not given a new one is generated.
388#endif
389 -> IO (Node ip) -- ^ a new DHT node running at given address.
390newNode opts naddr logger mbid = do 381newNode opts naddr logger mbid = do
391 s <- createInternalState 382 s <- createInternalState
392 runInternalState initNode s 383 runInternalState initNode s
@@ -409,11 +400,11 @@ newNode opts naddr logger mbid = do
409 400
410-- | Some resources like listener thread may live for 401-- | Some resources like listener thread may live for
411-- some short period of time right after this DHT session closed. 402-- some short period of time right after this DHT session closed.
412closeNode :: Node ip -> IO () 403closeNode :: Node raw dht u ip -> IO ()
413closeNode Node {..} = closeInternalState resources 404closeNode Node {..} = closeInternalState resources
414 405
415-- | Run DHT operation on the given session. 406-- | Run DHT operation on the given session.
416runDHT :: Node ip -> DHT ip a -> IO a 407runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a
417runDHT node action = runReaderT (unDHT action) node 408runDHT node action = runReaderT (unDHT action) node
418{-# INLINE runDHT #-} 409{-# INLINE runDHT #-}
419 410
@@ -453,24 +444,20 @@ checkToken sessionTokens addr questionableToken = do
453-----------------------------------------------------------------------} 444-----------------------------------------------------------------------}
454 445
455-- | This nodes externally routable address reported by remote peers. 446-- | This nodes externally routable address reported by remote peers.
456routableAddress :: DHT ip (Maybe SockAddr) 447routableAddress :: DHT raw dht u ip (Maybe SockAddr)
457routableAddress = do 448routableAddress = do
458 info <- asks routingInfo >>= liftIO . atomically . readTVar 449 info <- asks routingInfo >>= liftIO . atomically . readTVar
459 return $ myAddress <$> info 450 return $ myAddress <$> info
460 451
461-- | The current NodeId that the given remote node should know us by. 452-- | The current NodeId that the given remote node should know us by.
462#ifdef VERSION_bencoding 453myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht)
463myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId KMessageOf)
464#else
465myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId Tox.Message)
466#endif
467myNodeIdAccordingTo _ = do 454myNodeIdAccordingTo _ = do
468 info <- asks routingInfo >>= liftIO . atomically . readTVar 455 info <- asks routingInfo >>= liftIO . atomically . readTVar
469 maybe (asks tentativeNodeId) 456 maybe (asks tentativeNodeId)
470 (return . myNodeId) 457 (return . myNodeId)
471 info 458 info
472 459
473myNodeIdAccordingTo1 :: DHT ip ( NodeAddr ip -> IO (NodeId KMessageOf) ) 460myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) )
474myNodeIdAccordingTo1 = do 461myNodeIdAccordingTo1 = do
475 var <- asks routingInfo 462 var <- asks routingInfo
476 tid <- asks tentativeNodeId 463 tid <- asks tentativeNodeId
@@ -480,11 +467,7 @@ myNodeIdAccordingTo1 = do
480 467
481-- | Get current routing table. Normally you don't need to use this 468-- | Get current routing table. Normally you don't need to use this
482-- function, but it can be usefull for debugging and profiling purposes. 469-- function, but it can be usefull for debugging and profiling purposes.
483#ifdef VERSION_bencoding 470getTable :: Eq ip => DHT raw dht u ip (Table dht ip u)
484getTable :: Eq ip => DHT ip (Table KMessageOf ip ())
485#else
486getTable :: Eq ip => DHT ip (Table Tox.Message ip Bool)
487#endif
488getTable = do 471getTable = do
489 Node { tentativeNodeId = myId 472 Node { tentativeNodeId = myId
490 , routingInfo = var 473 , routingInfo = var
@@ -492,18 +475,18 @@ getTable = do
492 let nil = nullTable myId (optBucketCount opts) 475 let nil = nullTable myId (optBucketCount opts)
493 liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) 476 liftIO (maybe nil R.myBuckets <$> atomically (readTVar var))
494 477
495getSwarms :: Ord ip => DHT ip [ (InfoHash, Int, Maybe ByteString) ] 478getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ]
496getSwarms = do 479getSwarms = do
497 store <- asks contactInfo >>= liftIO . atomically . readTVar 480 store <- asks contactInfo >>= liftIO . atomically . readTVar
498 return $ P.knownSwarms store 481 return $ P.knownSwarms store
499 482
500savePeerStore :: (Ord ip, Address ip) => DHT ip ByteString 483savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString
501savePeerStore = do 484savePeerStore = do
502 var <- asks contactInfo 485 var <- asks contactInfo
503 peers <- liftIO $ atomically $ readTVar var 486 peers <- liftIO $ atomically $ readTVar var
504 return $ S.encode peers 487 return $ S.encode peers
505 488
506mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT ip () 489mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip ()
507mergeSavedPeers bs = do 490mergeSavedPeers bs = do
508 var <- asks contactInfo 491 var <- asks contactInfo
509 case S.decode bs of 492 case S.decode bs of
@@ -511,7 +494,7 @@ mergeSavedPeers bs = do
511 Left _ -> return () 494 Left _ -> return ()
512 495
513 496
514allPeers :: Ord ip => InfoHash -> DHT ip [ PeerAddr ip ] 497allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ]
515allPeers ih = do 498allPeers ih = do
516 store <- asks contactInfo >>= liftIO . atomically . readTVar 499 store <- asks contactInfo >>= liftIO . atomically . readTVar
517 return $ P.lookup ih store 500 return $ P.lookup ih store
@@ -521,18 +504,20 @@ allPeers ih = do
521-- 504--
522-- This operation used for 'find_nodes' query. 505-- This operation used for 'find_nodes' query.
523-- 506--
524#ifdef VERSION_bencoding 507getClosest :: ( Eq ip
525getClosest :: Eq ip => TableKey KMessageOf k => k -> DHT ip [NodeInfo KMessageOf ip ()] 508 , Ord (NodeId dht)
526#else 509 , FiniteBits (NodeId dht)
527getClosest :: Eq ip => TableKey Tox.Message k => k -> DHT ip [NodeInfo Tox.Message ip Bool] 510 , TableKey dht k ) =>
528#endif 511 k -> DHT raw dht u ip [NodeInfo dht ip u]
529getClosest node = do 512getClosest node = do
530 k <- asks (optK . options) 513 k <- asks (optK . options)
531 kclosest k node <$> getTable 514 kclosest k node <$> getTable
532 515
533getClosest1 :: ( Eq ip 516getClosest1 :: ( Eq ip
534 , TableKey KMessageOf k 517 , Ord (NodeId dht)
535 ) => DHT ip (k -> IO [NodeInfo KMessageOf ip ()]) 518 , FiniteBits (NodeId dht)
519 , TableKey dht k
520 ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u])
536getClosest1 = do 521getClosest1 = do
537 k <- asks (optK . options) 522 k <- asks (optK . options)
538 nobkts <- asks (optBucketCount . options) 523 nobkts <- asks (optBucketCount . options)
@@ -574,13 +559,12 @@ getTimestamp = do
574 -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) 559 -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime)))
575 return $ utcTimeToPOSIXSeconds utcTime 560 return $ utcTimeToPOSIXSeconds utcTime
576 561
577
578#ifdef VERSION_bencoding 562#ifdef VERSION_bencoding
579-- | Prepare result for 'get_peers' query. 563-- | Prepare result for 'get_peers' query.
580-- 564--
581-- This operation use 'getClosest' as failback so it may block. 565-- This operation use 'getClosest' as failback so it may block.
582-- 566--
583getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip) 567getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip)
584getPeerList ih = do 568getPeerList ih = do
585 var <- asks contactInfo 569 var <- asks contactInfo
586 ps <- liftIO $ lookupPeers var ih 570 ps <- liftIO $ lookupPeers var ih
@@ -588,7 +572,7 @@ getPeerList ih = do
588 then Left <$> getClosest ih 572 then Left <$> getClosest ih
589 else return (Right ps) 573 else return (Right ps)
590 574
591getPeerList1 :: Ord ip => DHT ip (InfoHash -> IO (PeerList ip)) 575getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip))
592getPeerList1 = do 576getPeerList1 = do
593 var <- asks contactInfo 577 var <- asks contactInfo
594 getclosest <- getClosest1 578 getclosest <- getClosest1
@@ -599,12 +583,12 @@ getPeerList1 = do
599 else return (Right ps) 583 else return (Right ps)
600 584
601 585
602insertTopic :: InfoHash -> PortNumber -> DHT ip () 586insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip ()
603insertTopic ih p = do 587insertTopic ih p = do
604 var <- asks announceInfo 588 var <- asks announceInfo
605 liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) 589 liftIO $ atomically $ modifyTVar' var (S.insert (ih, p))
606 590
607deleteTopic :: InfoHash -> PortNumber -> DHT ip () 591deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip ()
608deleteTopic ih p = do 592deleteTopic ih p = do
609 var <- asks announceInfo 593 var <- asks announceInfo
610 liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) 594 liftIO $ atomically $ modifyTVar' var (S.delete (ih, p))
@@ -616,7 +600,7 @@ deleteTopic ih p = do
616-----------------------------------------------------------------------} 600-----------------------------------------------------------------------}
617 601
618-- | Failed queries are ignored. 602-- | Failed queries are ignored.
619queryParallel :: [DHT ip a] -> DHT ip [a] 603queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a]
620queryParallel queries = do 604queryParallel queries = do
621 -- TODO: use alpha 605 -- TODO: use alpha
622 -- alpha <- asks (optAlpha . options) 606 -- alpha <- asks (optAlpha . options)