{-# LANGUAGE NondecreasingIndentation #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE CPP #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE TypeFamilies #-} import Control.Arrow import Control.Monad import Control.Monad.Logger import Control.Monad.Reader import Data.Char import Data.Default import Data.List as L import Data.Maybe import Data.String import qualified Data.Set as Set import qualified Data.ByteString as B (ByteString,writeFile,readFile) ; import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as B8 import System.IO import System.IO.Error import Text.PrettyPrint.HughesPJClass import Text.Printf import Text.Read hiding (get) import Control.Monad.Reader.Class import System.Posix.Process (getProcessID) import GHC.Stats import System.Mem import Data.Word import Data.Torrent (InfoHash) import Network.Address import Network.BitTorrent.DHT import Network.BitTorrent.DHT.Search import Network.BitTorrent.DHT.Query import Network.DHT.Mainline (FindNode(..),NodeFound(..),GetPeers(..),GotPeers(..)) import Network.DatagramServer (QueryFailure(..)) import Network.DatagramServer.Mainline (ReflectedIP(..),KMessageOf) import qualified Network.DatagramServer.Tox as Tox import qualified Network.DHT.Routing as R import Network.BitTorrent.DHT.Session import Network.SocketLike import Network.StreamServer import Control.Exception.Lifted as Lifted #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument import Data.Time () import Data.Time.Clock #else import Control.Concurrent #endif import Control.Concurrent.STM import System.Environment import Data.BEncode (BValue) import Network.DHT.Types import Network.DatagramServer.Types import Data.Bits import Data.Serialize import Network.KRPC.Method import Data.Typeable import GHC.Generics import Data.Bool import System.Random mkNodeAddr :: SockAddr -> NodeAddr IPv4 mkNodeAddr addr = NodeAddr (fromJust $ fromSockAddr addr) (fromMaybe 0 $ sockAddrPort addr) -- FIXME btBindAddr :: String -> Bool -> IO (NodeAddr IPv4) btBindAddr s b = mkNodeAddr <$> getBindAddress s b printReport :: MonadIO m => [(String,String)] -> m () printReport kvs = liftIO $ do putStrLn (showReport kvs) hFlush stdout showReport :: [(String,String)] -> String showReport kvs = do let colwidth = maximum $ map (length . fst) kvs (k,v) <- kvs concat [ printf " %-*s" (colwidth+1) k, v, "\n" ] showEnry :: ( Show a , Pretty (NodeId dht) ) => (NodeInfo dht a u, t) -> [Char] showEnry (n,_) = intercalate " " [ show $ pPrint (nodeId n) , show $ nodeAddr n ] printTable :: ( Pretty (NodeId dht) ) => DHT raw dht u IPv4 () printTable = do t <- showTable liftIO $ do putStrLn t hFlush stdout showTable :: ( Pretty (NodeId dht) ) => DHT raw dht u IPv4 String showTable = do nodes <- R.toList <$> getTable return $ showReport $ map (show *** showEnry) $ concat $ zipWith map (map (,) [0::Int ..]) nodes bootstrapNodes :: IO [NodeAddr IPv4] bootstrapNodes = mapMaybe fromAddr <$> mapM resolveHostName defaultBootstrapNodes -- ExtendedCaps (Map.singleton noDebugPrints :: LogSource -> LogLevel -> Bool noDebugPrints _ = \case LevelDebug -> False LevelOther _ -> False _ -> True noLogging :: LogSource -> LogLevel -> Bool noLogging _ _ = False allNoise :: LogSource -> LogLevel -> Bool allNoise _ _ = True resume :: DHT raw dht u IPv4 (Maybe B.ByteString) resume = do restore_attempt <- liftIO $ tryIOError $ B.readFile "dht-nodes.dat" saved_nodes <- either (const $ do liftIO $ putStrLn "Error reading dht-nodes.dat" return Nothing) (return . Just) restore_attempt return saved_nodes godht :: ( Eq (QueryMethod dht) , Show (QueryMethod dht) , Functor dht , Ord (TransactionID dht) , Serialize (TransactionID dht) , Kademlia dht , WireFormat raw dht , DataHandlers raw dht , SerializableTo raw (Query dht (FindNode dht IPv4)) , SerializableTo raw (Response dht (NodeFound dht IPv4)) , SerializableTo raw (Query dht (Ping dht)) , SerializableTo raw (Response dht (Ping dht)) , KRPC dht (Query dht (FindNode dht IPv4)) (Response dht (NodeFound dht IPv4)) , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Serialize (NodeId dht) , Show (NodeId dht) , Pretty (NodeId dht) , Pretty (NodeInfo dht IPv4 u) , Default u , Show u ) => String -> (NodeAddr IPv4 -> NodeId dht -> DHT raw dht u IPv4 b) -> IO b godht p f = do a <- btBindAddr p False dht def { optTimeout = 5 } a allNoise $ do me0 <- asks tentativeNodeId printReport [("tentative node-id",show $ pPrint me0) ,("listen-address", show a) ] f a me0 marshalForClient :: String -> String marshalForClient s = show (length s) ++ ":" ++ s hPutClient :: Handle -> String -> IO () hPutClient h s = hPutStr h ('.' : marshalForClient s) hPutClientChunk :: Handle -> String -> IO () hPutClientChunk h s = hPutStr h (' ' : marshalForClient s) data GenericDHT ip a = GenericDHT (forall raw dht u. ( Eq (QueryMethod dht) , Show (QueryMethod dht) , Functor dht , Ord (TransactionID dht) , Serialize (TransactionID dht) , Kademlia dht , WireFormat raw dht , DataHandlers raw dht , SerializableTo raw (Query dht (FindNode dht ip)) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (Ping dht)) , SerializableTo raw (Response dht (Ping dht)) , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , Ord (NodeId dht) , FiniteBits (NodeId dht) , Serialize (NodeId dht) , Show (NodeId dht) , Pretty (NodeId dht) , Pretty (NodeInfo dht ip ()) , Pretty (NodeInfo dht ip u) , Default u , Show u , Read (NodeId dht) ) => DHT raw dht u ip a) | BtDHT (DHT BValue KMessageOf () ip a) dhtType :: DHT raw dht u ip (Proxy dht) dhtType = return Proxy nodeIdType :: NodeId dht -> DHT raw dht u ip () nodeIdType _ = return () nodeAddrType :: NodeAddr ip -> DHT raw dht u ip () nodeAddrType _ = return () ipType :: f dht ip -> DHT raw dht u ip () ipType _ = return () instance Kademlia Tox.Message where data DHTData Tox.Message ip = ToxData namePing _ = Tox.Ping nameFindNodes _ = Tox.GetNodes initializeDHTData = return ToxData instance Pretty (NodeId Tox.Message) where pPrint (Tox.NodeId nid) = encodeHexDoc nid getToxPing isPong c n = do q'r <- get :: Get Word8 when (bool 0 1 isPong /= q'r) $ fail "Tox ping/pong parse fail." n8 <- get :: Get Tox.Nonce8 return $ c (n n8) Ping putToxPing isPong n8 = do put (bool 0 1 isPong :: Word8) put n8 instance Serialize (Query Tox.Message (Ping Tox.Message)) where get = getToxPing False Network.DHT.Types.Query Tox.QueryNonce put (Network.DHT.Types.Query extra Ping) = putToxPing False (Tox.qryNonce extra) instance Serialize (Response Tox.Message (Ping Tox.Message)) where get = getToxPing True Network.DHT.Types.Response Tox.ResponseNonce put (Network.DHT.Types.Response extra Ping) = putToxPing True (Tox.rspNonce extra) nodeFormatToNodeInfo nf = NodeInfo nid addr u where u = Tox.nodeIsTCP nf addr = NodeAddr (Tox.nodeIP nf) (Tox.nodePort nf) nid = Tox.nodePublicKey nf instance Serialize (Query Tox.Message (FindNode Tox.Message ip)) where get = do nid <- get n8 <- get return $ Network.DHT.Types.Query (Tox.QueryNonce n8) (FindNode nid) put (Network.DHT.Types.Query (Tox.QueryNonce n8) (FindNode nid)) = do put nid put n8 instance Serialize (Response Tox.Message (NodeFound Tox.Message IPv4)) where get = do num <- get :: Get Word8 when (num > 4) $ fail "Too many nodes in Tox get-nodes reply" ns0 <- sequence $ replicate (fromIntegral num) (nodeFormatToNodeInfo <$> get) -- TODO: Allow tcp and ipv6. For now filtering to udp ip4... let ns = flip mapMaybe ns0 $ \(NodeInfo nid addr u) -> do guard $ not u ip4 <- fromAddr addr return $ NodeInfo nid ip4 () n8 <- get return $ Network.DHT.Types.Response (Tox.ResponseNonce n8) $ NodeFound ns put (Network.DHT.Types.Response (Tox.ResponseNonce n8) (NodeFound ns)) = do put ( fromIntegral (length ns) :: Word8 ) forM_ ns $ \(NodeInfo nid ip4 ()) -> do put Tox.NodeFormat { nodePublicKey = nid , nodeIsTCP = False , nodeIP = IPv4 (nodeHost ip4) , nodePort = nodePort ip4 } put n8 validateToxExchange q r = qnonce == rnonce where qnonce = Tox.qryNonce . queryExtra . Tox.msgPayload $ q rnonce = Tox.rspNonce . responseExtra . Tox.msgPayload $ r instance KRPC Tox.Message (Query Tox.Message (FindNode Tox.Message IPv4)) (Response Tox.Message (NodeFound Tox.Message IPv4)) where method = Method Tox.GetNodes validateExchange = validateToxExchange makeQueryExtra _ _ _ _ = Tox.QueryNonce <$> randomIO makeResponseExtra _ _ q _ = return $ Tox.ResponseNonce $ Tox.qryNonce $ queryExtra q messageSender q _ = Tox.msgClient q messageResponder _ r = Tox.msgClient r instance KRPC Tox.Message (Query Tox.Message (Ping Tox.Message)) (Response Tox.Message (Ping Tox.Message)) where method = Method Tox.Ping validateExchange = validateToxExchange makeQueryExtra _ _ _ _ = Tox.QueryNonce <$> randomIO makeResponseExtra _ _ q _ = return $ Tox.ResponseNonce $ Tox.qryNonce $ queryExtra q messageSender q _ = Tox.msgClient q messageResponder _ r = Tox.msgClient r instance DataHandlers ByteString Tox.Message instance Default Bool where def = False clientSession :: Node BValue KMessageOf () IPv4 -> Node B.ByteString Tox.Message Bool IPv4 -> MVar () -> Bool -> RestrictedSocket -> Int -> Handle -> IO () clientSession bt tox signalQuit isBt sock n h = do line <- map toLower . dropWhile isSpace <$> hGetLine h let dht :: Either (Node BValue KMessageOf () IPv4) (Node B.ByteString Tox.Message Bool IPv4) dht | isBt = Left bt | otherwise = Right tox cmd0 :: IO () -> IO () cmd0 action = action >> clientSession bt tox signalQuit isBt sock n h cmd :: GenericDHT IPv4 (IO ()) -> IO () cmd (GenericDHT action) = cmd0 $ join $ either (flip runDHT action) (flip runDHT action) dht cmd (BtDHT action) = cmd0 $ join $ runDHT bt action (c,args) = second (dropWhile isSpace) $ break isSpace line switchNetwork dest = hPutClient h ("Network: "++if dest then "mainline" else "tox") >> clientSession bt tox signalQuit dest sock n h case (c,args) of ("bt", _) -> switchNetwork True ("tox", _) -> switchNetwork False ("quit", _) -> hPutClient h "" >> hClose h ("stop", _) -> do hPutClient h "Terminating DHT Daemon." hClose h putMVar signalQuit () ("pid", _) -> cmd0 $ do pid <- getProcessID hPutClient h (show pid) -- DHT specific ("ls", _) -> cmd $ GenericDHT $ do tbl <- getTable t <- showTable me <- myNodeIdAccordingTo (read "8.8.8.8:6881") ip <- routableAddress return $ do hPutClient h $ unlines [ t , showReport [ ("node-id", show $ pPrint me) , ("internet address", show ip) , ("buckets", show $ R.shape tbl) , ("network", if isBt then "mainline" else "tox") ] ] ("external-ip", _) -> cmd $ BtDHT $ do ip <- routableAddress return $ do hPutClient h $ maybe "" (takeWhile (/=':') . show) ip ("swarms", s) -> cmd $ BtDHT $ do let fltr = case s of ('-':'v':cs) | all isSpace (take 1 cs) -> const True _ -> (\(h,c,n) -> c/=0 ) ss <- getSwarms let r = map (\(h,c,n) -> (unwords [show h,show c], maybe "" show n)) $ filter fltr ss return $ do hPutClient h $ showReport r -- bittorrent only ("peers", s) -> cmd $ BtDHT $ case readEither s of Right ih -> do ps <- allPeers ih seq ih $ return $ do hPutClient h $ showReport $ map (((,) "") . show . pPrint) ps Left er -> return $ hPutClient h er #ifdef THREAD_DEBUG ("threads", _) -> cmd0 $ do ts <- threadsInformation tm <- getCurrentTime let r = map (\PerThread{..} -> (show lbl,show (diffUTCTime tm startTime))) ts hPutClient h $ showReport r ("mem", s) -> cmd0 $ do case s of "gc" -> do hPutClient h "Performing garbage collection..." performMajorGC "" -> do is_enabled <- getGCStatsEnabled if is_enabled then do GCStats{..} <- getGCStats let r = [ ("bytesAllocated", show bytesAllocated) , ("numGcs", show numGcs) , ("maxBytesUsed", show maxBytesUsed) , ("numByteUsageSamples", show numByteUsageSamples) , ("cumulativeBytesUsed", show cumulativeBytesUsed) , ("bytesCopied", show bytesCopied) , ("currentBytesUsed", show currentBytesUsed) , ("currentBytesSlop", show currentBytesSlop) , ("maxBytesSlop", show maxBytesSlop) , ("peakMegabytesAllocated", show peakMegabytesAllocated) , ("mutatorCpuSeconds", show mutatorCpuSeconds) , ("mutatorWallSeconds", show mutatorWallSeconds) , ("gcCpuSeconds", show gcCpuSeconds) , ("gcWallSeconds", show gcWallSeconds) , ("cpuSeconds", show cpuSeconds) , ("wallSeconds", show wallSeconds) , ("parTotBytesCopied", show parTotBytesCopied) , ("parMaxBytesCopied", show parMaxBytesCopied) ] hPutClient h $ showReport r else hPutClient h "Run with +RTS -T to obtain live memory-usage information." _ -> hPutClient h "error." #endif -- DHT specific ("closest", s) -> cmd $ GenericDHT $ do let (ns,hs) = second (dropWhile isSpace) $ break isSpace s parse | null hs = do ih <- readEither ns return (8 :: Int, ih) | otherwise = do n <- readEither ns ih <- readEither hs return (n :: Int, ih) case parse of Right (n,ih) -> do nodeIdType ih tbl <- getTable let nodes = R.kclosest n ih tbl return $ do hPutClient h $ unlines $ map (showEnry . (flip (,) (error "showEnry"))) nodes Left er -> return $ hPutClient h er -- DHT specific ("ping", s) -> cmd $ GenericDHT $ do case readEither s of Right addr -> do result <- try $ pingQ addr let rs = either (pure . showQueryFail) reportPong result return $ do hPutClient h $ unlines rs Left er -> return $ hPutClient h er -- DHT specific ("find-nodes", s) -> cmd $ GenericDHT $ do let (hs,as) = second (dropWhile isSpace) $ break isSpace s parse = do ih <- readEither hs a <- readEither as -- XXX: using 'InfoHash' only because 'NodeId' currently -- has no 'Read' instance. return (ih, a :: NodeAddr IPv4) case parse of Right (ih,a) -> do nodeIdType ih nodeAddrType a proxy <- dhtType let fn = findNodeMessage proxy ih ipType fn result <- try $ queryNode' a fn either (const $ return ()) (\(nid,nf,_) -> nodeIdType nid >> ipType nf) result let rs = either (pure . showQueryFail) reportNodes result return $ do hPutClient h $ unlines rs Left er -> return $ hPutClient h er -- bittorrent only ("get-peers", s) -> cmd $ BtDHT $ do let (hs,as) = second (dropWhile isSpace) $ break isSpace s parse = do ih <- readEither hs a <- readEither as return (ih :: InfoHash, a :: NodeAddr IPv4) case parse of Right (ih,a) -> do result <- try $ queryNode' (a ::NodeAddr IPv4) $ GetPeers ih let rs = either (pure . ( (,) "error" ) . showQueryFail) reportPeers result return $ do hPutClient h $ showReport rs Left er -> return $ hPutClient h er -- bittorrent only ("search-peers", s) -> cmd $ BtDHT $ do case readEither s of Right ih -> do (tid, s) <- isearch ioGetPeers ih flip fix Set.empty $ \again shown -> do (chk,fin) <- liftIO . atomically $ do r <- (Set.\\ shown) <$> readTVar (searchResults s) if not $ Set.null r then (,) r <$> searchIsFinished s else searchIsFinished s >>= check >> return (Set.empty,True) let ps = case Set.toList chk of [] -> "" _ -> unlines $ map (show . pPrint) $ Set.toList chk if fin then return $ hPutClient h ps else do liftIO $ hPutClientChunk h ps again (shown `Set.union` chk) Left er -> return $ hPutClient h er _ -> cmd0 $ hPutClient h "error." defaultPort = error "TODO defaultPort" showQueryFail :: QueryFailure -> String showQueryFail e = show e consip (ReflectedIP ip) xs = ("(external-ip " ++ show ip ++ ")") : xs consip' (ReflectedIP ip) xs = ("to", show ip) : xs reportPong (info,myip) = maybe id consip myip [show $ pPrint info] reportNodes :: ( Kademlia dht , Pretty (NodeInfo dht ip ()) , Pretty (NodeId dht) ) => (NodeId dht, NodeFound dht ip, Maybe ReflectedIP) -> [String] reportNodes (nid,ns,myip) = maybe id consip myip $ show (pPrint nid) : map (show . pPrint) (foundNodes ns) reportPeers :: (NodeId KMessageOf, GotPeers IPv4, Maybe ReflectedIP) -> [(String,String)] reportPeers (nid,GotPeers r tok,myip) = maybe id consip' myip $ ("from", show (pPrint nid)) : ("token", show tok) : case r of Right ps -> map ( ( (,) "peer" ) . show . pPrint ) ps Left ns -> map ( ( (,) "node" ) . show . pPrint ) ns main :: IO () main = do args <- getArgs p <- case take 2 (dropWhile (/="-p") args) of ["-p",port] | not ("-" `isPrefixOf` port) -> return port ("-p":_) -> error "Port not specified! (-p PORT)" _ -> defaultPort tox_state <- godht (show (succ (read p::Int))) $ \a me0 -> ask godht p $ \a me0 -> do printTable bs <- liftIO bootstrapNodes `onException` (Lifted.ioError $ userError "unable to resolve bootstrap nodes") saved_nodes <- resume peers'trial <- liftIO $ tryIOError $ B.readFile "bt-peers.dat" saved_peers <- either (const $ do liftIO $ putStrLn "Error reading bt-peers.dat" return Nothing) (return . Just) peers'trial maybe (return ()) mergeSavedPeers saved_peers when (isJust saved_nodes) $ do b <- isBootstrapped tbl <- getTable bc <- optBucketCount <$> asks options printTable me <- case concat $ R.toList tbl of (n,_):_ -> myNodeIdAccordingTo (nodeAddr n) _ -> return me0 printReport [("node-id",show $ pPrint me) ,("listen-address", show a) ,("bootstrapped", show b) ,("buckets", show $ R.shape tbl) ,("optBucketCount", show bc) ,("dht-nodes.dat", "Running bootstrap...") ] st <- ask waitForSignal <- liftIO $ do signalQuit <- newEmptyMVar srv <- streamServer (withSession $ clientSession st tox_state signalQuit True) (SockAddrUnix "dht.sock") return $ liftIO $ do () <- takeMVar signalQuit quitListening srv bootstrap saved_nodes bs b <- isBootstrapped tbl <- getTable bc <- optBucketCount <$> asks options printTable ip <- routableAddress me <- case concat $ R.toList tbl of (n,_):_ -> myNodeIdAccordingTo (nodeAddr n) _ -> return me0 printReport [("node-id",show $ pPrint me) ,("internet address", show ip) ,("listen-address", show a) ,("bootstrapped", show b) ,("buckets", show $ R.shape tbl) ,("optBucketCount", show bc) ] waitForSignal -- Await unix socket to signal termination. snapshot >>= liftIO . B.writeFile "dht-nodes.dat" savePeerStore >>= liftIO . B.writeFile "bt-peers.dat"