{-# LANGUAGE NondecreasingIndentation #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE CPP #-} import Control.Arrow import Control.Monad import Control.Monad.Logger import Control.Monad.Reader import Data.Char import Data.Default import Data.List as L import Data.Maybe import Data.String import qualified Data.Set as Set import qualified Data.ByteString as B (ByteString,writeFile,readFile) ; import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as B8 import System.IO import System.IO.Error import Text.PrettyPrint.HughesPJClass import Text.Printf import Text.Read import Control.Monad.Reader.Class import System.Posix.Process (getProcessID) import GHC.Stats import System.Mem import Data.Torrent (InfoHash) import Network.BitTorrent.Address import Network.BitTorrent.DHT import Network.BitTorrent.DHT.Search import Network.BitTorrent.DHT.Query import Network.BitTorrent.DHT.Message (FindNode(..),NodeFound(..),GetPeers(..),GotPeers(..)) import Network.KRPC.Manager (QueryFailure(..)) import Network.KRPC.Message (ReflectedIP(..),KMessageOf) import qualified Network.BitTorrent.DHT.Routing as R import Network.BitTorrent.DHT.Session import Network.SocketLike import Network.StreamServer import Control.Exception.Lifted as Lifted #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument import Data.Time () import Data.Time.Clock #else import Control.Concurrent #endif import Control.Concurrent.STM import System.Environment mkNodeAddr :: SockAddr -> NodeAddr IPv4 mkNodeAddr addr = NodeAddr (fromJust $ fromSockAddr addr) (fromMaybe 0 $ sockAddrPort addr) -- FIXME btBindAddr :: String -> Bool -> IO (NodeAddr IPv4) btBindAddr s b = mkNodeAddr <$> getBindAddress s b printReport :: MonadIO m => [(String,String)] -> m () printReport kvs = liftIO $ do putStrLn (showReport kvs) hFlush stdout showReport :: [(String,String)] -> String showReport kvs = do let colwidth = maximum $ map (length . fst) kvs (k,v) <- kvs concat [ printf " %-*s" (colwidth+1) k, v, "\n" ] showEnry :: Show a => (NodeInfo KMessageOf a (), t) -> [Char] showEnry (n,_) = intercalate " " [ show $ pPrint (nodeId n) , show $ nodeAddr n ] printTable :: DHT IPv4 () printTable = do t <- showTable liftIO $ do putStrLn t hFlush stdout showTable :: DHT IPv4 String showTable = do nodes <- R.toList <$> getTable return $ showReport $ map (show *** showEnry) $ concat $ zipWith map (map (,) [0::Int ..]) nodes bootstrapNodes :: IO [NodeAddr IPv4] bootstrapNodes = mapMaybe fromAddr <$> mapM resolveHostName defaultBootstrapNodes -- ExtendedCaps (Map.singleton noDebugPrints :: LogSource -> LogLevel -> Bool noDebugPrints _ = \case LevelDebug -> False LevelOther _ -> False _ -> True noLogging :: LogSource -> LogLevel -> Bool noLogging _ _ = False allNoise :: LogSource -> LogLevel -> Bool allNoise _ _ = True resume :: DHT IPv4 (Maybe B.ByteString) resume = do restore_attempt <- liftIO $ tryIOError $ B.readFile "dht-nodes.dat" saved_nodes <- either (const $ do liftIO $ putStrLn "Error reading dht-nodes.dat" return Nothing) (return . Just) restore_attempt return saved_nodes godht :: String -> (NodeAddr IPv4 -> NodeId -> DHT IPv4 b) -> IO b godht p f = do a <- btBindAddr p False dht def { optTimeout = 5 } a noDebugPrints $ do me0 <- asks tentativeNodeId printReport [("tentative node-id",show $ pPrint me0) ,("listen-address", show a) ] f a me0 marshalForClient :: String -> String marshalForClient s = show (length s) ++ ":" ++ s hPutClient :: Handle -> String -> IO () hPutClient h s = hPutStr h ('.' : marshalForClient s) hPutClientChunk :: Handle -> String -> IO () hPutClientChunk h s = hPutStr h (' ' : marshalForClient s) clientSession :: Node IPv4 -> MVar () -> RestrictedSocket -> Int -> Handle -> IO () clientSession st signalQuit sock n h = do line <- map toLower . dropWhile isSpace <$> hGetLine h let cmd0 action = action >> clientSession st signalQuit sock n h cmd action = cmd0 $ join $ runDHT st action (c,args) = second (dropWhile isSpace) $ break isSpace line case (c,args) of ("quit", _) -> hPutClient h "" >> hClose h ("stop", _) -> do hPutClient h "Terminating DHT Daemon." hClose h putMVar signalQuit () ("ls", _) -> cmd $ do tbl <- getTable t <- showTable me <- myNodeIdAccordingTo (read "8.8.8.8:6881") ip <- routableAddress return $ do hPutClient h $ unlines [ t , showReport [ ("node-id", show $ pPrint me) , ("internet address", show ip) , ("buckets", show $ R.shape tbl)] ] ("external-ip", _) -> cmd $ do ip <- routableAddress return $ do hPutClient h $ maybe "" (takeWhile (/=':') . show) ip ("swarms", s) -> cmd $ do let fltr = case s of ('-':'v':cs) | all isSpace (take 1 cs) -> const True _ -> (\(h,c,n) -> c/=0 ) ss <- getSwarms let r = map (\(h,c,n) -> (unwords [show h,show c], maybe "" show n)) $ filter fltr ss return $ do hPutClient h $ showReport r ("peers", s) -> cmd $ case readEither s of Right ih -> do ps <- allPeers ih seq ih $ return $ do hPutClient h $ showReport $ map (((,) "") . show . pPrint) ps Left er -> return $ hPutClient h er ("pid", _) -> cmd $ return $ do pid <- getProcessID hPutClient h (show pid) #ifdef THREAD_DEBUG ("threads", _) -> cmd $ return $ do ts <- threadsInformation tm <- getCurrentTime let r = map (\PerThread{..} -> (show lbl,show (diffUTCTime tm startTime))) ts hPutClient h $ showReport r ("mem", s) -> cmd $ return $ do case s of "gc" -> do hPutClient h "Performing garbage collection..." performMajorGC "" -> do is_enabled <- getGCStatsEnabled if is_enabled then do GCStats{..} <- getGCStats let r = [ ("bytesAllocated", show bytesAllocated) , ("numGcs", show numGcs) , ("maxBytesUsed", show maxBytesUsed) , ("numByteUsageSamples", show numByteUsageSamples) , ("cumulativeBytesUsed", show cumulativeBytesUsed) , ("bytesCopied", show bytesCopied) , ("currentBytesUsed", show currentBytesUsed) , ("currentBytesSlop", show currentBytesSlop) , ("maxBytesSlop", show maxBytesSlop) , ("peakMegabytesAllocated", show peakMegabytesAllocated) , ("mutatorCpuSeconds", show mutatorCpuSeconds) , ("mutatorWallSeconds", show mutatorWallSeconds) , ("gcCpuSeconds", show gcCpuSeconds) , ("gcWallSeconds", show gcWallSeconds) , ("cpuSeconds", show cpuSeconds) , ("wallSeconds", show wallSeconds) , ("parTotBytesCopied", show parTotBytesCopied) , ("parMaxBytesCopied", show parMaxBytesCopied) ] hPutClient h $ showReport r else hPutClient h "Run with +RTS -T to obtain live memory-usage information." _ -> hPutClient h "error." #endif ("closest", s) -> cmd $ do let (ns,hs) = second (dropWhile isSpace) $ break isSpace s parse | null hs = do ih <- readEither ns return (8 :: Int, ih :: InfoHash) | otherwise = do n <- readEither ns ih <- readEither hs return (n :: Int, ih :: InfoHash) case parse of Right (n,ih) -> do tbl <- getTable let nodes = R.kclosest n ih tbl return $ do hPutClient h $ unlines $ map (showEnry . (flip (,) 0)) nodes Left er -> return $ hPutClient h er ("ping", s) -> cmd $ do case readEither s of Right addr -> do result <- try $ pingQ addr let rs = either (pure . showQueryFail) reportPong result return $ do hPutClient h $ unlines rs Left er -> return $ hPutClient h er ("find-nodes", s) -> cmd $ do let (hs,as) = second (dropWhile isSpace) $ break isSpace s parse = do ih <- readEither hs a <- readEither as -- XXX: using 'InfoHash' only because 'NodeId' currently -- has no 'Read' instance. return (ih :: InfoHash, a :: NodeAddr IPv4) case parse of Right (ih,a) -> do result <- try $ queryNode' (a ::NodeAddr IPv4) $ FindNode (R.toNodeId ih) let rs = either (pure . showQueryFail) reportNodes result return $ do hPutClient h $ unlines rs Left er -> return $ hPutClient h er ("get-peers", s) -> cmd $ do let (hs,as) = second (dropWhile isSpace) $ break isSpace s parse = do ih <- readEither hs a <- readEither as return (ih :: InfoHash, a :: NodeAddr IPv4) case parse of Right (ih,a) -> do result <- try $ queryNode' (a ::NodeAddr IPv4) $ GetPeers ih let rs = either (pure . ( (,) "error" ) . showQueryFail) reportPeers result return $ do hPutClient h $ showReport rs Left er -> return $ hPutClient h er ("search-peers", s) -> cmd $ do case readEither s of Right ih -> do (tid, s) <- isearch ioGetPeers ih flip fix Set.empty $ \again shown -> do (chk,fin) <- liftIO . atomically $ do r <- (Set.\\ shown) <$> readTVar (searchResults s) if not $ Set.null r then (,) r <$> searchIsFinished s else searchIsFinished s >>= check >> return (Set.empty,True) let ps = case Set.toList chk of [] -> "" _ -> unlines $ map (show . pPrint) $ Set.toList chk if fin then return $ hPutClient h ps else do liftIO $ hPutClientChunk h ps again (shown `Set.union` chk) Left er -> return $ hPutClient h er _ -> cmd0 $ hPutClient h "error." defaultPort = error "TODO defaultPort" showQueryFail :: QueryFailure -> String showQueryFail e = show e consip (ReflectedIP ip) xs = ("(external-ip " ++ show ip ++ ")") : xs consip' (ReflectedIP ip) xs = ("to", show ip) : xs reportPong (info,myip) = maybe id consip myip [show $ pPrint info] reportNodes :: (NodeId, NodeFound IPv4, Maybe ReflectedIP) -> [String] reportNodes (nid,NodeFound ns,myip) = maybe id consip myip $ show (pPrint nid) : map (show . pPrint) ns reportPeers :: (NodeId, GotPeers IPv4, Maybe ReflectedIP) -> [(String,String)] reportPeers (nid,GotPeers r tok,myip) = maybe id consip' myip $ ("from", show (pPrint nid)) : ("token", show tok) : case r of Right ps -> map ( ( (,) "peer" ) . show . pPrint ) ps Left ns -> map ( ( (,) "node" ) . show . pPrint ) ns main :: IO () main = do args <- getArgs p <- case take 2 (dropWhile (/="-p") args) of ["-p",port] | not ("-" `isPrefixOf` port) -> return port ("-p":_) -> error "Port not specified! (-p PORT)" _ -> defaultPort godht p $ \a me0 -> do printTable bs <- liftIO bootstrapNodes `onException` (Lifted.ioError $ userError "unable to resolve bootstrap nodes") saved_nodes <- resume peers'trial <- liftIO $ tryIOError $ B.readFile "bt-peers.dat" saved_peers <- either (const $ do liftIO $ putStrLn "Error reading bt-peers.dat" return Nothing) (return . Just) peers'trial maybe (return ()) mergeSavedPeers saved_peers when (isJust saved_nodes) $ do b <- isBootstrapped tbl <- getTable bc <- optBucketCount <$> asks options printTable me <- case concat $ R.toList tbl of (n,_):_ -> myNodeIdAccordingTo (nodeAddr n) _ -> return me0 printReport [("node-id",show $ pPrint me) ,("listen-address", show a) ,("bootstrapped", show b) ,("buckets", show $ R.shape tbl) ,("optBucketCount", show bc) ,("dht-nodes.dat", "Running bootstrap...") ] st <- ask waitForSignal <- liftIO $ do signalQuit <- newEmptyMVar srv <- streamServer (withSession $ clientSession st signalQuit) (SockAddrUnix "dht.sock") return $ liftIO $ do () <- takeMVar signalQuit quitListening srv bootstrap saved_nodes bs b <- isBootstrapped tbl <- getTable bc <- optBucketCount <$> asks options printTable ip <- routableAddress me <- case concat $ R.toList tbl of (n,_):_ -> myNodeIdAccordingTo (nodeAddr n) _ -> return me0 printReport [("node-id",show $ pPrint me) ,("internet address", show ip) ,("listen-address", show a) ,("bootstrapped", show b) ,("buckets", show $ R.shape tbl) ,("optBucketCount", show bc) ] waitForSignal -- Await unix socket to signal termination. snapshot >>= liftIO . B.writeFile "dht-nodes.dat" savePeerStore >>= liftIO . B.writeFile "bt-peers.dat"