-- | -- Copyright : (c) Sam Truzjan 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- This module provides functions to interact with other nodes. -- Normally, you don't need to import this module, use -- "Network.BitTorrent.DHT" instead. -- {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} module Network.BitTorrent.DHT.Query ( -- * Handler -- | To bind specific set of handlers you need to pass -- handler list to the 'startNode' function. pingH , findNodeH , getPeersH , announceH , defaultHandlers -- * Query -- ** Basic -- | A basic query perform a single request expecting a -- single response. , Iteration , pingQ , findNodeQ , getPeersQ , announceQ -- ** Iterative -- | An iterative query perform multiple basic queries, -- concatenate its responses, optionally yielding result and -- continue to the next iteration. , Search , search , publish -- ** Routing table , insertNode , refreshNodes -- ** Messaging , queryNode , (<@>) ) where import Control.Applicative import Control.Concurrent.Lifted hiding (yield) import Control.Exception.Lifted hiding (Handler) import Control.Monad.Reader import Control.Monad.Logger import Data.Maybe import Data.Conduit import Data.Conduit.List as C hiding (mapMaybe, mapM_) import Data.Either import Data.List as L import Data.Monoid import Data.Text as T import Network import Text.PrettyPrint as PP hiding ((<>), ($$)) import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Data.Time import Data.Time.Clock.POSIX import Network.KRPC hiding (Options, def) import Network.KRPC.Message (ReflectedIP(..)) import Data.Torrent import Network.BitTorrent.Address import Network.BitTorrent.DHT.Message import Network.BitTorrent.DHT.Routing as R import Network.BitTorrent.DHT.Session import Control.Concurrent.STM {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} nodeHandler :: Address ip => KRPC (Query a) (Response b) => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip nodeHandler action = handler $ \ sockAddr (Query remoteId read_only q) -> do case fromSockAddr sockAddr of Nothing -> throwIO BadAddress Just naddr -> do let ni = NodeInfo remoteId naddr -- Do not route read-only nodes. (bep 43) if read_only then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) else insertNode ni Nothing >> return () -- TODO need to block. why? Response <$> myNodeIdAccordingTo naddr <*> action naddr q -- | Default 'Ping' handler. pingH :: Address ip => NodeHandler ip pingH = nodeHandler $ \ _ Ping -> do return Ping -- | Default 'FindNode' handler. findNodeH :: Address ip => NodeHandler ip findNodeH = nodeHandler $ \ _ (FindNode nid) -> do NodeFound <$> getClosest nid -- | Default 'GetPeers' handler. getPeersH :: Address ip => NodeHandler ip getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do ps <- getPeerList ih tok <- grantToken naddr $(logDebugS) "getPeersH" $ "INFO-HASH " <> T.pack (show (ih,fmap fromAddr naddr :: NodeAddr (Maybe IP))) return $ GotPeers ps tok -- | Default 'Announce' handler. announceH :: Address ip => NodeHandler ip announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do valid <- checkToken naddr sessionToken unless valid $ do throwIO $ InvalidParameter "token" let annPort = if impliedPort then nodePort else port let peerAddr = PeerAddr Nothing nodeHost annPort insertPeer topic peerAddr return Announced -- | Includes all default query handlers. defaultHandlers :: Address ip => [NodeHandler ip] defaultHandlers = [pingH, findNodeH, getPeersH, announceH] {----------------------------------------------------------------------- -- Basic queries -----------------------------------------------------------------------} type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) -- | The most basic query. May be used to check if the given node is -- alive or get its 'NodeId'. pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip, Maybe ReflectedIP) pingQ addr = do (nid, Ping, mip) <- queryNode' addr Ping return (NodeInfo nid addr, mip) -- TODO [robustness] match range of returned node ids with the -- expected range and either filter bad nodes or discard response at -- all throwing an exception findNodeQ :: Address ip => TableKey key => key -> Iteration ip NodeInfo findNodeQ key NodeInfo {..} = do NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr $(logInfoS) "findNodeQ" $ "NodeFound " <> T.pack (show $ L.map pPrint closest) return $ Right closest getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr getPeersQ topic NodeInfo {..} = do GotPeers {..} <- GetPeers topic <@> nodeAddr let dist = distance (toNodeId topic) nodeId $(logInfoS) "getPeersQ" $ T.pack $ "distance: " <> render (pPrint dist) <> " , result: " <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } return peers announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr announceQ ih p NodeInfo {..} = do GotPeers {..} <- GetPeers ih <@> nodeAddr case peers of Left ns | False -> undefined -- TODO check if we can announce | otherwise -> return (Left ns) Right ps -> do -- TODO *probably* add to peer cache Announced <- Announce False ih p grantedToken <@> nodeAddr return (Right [nodeAddr]) {----------------------------------------------------------------------- -- Iterative queries -----------------------------------------------------------------------} type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] -- TODO: use reorder and filter (Traversal option) leftovers search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o search _ action = do awaitForever $ \ batch -> unless (L.null batch) $ do $(logWarnS) "search" "start query" responses <- lift $ queryParallel (action <$> batch) let (nodes, results) = partitionEithers responses $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) leftover $ L.concat nodes mapM_ yield results publish :: Address ip => InfoHash -> PortNumber -> DHT ip () publish ih p = do nodes <- getClosest ih r <- asks (optReplication . options) _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r return () republish :: DHT ip ThreadId republish = fork $ do i <- asks (optReannounce . options) error "DHT.republish: not implemented" getTimestamp :: DHT ip Timestamp getTimestamp = do utcTime <- liftIO $ getCurrentTime $(logDebugS) "routing.make_timestamp" (T.pack (render (pPrint utcTime))) return $ utcTimeToPOSIXSeconds utcTime probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) probeNode addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) result <- try $ pingQ addr let _ = fmap (const ()) result :: Either SomeException () return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result -- FIXME do not use getClosest sinse we should /refresh/ them refreshNodes :: Address ip => NodeId -> DHT ip () -- [NodeInfo ip] refreshNodes nid = do $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) nodes <- getClosest nid do -- forM (L.take 1 nodes) $ \ addr -> do -- NodeFound ns <- FindNode nid <@> addr -- Expected type: ConduitM [NodeAddr ip] [NodeInfo ip] (DHT ip) () -- Actual type: ConduitM [NodeInfo ip] [NodeInfo ip] (DHT ip) () -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." queryParallel $ flip L.map (L.concat nss) $ \n -> do $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) pingQ (nodeAddr n) -- pingQ takes care of inserting the node. return () return () -- $ L.concat nss -- | This operation do not block but acquire exclusive access to -- routing table. insertNode :: Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId insertNode info witnessed_ip = fork $ do var <- asks routingInfo tm <- getTimestamp let showTable = do t <- getTable let logMsg = "Routing table: " <> pPrint t $(logDebugS) "insertNode" (T.pack (render logMsg)) t <- liftIO $ atomically $ readTVar var let arrival = TryInsert info arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) $(logDebugS) "insertNode" $ T.pack (show arrival4) maxbuckets <- asks (optBucketCount . options) fallbackid <- asks tentativeNodeId let atomicInsert arrival witnessed_ip = do minfo <- readTVar var let change ip = fromMaybe fallbackid $ listToMaybe $ rank id (nodeId $ foreignNode arrival) $ bep42s ip fallbackid case minfo of Just info -> do (ps,t') <- R.insert tm arrival $ myBuckets info -- TODO: Check witnessed_ip against myAddress. -- If 3 nodes witness a different address, change the table. -- Require these witnesses satisfy bep-42 and that their -- first 3 bits are unique. writeTVar var $ Just $ info { myBuckets = t' } return ps -- Ignore non-witnessing nodes until somebody tells -- us our ip address. Nothing -> fromMaybe (return []) $ do ReflectedIP ip0 <- witnessed_ip ip <- fromSockAddr ip0 let nil = nullTable (change ip) maxbuckets return $ do (ps,t') <- R.insert tm arrival nil writeTVar var $ Just $ R.Info t' (change ip) ip return ps ps <- liftIO $ atomically $ atomicInsert arrival witnessed_ip showTable fork $ forM_ ps $ \(CheckPing ns)-> do forM_ ns $ \n -> do (b,mip) <- probeNode (nodeAddr n) let alive = PingResult n b $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) tm <- getTimestamp liftIO $ atomically $ atomicInsert alive mip showTable return () -- | Throws exception if node is not responding. queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) => NodeAddr ip -> a -> DHT ip (NodeId, b) queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) => NodeAddr ip -> a -> DHT ip (NodeId, b, Maybe ReflectedIP) queryNode' addr q = do nid <- myNodeIdAccordingTo addr let read_only = False -- TODO: check for NAT issues. (BEP 43) (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) $(logInfoS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) <> " by " <> T.pack (show (toSockAddr addr)) insertNode (NodeInfo remoteId addr) witnessed_ip return (remoteId, r, witnessed_ip) -- | Infix version of 'queryNode' function. (<@>) :: Address ip => KRPC (Query a) (Response b) => a -> NodeAddr ip -> DHT ip b q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-}