-- | -- Copyright : (c) Sam Truzjan 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- This module provides functions to interact with other nodes. -- Normally, you don't need to import this module, use -- "Network.BitTorrent.DHT" instead. -- {-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE MultiParamTypeClasses #-} module Network.BitTorrent.DHT.Query ( -- * Handler -- | To bind specific set of handlers you need to pass -- handler list to the 'startNode' function. pingH , findNodeH , getPeersH , announceH , defaultHandlers -- * Query -- ** Basic -- | A basic query perform a single request expecting a -- single response. , Iteration , pingQ , coldPingQ , findNodeQ , getPeersQ , announceQ -- ** Iterative -- | An iterative query perform multiple basic queries, -- concatenate its responses, optionally yielding result and -- continue to the next iteration. , Search -- , search , publish , ioFindNode , ioFindNodes , ioGetPeers , isearch , bgsearch -- ** Routing table , insertNode , refreshNodes -- ** Messaging , queryNode , queryNode' , (<@>) ) where import Data.Bits import Data.Default #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument hiding (yield) #else import GHC.Conc (labelThread) import Control.Concurrent.Lifted hiding (yield) #endif import Control.Exception.Lifted hiding (Handler) import Control.Monad.Reader import Control.Monad.Logger import Data.Maybe import Data.Conduit import Data.Conduit.List as C hiding (mapMaybe, mapM_) import Data.Either import Data.List as L import Data.Monoid import Data.Text as T import qualified Data.Set as Set ;import Data.Set (Set) import Network import Text.PrettyPrint as PP hiding ((<>), ($$)) import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Data.Time import Data.Time.Clock.POSIX import Data.Hashable (Hashable) import Data.Serialize import Data.Hashable import Network.DatagramServer as KRPC hiding (Options, def) import Network.KRPC.Method as KRPC import Network.DatagramServer.Mainline (ReflectedIP(..), QueryExtra(..), ResponseExtra(..)) import Network.DatagramServer (QueryFailure(..)) import Data.Torrent import qualified Network.DHT as DHT import Network.DHT.Mainline import Network.DHT.Routing as R import Network.BitTorrent.DHT.Session import Control.Concurrent.STM import qualified Network.BitTorrent.DHT.Search as Search #ifdef VERSION_bencoding import Data.BEncode (BValue) import Network.DatagramServer.Mainline (KMessageOf) #else import Data.ByteString (ByteString) import Network.DatagramServer.Tox #endif import Network.Address hiding (NodeId) import Network.DatagramServer.Types as RPC hiding (Query,Response) import Network.DHT.Types import Control.Monad.Trans.Control import Data.Typeable import Data.Serialize import System.IO.Unsafe (unsafeInterleaveIO) import Data.String {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} {- nodeHandler :: ( Address ip , KRPC dht (Query KMessageOf a) (Response KMessageOf b) ) => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler -} nodeHandler :: forall raw dht addr u t q r. (Address addr, WireFormat raw dht, Pretty (NodeInfo dht addr u), Default u, IsString t, Functor dht, KRPC dht (Query dht q) (Response dht r), SerializableTo raw (Response dht r), SerializableTo raw (Query dht q), Show (QueryMethod dht)) => (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ()) -> (NodeAddr addr -> IO (NodeId dht)) -> (Char -> t -> Text -> IO ()) -> DHTData dht addr -> QueryMethod dht -> (NodeAddr addr -> q -> IO r) -> Handler IO dht raw nodeHandler insertNode myNodeIdAccordingTo logm dta method action = handler (\sockaddr -> myNodeIdAccordingTo (error "todo")) method $ \ sockAddr msg -> do let remoteId = messageSender (msg :: dht (Query dht q)) resptype qextra = queryExtra qry resptype = Proxy :: Proxy (Response dht r) q = queryParams qry qry = envelopePayload msg :: Query dht q case fromSockAddr sockAddr of Nothing -> throwIO BadAddress Just naddr -> do logm 'D' "nodeHandler" $ "Received query: " <> T.pack (show $ method) me <- myNodeIdAccordingTo naddr rextra <- liftIO $ makeResponseExtra dta me qry resptype let ni = NodeInfo remoteId naddr def -- Do not route read-only nodes. (bep 43) if fromRoutableNode qextra then insertNode ni Nothing >> return () -- TODO need to block. why? else logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) Response <$> pure rextra <*> action naddr q -- | Default 'Ping' handler. pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht) pingH dht _ _ = return (DHT.pongMessage dht) -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } -- | Default 'FindNode' handler. findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip) findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg) -- | Default 'GetPeers' handler. getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) getPeersH getPeerList toks naddr (GetPeers ih) = do ps <- getPeerList ih tok <- grantToken toks naddr return $ GotPeers ps tok -- | Default 'Announce' handler. announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do valid <- checkToken toks naddr sessionToken unless valid $ do throwIO $ InvalidParameter "token" let annPort = if impliedPort then nodePort else port peerAddr = PeerAddr Nothing nodeHost annPort insertPeer peers topic announcedName peerAddr return Announced -- | Includes all Kademlia-related handlers. kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip , Ord (TransactionID dht) , Ord (NodeId dht) , Show u , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Show (QueryMethod dht) , Show (NodeId dht) , FiniteBits (NodeId dht) , Default u , Serialize (TransactionID dht) , WireFormat raw dht , Kademlia dht , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Functor dht , Pretty (NodeInfo dht ip u) , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (FindNode dht ip)) ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] -- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] kademliaHandlers logger = do groknode <- insertNode1 mynid <- myNodeIdAccordingTo1 dta <- asks dhtData let handler :: ( KRPC dht (Query dht a) (Response dht b) , SerializableTo raw (Response dht b) , SerializableTo raw (Query dht a) ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw handler = nodeHandler groknode mynid (logt logger) dta dht = Proxy :: Proxy dht getclosest <- getClosest1 return [ handler (namePing dht) $ pingH dht , handler (nameFindNodes dht) $ findNodeH getclosest ] instance DataHandlers BValue KMessageOf where dataHandlers = bthandlers bthandlers :: ( Ord ip , Hashable ip, Typeable ip, Serialize ip) => (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) -> DHTData KMessageOf ip -> [MethodHandler BValue KMessageOf ip] bthandlers getclosest dta = [ MethodHandler "get_peers" $ getPeersH (getpeers dta) (sessionTokens dta) , MethodHandler "announce_peer" $ announceH (contactInfo dta) (sessionTokens dta) ] where getpeers dta ih = do ps <- lookupPeers (contactInfo dta) ih if L.null ps then Left <$> getclosest (toNodeId ih) else return (Right ps) -- | Includes all default query handlers. defaultHandlers :: forall raw dht u ip. ( Ord (TransactionID dht) , Ord (NodeId dht) , Show u , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Show (QueryMethod dht) , Show (NodeId dht) , FiniteBits (NodeId dht) , Default u , Serialize (TransactionID dht) , WireFormat raw dht , Kademlia dht , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Functor dht , Pretty (NodeInfo dht ip u) , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (FindNode dht ip)) , Eq ip, Ord ip, Address ip, DataHandlers raw dht ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] defaultHandlers logger = do groknode <- insertNode1 mynid <- myNodeIdAccordingTo1 dta <- asks dhtData let handler :: MethodHandler raw dht ip -> Handler IO dht raw handler (MethodHandler name action) = nodeHandler groknode mynid (logt logger) dta name action getclosest <- getClosest1 hs <- kademliaHandlers logger return $ hs ++ L.map handler (dataHandlers (fmap (fmap (fmap (const ()))) . getclosest) dta) {----------------------------------------------------------------------- -- Basic queries -----------------------------------------------------------------------} type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip]) -- | The most basic query. May be used to check if the given node is -- alive or get its 'NodeId'. pingQ :: forall raw dht u ip. ( DHT.Kademlia dht , Address ip , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Default u , Show u , Ord (TransactionID dht) , Serialize (TransactionID dht) , WireFormat raw dht , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) ) => NodeInfo dht ip u -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) pingQ ni = do let ping = DHT.pingMessage (Proxy :: Proxy dht) (nid, pong, mip) <- queryNode' ni ping let _ = pong `asTypeOf` ping -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} return (NodeInfo nid (nodeAddr ni) def, mip) -- | The most basic query. May be used to check if the given node is -- alive or get its 'NodeId'. coldPingQ :: forall raw dht u ip. ( DHT.Kademlia dht , Address ip , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Default u , Show u , Ord (TransactionID dht) , Serialize (TransactionID dht) , WireFormat raw dht , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) ) => PacketDestination dht -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) coldPingQ dest = do let ping = DHT.pingMessage (Proxy :: Proxy dht) naddr <- maybe (throwIO $ QueryFailed ProtocolError "unable to construct NodeAddr from PacketDestination") return $ fromAddr dest (nid, pong, mip) <- coldQueryNode' naddr dest ping let _ = pong `asTypeOf` ping -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} return (NodeInfo nid naddr def, mip) -- TODO [robustness] match range of returned node ids with the -- expected range and either filter bad nodes or discard response at -- all throwing an exception -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo findNodeQ proxy key ni = do closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> ni $(logInfoS) "findNodeQ" $ "NodeFound\n" <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) return $ Right closest #ifdef VERSION_bencoding getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr getPeersQ topic ni = do GotPeers {..} <- GetPeers topic <@> ni let dist = distance (toNodeId topic) (nodeId ni) $(logInfoS) "getPeersQ" $ T.pack $ "distance: " <> render (pPrint dist) <> " , result: " <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } return peers announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr announceQ ih p ni = do GotPeers {..} <- GetPeers ih <@> ni case peers of Left ns | False -> undefined -- TODO check if we can announce | otherwise -> return (Left ns) Right _ -> do -- TODO *probably* add to peer cache Announced <- Announce False ih Nothing p grantedToken <@> ni return (Right [nodeAddr ni]) #endif {----------------------------------------------------------------------- -- Iterative queries -----------------------------------------------------------------------} ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) ioGetPeers ih = do session <- ask return $ \ni -> runDHT session $ do r <- try $ getPeersQ ih ni case r of Right e -> return $ either (,[]) ([],) e Left e -> let _ = e :: QueryFailure in return ([],[]) ioFindNode :: ( DHT.Kademlia dht , WireFormat raw dht , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Address ip , Default u , Show u , Show (QueryMethod dht) , TableKey dht infohash , Eq (NodeId dht) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , Ord (TransactionID dht) , Serialize (TransactionID dht) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (FindNode dht ip)) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) ioFindNode ih = do session <- ask return $ \ni -> runDHT session $ do ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni let ns' = L.map (fmap (const def)) ns return $ L.partition (\n -> nodeId n /= toNodeId ih) ns' -- | Like ioFindNode, but considers all found nodes to be 'Right' results. ioFindNodes :: ( DHT.Kademlia dht , WireFormat raw dht , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Address ip , Default u , Show u , Show (QueryMethod dht) , TableKey dht infohash , Eq (NodeId dht) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , Ord (TransactionID dht) , Serialize (TransactionID dht) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (FindNode dht ip)) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) ioFindNodes ih = do session <- ask return $ \ni -> runDHT session $ do ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni let ns' = L.map (fmap (const def)) ns return ([], ns') isearch :: ( Ord r , Ord ip , Ord (NodeId dht) , FiniteBits (NodeId dht) , TableKey dht ih , Show ih) => (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]))) -> ih -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r) isearch f ih = do qry <- f ih ns <- kclosest 8 ih <$> getTable liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns a <- fork $ do tid <- myThreadId labelThread tid ("search."++show ih) Search.search s -- atomically \$ readTVar (Search.searchResults s) return (a, s) -- | Background search: fill a lazy list using a background thread. bgsearch f ih = do (tid, s) <- isearch f ih let again shown = do (chk,fin) <- atomically $ do r <- (Set.\\ shown) <$> readTVar (Search.searchResults s) if not $ Set.null r then (,) r <$> Search.searchIsFinished s else Search.searchIsFinished s >>= check >> return (Set.empty,True) let ps = Set.toList chk if fin then return ps else do xs <- unsafeInterleaveIO $ again (shown `Set.union` chk) return $ ps ++ xs liftIO $ again Set.empty type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u] #if 0 -- TODO: use reorder and filter (Traversal option) leftovers -- search :: k -> IterationI ip o -> Search ip o search _ action = do awaitForever $ \ batch -> unless (L.null batch) $ do $(logWarnS) "search" "start query" responses <- lift $ queryParallel (action <$> batch) let (nodes, results) = partitionEithers responses $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) leftover $ L.concat nodes let r = mapM_ yield results _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ()) r #endif publish = error "todo" -- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip () -- publish ih p = do -- nodes <- getClosest ih -- r <- asks (optReplication . options) -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r -- return () probeNode :: ( Default u , Show u , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , DHT.Kademlia dht , Address ip , Ord (TransactionID dht) , Serialize (TransactionID dht) , WireFormat raw dht , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) ) => NodeInfo dht ip u -> DHT raw dht u ip (Bool , Maybe ReflectedIP) probeNode addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint $ nodeAddr addr))) result <- try $ pingQ addr let _ = fmap (const ()) result :: Either QueryFailure () return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result refreshNodes :: forall raw dht u ip. ( Address ip , Ord (NodeId dht) , Default u , FiniteBits (NodeId dht) , Pretty (NodeId dht) , DHT.Kademlia dht , Ord ip , Ord (TransactionID dht) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (FindNode dht ip)) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Pretty (NodeInfo dht ip u) , Show (NodeId dht) , Show u , Show (QueryMethod dht) , Serialize (TransactionID dht) , WireFormat raw dht , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()] -- FIXME do not use getClosest sinse we should /refresh/ them refreshNodes nid = do $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) nodes <- getClosest nid do -- forM (L.take 1 nodes) \$ \ addr -> do -- NodeFound ns <- FindNode nid <@> addr -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume -- nss <- sourceList [nodes] \$= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume ns <- bgsearch ioFindNodes nid $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length ns)) <> " nodes." _ <- queryParallel $ flip L.map ns $ \n -> do $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) pingQ n -- pingQ takes care of inserting the node. return () return () -- \$ L.concat nss logc :: Char -> String -> DHT raw dht u ip () logc 'D' = $(logDebugS) "insertNode" . T.pack logc 'W' = $(logWarnS) "insertNode" . T.pack logc 'I' = $(logInfoS) "insertNode" . T.pack logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) -- | This operation do not block but acquire exclusive access to -- routing table. insertNode :: forall raw dht u ip. ( Address ip , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Default u , Show u , DHT.Kademlia dht , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Ord (TransactionID dht) , WireFormat raw dht , Serialize (TransactionID dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Ord (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip () insertNode info witnessed_ip0 = do f <- insertNode1 liftIO $ f info witnessed_ip0 insertNode1 :: forall raw dht u ip. ( Address ip , Default u , Show u , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , DHT.Kademlia dht , Ord (TransactionID dht) , WireFormat raw dht , Serialize (TransactionID dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Ord (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) insertNode1 = do bc <- optBucketCount <$> asks options nid <- asks tentativeNodeId logm0 <- embed_ (uncurry logc) let logm c = logm0 . (c,) dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) let probe n = probe0 n >>= runDHT dht_node_state . restoreM {- changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive ip <- fromSockAddr ip0 :: Maybe ip listToMaybe $ rank id (nodeId $ foreignNode arrival) $ bep42s ip (DHT.fallbackID params) -- warning: recursive -} params = DHT.TableParameters { maxBuckets = bc :: Int , fallbackID = nid :: NodeId dht , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht , logMessage = logm :: Char -> String -> IO () , pingProbe = probe :: NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP) } tbl <- asks routingInfo let state = DHT.TableKeeper { routingInfo = tbl , grokNode = DHT.insertNode params state , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO () } return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 -- | Throws exception if node is not responding. queryNode :: forall raw dht u a b ip. ( Address ip , KRPC dht (Query dht a) (Response dht b) , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Default u , Show u , DHT.Kademlia dht , Ord (TransactionID dht) , Serialize (TransactionID dht) , WireFormat raw dht , SerializableTo raw (Response dht b) , SerializableTo raw (Query dht a) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b) queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q queryNode' :: forall raw dht u a b ip. ( Address ip , Default u , Show u , DHT.Kademlia dht , KRPC dht (Query dht a) (Response dht b) , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Ord (TransactionID dht) , Serialize (TransactionID dht) , WireFormat raw dht , SerializableTo raw (Response dht b) , SerializableTo raw (Query dht a) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) queryNode' ni q = do let addr = nodeAddr ni dest = makeAddress (Left $ nodeId ni) (toSockAddr addr) coldQueryNode' addr dest q coldQueryNode' :: forall raw dht u a b ip. ( Address ip , Default u , Show u , DHT.Kademlia dht , KRPC dht (Query dht a) (Response dht b) , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Ord (TransactionID dht) , Serialize (TransactionID dht) , WireFormat raw dht , SerializableTo raw (Response dht b) , SerializableTo raw (Query dht a) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) ) => NodeAddr ip -> PacketDestination dht -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) coldQueryNode' addr dest q = do nid <- myNodeIdAccordingTo $ fromMaybe (error "TODO: coldQueryNode' myNodeIdAccordingTo") $ fromAddr dest dta <- asks dhtData qextra <- liftIO $ makeQueryExtra dta nid (Proxy :: Proxy (Query dht q)) (Proxy :: Proxy (Response dht b)) let read_only = False -- TODO: check for NAT issues. (BEP 43) -- let KRPC.Method name = KRPC.method :: KRPC.Method dht (Query dht a) (Response dht b) mgr <- asks manager (Response rextra r, remoteId, witnessed_ip) <- liftIO $ query' mgr dest (Query qextra q) -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) -- <> " by " <> T.pack (show (toSockAddr addr)) _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip return (remoteId, r, witnessed_ip) -- | Infix version of 'queryNode' function. (<@>) :: ( Address ip , KRPC dht (Query dht a) (Response dht b) , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Default u , Show u , Show (QueryMethod dht) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , Ord (TransactionID dht) , Serialize (TransactionID dht) , SerializableTo raw (Response dht b) , SerializableTo raw (Query dht a) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , WireFormat raw dht , Kademlia dht ) => a -> NodeInfo dht ip u -> DHT raw dht u ip b q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-}