-- | -- Copyright : (c) Sam Truzjan 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- This module provides functions to interact with other nodes. -- Normally, you don't need to import this module, use -- "Network.BitTorrent.DHT" instead. -- {-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE GADTs #-} module Network.BitTorrent.DHT.Query ( -- * Handler -- | To bind specific set of handlers you need to pass -- handler list to the 'startNode' function. pingH , findNodeH , getPeersH , announceH , defaultHandlers -- * Query -- ** Basic -- | A basic query perform a single request expecting a -- single response. , Iteration , pingQ , findNodeQ , getPeersQ , announceQ -- ** Iterative -- | An iterative query perform multiple basic queries, -- concatenate its responses, optionally yielding result and -- continue to the next iteration. , Search -- , search , publish , ioFindNode , ioFindNodes , ioGetPeers , isearch , bgsearch -- ** Routing table , insertNode , refreshNodes -- ** Messaging , queryNode , queryNode' , (<@>) ) where import Data.Bits import Data.Default #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument hiding (yield) #else import GHC.Conc (labelThread) import Control.Concurrent.Lifted hiding (yield) #endif import Control.Exception.Lifted hiding (Handler) import Control.Monad.Reader import Control.Monad.Logger import Data.Maybe import Data.Conduit import Data.Conduit.List as C hiding (mapMaybe, mapM_) import Data.Either import Data.List as L import Data.Monoid import Data.Text as T import qualified Data.Set as Set ;import Data.Set (Set) import Network import Text.PrettyPrint as PP hiding ((<>), ($$)) import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Data.Time import Data.Time.Clock.POSIX import Data.Hashable (Hashable) import Network.DatagramServer as KRPC hiding (Options, def) import Network.KRPC.Method as KRPC import Network.DatagramServer.Mainline (ReflectedIP(..)) import Network.DatagramServer (QueryFailure(..)) import Data.Torrent import qualified Network.DHT as DHT import Network.DHT.Mainline import Network.DHT.Routing as R import Network.BitTorrent.DHT.Session import Control.Concurrent.STM import qualified Network.BitTorrent.DHT.Search as Search #ifdef VERSION_bencoding import Data.BEncode (BValue) import Network.DatagramServer.Mainline (KMessageOf) #else import Data.ByteString (ByteString) import Network.DatagramServer.Tox #endif import Network.Address hiding (NodeId) import Network.DatagramServer.Types as RPC hiding (Query,Response) import Network.DHT.Types import Control.Monad.Trans.Control import Data.Typeable import Data.Serialize import System.IO.Unsafe (unsafeInterleaveIO) import Data.String {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} {- nodeHandler :: ( Address ip , KRPC (Query KMessageOf a) (Response KMessageOf b) ) => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler -} nodeHandler :: (Address addr, WireFormat raw msg, Pretty (NodeInfo dht addr u), Default u, IsString t, Functor msg, SerializableTo raw (Response dht r), SerializableTo raw (Query dht q)) => (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ()) -> (NodeAddr addr -> IO (NodeId dht)) -> (Char -> t -> Text -> IO ()) -> QueryMethod msg -> (NodeAddr addr -> q -> IO r) -> Handler IO msg raw nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do let remoteId = queringNodeId qry read_only = queryIsReadOnly qry q = queryParams qry case fromSockAddr sockAddr of Nothing -> throwIO BadAddress Just naddr -> do let ni = NodeInfo remoteId naddr def -- Do not route read-only nodes. (bep 43) if read_only then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) else insertNode ni Nothing >> return () -- TODO need to block. why? Response <$> myNodeIdAccordingTo naddr <*> action naddr q -- | Default 'Ping' handler. pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht) pingH dht _ _ = return (DHT.pongMessage dht) -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } -- | Default 'FindNode' handler. findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip) findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg) -- | Default 'GetPeers' handler. getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) getPeersH getPeerList toks naddr (GetPeers ih) = do ps <- getPeerList ih tok <- grantToken toks naddr return $ GotPeers ps tok -- | Default 'Announce' handler. announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do valid <- checkToken toks naddr sessionToken unless valid $ do throwIO $ InvalidParameter "token" let annPort = if impliedPort then nodePort else port peerAddr = PeerAddr Nothing nodeHost annPort insertPeer peers topic announcedName peerAddr return Announced -- | Includes all Kademlia-related handlers. kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip , Ord (TransactionID dht) , Ord (NodeId dht) , Show u , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Show (QueryMethod dht) , Show (NodeId dht) , FiniteBits (NodeId dht) , Default u , Serialize (TransactionID dht) , WireFormat raw dht , Kademlia dht , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , Functor dht , Pretty (NodeInfo dht ip u) , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (FindNode dht ip)) ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] -- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] kademliaHandlers logger = do groknode <- insertNode1 mynid <- myNodeIdAccordingTo1 let handler :: ( KRPC (Query dht a) (Response dht b) , SerializableTo raw (Response dht b) , SerializableTo raw (Query dht a) ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw handler = nodeHandler groknode mynid (logt logger) dht = Proxy :: Proxy dht getclosest <- getClosest1 return [ handler (namePing dht) $ pingH dht , handler (nameFindNodes dht) $ findNodeH getclosest ] -- | Includes all default query handlers. defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] defaultHandlers logger = do groknode <- insertNode1 mynid <- myNodeIdAccordingTo1 let handler :: KRPC (Query KMessageOf a) (Response KMessageOf b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler handler = nodeHandler groknode mynid (logt logger) toks <- asks sessionTokens peers <- asks contactInfo getpeers <- getPeerList1 hs <- kademliaHandlers logger return $ hs ++ [ handler "get_peers" $ getPeersH getpeers toks , handler "announce_peer" $ announceH peers toks ] {----------------------------------------------------------------------- -- Basic queries -----------------------------------------------------------------------} type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip]) -- | The most basic query. May be used to check if the given node is -- alive or get its 'NodeId'. pingQ :: forall raw dht u ip. ( DHT.Kademlia dht , Address ip , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , Default u , Show u , Ord (TransactionID dht) , Serialize (TransactionID dht) , WireFormat raw dht , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) ) => NodeAddr ip -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) pingQ addr = do let ping = DHT.pingMessage (Proxy :: Proxy dht) (nid, pong, mip) <- queryNode' addr ping let _ = pong `asTypeOf` ping -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} return (NodeInfo nid addr def, mip) -- TODO [robustness] match range of returned node ids with the -- expected range and either filter bad nodes or discard response at -- all throwing an exception -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo findNodeQ proxy key NodeInfo {..} = do closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> nodeAddr $(logInfoS) "findNodeQ" $ "NodeFound\n" <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) return $ Right closest #ifdef VERSION_bencoding getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr getPeersQ topic NodeInfo {..} = do GotPeers {..} <- GetPeers topic <@> nodeAddr let dist = distance (toNodeId topic) nodeId $(logInfoS) "getPeersQ" $ T.pack $ "distance: " <> render (pPrint dist) <> " , result: " <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } return peers announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr announceQ ih p NodeInfo {..} = do GotPeers {..} <- GetPeers ih <@> nodeAddr case peers of Left ns | False -> undefined -- TODO check if we can announce | otherwise -> return (Left ns) Right _ -> do -- TODO *probably* add to peer cache Announced <- Announce False ih Nothing p grantedToken <@> nodeAddr return (Right [nodeAddr]) #endif {----------------------------------------------------------------------- -- Iterative queries -----------------------------------------------------------------------} ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) ioGetPeers ih = do session <- ask return $ \ni -> runDHT session $ do r <- try $ getPeersQ ih ni case r of Right e -> return $ either (,[]) ([],) e Left e -> let _ = e :: QueryFailure in return ([],[]) ioFindNode :: ( DHT.Kademlia dht , WireFormat raw dht , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , Address ip , Default u , Show u , Show (QueryMethod dht) , TableKey dht infohash , Eq (NodeId dht) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , Ord (TransactionID dht) , Serialize (TransactionID dht) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (FindNode dht ip)) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) ioFindNode ih = do session <- ask return $ \ni -> runDHT session $ do ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni let ns' = L.map (fmap (const def)) ns return $ L.partition (\n -> nodeId n /= toNodeId ih) ns' -- | Like ioFindNode, but considers all found nodes to be 'Right' results. ioFindNodes :: ( DHT.Kademlia dht , WireFormat raw dht , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , Address ip , Default u , Show u , Show (QueryMethod dht) , TableKey dht infohash , Eq (NodeId dht) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , Ord (TransactionID dht) , Serialize (TransactionID dht) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (FindNode dht ip)) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) ioFindNodes ih = do session <- ask return $ \ni -> runDHT session $ do ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni let ns' = L.map (fmap (const def)) ns return ([], ns') isearch :: ( Ord r , Ord ip , Ord (NodeId dht) , FiniteBits (NodeId dht) , TableKey dht ih , Show ih) => (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]))) -> ih -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r) isearch f ih = do qry <- f ih ns <- kclosest 8 ih <$> getTable liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns a <- fork $ do tid <- myThreadId labelThread tid ("search."++show ih) Search.search s -- atomically \$ readTVar (Search.searchResults s) return (a, s) -- | Background search: fill a lazy list using a background thread. bgsearch f ih = do (tid, s) <- isearch f ih let again shown = do (chk,fin) <- atomically $ do r <- (Set.\\ shown) <$> readTVar (Search.searchResults s) if not $ Set.null r then (,) r <$> Search.searchIsFinished s else Search.searchIsFinished s >>= check >> return (Set.empty,True) let ps = Set.toList chk if fin then return ps else do xs <- unsafeInterleaveIO $ again (shown `Set.union` chk) return $ ps ++ xs liftIO $ again Set.empty type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u] #if 0 -- TODO: use reorder and filter (Traversal option) leftovers -- search :: k -> IterationI ip o -> Search ip o search _ action = do awaitForever $ \ batch -> unless (L.null batch) $ do $(logWarnS) "search" "start query" responses <- lift $ queryParallel (action <$> batch) let (nodes, results) = partitionEithers responses $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) leftover $ L.concat nodes let r = mapM_ yield results _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ()) r #endif publish = error "todo" -- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip () -- publish ih p = do -- nodes <- getClosest ih -- r <- asks (optReplication . options) -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r -- return () probeNode :: ( Default u , Show u , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , DHT.Kademlia dht , Address ip , Ord (TransactionID dht) , Serialize (TransactionID dht) , WireFormat raw dht , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) ) => NodeAddr ip -> DHT raw dht u ip (Bool , Maybe ReflectedIP) probeNode addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) result <- try $ pingQ addr let _ = fmap (const ()) result :: Either QueryFailure () return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result refreshNodes :: forall raw dht u ip. ( Address ip , Ord (NodeId dht) , Default u , FiniteBits (NodeId dht) , Pretty (NodeId dht) , DHT.Kademlia dht ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()] refreshNodes _ = return () -- TODO #if 0 -- FIXME do not use getClosest sinse we should /refresh/ them refreshNodes nid = do $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) nodes <- getClosest nid do -- forM (L.take 1 nodes) \$ \ addr -> do -- NodeFound ns <- FindNode nid <@> addr -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume nss <- sourceList [nodes] $= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) pingQ (nodeAddr n) -- pingQ takes care of inserting the node. return () return () -- \$ L.concat nss #endif logc :: Char -> String -> DHT raw dht u ip () logc 'D' = $(logDebugS) "insertNode" . T.pack logc 'W' = $(logWarnS) "insertNode" . T.pack logc 'I' = $(logInfoS) "insertNode" . T.pack logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) -- | This operation do not block but acquire exclusive access to -- routing table. insertNode :: forall raw dht u ip. ( Address ip , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Default u , Show u , DHT.Kademlia dht , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , Ord (TransactionID dht) , WireFormat raw dht , Serialize (TransactionID dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Ord (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip () insertNode info witnessed_ip0 = do f <- insertNode1 liftIO $ f info witnessed_ip0 insertNode1 :: forall raw dht u ip. ( Address ip , Default u , Show u , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , DHT.Kademlia dht , Ord (TransactionID dht) , WireFormat raw dht , Serialize (TransactionID dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Ord (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) insertNode1 = do bc <- optBucketCount <$> asks options nid <- asks tentativeNodeId logm0 <- embed_ (uncurry logc) let logm c = logm0 . (c,) dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) let probe n = probe0 n >>= runDHT dht_node_state . restoreM {- changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive ip <- fromSockAddr ip0 :: Maybe ip listToMaybe $ rank id (nodeId $ foreignNode arrival) $ bep42s ip (DHT.fallbackID params) -- warning: recursive -} params = DHT.TableParameters { maxBuckets = bc :: Int , fallbackID = nid :: NodeId dht , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht , logMessage = logm :: Char -> String -> IO () , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP) } tbl <- asks routingInfo let state = DHT.TableKeeper { routingInfo = tbl , grokNode = DHT.insertNode params state , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO () } return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 -- | Throws exception if node is not responding. queryNode :: forall raw dht u a b ip. ( Address ip , KRPC (Query dht a) (Response dht b) , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , Default u , Show u , DHT.Kademlia dht , Ord (TransactionID dht) , Serialize (TransactionID dht) , WireFormat raw dht , SerializableTo raw (Response dht b) , SerializableTo raw (Query dht a) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b) queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q queryNode' :: forall raw dht u a b ip. ( Address ip , Default u , Show u , DHT.Kademlia dht , KRPC (Query dht a) (Response dht b) , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , Ord (TransactionID dht) , Serialize (TransactionID dht) , WireFormat raw dht , SerializableTo raw (Response dht b) , SerializableTo raw (Query dht a) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) queryNode' addr q = do nid <- myNodeIdAccordingTo addr let read_only = False -- TODO: check for NAT issues. (BEP 43) let KRPC.Method name = KRPC.method :: KRPC.Method (Query dht a) (Response dht b) mgr <- asks manager (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr (error "TODO: name") (toSockAddr addr) (Query nid read_only q) -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) -- <> " by " <> T.pack (show (toSockAddr addr)) _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip return (remoteId, r, witnessed_ip) -- | Infix version of 'queryNode' function. (<@>) :: ( Address ip , KRPC (Query dht a) (Response dht b) , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , Default u , Show u , Show (QueryMethod dht) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Ord (TransactionID dht) , Serialize (TransactionID dht) , SerializableTo raw (Response dht b) , SerializableTo raw (Query dht a) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , WireFormat raw dht , Kademlia dht ) => a -> NodeAddr ip -> DHT raw dht u ip b q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-}