-- | -- Copyright : (c) Sam Truzjan 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- This module provides functions to interact with other nodes. -- Normally, you don't need to import this module, use -- "Network.BitTorrent.DHT" instead. -- {-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE GADTs #-} module Network.BitTorrent.DHT.Query ( -- * Handler -- | To bind specific set of handlers you need to pass -- handler list to the 'startNode' function. pingH , findNodeH , getPeersH , announceH , defaultHandlers -- * Query -- ** Basic -- | A basic query perform a single request expecting a -- single response. , Iteration , pingQ , findNodeQ , getPeersQ , announceQ -- ** Iterative -- | An iterative query perform multiple basic queries, -- concatenate its responses, optionally yielding result and -- continue to the next iteration. , Search , search , publish , ioFindNode , ioGetPeers , isearch -- ** Routing table , insertNode , refreshNodes -- ** Messaging , queryNode , queryNode' , (<@>) ) where #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument hiding (yield) #else import GHC.Conc (labelThread) import Control.Concurrent.Lifted hiding (yield) #endif import Control.Exception.Lifted hiding (Handler) import Control.Monad.Reader import Control.Monad.Logger import Data.Maybe import Data.Conduit import Data.Conduit.List as C hiding (mapMaybe, mapM_) import Data.Either import Data.List as L import Data.Monoid import Data.Text as T import qualified Data.Set as Set ;import Data.Set (Set) import Network import Text.PrettyPrint as PP hiding ((<>), ($$)) import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Data.Time import Data.Time.Clock.POSIX import Network.DatagramServer as KRPC hiding (Options, def) import Network.KRPC.Method as KRPC import Network.DatagramServer.Mainline (ReflectedIP(..)) import Network.DatagramServer (QueryFailure(..)) import Data.Torrent import Network.DHT.Mainline import Network.DHT.Routing as R import Network.BitTorrent.DHT.Session import Control.Concurrent.STM import qualified Network.BitTorrent.DHT.Search as Search #ifdef VERSION_bencoding import Data.BEncode (BValue) import Network.DatagramServer.Mainline (KMessageOf) #else import Data.ByteString (ByteString) import Network.DatagramServer.Tox #endif import Network.Address hiding (NodeId) import Network.DatagramServer.Types as RPC hiding (Query,Response) {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} nodeHandler :: ( Address ip , KRPC (Query a) (Response b) #ifdef VERSION_bencoding , KRPC.Envelope (Query a) (Response b) ~ BValue ) #else , KPRC.Envelope (Query a) (Response b) ~ ByteString ) #endif => QueryMethod KMessageOf -> (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip nodeHandler method action = handler method $ \ sockAddr qry -> do #ifdef VERSION_bencoding let remoteId = queringNodeId qry read_only = queryIsReadOnly qry q = queryParams qry #else let remoteId = msgClient qry read_only = False q = msgPayload qry #endif case fromSockAddr sockAddr of Nothing -> throwIO BadAddress Just naddr -> do let ni = NodeInfo remoteId naddr () -- Do not route read-only nodes. (bep 43) if read_only then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) else insertNode ni Nothing >> return () -- TODO need to block. why? Response <$> myNodeIdAccordingTo naddr <*> action naddr q -- | Default 'Ping' handler. pingH :: Address ip => NodeHandler ip #ifdef VERSION_bencoding pingH = nodeHandler "ping" $ \ _ Ping -> return Ping #else pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } #endif -- | Default 'FindNode' handler. findNodeH :: Address ip => NodeHandler ip findNodeH = nodeHandler "find-nodes" $ \ _ (FindNode nid) -> do NodeFound <$> getClosest nid #ifdef VERSION_bencoding -- | Default 'GetPeers' handler. getPeersH :: Ord ip => Address ip => NodeHandler ip getPeersH = nodeHandler "get_peers" $ \ naddr (GetPeers ih) -> do ps <- getPeerList ih tok <- grantToken naddr return $ GotPeers ps tok -- | Default 'Announce' handler. announceH :: Ord ip => Address ip => NodeHandler ip announceH = nodeHandler "announce_peer" $ \ naddr @ NodeAddr {..} (Announce {..}) -> do valid <- checkToken naddr sessionToken unless valid $ do throwIO $ InvalidParameter "token" let annPort = if impliedPort then nodePort else port peerAddr = PeerAddr Nothing nodeHost annPort insertPeer topic announcedName peerAddr return Announced -- | Includes all default query handlers. defaultHandlers :: Ord ip => Address ip => [NodeHandler ip] defaultHandlers = [pingH, findNodeH, getPeersH, announceH] #else -- | Includes all default query handlers. defaultHandlers :: Ord ip => Address ip => [NodeHandler ip] defaultHandlers = [pingH, findNodeH] #endif {----------------------------------------------------------------------- -- Basic queries -----------------------------------------------------------------------} type Iteration ip o = NodeInfo KMessageOf ip () -> DHT ip (Either [NodeInfo KMessageOf ip ()] [o ip]) -- | The most basic query. May be used to check if the given node is -- alive or get its 'NodeId'. pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo KMessageOf ip (), Maybe ReflectedIP) pingQ addr = do #ifdef VERSION_bencoding (nid, Ping, mip) <- queryNode' addr Ping #else (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} #endif return (NodeInfo nid addr (), mip) -- TODO [robustness] match range of returned node ids with the -- expected range and either filter bad nodes or discard response at -- all throwing an exception -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo findNodeQ key NodeInfo {..} = do NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr $(logInfoS) "findNodeQ" $ "NodeFound\n" <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) return $ Right closest #ifdef VERSION_bencoding getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr getPeersQ topic NodeInfo {..} = do GotPeers {..} <- GetPeers topic <@> nodeAddr let dist = distance (toNodeId topic) nodeId $(logInfoS) "getPeersQ" $ T.pack $ "distance: " <> render (pPrint dist) <> " , result: " <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } return peers announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr announceQ ih p NodeInfo {..} = do GotPeers {..} <- GetPeers ih <@> nodeAddr case peers of Left ns | False -> undefined -- TODO check if we can announce | otherwise -> return (Left ns) Right _ -> do -- TODO *probably* add to peer cache Announced <- Announce False ih Nothing p grantedToken <@> nodeAddr return (Right [nodeAddr]) #endif {----------------------------------------------------------------------- -- Iterative queries -----------------------------------------------------------------------} ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) ioGetPeers ih = do session <- ask return $ \ni -> runDHT session $ do r <- try $ getPeersQ ih ni case r of Right e -> return $ either (,[]) ([],) e Left e -> let _ = e :: QueryFailure in return ([],[]) ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [NodeInfo KMessageOf ip ()])) ioFindNode ih = do session <- ask return $ \ni -> runDHT session $ do NodeFound ns <- FindNode (toNodeId ih) <@> nodeAddr ni return $ L.partition (\n -> nodeId n /= toNodeId ih) ns isearch :: (Ord r, Ord ip) => (InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r]))) -> InfoHash -> DHT ip (ThreadId, Search.IterativeSearch ip r) isearch f ih = do qry <- f ih ns <- kclosest 8 ih <$> getTable liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns a <- fork $ do tid <- myThreadId labelThread tid ("search."++show ih) Search.search s -- atomically $ readTVar (Search.searchResults s) return (a, s) type Search ip o = Conduit [NodeInfo KMessageOf ip ()] (DHT ip) [o KMessageOf ip ()] -- TODO: use reorder and filter (Traversal option) leftovers -- search :: k -> IterationI ip o -> Search ip o search _ action = do awaitForever $ \ batch -> unless (L.null batch) $ do $(logWarnS) "search" "start query" responses <- lift $ queryParallel (action <$> batch) let (nodes, results) = partitionEithers responses $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) leftover $ L.concat nodes mapM_ yield results publish :: Address ip => InfoHash -> PortNumber -> DHT ip () publish ih p = do nodes <- getClosest ih r <- asks (optReplication . options) _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r return () probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) probeNode addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) result <- try $ pingQ addr let _ = fmap (const ()) result :: Either QueryFailure () return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result -- FIXME do not use getClosest sinse we should /refresh/ them refreshNodes :: Address ip => NodeId KMessageOf -> DHT ip () -- [NodeInfo KMessageOf ip ()] refreshNodes nid = do $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) nodes <- getClosest nid do -- forM (L.take 1 nodes) $ \ addr -> do -- NodeFound ns <- FindNode nid <@> addr -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) pingQ (nodeAddr n) -- pingQ takes care of inserting the node. return () return () -- $ L.concat nss -- | This operation do not block but acquire exclusive access to -- routing table. insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () insertNode info witnessed_ip0 = do var <- asks routingInfo tm <- getTimestamp let showTable = do t <- getTable let logMsg = "Routing table: " <> pPrint t $(logDebugS) "insertNode" (T.pack (render logMsg)) let arrival0 = TryInsert info arrival4 = TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ $(logDebugS) "insertNode" $ T.pack (show arrival4) maxbuckets <- asks (optBucketCount . options) fallbackid <- asks tentativeNodeId let atomicInsert arrival witnessed_ip = do minfo <- readTVar var let change ip0 = fromMaybe fallbackid $ do ip <- fromSockAddr ip0 :: Maybe ip listToMaybe $ rank id (nodeId $ foreignNode arrival) $ bep42s ip fallbackid case minfo of Just inf -> do (ps,t') <- R.insert tm arrival $ myBuckets inf writeTVar var $ Just $ inf { myBuckets = t' } return $ do case witnessed_ip of Just (ReflectedIP ip) | ip /= myAddress inf -> $(logInfo) ( T.pack $ L.unwords $ [ "Possible NAT?" , show (toSockAddr $ nodeAddr $ foreignNode arrival) , "reports my address:" , show ip ] ) -- TODO: Let routing table vote on my IP/NodeId. _ -> return () return ps Nothing -> let dropped = return $ do -- Ignore non-witnessing nodes until somebody tells -- us our ip address. $(logWarnS) "insertNode" ("Dropped " <> T.pack (show (toSockAddr $ nodeAddr $ foreignNode arrival))) return [] in fromMaybe dropped $ do ReflectedIP ip <- witnessed_ip let nil = nullTable (change ip) maxbuckets return $ do (ps,t') <- R.insert tm arrival nil let new_info = R.Info t' (change ip) ip writeTVar var $ Just new_info return $ do $(logInfo) ( T.pack $ L.unwords [ "External IP address:" , show ip , "(reported by" , show (toSockAddr $ nodeAddr $ foreignNode arrival) <> ")" ] ) return ps ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0 showTable _ <- fork $ do myThreadId >>= liftIO . flip labelThread "DHT.insertNode.pingResults" forM_ ps $ \(CheckPing ns)-> do forM_ ns $ \n -> do (b,mip) <- probeNode (nodeAddr n) let alive = PingResult n b $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) _ <- join $ liftIO $ atomically $ atomicInsert alive mip showTable return () -- | Throws exception if node is not responding. queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b) queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b, Maybe ReflectedIP) queryNode' addr q = do nid <- myNodeIdAccordingTo addr let read_only = False -- TODO: check for NAT issues. (BEP 43) let KRPC.Method name = KRPC.method :: KRPC.Method (Query a) (Response b) (Response remoteId r, witnessed_ip) <- query' name (toSockAddr addr) (Query nid read_only q) -- $(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) -- <> " by " <> T.pack (show (toSockAddr addr)) _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip return (remoteId, r, witnessed_ip) -- | Infix version of 'queryNode' function. (<@>) :: Address ip => KRPC (Query a) (Response b) => a -> NodeAddr ip -> DHT ip b q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-}