From 3195c0877b443e5ccd4d489f03944fc059d4d7aa Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 29 Jun 2017 10:37:07 -0400 Subject: WIP: Generalizing DHT monad. --- src/Network/BitTorrent/DHT.hs | 42 ++-- src/Network/BitTorrent/DHT/Query.hs | 391 ++++++++++++++++++++++++++++------ src/Network/BitTorrent/DHT/Search.hs | 52 ++--- src/Network/BitTorrent/DHT/Session.hs | 114 +++++----- 4 files changed, 425 insertions(+), 174 deletions(-) (limited to 'src/Network/BitTorrent') diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index d9328cea..8bc423a3 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -15,9 +15,11 @@ -- -- {-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE CPP #-} module Network.BitTorrent.DHT ( -- * Distributed Hash Table DHT @@ -37,7 +39,7 @@ module Network.BitTorrent.DHT , snapshot -- * Operations - , Network.BitTorrent.DHT.lookup + -- , Network.BitTorrent.DHT.lookup , Network.BitTorrent.DHT.insert , Network.BitTorrent.DHT.delete @@ -50,7 +52,7 @@ module Network.BitTorrent.DHT , closeNode -- ** Monad - , MonadDHT (..) + -- , MonadDHT (..) , runDHT ) where @@ -81,11 +83,13 @@ import qualified Network.DatagramServer as KRPC (listen, Protocol(..)) -- DHT types -----------------------------------------------------------------------} +#if 0 class MonadDHT m where - liftDHT :: DHT IPv4 a -> m a + liftDHT :: DHT raw dht u IPv4 a -> m a -instance MonadDHT (DHT IPv4) where +instance MonadDHT (DHT raw dht u IPv4) where liftDHT = id +#endif -- | Convenience method. Pass this to 'dht' to enable full logging. fullLogging :: LogSource -> LogLevel -> Bool @@ -96,7 +100,7 @@ dht :: (Ord ip, Address ip) => Options -- ^ normally you need to use 'Data.Default.def'; -> NodeAddr ip -- ^ address to bind this node; -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default - -> DHT ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; + -> DHT raw dht u ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; -> IO a -- ^ result. dht opts addr logfilter action = do runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do @@ -175,7 +179,7 @@ resolveHostName NodeAddr {..} = do -- -- This operation do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -bootstrap :: forall ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT ip () +bootstrap :: forall raw dht u ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT raw dht u ip () bootstrap mbs startNodes = do restored <- case decode <$> mbs of @@ -187,8 +191,8 @@ bootstrap mbs startNodes = do $(logInfoS) "bootstrap" "Start node bootstrapping" let searchAll aliveNodes = do nid <- myNodeIdAccordingTo (error "FIXME") - nss <- C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume - return ( nss :: [[NodeInfo KMessageOf ip ()]] ) + ns <- bgsearch ioFindNodes nid + return ( ns :: [NodeInfo KMessageOf ip ()] ) input_nodes <- (restored ++) . T.toList <$> getTable -- Step 1: Use iterative searches to flesh out the table.. do let knowns = map (map $ nodeAddr . fst) input_nodes @@ -200,10 +204,10 @@ bootstrap mbs startNodes = do -- If our cached nodes are alive and our IP address did not change, it's possible -- we are already bootsrapped, so no need to do any searches. when (not b) $ do - nss <- searchAll $ take 2 alive_knowns + ns <- searchAll $ take 2 alive_knowns -- We only use the supplied bootstrap nodes when we don't know of any -- others to try. - when (null nss) $ do + when (null ns) $ do -- TODO filter duplicated in startNodes list -- TODO retransmissions for startNodes (aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes) @@ -243,7 +247,7 @@ bootstrap mbs startNodes = do -- -- This operation do not block. -- -isBootstrapped :: Eq ip => DHT ip Bool +isBootstrapped :: Eq ip => DHT raw dht u ip Bool isBootstrapped = T.full <$> getTable {----------------------------------------------------------------------- @@ -254,7 +258,11 @@ isBootstrapped = T.full <$> getTable -- -- This is blocking operation, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -snapshot :: Address ip => DHT ip BS.ByteString +snapshot :: ( Address ip + , Ord (NodeId dht) + , Serialize u + , Serialize (NodeId dht) + ) => DHT raw dht u ip BS.ByteString snapshot = do tbl <- getTable return $ encode tbl @@ -263,15 +271,19 @@ snapshot = do -- Operations -----------------------------------------------------------------------} +#if 0 + -- | Get list of peers which downloading this torrent. -- -- This operation is incremental and do block. -- -lookup :: Address ip => InfoHash -> DHT ip `C.Source` [PeerAddr ip] +lookup :: Address ip => InfoHash -> DHT raw dht u ip `C.Source` [PeerAddr ip] lookup topic = do -- TODO retry getClosest if bucket is empty closest <- lift $ getClosest topic C.sourceList [closest] $= search topic (getPeersQ topic) +#endif + -- TODO do not republish if the topic is already in announceSet -- | Announce that /this/ peer may have some pieces of the specified @@ -281,7 +293,7 @@ lookup topic = do -- TODO retry getClosest if bucket is empty -- This operation is synchronous and do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -- -insert :: Address ip => InfoHash -> PortNumber -> DHT ip () +insert :: Address ip => InfoHash -> PortNumber -> DHT raw dht u ip () insert ih p = do publish ih p insertTopic ih p @@ -290,6 +302,6 @@ insert ih p = do -- -- This operation is atomic and may block for a while. -- -delete :: InfoHash -> PortNumber -> DHT ip () +delete :: InfoHash -> PortNumber -> DHT raw dht u ip () delete = deleteTopic {-# INLINE delete #-} diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 254b347c..e5d9bd5f 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs @@ -41,11 +41,13 @@ module Network.BitTorrent.DHT.Query -- concatenate its responses, optionally yielding result and -- continue to the next iteration. , Search - , search + -- , search , publish , ioFindNode + , ioFindNodes , ioGetPeers , isearch + , bgsearch -- ** Routing table , insertNode @@ -57,6 +59,8 @@ module Network.BitTorrent.DHT.Query , (<@>) ) where +import Data.Bits +import Data.Default #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument hiding (yield) #else @@ -102,30 +106,43 @@ import Network.DatagramServer.Tox #endif import Network.Address hiding (NodeId) import Network.DatagramServer.Types as RPC hiding (Query,Response) +import Network.DHT.Types import Control.Monad.Trans.Control +import Data.Typeable +import Data.Serialize +import System.IO.Unsafe (unsafeInterleaveIO) +import Data.String {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} +{- nodeHandler :: ( Address ip - , KRPC (Query a) (Response b) + , KRPC (Query KMessageOf a) (Response KMessageOf b) ) => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler +-} +nodeHandler :: + (Address addr, WireFormat raw msg, Pretty (NodeInfo dht addr u), + Default u, + IsString t, Functor msg, + SerializableTo raw (Response dht r), + SerializableTo raw (Query dht q)) => + (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ()) + -> (NodeAddr addr -> IO (NodeId dht)) + -> (Char -> t -> Text -> IO ()) + -> QueryMethod msg + -> (NodeAddr addr -> q -> IO r) + -> Handler IO msg raw nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do -#ifdef VERSION_bencoding let remoteId = queringNodeId qry read_only = queryIsReadOnly qry q = queryParams qry -#else - let remoteId = msgClient qry - read_only = False - q = msgPayload qry -#endif case fromSockAddr sockAddr of Nothing -> throwIO BadAddress Just naddr -> do - let ni = NodeInfo remoteId naddr () + let ni = NodeInfo remoteId naddr def -- Do not route read-only nodes. (bep 43) if read_only then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) @@ -135,13 +152,13 @@ nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ <*> action naddr q -- | Default 'Ping' handler. -pingH :: NodeAddr ip -> Ping -> IO Ping -pingH _ Ping = return Ping +pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht) +pingH dht _ _ = return (DHT.pongMessage dht) -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } -- | Default 'FindNode' handler. -findNodeH :: (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) -> NodeAddr ip -> FindNode ip -> IO (NodeFound ip) -findNodeH getclosest _ (FindNode nid) = NodeFound <$> getclosest nid +findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip) +findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg) -- | Default 'GetPeers' handler. getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) @@ -162,51 +179,100 @@ announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do insertPeer peers topic announcedName peerAddr return Announced +-- | Includes all Kademlia-related handlers. +kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip + , Ord (TransactionID dht) + , Ord (NodeId dht) + , Show u + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Show (QueryMethod dht) + , Show (NodeId dht) + , FiniteBits (NodeId dht) + , Default u + , Serialize (TransactionID dht) + , WireFormat raw dht + , Kademlia dht + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Functor dht + , Pretty (NodeInfo dht ip u) + , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) + , SerializableTo raw (Response dht (NodeFound dht ip)) + , SerializableTo raw (Query dht (FindNode dht ip)) + ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] +-- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] +kademliaHandlers logger = do + groknode <- insertNode1 + mynid <- myNodeIdAccordingTo1 + let handler :: ( KRPC (Query dht a) (Response dht b) + , SerializableTo raw (Response dht b) + , SerializableTo raw (Query dht a) + ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw + handler = nodeHandler groknode mynid (logt logger) + dht = Proxy :: Proxy dht + getclosest <- getClosest1 + return [ handler (namePing dht) $ pingH dht + , handler (nameFindNodes dht) $ findNodeH getclosest + ] + + -- | Includes all default query handlers. -defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT ip [NodeHandler] +defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] defaultHandlers logger = do groknode <- insertNode1 - toks <- asks sessionTokens - getclosest <- getClosest1 mynid <- myNodeIdAccordingTo1 + let handler :: KRPC (Query KMessageOf a) (Response KMessageOf b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler + handler = nodeHandler groknode mynid (logt logger) + toks <- asks sessionTokens peers <- asks contactInfo getpeers <- getPeerList1 - let handler :: KRPC (Query a) (Response b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler - handler = nodeHandler groknode mynid (logt logger) - return [ handler "ping" $ pingH - , handler "find-nodes" $ findNodeH getclosest - , handler "get_peers" $ getPeersH getpeers toks - , handler "announce_peer" $ announceH peers toks ] + hs <- kademliaHandlers logger + return $ hs ++ [ handler "get_peers" $ getPeersH getpeers toks + , handler "announce_peer" $ announceH peers toks ] {----------------------------------------------------------------------- -- Basic queries -----------------------------------------------------------------------} -type Iteration ip o = NodeInfo KMessageOf ip () -> DHT ip (Either [NodeInfo KMessageOf ip ()] [o ip]) +type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip]) -- | The most basic query. May be used to check if the given node is -- alive or get its 'NodeId'. -pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo KMessageOf ip (), Maybe ReflectedIP) +pingQ :: forall raw dht u ip. + ( DHT.Kademlia dht + , Address ip + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Default u + , Show u + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , WireFormat raw dht + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + ) => NodeAddr ip -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) pingQ addr = do -#ifdef VERSION_bencoding - (nid, Ping, mip) <- queryNode' addr Ping -#else - (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} -#endif - return (NodeInfo nid addr (), mip) + let ping = DHT.pingMessage (Proxy :: Proxy dht) + (nid, pong, mip) <- queryNode' addr ping + let _ = pong `asTypeOf` ping + -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} + return (NodeInfo nid addr def, mip) -- TODO [robustness] match range of returned node ids with the -- expected range and either filter bad nodes or discard response at -- all throwing an exception -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo -findNodeQ key NodeInfo {..} = do - NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr +findNodeQ proxy key NodeInfo {..} = do + closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> nodeAddr $(logInfoS) "findNodeQ" $ "NodeFound\n" <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) return $ Right closest #ifdef VERSION_bencoding -getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr +getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr getPeersQ topic NodeInfo {..} = do GotPeers {..} <- GetPeers topic <@> nodeAddr let dist = distance (toNodeId topic) nodeId @@ -215,7 +281,7 @@ getPeersQ topic NodeInfo {..} = do <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } return peers -announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr +announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr announceQ ih p NodeInfo {..} = do GotPeers {..} <- GetPeers ih <@> nodeAddr case peers of @@ -232,7 +298,7 @@ announceQ ih p NodeInfo {..} = do -----------------------------------------------------------------------} -ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) +ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) ioGetPeers ih = do session <- ask return $ \ni -> runDHT session $ do @@ -241,17 +307,71 @@ ioGetPeers ih = do Right e -> return $ either (,[]) ([],) e Left e -> let _ = e :: QueryFailure in return ([],[]) -ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [NodeInfo KMessageOf ip ()])) +ioFindNode :: ( DHT.Kademlia dht + , WireFormat raw dht + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Address ip + , Default u + , Show u + , Show (QueryMethod dht) + , TableKey dht infohash + , Eq (NodeId dht) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , SerializableTo raw (Response dht (NodeFound dht ip)) + , SerializableTo raw (Query dht (FindNode dht ip)) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) ioFindNode ih = do session <- ask return $ \ni -> runDHT session $ do - NodeFound ns <- FindNode (toNodeId ih) <@> nodeAddr ni - return $ L.partition (\n -> nodeId n /= toNodeId ih) ns - -isearch :: (Ord r, Ord ip) => - (InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r]))) - -> InfoHash - -> DHT ip (ThreadId, Search.IterativeSearch ip r) + ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni + let ns' = L.map (fmap (const def)) ns + return $ L.partition (\n -> nodeId n /= toNodeId ih) ns' + + +-- | Like ioFindNode, but considers all found nodes to be 'Right' results. +ioFindNodes :: ( DHT.Kademlia dht + , WireFormat raw dht + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Address ip + , Default u + , Show u + , Show (QueryMethod dht) + , TableKey dht infohash + , Eq (NodeId dht) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , SerializableTo raw (Response dht (NodeFound dht ip)) + , SerializableTo raw (Query dht (FindNode dht ip)) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) +ioFindNodes ih = do + session <- ask + return $ \ni -> runDHT session $ do + ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni + let ns' = L.map (fmap (const def)) ns + return ([], ns') + +isearch :: ( Ord r + , Ord ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , TableKey dht ih + , Show ih) => + (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]))) + -> ih + -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r) isearch f ih = do qry <- f ih ns <- kclosest 8 ih <$> getTable @@ -263,8 +383,25 @@ isearch f ih = do -- atomically \$ readTVar (Search.searchResults s) return (a, s) - -type Search ip o = Conduit [NodeInfo KMessageOf ip ()] (DHT ip) [o KMessageOf ip ()] +-- | Background search: fill a lazy list using a background thread. +bgsearch f ih = do + (tid, s) <- isearch f ih + let again shown = do + (chk,fin) <- atomically $ do + r <- (Set.\\ shown) <$> readTVar (Search.searchResults s) + if not $ Set.null r + then (,) r <$> Search.searchIsFinished s + else Search.searchIsFinished s >>= check >> return (Set.empty,True) + let ps = Set.toList chk + if fin then return ps + else do + xs <- unsafeInterleaveIO $ again (shown `Set.union` chk) + return $ ps ++ xs + liftIO $ again Set.empty + +type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u] + +#if 0 -- TODO: use reorder and filter (Traversal option) leftovers -- search :: k -> IterationI ip o -> Search ip o @@ -275,17 +412,36 @@ search _ action = do let (nodes, results) = partitionEithers responses $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) leftover $ L.concat nodes - mapM_ yield results - -publish :: Address ip => InfoHash -> PortNumber -> DHT ip () -publish ih p = do - nodes <- getClosest ih - r <- asks (optReplication . options) - _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r - return () + let r = mapM_ yield results + _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ()) + r +#endif -probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) +publish = error "todo" +-- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip () +-- publish ih p = do + -- nodes <- getClosest ih + -- r <- asks (optReplication . options) + -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r + -- return () + + +probeNode :: ( Default u + , Show u + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , DHT.Kademlia dht + , Address ip + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , WireFormat raw dht + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + ) => NodeAddr ip -> DHT raw dht u ip (Bool , Maybe ReflectedIP) probeNode addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) result <- try $ pingQ addr @@ -293,8 +449,16 @@ probeNode addr = do return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result +refreshNodes :: forall raw dht u ip. + ( Address ip + , Ord (NodeId dht) + , Default u + , FiniteBits (NodeId dht) + , Pretty (NodeId dht) + , DHT.Kademlia dht ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()] +refreshNodes _ = return () -- TODO +#if 0 -- FIXME do not use getClosest sinse we should /refresh/ them -refreshNodes :: Address ip => NodeId KMessageOf -> DHT ip () -- [NodeInfo KMessageOf ip ()] refreshNodes nid = do $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) nodes <- getClosest nid @@ -304,7 +468,7 @@ refreshNodes nid = do -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume - nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume + nss <- sourceList [nodes] $= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) @@ -312,8 +476,9 @@ refreshNodes nid = do -- pingQ takes care of inserting the node. return () return () -- \$ L.concat nss +#endif -logc :: Char -> String -> DHT ip () +logc :: Char -> String -> DHT raw dht u ip () logc 'D' = $(logDebugS) "insertNode" . T.pack logc 'W' = $(logWarnS) "insertNode" . T.pack logc 'I' = $(logInfoS) "insertNode" . T.pack @@ -321,12 +486,46 @@ logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) -- | This operation do not block but acquire exclusive access to -- routing table. -insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () +insertNode :: forall raw dht u ip. + ( Address ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Default u + , Show u + , DHT.Kademlia dht + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Ord (TransactionID dht) + , WireFormat raw dht + , Serialize (TransactionID dht) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Ord (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip () insertNode info witnessed_ip0 = do f <- insertNode1 liftIO $ f info witnessed_ip0 -insertNode1 :: forall ip. Address ip => DHT ip (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) +insertNode1 :: forall raw dht u ip. + ( Address ip + , Default u + , Show u + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , DHT.Kademlia dht + , Ord (TransactionID dht) + , WireFormat raw dht + , Serialize (TransactionID dht) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Ord (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) insertNode1 = do bc <- optBucketCount <$> asks options nid <- asks tentativeNodeId @@ -335,15 +534,17 @@ insertNode1 = do dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) let probe n = probe0 n >>= runDHT dht_node_state . restoreM + {- changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive ip <- fromSockAddr ip0 :: Maybe ip listToMaybe $ rank id (nodeId $ foreignNode arrival) $ bep42s ip (DHT.fallbackID params) -- warning: recursive + -} params = DHT.TableParameters { maxBuckets = bc :: Int - , fallbackID = nid :: NodeId KMessageOf - , adjustID = changeip :: SockAddr -> Event KMessageOf ip () -> NodeId KMessageOf + , fallbackID = nid :: NodeId dht + , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht , logMessage = logm :: Char -> String -> IO () , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP) } @@ -356,25 +557,75 @@ insertNode1 = do return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 -- | Throws exception if node is not responding. -queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) - => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b) +queryNode :: forall raw dht u a b ip. + ( Address ip + , KRPC (Query dht a) (Response dht b) + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Default u + , Show u + , DHT.Kademlia dht + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , WireFormat raw dht + , SerializableTo raw (Response dht b) + , SerializableTo raw (Query dht a) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b) queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q -queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) - => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b, Maybe ReflectedIP) +queryNode' :: forall raw dht u a b ip. + ( Address ip + , Default u + , Show u + , DHT.Kademlia dht + , KRPC (Query dht a) (Response dht b) + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , WireFormat raw dht + , SerializableTo raw (Response dht b) + , SerializableTo raw (Query dht a) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) queryNode' addr q = do nid <- myNodeIdAccordingTo addr let read_only = False -- TODO: check for NAT issues. (BEP 43) - let KRPC.Method name = KRPC.method :: KRPC.Method (Query a) (Response b) + let KRPC.Method name = KRPC.method :: KRPC.Method (Query dht a) (Response dht b) mgr <- asks manager - (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr name (toSockAddr addr) (Query nid read_only q) + (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr (error "TODO: name") (toSockAddr addr) (Query nid read_only q) -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) -- <> " by " <> T.pack (show (toSockAddr addr)) - _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip + _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip return (remoteId, r, witnessed_ip) -- | Infix version of 'queryNode' function. -(<@>) :: Address ip => KRPC (Query a) (Response b) - => a -> NodeAddr ip -> DHT ip b +(<@>) :: ( Address ip + , KRPC (Query dht a) (Response dht b) + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Default u + , Show u + , Show (QueryMethod dht) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , SerializableTo raw (Response dht b) + , SerializableTo raw (Query dht a) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , WireFormat raw dht + , Kademlia dht + ) => a -> NodeAddr ip -> DHT raw dht u ip b q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-} diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index f5cd7834..356f6fd9 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs @@ -2,6 +2,7 @@ {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE FlexibleContexts #-} module Network.BitTorrent.DHT.Search where import Control.Concurrent @@ -25,27 +26,23 @@ import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) import Network.Address hiding (NodeId) import Network.DatagramServer.Types -#ifdef VERSION_bencoding -import Network.DatagramServer.Mainline (KMessageOf) -type Ann = () -#else -import Network.DatagramServer.Tox as Tox -type KMessageOf = Tox.Message -type Ann = Bool -#endif +import Data.Bits -data IterativeSearch ip r = IterativeSearch - { searchTarget :: NodeId KMessageOf - , searchQuery :: NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r]) +data IterativeSearch dht u ip r = IterativeSearch + { searchTarget :: NodeId dht + , searchQuery :: NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]) , searchPendingCount :: TVar Int - , searchQueued :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) - , searchInformant :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) + , searchQueued :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))) + , searchInformant :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))) , searchVisited :: TVar (Set (NodeAddr ip)) , searchResults :: TVar (Set r) } -newSearch :: Eq ip => (NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r])) - -> NodeId KMessageOf -> [NodeInfo KMessageOf ip Ann] -> IO (IterativeSearch ip r) +newSearch :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])) + -> NodeId dht -> [NodeInfo dht ip u] -> IO (IterativeSearch dht u ip r) newSearch qry target ns = atomically $ do c <- newTVar 0 q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns @@ -60,9 +57,14 @@ searchAlpha = 3 searchK :: Int searchK = 8 -sendQuery :: forall a ip. (Ord a, Ord ip) => - IterativeSearch ip a - -> Binding (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf)) +sendQuery :: forall a ip dht u. + ( Ord a + , Ord ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => + IterativeSearch dht u ip a + -> Binding (NodeInfo dht ip u) (NodeDistance (NodeId dht)) -> IO () sendQuery IterativeSearch{..} (ni :-> d) = do (ns,rs) <- handle (\(SomeException e) -> return ([],[])) @@ -71,9 +73,9 @@ sendQuery IterativeSearch{..} (ni :-> d) = do modifyTVar searchPendingCount pred vs <- readTVar searchVisited -- We only queue a node if it is not yet visited - let insertFoundNode :: NodeInfo KMessageOf ip u - -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) - -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) + let insertFoundNode :: NodeInfo dht ip u + -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)) + -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)) insertFoundNode n q | nodeAddr n `Set.member` vs = q | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q @@ -82,7 +84,9 @@ sendQuery IterativeSearch{..} (ni :-> d) = do modifyTVar searchResults $ \s -> foldr Set.insert s rs -searchIsFinished :: Ord ip => IterativeSearch ip r -> STM Bool +searchIsFinished :: ( Ord ip + , Ord (NodeId dht) + ) => IterativeSearch dht u ip r -> STM Bool searchIsFinished IterativeSearch{..} = do q <- readTVar searchQueued cnt <- readTVar searchPendingCount @@ -94,8 +98,8 @@ searchIsFinished IterativeSearch{..} = do <= PSQ.prio (fromJust $ MM.findMin q)))) search :: - (Ord r, Ord ip) => - IterativeSearch ip r -> IO () + (Ord r, Ord ip, Ord (NodeId dht), FiniteBits (NodeId dht)) => + IterativeSearch dht u ip r -> IO () search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do fix $ \again -> do join $ atomically $ do diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index d4794038..f96ba707 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -96,6 +96,7 @@ import Control.Monad.Trans.Control import Control.Monad.Trans.Resource import Data.Typeable import Data.String +import Data.Bits import Data.ByteString import Data.Conduit.Lazy import Data.Default @@ -265,26 +266,17 @@ type AnnounceSet = Set (InfoHash, PortNumber) type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () -- | DHT session keep track state of /this/ node. -data Node ip = Node +data Node raw dht u ip = Node { -- | Session configuration; options :: !Options -- | Pseudo-unique self-assigned session identifier. This value is -- constant during DHT session and (optionally) between sessions. -#ifdef VERSION_bencoding - , tentativeNodeId :: !(NodeId KMessageOf) -#else - , tentativeNodeId :: !(NodeId Tox.Message) -#endif + , tentativeNodeId :: !(NodeId dht) , resources :: !InternalState -#ifdef VERSION_bencoding - , manager :: !(Manager IO BValue KMessageOf) -- ^ RPC manager; - , routingInfo :: !(TVar (Maybe (R.Info KMessageOf ip ()))) -- ^ search table; -#else - , manager :: !(Manager (DHT ip) ByteString Tox.Message) -- ^ RPC manager; - , routingInfo :: !(TVar (Maybe (R.Info Tox.Message ip Bool))) -- ^ search table; -#endif + , manager :: !(Manager raw dht) -- ^ RPC manager; + , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. @@ -293,23 +285,23 @@ data Node ip = Node -- | DHT keep track current session and proper resource allocation for -- safe multithreading. -newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } +newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a } deriving ( Functor, Applicative, Monad, MonadIO - , MonadBase IO, MonadReader (Node ip), MonadThrow + , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow ) #if MIN_VERSION_monad_control(1,0,0) -newtype DHTStM ip a = StM { - unSt :: StM (ReaderT (Node ip) IO) a +newtype DHTStM raw dht u ip a = StM { + unSt :: StM (ReaderT (Node raw dht u ip) IO) a } #endif -instance MonadBaseControl IO (DHT ip) where +instance MonadBaseControl IO (DHT raw dht u ip) where #if MIN_VERSION_monad_control(1,0,0) - type StM (DHT ip) a = DHTStM ip a + type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a #else - newtype StM (DHT ip) a = StM { - unSt :: StM (ReaderT (Node ip) IO) a + newtype StM (DHT raw dht u ip) a = StM { + unSt :: StM (ReaderT (Node raw dht u ip) IO) a } #endif liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> @@ -321,22 +313,22 @@ instance MonadBaseControl IO (DHT ip) where -- | Check is it is possible to run 'queryNode' or handle pending -- query from remote node. -instance MonadActive (DHT ip) where +instance MonadActive (DHT raw dht u ip) where monadActive = getManager >>= liftIO . isActive {-# INLINE monadActive #-} -- | All allocated resources will be closed at 'closeNode'. -instance MonadResource (DHT ip) where +instance MonadResource (DHT raw dht u ip) where liftResourceT m = do s <- asks resources liftIO $ runInternalState m s --- instance MonadKRPC (DHT ip) (DHT ip) BValue KMessageOf where +-- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where -getManager :: DHT ip (Manager IO BValue KMessageOf) +getManager :: DHT raw dht u ip (Manager raw dht) getManager = asks manager -instance MonadLogger (DHT ip) where +instance MonadLogger (DHT raw dht u ip) where monadLoggerLog loc src lvl msg = do logger <- asks loggerFun liftIO $ logger loc src lvl (toLogStr msg) @@ -344,7 +336,7 @@ instance MonadLogger (DHT ip) where #ifdef VERSION_bencoding type NodeHandler = Handler IO KMessageOf BValue #else -type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString +type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString #endif logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () @@ -376,17 +368,16 @@ locFromCS cs = case getCallStack cs of -- | Run DHT session. You /must/ properly close session using -- 'closeNode' function, otherwise socket or other scarce resources may -- leak. -newNode :: Address ip +newNode :: ( Address ip + , FiniteBits (NodeId dht) + , Serialize (NodeId dht) + ) => -- [NodeHandler] -- ^ handlers to run on accepted queries; Options -- ^ various dht options; -> NodeAddr ip -- ^ node address to bind; -> LogFun -- ^ invoked on log messages; -#ifdef VERSION_bencoding - -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. -#else - -> Maybe (NodeId Tox.Message) -- ^ use this NodeId, if not given a new one is generated. -#endif - -> IO (Node ip) -- ^ a new DHT node running at given address. + -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated. + -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address. newNode opts naddr logger mbid = do s <- createInternalState runInternalState initNode s @@ -409,11 +400,11 @@ newNode opts naddr logger mbid = do -- | Some resources like listener thread may live for -- some short period of time right after this DHT session closed. -closeNode :: Node ip -> IO () +closeNode :: Node raw dht u ip -> IO () closeNode Node {..} = closeInternalState resources -- | Run DHT operation on the given session. -runDHT :: Node ip -> DHT ip a -> IO a +runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a runDHT node action = runReaderT (unDHT action) node {-# INLINE runDHT #-} @@ -453,24 +444,20 @@ checkToken sessionTokens addr questionableToken = do -----------------------------------------------------------------------} -- | This nodes externally routable address reported by remote peers. -routableAddress :: DHT ip (Maybe SockAddr) +routableAddress :: DHT raw dht u ip (Maybe SockAddr) routableAddress = do info <- asks routingInfo >>= liftIO . atomically . readTVar return $ myAddress <$> info -- | The current NodeId that the given remote node should know us by. -#ifdef VERSION_bencoding -myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId KMessageOf) -#else -myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId Tox.Message) -#endif +myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht) myNodeIdAccordingTo _ = do info <- asks routingInfo >>= liftIO . atomically . readTVar maybe (asks tentativeNodeId) (return . myNodeId) info -myNodeIdAccordingTo1 :: DHT ip ( NodeAddr ip -> IO (NodeId KMessageOf) ) +myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) ) myNodeIdAccordingTo1 = do var <- asks routingInfo tid <- asks tentativeNodeId @@ -480,11 +467,7 @@ myNodeIdAccordingTo1 = do -- | Get current routing table. Normally you don't need to use this -- function, but it can be usefull for debugging and profiling purposes. -#ifdef VERSION_bencoding -getTable :: Eq ip => DHT ip (Table KMessageOf ip ()) -#else -getTable :: Eq ip => DHT ip (Table Tox.Message ip Bool) -#endif +getTable :: Eq ip => DHT raw dht u ip (Table dht ip u) getTable = do Node { tentativeNodeId = myId , routingInfo = var @@ -492,18 +475,18 @@ getTable = do let nil = nullTable myId (optBucketCount opts) liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) -getSwarms :: Ord ip => DHT ip [ (InfoHash, Int, Maybe ByteString) ] +getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ] getSwarms = do store <- asks contactInfo >>= liftIO . atomically . readTVar return $ P.knownSwarms store -savePeerStore :: (Ord ip, Address ip) => DHT ip ByteString +savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString savePeerStore = do var <- asks contactInfo peers <- liftIO $ atomically $ readTVar var return $ S.encode peers -mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT ip () +mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip () mergeSavedPeers bs = do var <- asks contactInfo case S.decode bs of @@ -511,7 +494,7 @@ mergeSavedPeers bs = do Left _ -> return () -allPeers :: Ord ip => InfoHash -> DHT ip [ PeerAddr ip ] +allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ] allPeers ih = do store <- asks contactInfo >>= liftIO . atomically . readTVar return $ P.lookup ih store @@ -521,18 +504,20 @@ allPeers ih = do -- -- This operation used for 'find_nodes' query. -- -#ifdef VERSION_bencoding -getClosest :: Eq ip => TableKey KMessageOf k => k -> DHT ip [NodeInfo KMessageOf ip ()] -#else -getClosest :: Eq ip => TableKey Tox.Message k => k -> DHT ip [NodeInfo Tox.Message ip Bool] -#endif +getClosest :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , TableKey dht k ) => + k -> DHT raw dht u ip [NodeInfo dht ip u] getClosest node = do k <- asks (optK . options) kclosest k node <$> getTable getClosest1 :: ( Eq ip - , TableKey KMessageOf k - ) => DHT ip (k -> IO [NodeInfo KMessageOf ip ()]) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , TableKey dht k + ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u]) getClosest1 = do k <- asks (optK . options) nobkts <- asks (optBucketCount . options) @@ -574,13 +559,12 @@ getTimestamp = do -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) return $ utcTimeToPOSIXSeconds utcTime - #ifdef VERSION_bencoding -- | Prepare result for 'get_peers' query. -- -- This operation use 'getClosest' as failback so it may block. -- -getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip) +getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) getPeerList ih = do var <- asks contactInfo ps <- liftIO $ lookupPeers var ih @@ -588,7 +572,7 @@ getPeerList ih = do then Left <$> getClosest ih else return (Right ps) -getPeerList1 :: Ord ip => DHT ip (InfoHash -> IO (PeerList ip)) +getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) getPeerList1 = do var <- asks contactInfo getclosest <- getClosest1 @@ -599,12 +583,12 @@ getPeerList1 = do else return (Right ps) -insertTopic :: InfoHash -> PortNumber -> DHT ip () +insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () insertTopic ih p = do var <- asks announceInfo liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) -deleteTopic :: InfoHash -> PortNumber -> DHT ip () +deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () deleteTopic ih p = do var <- asks announceInfo liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) @@ -616,7 +600,7 @@ deleteTopic ih p = do -----------------------------------------------------------------------} -- | Failed queries are ignored. -queryParallel :: [DHT ip a] -> DHT ip [a] +queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a] queryParallel queries = do -- TODO: use alpha -- alpha <- asks (optAlpha . options) -- cgit v1.2.3