From 3195c0877b443e5ccd4d489f03944fc059d4d7aa Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 29 Jun 2017 10:37:07 -0400 Subject: WIP: Generalizing DHT monad. --- src/Network/BitTorrent/DHT.hs | 42 ++-- src/Network/BitTorrent/DHT/Query.hs | 391 ++++++++++++++++++++++++++++------ src/Network/BitTorrent/DHT/Search.hs | 52 ++--- src/Network/BitTorrent/DHT/Session.hs | 114 +++++----- src/Network/DHT/Mainline.hs | 89 ++++---- src/Network/DHT/Types.hs | 51 ++++- src/Network/DatagramServer.hs | 36 ++-- 7 files changed, 537 insertions(+), 238 deletions(-) (limited to 'src') diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index d9328cea..8bc423a3 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -15,9 +15,11 @@ -- -- {-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE CPP #-} module Network.BitTorrent.DHT ( -- * Distributed Hash Table DHT @@ -37,7 +39,7 @@ module Network.BitTorrent.DHT , snapshot -- * Operations - , Network.BitTorrent.DHT.lookup + -- , Network.BitTorrent.DHT.lookup , Network.BitTorrent.DHT.insert , Network.BitTorrent.DHT.delete @@ -50,7 +52,7 @@ module Network.BitTorrent.DHT , closeNode -- ** Monad - , MonadDHT (..) + -- , MonadDHT (..) , runDHT ) where @@ -81,11 +83,13 @@ import qualified Network.DatagramServer as KRPC (listen, Protocol(..)) -- DHT types -----------------------------------------------------------------------} +#if 0 class MonadDHT m where - liftDHT :: DHT IPv4 a -> m a + liftDHT :: DHT raw dht u IPv4 a -> m a -instance MonadDHT (DHT IPv4) where +instance MonadDHT (DHT raw dht u IPv4) where liftDHT = id +#endif -- | Convenience method. Pass this to 'dht' to enable full logging. fullLogging :: LogSource -> LogLevel -> Bool @@ -96,7 +100,7 @@ dht :: (Ord ip, Address ip) => Options -- ^ normally you need to use 'Data.Default.def'; -> NodeAddr ip -- ^ address to bind this node; -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default - -> DHT ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; + -> DHT raw dht u ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; -> IO a -- ^ result. dht opts addr logfilter action = do runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do @@ -175,7 +179,7 @@ resolveHostName NodeAddr {..} = do -- -- This operation do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -bootstrap :: forall ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT ip () +bootstrap :: forall raw dht u ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT raw dht u ip () bootstrap mbs startNodes = do restored <- case decode <$> mbs of @@ -187,8 +191,8 @@ bootstrap mbs startNodes = do $(logInfoS) "bootstrap" "Start node bootstrapping" let searchAll aliveNodes = do nid <- myNodeIdAccordingTo (error "FIXME") - nss <- C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume - return ( nss :: [[NodeInfo KMessageOf ip ()]] ) + ns <- bgsearch ioFindNodes nid + return ( ns :: [NodeInfo KMessageOf ip ()] ) input_nodes <- (restored ++) . T.toList <$> getTable -- Step 1: Use iterative searches to flesh out the table.. do let knowns = map (map $ nodeAddr . fst) input_nodes @@ -200,10 +204,10 @@ bootstrap mbs startNodes = do -- If our cached nodes are alive and our IP address did not change, it's possible -- we are already bootsrapped, so no need to do any searches. when (not b) $ do - nss <- searchAll $ take 2 alive_knowns + ns <- searchAll $ take 2 alive_knowns -- We only use the supplied bootstrap nodes when we don't know of any -- others to try. - when (null nss) $ do + when (null ns) $ do -- TODO filter duplicated in startNodes list -- TODO retransmissions for startNodes (aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes) @@ -243,7 +247,7 @@ bootstrap mbs startNodes = do -- -- This operation do not block. -- -isBootstrapped :: Eq ip => DHT ip Bool +isBootstrapped :: Eq ip => DHT raw dht u ip Bool isBootstrapped = T.full <$> getTable {----------------------------------------------------------------------- @@ -254,7 +258,11 @@ isBootstrapped = T.full <$> getTable -- -- This is blocking operation, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -snapshot :: Address ip => DHT ip BS.ByteString +snapshot :: ( Address ip + , Ord (NodeId dht) + , Serialize u + , Serialize (NodeId dht) + ) => DHT raw dht u ip BS.ByteString snapshot = do tbl <- getTable return $ encode tbl @@ -263,15 +271,19 @@ snapshot = do -- Operations -----------------------------------------------------------------------} +#if 0 + -- | Get list of peers which downloading this torrent. -- -- This operation is incremental and do block. -- -lookup :: Address ip => InfoHash -> DHT ip `C.Source` [PeerAddr ip] +lookup :: Address ip => InfoHash -> DHT raw dht u ip `C.Source` [PeerAddr ip] lookup topic = do -- TODO retry getClosest if bucket is empty closest <- lift $ getClosest topic C.sourceList [closest] $= search topic (getPeersQ topic) +#endif + -- TODO do not republish if the topic is already in announceSet -- | Announce that /this/ peer may have some pieces of the specified @@ -281,7 +293,7 @@ lookup topic = do -- TODO retry getClosest if bucket is empty -- This operation is synchronous and do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -- -insert :: Address ip => InfoHash -> PortNumber -> DHT ip () +insert :: Address ip => InfoHash -> PortNumber -> DHT raw dht u ip () insert ih p = do publish ih p insertTopic ih p @@ -290,6 +302,6 @@ insert ih p = do -- -- This operation is atomic and may block for a while. -- -delete :: InfoHash -> PortNumber -> DHT ip () +delete :: InfoHash -> PortNumber -> DHT raw dht u ip () delete = deleteTopic {-# INLINE delete #-} diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 254b347c..e5d9bd5f 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs @@ -41,11 +41,13 @@ module Network.BitTorrent.DHT.Query -- concatenate its responses, optionally yielding result and -- continue to the next iteration. , Search - , search + -- , search , publish , ioFindNode + , ioFindNodes , ioGetPeers , isearch + , bgsearch -- ** Routing table , insertNode @@ -57,6 +59,8 @@ module Network.BitTorrent.DHT.Query , (<@>) ) where +import Data.Bits +import Data.Default #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument hiding (yield) #else @@ -102,30 +106,43 @@ import Network.DatagramServer.Tox #endif import Network.Address hiding (NodeId) import Network.DatagramServer.Types as RPC hiding (Query,Response) +import Network.DHT.Types import Control.Monad.Trans.Control +import Data.Typeable +import Data.Serialize +import System.IO.Unsafe (unsafeInterleaveIO) +import Data.String {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} +{- nodeHandler :: ( Address ip - , KRPC (Query a) (Response b) + , KRPC (Query KMessageOf a) (Response KMessageOf b) ) => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler +-} +nodeHandler :: + (Address addr, WireFormat raw msg, Pretty (NodeInfo dht addr u), + Default u, + IsString t, Functor msg, + SerializableTo raw (Response dht r), + SerializableTo raw (Query dht q)) => + (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ()) + -> (NodeAddr addr -> IO (NodeId dht)) + -> (Char -> t -> Text -> IO ()) + -> QueryMethod msg + -> (NodeAddr addr -> q -> IO r) + -> Handler IO msg raw nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do -#ifdef VERSION_bencoding let remoteId = queringNodeId qry read_only = queryIsReadOnly qry q = queryParams qry -#else - let remoteId = msgClient qry - read_only = False - q = msgPayload qry -#endif case fromSockAddr sockAddr of Nothing -> throwIO BadAddress Just naddr -> do - let ni = NodeInfo remoteId naddr () + let ni = NodeInfo remoteId naddr def -- Do not route read-only nodes. (bep 43) if read_only then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) @@ -135,13 +152,13 @@ nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ <*> action naddr q -- | Default 'Ping' handler. -pingH :: NodeAddr ip -> Ping -> IO Ping -pingH _ Ping = return Ping +pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht) +pingH dht _ _ = return (DHT.pongMessage dht) -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } -- | Default 'FindNode' handler. -findNodeH :: (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) -> NodeAddr ip -> FindNode ip -> IO (NodeFound ip) -findNodeH getclosest _ (FindNode nid) = NodeFound <$> getclosest nid +findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip) +findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg) -- | Default 'GetPeers' handler. getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) @@ -162,51 +179,100 @@ announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do insertPeer peers topic announcedName peerAddr return Announced +-- | Includes all Kademlia-related handlers. +kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip + , Ord (TransactionID dht) + , Ord (NodeId dht) + , Show u + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Show (QueryMethod dht) + , Show (NodeId dht) + , FiniteBits (NodeId dht) + , Default u + , Serialize (TransactionID dht) + , WireFormat raw dht + , Kademlia dht + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Functor dht + , Pretty (NodeInfo dht ip u) + , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) + , SerializableTo raw (Response dht (NodeFound dht ip)) + , SerializableTo raw (Query dht (FindNode dht ip)) + ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] +-- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] +kademliaHandlers logger = do + groknode <- insertNode1 + mynid <- myNodeIdAccordingTo1 + let handler :: ( KRPC (Query dht a) (Response dht b) + , SerializableTo raw (Response dht b) + , SerializableTo raw (Query dht a) + ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw + handler = nodeHandler groknode mynid (logt logger) + dht = Proxy :: Proxy dht + getclosest <- getClosest1 + return [ handler (namePing dht) $ pingH dht + , handler (nameFindNodes dht) $ findNodeH getclosest + ] + + -- | Includes all default query handlers. -defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT ip [NodeHandler] +defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] defaultHandlers logger = do groknode <- insertNode1 - toks <- asks sessionTokens - getclosest <- getClosest1 mynid <- myNodeIdAccordingTo1 + let handler :: KRPC (Query KMessageOf a) (Response KMessageOf b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler + handler = nodeHandler groknode mynid (logt logger) + toks <- asks sessionTokens peers <- asks contactInfo getpeers <- getPeerList1 - let handler :: KRPC (Query a) (Response b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler - handler = nodeHandler groknode mynid (logt logger) - return [ handler "ping" $ pingH - , handler "find-nodes" $ findNodeH getclosest - , handler "get_peers" $ getPeersH getpeers toks - , handler "announce_peer" $ announceH peers toks ] + hs <- kademliaHandlers logger + return $ hs ++ [ handler "get_peers" $ getPeersH getpeers toks + , handler "announce_peer" $ announceH peers toks ] {----------------------------------------------------------------------- -- Basic queries -----------------------------------------------------------------------} -type Iteration ip o = NodeInfo KMessageOf ip () -> DHT ip (Either [NodeInfo KMessageOf ip ()] [o ip]) +type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip]) -- | The most basic query. May be used to check if the given node is -- alive or get its 'NodeId'. -pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo KMessageOf ip (), Maybe ReflectedIP) +pingQ :: forall raw dht u ip. + ( DHT.Kademlia dht + , Address ip + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Default u + , Show u + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , WireFormat raw dht + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + ) => NodeAddr ip -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) pingQ addr = do -#ifdef VERSION_bencoding - (nid, Ping, mip) <- queryNode' addr Ping -#else - (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} -#endif - return (NodeInfo nid addr (), mip) + let ping = DHT.pingMessage (Proxy :: Proxy dht) + (nid, pong, mip) <- queryNode' addr ping + let _ = pong `asTypeOf` ping + -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} + return (NodeInfo nid addr def, mip) -- TODO [robustness] match range of returned node ids with the -- expected range and either filter bad nodes or discard response at -- all throwing an exception -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo -findNodeQ key NodeInfo {..} = do - NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr +findNodeQ proxy key NodeInfo {..} = do + closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> nodeAddr $(logInfoS) "findNodeQ" $ "NodeFound\n" <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) return $ Right closest #ifdef VERSION_bencoding -getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr +getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr getPeersQ topic NodeInfo {..} = do GotPeers {..} <- GetPeers topic <@> nodeAddr let dist = distance (toNodeId topic) nodeId @@ -215,7 +281,7 @@ getPeersQ topic NodeInfo {..} = do <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } return peers -announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr +announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr announceQ ih p NodeInfo {..} = do GotPeers {..} <- GetPeers ih <@> nodeAddr case peers of @@ -232,7 +298,7 @@ announceQ ih p NodeInfo {..} = do -----------------------------------------------------------------------} -ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) +ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) ioGetPeers ih = do session <- ask return $ \ni -> runDHT session $ do @@ -241,17 +307,71 @@ ioGetPeers ih = do Right e -> return $ either (,[]) ([],) e Left e -> let _ = e :: QueryFailure in return ([],[]) -ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [NodeInfo KMessageOf ip ()])) +ioFindNode :: ( DHT.Kademlia dht + , WireFormat raw dht + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Address ip + , Default u + , Show u + , Show (QueryMethod dht) + , TableKey dht infohash + , Eq (NodeId dht) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , SerializableTo raw (Response dht (NodeFound dht ip)) + , SerializableTo raw (Query dht (FindNode dht ip)) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) ioFindNode ih = do session <- ask return $ \ni -> runDHT session $ do - NodeFound ns <- FindNode (toNodeId ih) <@> nodeAddr ni - return $ L.partition (\n -> nodeId n /= toNodeId ih) ns - -isearch :: (Ord r, Ord ip) => - (InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r]))) - -> InfoHash - -> DHT ip (ThreadId, Search.IterativeSearch ip r) + ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni + let ns' = L.map (fmap (const def)) ns + return $ L.partition (\n -> nodeId n /= toNodeId ih) ns' + + +-- | Like ioFindNode, but considers all found nodes to be 'Right' results. +ioFindNodes :: ( DHT.Kademlia dht + , WireFormat raw dht + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Address ip + , Default u + , Show u + , Show (QueryMethod dht) + , TableKey dht infohash + , Eq (NodeId dht) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , SerializableTo raw (Response dht (NodeFound dht ip)) + , SerializableTo raw (Query dht (FindNode dht ip)) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) +ioFindNodes ih = do + session <- ask + return $ \ni -> runDHT session $ do + ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni + let ns' = L.map (fmap (const def)) ns + return ([], ns') + +isearch :: ( Ord r + , Ord ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , TableKey dht ih + , Show ih) => + (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]))) + -> ih + -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r) isearch f ih = do qry <- f ih ns <- kclosest 8 ih <$> getTable @@ -263,8 +383,25 @@ isearch f ih = do -- atomically \$ readTVar (Search.searchResults s) return (a, s) - -type Search ip o = Conduit [NodeInfo KMessageOf ip ()] (DHT ip) [o KMessageOf ip ()] +-- | Background search: fill a lazy list using a background thread. +bgsearch f ih = do + (tid, s) <- isearch f ih + let again shown = do + (chk,fin) <- atomically $ do + r <- (Set.\\ shown) <$> readTVar (Search.searchResults s) + if not $ Set.null r + then (,) r <$> Search.searchIsFinished s + else Search.searchIsFinished s >>= check >> return (Set.empty,True) + let ps = Set.toList chk + if fin then return ps + else do + xs <- unsafeInterleaveIO $ again (shown `Set.union` chk) + return $ ps ++ xs + liftIO $ again Set.empty + +type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u] + +#if 0 -- TODO: use reorder and filter (Traversal option) leftovers -- search :: k -> IterationI ip o -> Search ip o @@ -275,17 +412,36 @@ search _ action = do let (nodes, results) = partitionEithers responses $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) leftover $ L.concat nodes - mapM_ yield results - -publish :: Address ip => InfoHash -> PortNumber -> DHT ip () -publish ih p = do - nodes <- getClosest ih - r <- asks (optReplication . options) - _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r - return () + let r = mapM_ yield results + _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ()) + r +#endif -probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) +publish = error "todo" +-- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip () +-- publish ih p = do + -- nodes <- getClosest ih + -- r <- asks (optReplication . options) + -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r + -- return () + + +probeNode :: ( Default u + , Show u + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , DHT.Kademlia dht + , Address ip + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , WireFormat raw dht + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + ) => NodeAddr ip -> DHT raw dht u ip (Bool , Maybe ReflectedIP) probeNode addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) result <- try $ pingQ addr @@ -293,8 +449,16 @@ probeNode addr = do return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result +refreshNodes :: forall raw dht u ip. + ( Address ip + , Ord (NodeId dht) + , Default u + , FiniteBits (NodeId dht) + , Pretty (NodeId dht) + , DHT.Kademlia dht ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()] +refreshNodes _ = return () -- TODO +#if 0 -- FIXME do not use getClosest sinse we should /refresh/ them -refreshNodes :: Address ip => NodeId KMessageOf -> DHT ip () -- [NodeInfo KMessageOf ip ()] refreshNodes nid = do $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) nodes <- getClosest nid @@ -304,7 +468,7 @@ refreshNodes nid = do -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume - nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume + nss <- sourceList [nodes] $= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) @@ -312,8 +476,9 @@ refreshNodes nid = do -- pingQ takes care of inserting the node. return () return () -- \$ L.concat nss +#endif -logc :: Char -> String -> DHT ip () +logc :: Char -> String -> DHT raw dht u ip () logc 'D' = $(logDebugS) "insertNode" . T.pack logc 'W' = $(logWarnS) "insertNode" . T.pack logc 'I' = $(logInfoS) "insertNode" . T.pack @@ -321,12 +486,46 @@ logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) -- | This operation do not block but acquire exclusive access to -- routing table. -insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () +insertNode :: forall raw dht u ip. + ( Address ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Default u + , Show u + , DHT.Kademlia dht + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Ord (TransactionID dht) + , WireFormat raw dht + , Serialize (TransactionID dht) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Ord (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip () insertNode info witnessed_ip0 = do f <- insertNode1 liftIO $ f info witnessed_ip0 -insertNode1 :: forall ip. Address ip => DHT ip (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) +insertNode1 :: forall raw dht u ip. + ( Address ip + , Default u + , Show u + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , DHT.Kademlia dht + , Ord (TransactionID dht) + , WireFormat raw dht + , Serialize (TransactionID dht) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Ord (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) insertNode1 = do bc <- optBucketCount <$> asks options nid <- asks tentativeNodeId @@ -335,15 +534,17 @@ insertNode1 = do dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) let probe n = probe0 n >>= runDHT dht_node_state . restoreM + {- changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive ip <- fromSockAddr ip0 :: Maybe ip listToMaybe $ rank id (nodeId $ foreignNode arrival) $ bep42s ip (DHT.fallbackID params) -- warning: recursive + -} params = DHT.TableParameters { maxBuckets = bc :: Int - , fallbackID = nid :: NodeId KMessageOf - , adjustID = changeip :: SockAddr -> Event KMessageOf ip () -> NodeId KMessageOf + , fallbackID = nid :: NodeId dht + , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht , logMessage = logm :: Char -> String -> IO () , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP) } @@ -356,25 +557,75 @@ insertNode1 = do return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 -- | Throws exception if node is not responding. -queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) - => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b) +queryNode :: forall raw dht u a b ip. + ( Address ip + , KRPC (Query dht a) (Response dht b) + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Default u + , Show u + , DHT.Kademlia dht + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , WireFormat raw dht + , SerializableTo raw (Response dht b) + , SerializableTo raw (Query dht a) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b) queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q -queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) - => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b, Maybe ReflectedIP) +queryNode' :: forall raw dht u a b ip. + ( Address ip + , Default u + , Show u + , DHT.Kademlia dht + , KRPC (Query dht a) (Response dht b) + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , WireFormat raw dht + , SerializableTo raw (Response dht b) + , SerializableTo raw (Query dht a) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Show (QueryMethod dht) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) queryNode' addr q = do nid <- myNodeIdAccordingTo addr let read_only = False -- TODO: check for NAT issues. (BEP 43) - let KRPC.Method name = KRPC.method :: KRPC.Method (Query a) (Response b) + let KRPC.Method name = KRPC.method :: KRPC.Method (Query dht a) (Response dht b) mgr <- asks manager - (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr name (toSockAddr addr) (Query nid read_only q) + (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr (error "TODO: name") (toSockAddr addr) (Query nid read_only q) -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) -- <> " by " <> T.pack (show (toSockAddr addr)) - _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip + _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip return (remoteId, r, witnessed_ip) -- | Infix version of 'queryNode' function. -(<@>) :: Address ip => KRPC (Query a) (Response b) - => a -> NodeAddr ip -> DHT ip b +(<@>) :: ( Address ip + , KRPC (Query dht a) (Response dht b) + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Default u + , Show u + , Show (QueryMethod dht) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Show (NodeId dht) + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , SerializableTo raw (Response dht b) + , SerializableTo raw (Query dht a) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , WireFormat raw dht + , Kademlia dht + ) => a -> NodeAddr ip -> DHT raw dht u ip b q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-} diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index f5cd7834..356f6fd9 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs @@ -2,6 +2,7 @@ {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE FlexibleContexts #-} module Network.BitTorrent.DHT.Search where import Control.Concurrent @@ -25,27 +26,23 @@ import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) import Network.Address hiding (NodeId) import Network.DatagramServer.Types -#ifdef VERSION_bencoding -import Network.DatagramServer.Mainline (KMessageOf) -type Ann = () -#else -import Network.DatagramServer.Tox as Tox -type KMessageOf = Tox.Message -type Ann = Bool -#endif +import Data.Bits -data IterativeSearch ip r = IterativeSearch - { searchTarget :: NodeId KMessageOf - , searchQuery :: NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r]) +data IterativeSearch dht u ip r = IterativeSearch + { searchTarget :: NodeId dht + , searchQuery :: NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]) , searchPendingCount :: TVar Int - , searchQueued :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) - , searchInformant :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) + , searchQueued :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))) + , searchInformant :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))) , searchVisited :: TVar (Set (NodeAddr ip)) , searchResults :: TVar (Set r) } -newSearch :: Eq ip => (NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r])) - -> NodeId KMessageOf -> [NodeInfo KMessageOf ip Ann] -> IO (IterativeSearch ip r) +newSearch :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])) + -> NodeId dht -> [NodeInfo dht ip u] -> IO (IterativeSearch dht u ip r) newSearch qry target ns = atomically $ do c <- newTVar 0 q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns @@ -60,9 +57,14 @@ searchAlpha = 3 searchK :: Int searchK = 8 -sendQuery :: forall a ip. (Ord a, Ord ip) => - IterativeSearch ip a - -> Binding (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf)) +sendQuery :: forall a ip dht u. + ( Ord a + , Ord ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => + IterativeSearch dht u ip a + -> Binding (NodeInfo dht ip u) (NodeDistance (NodeId dht)) -> IO () sendQuery IterativeSearch{..} (ni :-> d) = do (ns,rs) <- handle (\(SomeException e) -> return ([],[])) @@ -71,9 +73,9 @@ sendQuery IterativeSearch{..} (ni :-> d) = do modifyTVar searchPendingCount pred vs <- readTVar searchVisited -- We only queue a node if it is not yet visited - let insertFoundNode :: NodeInfo KMessageOf ip u - -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) - -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) + let insertFoundNode :: NodeInfo dht ip u + -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)) + -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)) insertFoundNode n q | nodeAddr n `Set.member` vs = q | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q @@ -82,7 +84,9 @@ sendQuery IterativeSearch{..} (ni :-> d) = do modifyTVar searchResults $ \s -> foldr Set.insert s rs -searchIsFinished :: Ord ip => IterativeSearch ip r -> STM Bool +searchIsFinished :: ( Ord ip + , Ord (NodeId dht) + ) => IterativeSearch dht u ip r -> STM Bool searchIsFinished IterativeSearch{..} = do q <- readTVar searchQueued cnt <- readTVar searchPendingCount @@ -94,8 +98,8 @@ searchIsFinished IterativeSearch{..} = do <= PSQ.prio (fromJust $ MM.findMin q)))) search :: - (Ord r, Ord ip) => - IterativeSearch ip r -> IO () + (Ord r, Ord ip, Ord (NodeId dht), FiniteBits (NodeId dht)) => + IterativeSearch dht u ip r -> IO () search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do fix $ \again -> do join $ atomically $ do diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index d4794038..f96ba707 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -96,6 +96,7 @@ import Control.Monad.Trans.Control import Control.Monad.Trans.Resource import Data.Typeable import Data.String +import Data.Bits import Data.ByteString import Data.Conduit.Lazy import Data.Default @@ -265,26 +266,17 @@ type AnnounceSet = Set (InfoHash, PortNumber) type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () -- | DHT session keep track state of /this/ node. -data Node ip = Node +data Node raw dht u ip = Node { -- | Session configuration; options :: !Options -- | Pseudo-unique self-assigned session identifier. This value is -- constant during DHT session and (optionally) between sessions. -#ifdef VERSION_bencoding - , tentativeNodeId :: !(NodeId KMessageOf) -#else - , tentativeNodeId :: !(NodeId Tox.Message) -#endif + , tentativeNodeId :: !(NodeId dht) , resources :: !InternalState -#ifdef VERSION_bencoding - , manager :: !(Manager IO BValue KMessageOf) -- ^ RPC manager; - , routingInfo :: !(TVar (Maybe (R.Info KMessageOf ip ()))) -- ^ search table; -#else - , manager :: !(Manager (DHT ip) ByteString Tox.Message) -- ^ RPC manager; - , routingInfo :: !(TVar (Maybe (R.Info Tox.Message ip Bool))) -- ^ search table; -#endif + , manager :: !(Manager raw dht) -- ^ RPC manager; + , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. @@ -293,23 +285,23 @@ data Node ip = Node -- | DHT keep track current session and proper resource allocation for -- safe multithreading. -newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } +newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a } deriving ( Functor, Applicative, Monad, MonadIO - , MonadBase IO, MonadReader (Node ip), MonadThrow + , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow ) #if MIN_VERSION_monad_control(1,0,0) -newtype DHTStM ip a = StM { - unSt :: StM (ReaderT (Node ip) IO) a +newtype DHTStM raw dht u ip a = StM { + unSt :: StM (ReaderT (Node raw dht u ip) IO) a } #endif -instance MonadBaseControl IO (DHT ip) where +instance MonadBaseControl IO (DHT raw dht u ip) where #if MIN_VERSION_monad_control(1,0,0) - type StM (DHT ip) a = DHTStM ip a + type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a #else - newtype StM (DHT ip) a = StM { - unSt :: StM (ReaderT (Node ip) IO) a + newtype StM (DHT raw dht u ip) a = StM { + unSt :: StM (ReaderT (Node raw dht u ip) IO) a } #endif liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> @@ -321,22 +313,22 @@ instance MonadBaseControl IO (DHT ip) where -- | Check is it is possible to run 'queryNode' or handle pending -- query from remote node. -instance MonadActive (DHT ip) where +instance MonadActive (DHT raw dht u ip) where monadActive = getManager >>= liftIO . isActive {-# INLINE monadActive #-} -- | All allocated resources will be closed at 'closeNode'. -instance MonadResource (DHT ip) where +instance MonadResource (DHT raw dht u ip) where liftResourceT m = do s <- asks resources liftIO $ runInternalState m s --- instance MonadKRPC (DHT ip) (DHT ip) BValue KMessageOf where +-- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where -getManager :: DHT ip (Manager IO BValue KMessageOf) +getManager :: DHT raw dht u ip (Manager raw dht) getManager = asks manager -instance MonadLogger (DHT ip) where +instance MonadLogger (DHT raw dht u ip) where monadLoggerLog loc src lvl msg = do logger <- asks loggerFun liftIO $ logger loc src lvl (toLogStr msg) @@ -344,7 +336,7 @@ instance MonadLogger (DHT ip) where #ifdef VERSION_bencoding type NodeHandler = Handler IO KMessageOf BValue #else -type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString +type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString #endif logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () @@ -376,17 +368,16 @@ locFromCS cs = case getCallStack cs of -- | Run DHT session. You /must/ properly close session using -- 'closeNode' function, otherwise socket or other scarce resources may -- leak. -newNode :: Address ip +newNode :: ( Address ip + , FiniteBits (NodeId dht) + , Serialize (NodeId dht) + ) => -- [NodeHandler] -- ^ handlers to run on accepted queries; Options -- ^ various dht options; -> NodeAddr ip -- ^ node address to bind; -> LogFun -- ^ invoked on log messages; -#ifdef VERSION_bencoding - -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. -#else - -> Maybe (NodeId Tox.Message) -- ^ use this NodeId, if not given a new one is generated. -#endif - -> IO (Node ip) -- ^ a new DHT node running at given address. + -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated. + -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address. newNode opts naddr logger mbid = do s <- createInternalState runInternalState initNode s @@ -409,11 +400,11 @@ newNode opts naddr logger mbid = do -- | Some resources like listener thread may live for -- some short period of time right after this DHT session closed. -closeNode :: Node ip -> IO () +closeNode :: Node raw dht u ip -> IO () closeNode Node {..} = closeInternalState resources -- | Run DHT operation on the given session. -runDHT :: Node ip -> DHT ip a -> IO a +runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a runDHT node action = runReaderT (unDHT action) node {-# INLINE runDHT #-} @@ -453,24 +444,20 @@ checkToken sessionTokens addr questionableToken = do -----------------------------------------------------------------------} -- | This nodes externally routable address reported by remote peers. -routableAddress :: DHT ip (Maybe SockAddr) +routableAddress :: DHT raw dht u ip (Maybe SockAddr) routableAddress = do info <- asks routingInfo >>= liftIO . atomically . readTVar return $ myAddress <$> info -- | The current NodeId that the given remote node should know us by. -#ifdef VERSION_bencoding -myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId KMessageOf) -#else -myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId Tox.Message) -#endif +myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht) myNodeIdAccordingTo _ = do info <- asks routingInfo >>= liftIO . atomically . readTVar maybe (asks tentativeNodeId) (return . myNodeId) info -myNodeIdAccordingTo1 :: DHT ip ( NodeAddr ip -> IO (NodeId KMessageOf) ) +myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) ) myNodeIdAccordingTo1 = do var <- asks routingInfo tid <- asks tentativeNodeId @@ -480,11 +467,7 @@ myNodeIdAccordingTo1 = do -- | Get current routing table. Normally you don't need to use this -- function, but it can be usefull for debugging and profiling purposes. -#ifdef VERSION_bencoding -getTable :: Eq ip => DHT ip (Table KMessageOf ip ()) -#else -getTable :: Eq ip => DHT ip (Table Tox.Message ip Bool) -#endif +getTable :: Eq ip => DHT raw dht u ip (Table dht ip u) getTable = do Node { tentativeNodeId = myId , routingInfo = var @@ -492,18 +475,18 @@ getTable = do let nil = nullTable myId (optBucketCount opts) liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) -getSwarms :: Ord ip => DHT ip [ (InfoHash, Int, Maybe ByteString) ] +getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ] getSwarms = do store <- asks contactInfo >>= liftIO . atomically . readTVar return $ P.knownSwarms store -savePeerStore :: (Ord ip, Address ip) => DHT ip ByteString +savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString savePeerStore = do var <- asks contactInfo peers <- liftIO $ atomically $ readTVar var return $ S.encode peers -mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT ip () +mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip () mergeSavedPeers bs = do var <- asks contactInfo case S.decode bs of @@ -511,7 +494,7 @@ mergeSavedPeers bs = do Left _ -> return () -allPeers :: Ord ip => InfoHash -> DHT ip [ PeerAddr ip ] +allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ] allPeers ih = do store <- asks contactInfo >>= liftIO . atomically . readTVar return $ P.lookup ih store @@ -521,18 +504,20 @@ allPeers ih = do -- -- This operation used for 'find_nodes' query. -- -#ifdef VERSION_bencoding -getClosest :: Eq ip => TableKey KMessageOf k => k -> DHT ip [NodeInfo KMessageOf ip ()] -#else -getClosest :: Eq ip => TableKey Tox.Message k => k -> DHT ip [NodeInfo Tox.Message ip Bool] -#endif +getClosest :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , TableKey dht k ) => + k -> DHT raw dht u ip [NodeInfo dht ip u] getClosest node = do k <- asks (optK . options) kclosest k node <$> getTable getClosest1 :: ( Eq ip - , TableKey KMessageOf k - ) => DHT ip (k -> IO [NodeInfo KMessageOf ip ()]) + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , TableKey dht k + ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u]) getClosest1 = do k <- asks (optK . options) nobkts <- asks (optBucketCount . options) @@ -574,13 +559,12 @@ getTimestamp = do -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) return $ utcTimeToPOSIXSeconds utcTime - #ifdef VERSION_bencoding -- | Prepare result for 'get_peers' query. -- -- This operation use 'getClosest' as failback so it may block. -- -getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip) +getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) getPeerList ih = do var <- asks contactInfo ps <- liftIO $ lookupPeers var ih @@ -588,7 +572,7 @@ getPeerList ih = do then Left <$> getClosest ih else return (Right ps) -getPeerList1 :: Ord ip => DHT ip (InfoHash -> IO (PeerList ip)) +getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) getPeerList1 = do var <- asks contactInfo getclosest <- getClosest1 @@ -599,12 +583,12 @@ getPeerList1 = do else return (Right ps) -insertTopic :: InfoHash -> PortNumber -> DHT ip () +insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () insertTopic ih p = do var <- asks announceInfo liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) -deleteTopic :: InfoHash -> PortNumber -> DHT ip () +deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () deleteTopic ih p = do var <- asks announceInfo liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) @@ -616,7 +600,7 @@ deleteTopic ih p = do -----------------------------------------------------------------------} -- | Failed queries are ignored. -queryParallel :: [DHT ip a] -> DHT ip [a] +queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a] queryParallel queries = do -- TODO: use alpha -- alpha <- asks (optAlpha . options) diff --git a/src/Network/DHT/Mainline.hs b/src/Network/DHT/Mainline.hs index d118ceb0..29d4231d 100644 --- a/src/Network/DHT/Mainline.hs +++ b/src/Network/DHT/Mainline.hs @@ -123,6 +123,8 @@ import Network.BitTorrent.DHT.Token import Network.DatagramServer () #endif import Network.DatagramServer.Types hiding (Query,Response) +import Network.DHT.Types +import Network.DHT.Routing {----------------------------------------------------------------------- -- envelopes @@ -140,15 +142,7 @@ read_only_key = "ro" #ifdef VERSION_bencoding --- | All queries have an \"id\" key and value containing the node ID --- of the querying node. -data Query a = Query - { queringNodeId :: NodeId KMessageOf -- ^ node id of /quering/ node; - , queryIsReadOnly :: Bool -- ^ node is read-only as per BEP 43 - , queryParams :: a -- ^ query parameters. - } deriving (Show, Eq, Typeable) - -instance BEncode a => BEncode (Query a) where +instance BEncode a => BEncode (Query KMessageOf a) where toBEncode Query {..} = toDict $ BDict.union ( node_id_key .=! queringNodeId .: read_only_key .=? bool Nothing (Just (1 :: Integer)) queryIsReadOnly @@ -167,14 +161,7 @@ data Query a = Query a #endif #ifdef VERSION_bencoding --- | All responses have an \"id\" key and value containing the node ID --- of the responding node. -data Response a = Response - { queredNodeId :: NodeId KMessageOf -- ^ node id of /quered/ node; - , responseVals :: a -- ^ query result. - } deriving (Show, Eq, Typeable) - -instance BEncode a => BEncode (Response a) where +instance BEncode a => BEncode (Response KMessageOf a) where toBEncode = toBEncode . toQuery where toQuery (Response nid a) = Query nid False a @@ -183,28 +170,23 @@ instance BEncode a => BEncode (Response a) where where fromQuery (Query nid _ a) = Response nid a #else -data Response a = Response a +data Response KMessageOf a = Response KMessageOf a #endif {----------------------------------------------------------------------- -- ping method -----------------------------------------------------------------------} --- | The most basic query is a ping. Ping query is used to check if a +-- / The most basic query is a ping. Ping query is used to check if a -- quered node is still alive. -#ifdef VERSION_bencoding -data Ping = Ping -#else -data Ping = Ping Tox.Nonce8 -#endif - deriving (Show, Eq, Typeable) +-- data Ping = Ping Tox.Nonce8 deriving (Show, Eq, Typeable) #ifdef VERSION_bencoding -instance BEncode Ping where +instance BEncode (Ping KMessageOf) where toBEncode Ping = toDict endDict fromBEncode _ = pure Ping #else -instance Serialize (Query Ping) where +instance Serialize (Query (Ping KMessageOf)) where get = do b <- get when ( (b::Word8) /= 0) $ fail "Bad ping request" @@ -225,7 +207,7 @@ instance Serialize (Response Ping) where #endif -- | \"q\" = \"ping\" -instance KRPC (Query Ping) (Response Ping) where +instance KRPC (Query KMessageOf (Ping KMessageOf)) (Response KMessageOf (Ping KMessageOf)) where #ifdef VERSION_bencoding method = "ping" #else @@ -236,24 +218,20 @@ instance KRPC (Query Ping) (Response Ping) where -- find_node method -----------------------------------------------------------------------} --- | Find node is used to find the contact information for a node +-- / Find node is used to find the contact information for a node -- given its ID. -#ifdef VERSION_bencoding -newtype FindNode ip = FindNode (NodeId KMessageOf) -#else -data FindNode ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes -#endif - deriving (Show, Eq, Typeable) +-- data FindNode KMessageOf ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes + -- deriving (Show, Eq, Typeable) target_key :: BKey target_key = "target" #ifdef VERSION_bencoding -instance Typeable ip => BEncode (FindNode ip) where +instance Typeable ip => BEncode (FindNode KMessageOf ip) where toBEncode (FindNode nid) = toDict $ target_key .=! nid .: endDict fromBEncode = fromDict $ FindNode <$>! target_key #else -instance Serialize (Query (FindNode ip)) where +instance Serialize (Query KMessageOf (FindNode KMessageOf ip)) where get = do nid <- get nonce <- get @@ -268,12 +246,11 @@ instance Serialize (Query (FindNode ip)) where -- nodes in its own routing table. -- #ifdef VERSION_bencoding -newtype NodeFound ip = NodeFound [NodeInfo KMessageOf ip ()] +-- newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()] deriving (Show, Eq, Typeable) #else -data NodeFound ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 +data NodeFound KMessageOf ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 deriving (Show, Eq, Typeable) #endif -- Tox: send_nodes - deriving (Show, Eq, Typeable) nodes_key :: BKey nodes_key = "nodes" @@ -290,7 +267,7 @@ binary k = field (req k) >>= either (fail . format) return . where format str = "fail to deserialize " ++ show k ++ " field: " ++ str -instance Address ip => BEncode (NodeFound ip) where +instance Address ip => BEncode (NodeFound KMessageOf ip) where toBEncode (NodeFound ns) = toDict $ nodes_key .=! runPut (mapM_ put ns) .: endDict @@ -298,7 +275,7 @@ instance Address ip => BEncode (NodeFound ip) where -- TODO: handle IPv6 by reading the "nodes6" key (see bep 32) fromBEncode bval = NodeFound <$> (traverse from4 =<< fromDict (binary nodes_key) bval) #else -instance Serialize (Response (NodeFound ip)) where +instance Serialize (Response KMessageOf (NodeFound KMessageOf ip)) where get = do count <- get :: Get Word8 nodes <- sequence $ replicate (fromIntegral count) get @@ -314,7 +291,7 @@ instance Serialize (Response (NodeFound ip)) where -- | \"q\" == \"find_node\" instance (Address ip, Typeable ip) - => KRPC (Query (FindNode ip)) (Response (NodeFound ip)) where + => KRPC (Query KMessageOf (FindNode KMessageOf ip)) (Response KMessageOf (NodeFound KMessageOf ip)) where #ifdef VERSION_bencoding method = "find_node" #else @@ -383,7 +360,7 @@ instance (Typeable ip, Serialize ip) => BEncode (GotPeers ip) where -- | \"q" = \"get_peers\" instance (Typeable ip, Serialize ip) => - KRPC (Query (GetPeers ip)) (Response (GotPeers ip)) where + KRPC (Query KMessageOf (GetPeers ip)) (Response KMessageOf (GotPeers ip)) where method = "get_peers" {----------------------------------------------------------------------- @@ -455,7 +432,7 @@ instance BEncode Announced where fromBEncode _ = pure Announced -- | \"q" = \"announce\" -instance KRPC (Query Announce) (Response Announced) where +instance KRPC (Query KMessageOf Announce) (Response KMessageOf Announced) where method = "announce_peer" -- endif VERSION_bencoding @@ -495,3 +472,25 @@ bep42 addr (NodeId r) where msk | BS.length ip == 4 = ip4mask | otherwise = ip6mask +instance Kademlia KMessageOf where + data Ping KMessageOf = Ping + deriving (Show, Eq, Typeable) + newtype FindNode KMessageOf ip = FindNode (NodeId KMessageOf) + deriving (Show, Eq, Typeable) + newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()] + deriving (Show, Eq, Typeable) + pingMessage _ = Ping + pongMessage _ = Ping + findNodeMessage _ k = FindNode (toNodeId k) + foundNodes (NodeFound ns) = ns + + dhtAdjustID _ fallback ip0 arrival + = fromMaybe fallback $ do + ip <- fromSockAddr ip0 -- :: Maybe ip + let _ = ip `asTypeOf` nodeAddr (foreignNode arrival) + listToMaybe + $ rank id (nodeId $ foreignNode arrival) + $ bep42s ip fallback + + namePing _ = "ping" + nameFindNodes _ = "find-nodes" diff --git a/src/Network/DHT/Types.hs b/src/Network/DHT/Types.hs index ed2dc175..79f9e1d3 100644 --- a/src/Network/DHT/Types.hs +++ b/src/Network/DHT/Types.hs @@ -1,8 +1,17 @@ -module Network.DHT.Types where +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE FlexibleContexts #-} +module Network.DHT.Types + ( module Network.DHT.Types + , TableKey + , toNodeId + ) where import Network.Socket (SockAddr) import Network.DatagramServer.Types import Network.DHT.Routing +import Data.Typeable data TableParameters msg ip u = TableParameters { maxBuckets :: Int @@ -11,3 +20,43 @@ data TableParameters msg ip u = TableParameters , logMessage :: Char -> String -> IO () , adjustID :: SockAddr -> Event msg ip u -> NodeId msg } + +-- | All queries have an \"id\" key and value containing the node ID +-- of the querying node. +data Query dht a = Query + { queringNodeId :: NodeId dht -- ^ node id of /quering/ node; + , queryIsReadOnly :: Bool -- ^ node is read-only as per BEP 43 + , queryParams :: a -- ^ query parameters. + } deriving (Typeable) + +deriving instance (Eq (NodeId dht), Eq a ) => Eq (Query dht a) +deriving instance (Show (NodeId dht), Show a ) => Show (Query dht a) + +-- | All responses have an \"id\" key and value containing the node ID +-- of the responding node. +data Response dht a = Response + { queredNodeId :: NodeId dht -- ^ node id of /quered/ node; + , responseVals :: a -- ^ query result. + } deriving (Typeable) + +deriving instance (Eq (NodeId dht), Eq a ) => Eq (Response dht a) +deriving instance (Show (NodeId dht), Show a ) => Show (Response dht a) + + +class Kademlia dht where + -- | The most basic query is a ping. Ping query is used to check if a + -- quered node is still alive. + data Ping dht + -- | Find node is used to find the contact information for a node + -- given its ID. + data FindNode dht ip + data NodeFound dht ip + pingMessage :: Proxy dht -> Ping dht + pongMessage :: Proxy dht -> Ping dht + findNodeMessage :: TableKey dht k => Proxy dht -> k -> FindNode dht ip + foundNodesMessage :: [NodeInfo dht ip ()] -> NodeFound dht ip + foundNodes :: NodeFound dht ip -> [NodeInfo dht ip ()] + findWho :: FindNode dht ip -> NodeId dht + dhtAdjustID :: Address ip => Proxy dht -> NodeId dht -> SockAddr -> Event dht ip u -> NodeId dht + namePing :: Proxy dht -> QueryMethod dht + nameFindNodes :: Proxy dht -> QueryMethod dht diff --git a/src/Network/DatagramServer.hs b/src/Network/DatagramServer.hs index 2140e2cd..91efa443 100644 --- a/src/Network/DatagramServer.hs +++ b/src/Network/DatagramServer.hs @@ -190,13 +190,13 @@ type Handler h msg v = (QueryMethod msg, HandlerBody h msg v) -- | Keep track pending queries made by /this/ node and handle queries -- made by /remote/ nodes. -data Manager h raw msg = Manager +data Manager raw msg = Manager { sock :: !Socket , options :: !Options , listenerThread :: !(MVar ThreadId) , transactionCounter :: {-# UNPACK #-} !TransactionCounter , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw) - , handlers :: [Handler h msg raw] -- TODO delete this, it's not used + -- , handlers :: [Handler h msg raw] -- TODO delete this, it's not used , logMsg :: Char -> String -> T.Text -> IO () } @@ -212,14 +212,14 @@ newManager :: Options -- ^ various protocol options; -> (Char -> String -> T.Text -> IO ()) -- ^ loging function -> SockAddr -- ^ address to listen on; -> [Handler h msg raw] -- ^ handlers to run on incoming queries. - -> IO (Manager h raw msg) -- ^ new rpc manager. + -> IO (Manager raw msg) -- ^ new rpc manager. newManager opts @ Options {..} logmsg servAddr handlers = do validateOptions opts sock <- bindServ tref <- newEmptyMVar tran <- newIORef optSeedTransaction calls <- newIORef M.empty - return $ Manager sock opts tref tran calls handlers logmsg + return $ Manager sock opts tref tran calls logmsg where bindServ = do let family = sockAddrFamily servAddr @@ -230,7 +230,7 @@ newManager opts @ Options {..} logmsg servAddr handlers = do return sock -- | Unblock all pending calls and close socket. -closeManager :: Manager m raw msg -> IO () +closeManager :: Manager raw msg -> IO () closeManager Manager {..} = do maybe (return ()) killThread =<< tryTakeMVar listenerThread -- TODO unblock calls @@ -238,7 +238,7 @@ closeManager Manager {..} = do -- | Check if the manager is still active. Manager becomes active -- until 'closeManager' called. -isActive :: Manager m raw msg -> IO Bool +isActive :: Manager raw msg -> IO Bool isActive Manager {..} = liftIO $ isBound sock {-# INLINE isActive #-} @@ -246,7 +246,7 @@ isActive Manager {..} = liftIO $ isBound sock -- | Normally you should use Control.Monad.Trans.Resource.allocate -- function. withManager :: Options -> SockAddr -> [Handler h msg raw] - -> (Manager h raw msg -> IO a) -> IO a + -> (Manager raw msg -> IO a) -> IO a withManager opts addr hs = bracket (newManager opts addr hs) closeManager #endif @@ -289,7 +289,7 @@ genTransactionId ref = do uniqueTransactionId cur -- | How many times 'query' call have been performed. -getQueryCount :: Manager h raw msg -> IO Int +getQueryCount :: Manager raw msg -> IO Int getQueryCount mgr@Manager{..} = do curTrans <- readIORef transactionCounter return $ curTrans - optSeedTransaction options @@ -320,21 +320,21 @@ sendQuery sock addr q = handle sockError $ sendMessage sock addr q -- This function should throw 'QueryFailure' exception if quered node -- respond with @error@ message or the query timeout expires. -- -query :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO b +query :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager raw msg -> QueryMethod msg -> SockAddr -> a -> IO b query mgr meth addr params = queryK mgr meth addr params (\_ x _ -> x) -- | Like 'query' but possibly returns your externally routable IP address. -query' :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, Maybe ReflectedIP) +query' :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, Maybe ReflectedIP) query' mgr meth addr params = queryK mgr meth addr params (const (,)) -- | Enqueue a query, but give us the complete BEncoded content sent by the -- remote Node. This is useful for handling extensions that this library does -- not otherwise support. -queryRaw :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, raw) +queryRaw :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, raw) queryRaw mgr meth addr params = queryK mgr meth addr params (\raw x _ -> (x,raw)) queryK :: forall h a b x raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => - Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> IO x + Manager raw msg -> QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> IO x queryK mgr@Manager{..} meth addr params kont = do tid <- liftIO $ genTransactionId transactionCounter -- let queryMethod = method :: Method a b @@ -424,7 +424,7 @@ handler name body = (name, wrapper) runHandler :: ( Envelope msg , Show (QueryMethod msg) , Serialize (TransactionID msg)) - => Manager IO raw msg -> QueryMethod msg -> HandlerBody IO msg raw -> SockAddr -> msg raw -> IO (KResult msg raw) + => Manager raw msg -> QueryMethod msg -> HandlerBody IO msg raw -> SockAddr -> msg raw -> IO (KResult msg raw) runHandler mgr@Manager{..} meth h addr m = Lifted.catches wrapper failbacks where signature = querySignature meth (envelopeTransaction m) addr @@ -462,7 +462,7 @@ dispatchHandler :: ( Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) , Envelope msg - ) => Manager IO raw msg -> [Handler IO msg raw] -> QueryMethod msg -> msg raw -> SockAddr -> IO (KResult msg raw) + ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> msg raw -> SockAddr -> IO (KResult msg raw) dispatchHandler mgr handlers meth q addr = do case L.lookup meth handlers of Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q) @@ -483,7 +483,7 @@ handleQuery :: ( WireFormat raw msg , Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) - ) => Manager IO raw msg -> [Handler IO msg raw] -> QueryMethod msg -> raw -> msg raw -> SockAddr -> IO () + ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> raw -> msg raw -> SockAddr -> IO () handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" res <- dispatchHandler mgr hs meth q addr @@ -501,7 +501,7 @@ handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do handleResponse :: ( Ord (TransactionID msg) , Envelope msg - ) => Manager IO raw msg -> raw -> KResult msg raw -> SockAddr -> IO () + ) => Manager raw msg -> raw -> KResult msg raw -> SockAddr -> IO () handleResponse mgr@Manager{..} raw result addr = do liftIO $ do let resultId = either errorId envelopeTransaction result @@ -520,7 +520,7 @@ listener :: forall raw msg. , Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) - ) => Manager IO raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () + ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () listener mgr@Manager{..} hs p = do fix $ \again -> do let ctx = error "TODO TOX ToxCipherContext or () for Mainline" @@ -551,7 +551,7 @@ listen :: ( WireFormat raw msg , Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) - ) => Manager IO raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () + ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () listen mgr@Manager{..} hs p = do tid <- fork $ do myThreadId >>= liftIO . flip labelThread "KRPC.listen" -- cgit v1.2.3