From 3195c0877b443e5ccd4d489f03944fc059d4d7aa Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 29 Jun 2017 10:37:07 -0400 Subject: WIP: Generalizing DHT monad. --- src/Network/BitTorrent/DHT/Session.hs | 114 +++++++++++++++------------------- 1 file changed, 49 insertions(+), 65 deletions(-) (limited to 'src/Network/BitTorrent/DHT/Session.hs') diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index d4794038..f96ba707 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -96,6 +96,7 @@ import Control.Monad.Trans.Control import Control.Monad.Trans.Resource import Data.Typeable import Data.String +import Data.Bits import Data.ByteString import Data.Conduit.Lazy import Data.Default @@ -265,26 +266,17 @@ type AnnounceSet = Set (InfoHash, PortNumber) type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () -- | DHT session keep track state of /this/ node. -data Node ip = Node +data Node raw dht u ip = Node { -- | Session configuration; options :: !Options -- | Pseudo-unique self-assigned session identifier. This value is -- constant during DHT session and (optionally) between sessions. -#ifdef VERSION_bencoding - , tentativeNodeId :: !(NodeId KMessageOf) -#else - , tentativeNodeId :: !(NodeId Tox.Message) -#endif + , tentativeNodeId :: !(NodeId dht) , resources :: !InternalState -#ifdef VERSION_bencoding - , manager :: !(Manager IO BValue KMessageOf) -- ^ RPC manager; - , routingInfo :: !(TVar (Maybe (R.Info KMessageOf ip ()))) -- ^ search table; -#else - , manager :: !(Manager (DHT ip) ByteString Tox.Message) -- ^ RPC manager; - , routingInfo :: !(TVar (Maybe (R.Info Tox.Message ip Bool))) -- ^ search table; -#endif + , manager :: !(Manager raw dht) -- ^ RPC manager; + , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. @@ -293,23 +285,23 @@ data Node ip = Node -- | DHT keep track current session and proper resource allocation for -- safe multithreading. -newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } +newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a } deriving ( Functor, Applicative, Monad, MonadIO - , MonadBase IO, MonadReader (Node ip), MonadThrow + , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow ) #if MIN_VERSION_monad_control(1,0,0) -newtype DHTStM ip a = StM { - unSt :: StM (ReaderT (Node ip) IO) a +newtype DHTStM raw dht u ip a = StM { + unSt :: StM (ReaderT (Node raw dht u ip) IO) a } #endif -instance MonadBaseControl IO (DHT ip) where +instance MonadBaseControl IO (DHT raw dht u ip) where #if MIN_VERSION_monad_control(1,0,0) - type StM (DHT ip) a = DHTStM ip a + type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a #else - newtype StM (DHT ip) a = StM { - unSt :: StM (ReaderT (Node ip) IO) a + newtype StM (DHT raw dht u ip) a = StM { + unSt :: StM (ReaderT (Node raw dht u ip) IO) a } #endif liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> @@ -321,22 +313,22 @@ instance MonadBaseControl IO (DHT ip) where -- | Check is it is possible to run 'queryNode' or handle pending -- query from remote node. -instance MonadActive (DHT ip) where +instance MonadActive (DHT raw dht u ip) where monadActive = getManager >>= liftIO . isActive {-# INLINE monadActive #-} -- | All allocated resources will be closed at 'closeNode'. -instance MonadResource (DHT ip) where +instance MonadResource (DHT raw dht u ip) where liftResourceT m = do s <- asks resources liftIO $ runInternalState m s --- instance MonadKRPC (DHT ip) (DHT ip) BValue KMessageOf where +-- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where -getManager :: DHT ip (Manager IO BValue KMessageOf) +getManager :: DHT raw dht u ip (Manager raw dht) getManager = asks manager -instance MonadLogger (DHT ip) where +instance MonadLogger (DHT raw dht u ip) where monadLoggerLog loc src lvl msg = do logger <- asks loggerFun liftIO $ logger loc src lvl (toLogStr msg) @@ -344,7 +336,7 @@ instance MonadLogger (DHT ip) where #ifdef VERSION_bencoding type NodeHandler = Handler IO KMessageOf BValue #else -type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString +type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString #endif logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () @@ -376,17 +368,16 @@ locFromCS cs = case getCallStack cs of -- | Run DHT session. You /must/ properly close session using -- 'closeNode' function, otherwise socket or other scarce resources may -- leak. -newNode :: Address ip +newNode :: ( Address ip + , FiniteBits (NodeId dht) + , Serialize (NodeId dht) + ) => -- [NodeHandler] -- ^ handlers to run on accepted queries; Options -- ^ various dht options; -> NodeAddr ip -- ^ node address to bind; -> LogFun -- ^ invoked on log messages; -#ifdef VERSION_bencoding - -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. -#else - -> Maybe (NodeId Tox.Message) -- ^ use this NodeId, if not given a new one is generated. -#endif - -> IO (Node ip) -- ^ a new DHT node running at given address. + -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated. + -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address. newNode opts naddr logger mbid = do s <- createInternalState runInternalState initNode s @@ -409,11 +400,11 @@ newNode opts naddr logger mbid = do -- | Some resources like listener thread may live for -- some short period of time right after this DHT session closed. -closeNode :: Node ip -> IO () +closeNode :: Node raw dht u ip -> IO () closeNode Node {..} = closeInternalState resources -- | Run DHT operation on the given session. -runDHT :: Node ip -> DHT ip a -> IO a +runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a runDHT node action = runReaderT (unDHT action) node {-# INLINE runDHT #-} @@ -453,24 +444,20 @@ checkToken sessionTokens addr questionableToken = do -----------------------------------------------------------------------} -- | This nodes externally routable address reported by remote peers. -routableAddress :: DHT ip (Maybe SockAddr) +routableAddress :: DHT raw dht u ip (Maybe SockAddr) routableAddress = do info <- asks routingInfo >>= liftIO . atomically . readTVar return $ myAddress <$> info -- | The current NodeId that the given remote node should know us by. -#ifdef VERSION_bencoding -myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId KMessageOf) -#else -myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId Tox.Message) -#endif +myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht) myNodeIdAccordingTo _ = do info <- asks routingInfo >>= liftIO . atomically . readTVar maybe (asks tentativeNodeId) (return . myNodeId) info -myNodeIdAccordingTo1 :: DHT ip ( NodeAddr ip -> IO (NodeId KMessageOf) ) +myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) ) myNodeIdAccordingTo1 = do var <- asks routingInfo tid <- asks tentativeNodeId @@ -480,11 +467,7 @@ myNodeIdAccordingTo1 = do -- | Get current routing table. Normally you don't need to use this -- function, but it can be usefull for debugging and profiling purposes. -#ifdef VERSION_bencoding -getTable :: Eq ip => DHT ip (Table KMessageOf ip ()) -#else -getTable :: Eq ip => DHT ip (Table Tox.Message ip Bool) -#endif +getTable :: Eq ip => DHT raw dht u ip (Table dht ip u) getTable = do Node { tentativeNodeId = myId , routingInfo = var @@ -492,18 +475,18 @@ getTable = do let nil = nullTable myId (optBucketCount opts) liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) -getSwarms :: Ord ip => DHT ip [ (InfoHash, Int, Maybe ByteString) ] +getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ] getSwarms = do store <- asks contactInfo >>= liftIO . atomically . readTVar return $ P.knownSwarms store -savePeerStore :: (Ord ip, Address ip) => DHT ip ByteString +savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString savePeerStore = do var <- asks contactInfo peers <- liftIO $ atomically $ readTVar var return $ S.encode peers -mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT ip () +mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip () mergeSavedPeers bs = do var <- asks contactInfo case S.decode bs of @@ -511,7 +494,7 @@ mergeSavedPeers bs = do Left _ -> return () -allPeers :: Ord ip => InfoHash -> DHT ip [ PeerAddr ip ] +allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ] allPeers ih = do store <- asks contactInfo >>= liftIO . atomically . readTVar return $ P.lookup ih store @@ -521,18 +504,20 @@ allPeers ih = do -- -- This operation used for 'find_nodes' query. -- -#ifdef VERSION_bencoding -getClosest :: Eq ip => TableKey KMessageOf k => k -> DHT ip [NodeInfo KMessageOf ip ()] -#else -getClosest :: Eq ip => TableKey Tox.Message k => k -> DHT ip [NodeInfo Tox.Message ip Bool] -#endif +getClosest :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , TableKey dht k ) => + k -> DHT raw dht u ip [NodeInfo dht ip u] getClosest node = do k <- asks (optK . options) kclosest k node <$> getTable getClosest1 :: ( Eq ip - , TableKey KMessageOf k - ) => DHT ip (k -> IO [NodeInfo KMessageOf ip ()]) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , TableKey dht k + ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u]) getClosest1 = do k <- asks (optK . options) nobkts <- asks (optBucketCount . options) @@ -574,13 +559,12 @@ getTimestamp = do -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) return $ utcTimeToPOSIXSeconds utcTime - #ifdef VERSION_bencoding -- | Prepare result for 'get_peers' query. -- -- This operation use 'getClosest' as failback so it may block. -- -getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip) +getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) getPeerList ih = do var <- asks contactInfo ps <- liftIO $ lookupPeers var ih @@ -588,7 +572,7 @@ getPeerList ih = do then Left <$> getClosest ih else return (Right ps) -getPeerList1 :: Ord ip => DHT ip (InfoHash -> IO (PeerList ip)) +getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) getPeerList1 = do var <- asks contactInfo getclosest <- getClosest1 @@ -599,12 +583,12 @@ getPeerList1 = do else return (Right ps) -insertTopic :: InfoHash -> PortNumber -> DHT ip () +insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () insertTopic ih p = do var <- asks announceInfo liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) -deleteTopic :: InfoHash -> PortNumber -> DHT ip () +deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () deleteTopic ih p = do var <- asks announceInfo liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) @@ -616,7 +600,7 @@ deleteTopic ih p = do -----------------------------------------------------------------------} -- | Failed queries are ignored. -queryParallel :: [DHT ip a] -> DHT ip [a] +queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a] queryParallel queries = do -- TODO: use alpha -- alpha <- asks (optAlpha . options) -- cgit v1.2.3