From 4060c2c717eeac95dd16f9222184d6b4e998cb7f Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 25 Jul 2017 06:17:46 -0400 Subject: Fixes to IPv4 bootstrap. --- Mainline.hs | 265 ++++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 204 insertions(+), 61 deletions(-) (limited to 'Mainline.hs') diff --git a/Mainline.hs b/Mainline.hs index 76e914b3..911f0dbc 100644 --- a/Mainline.hs +++ b/Mainline.hs @@ -16,49 +16,69 @@ import Control.Arrow import Control.Concurrent.STM import Control.Monad import Crypto.Random -import Data.BEncode as BE -import qualified Data.BEncode.BDict as BE - ;import Data.BEncode.BDict (BKey) -import Data.BEncode.Types (BDict) +import Data.BEncode as BE +import qualified Data.BEncode.BDict as BE + ;import Data.BEncode.BDict (BKey) +import Data.BEncode.Pretty +import Data.BEncode.Types (BDict) import Data.Bits import Data.Bits.ByteString import Data.Bool -import qualified Data.ByteArray as BA - ;import Data.ByteArray (ByteArrayAccess) -import qualified Data.ByteString as B - ;import Data.ByteString (ByteString) -import Data.ByteString.Lazy (toStrict) +import qualified Data.ByteArray as BA + ;import Data.ByteArray (ByteArrayAccess) +import qualified Data.ByteString as B + ;import Data.ByteString (ByteString) +import qualified Data.ByteString.Base16 as Base16 +import qualified Data.ByteString.Char8 as Char8 +import Data.ByteString.Lazy (toStrict) +import qualified Data.ByteString.Lazy.Char8 as L8 import Data.Coerce import Data.Data import Data.Default +import Data.Digest.CRC32C +import Data.Function (fix) import Data.Hashable import Data.IP import Data.List import Data.Maybe import Data.Monoid import Data.Ord -import qualified Data.Serialize as S -import Data.Set (Set) -import Data.Time.Clock.POSIX (POSIXTime) +import qualified Data.Serialize as S +import Data.Set (Set) +import Data.Time.Clock.POSIX (POSIXTime) import Data.Torrent import Data.Typeable import Data.Word -import qualified Data.Wrapper.PSQInt as Int +import qualified Data.Wrapper.PSQInt as Int +import Debug.Trace import Kademlia -import Network.Address (Address, fromSockAddr, setPort, - sockAddrPort, testIdBit, toSockAddr) -import Network.BitTorrent.DHT.ContactInfo as Peers -import Network.BitTorrent.DHT.Search (Search (..)) -import Network.BitTorrent.DHT.Token as Token -import qualified Network.DHT.Routing as R - ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) +import Network.Address (Address, fromAddr, fromSockAddr, + setPort, sockAddrPort, testIdBit, + toSockAddr) +import Network.BitTorrent.DHT.ContactInfo as Peers +import Network.BitTorrent.DHT.Search (Search (..)) +import Network.BitTorrent.DHT.Token as Token +import Network.DatagramServer.Types (genBucketSample') +import qualified Network.DHT.Routing as R + ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) import Network.QueryResponse import Network.Socket +import System.IO import System.IO.Error -import System.IO.Unsafe (unsafeInterleaveIO) +import System.IO.Unsafe (unsafeInterleaveIO) +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif +import Control.Exception (SomeException(..),handle) newtype NodeId = NodeId ByteString - deriving (Eq,Ord,Show,ByteArrayAccess, BEncode, Bits, Hashable) + deriving (Eq,Ord,ByteArrayAccess, BEncode, Bits, Hashable) + +instance Show NodeId where + show (NodeId bs) = Char8.unpack $ Base16.encode bs instance S.Serialize NodeId where get = NodeId <$> S.getBytes 20 @@ -84,6 +104,13 @@ instance Hashable NodeInfo where {-# INLINE hashWithSalt #-} +instance Show NodeInfo where + show (NodeInfo (NodeId nid) ip port) = + Char8.unpack (Base16.encode nid) ++ "@" ++ show ip' ++ ":" ++ show port + where + ip' | IPv6 ip6 <- ip + , Just ip4 <- un4map ip6 = IPv4 ip4 + | otherwise = ip {- @@ -188,9 +215,20 @@ data Message a = Q { msgOrigin :: NodeId , rspPayload :: Either Error a , rspReflectedIP :: Maybe SockAddr } +showBE bval = L8.unpack (showBEncode bval) + instance BE.BEncode (Message BValue) where - toBEncode = encodeMessage - fromBEncode = decodeMessage + toBEncode m = encodeMessage m + {- + in case m of + Q {} -> trace ("encoded(query): "++showBE r) r + R {} -> trace ("encoded(response): "++showBE r) r -} + fromBEncode bval = decodeMessage bval + {- + in case r of + Left e -> trace (show e) r + Right (Q {}) -> trace ("decoded(query): "++showBE bval) r + Right (R {}) -> trace ("decoded(response): "++showBE bval) r -} decodeMessage :: BValue -> Either String (Message BValue) decodeMessage = fromDict $ do @@ -238,14 +276,19 @@ encodeMessage (R origin tid v ip) encodeAddr :: SockAddr -> ByteString encodeAddr (SockAddrInet port addr) - = S.runPut (S.putWord32host addr >> S.put (fromIntegral port :: Word16)) -encodeAddr (SockAddrInet6 port _ addr _) - = S.runPut (S.put addr >> S.put (fromIntegral port :: Word16)) -encodeAddr _ = B.empty + = S.runPut (S.putWord32host addr >> S.putWord16be (fromIntegral port)) +encodeAddr saddr@(SockAddrInet6 port _ addr _) + | Just ip4 <- (fromSockAddr saddr >>= un4map) = encodeAddr (setPort port $ toSockAddr ip4) + | otherwise = S.runPut (S.put addr >> S.putWord16be (fromIntegral port)) +encodeAddr _ = B.empty decodeAddr :: ByteString -> Either String SockAddr -decodeAddr = S.runGet $ do - error "decodeAddr" +decodeAddr bs = S.runGet g bs + where + g | (B.length bs == 6) = flip SockAddrInet <$> S.getWord32host <*> (fromIntegral <$> S.getWord16be) + | otherwise = do host <- S.get -- TODO: Is this right? + port <- fromIntegral <$> S.getWord16be + return $ SockAddrInet6 port 0 host 0 genericArgs :: BEncode a => a -> Bool -> BDict genericArgs nodeid ro = @@ -289,8 +332,12 @@ classify :: Message BValue -> MessageClass String Method TransactionId classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid classify (R { msgID = tid }) = IsResponse tid -encodePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue -encodePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest) +encodeResponsePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue +encodeResponsePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest) + +encodeQueryPayload :: BEncode a => + Method -> Bool -> TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue +encodeQueryPayload meth isReadonly tid self dest b = Q (nodeId self) tid (BE.toBEncode b) meth isReadonly errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest) @@ -304,7 +351,7 @@ handler :: ( BEncode a , BEncode b ) => (NodeInfo -> a -> IO b) -> Maybe Handler -handler f = Just $ MethodHandler decodePayload encodePayload f +handler f = Just $ MethodHandler decodePayload encodeResponsePayload f handlerE :: ( BEncode a @@ -314,7 +361,7 @@ handlerE :: ( BEncode a handlerE f = Just $ MethodHandler decodePayload enc f where enc tid self dest (Left e) = errorPayload tid self dest e - enc tid self dest (Right b) = encodePayload tid self dest b + enc tid self dest (Right b) = encodeResponsePayload tid self dest b type AnnounceSet = Set (InfoHash, PortNumber) @@ -344,6 +391,13 @@ data Routing = Routing , committee6 :: TriadCommittee NodeId SockAddr } +traced :: Show tid => TableMethods t tid -> TableMethods t tid +traced (TableMethods ins del lkup) + = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) + (\tid t -> trace ("del "++show tid) $ del tid t) + (\tid t -> trace ("lookup "++show tid) $ lkup tid t) + + type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) newClient :: SockAddr -> IO (MainlineClient, Routing) @@ -355,24 +409,34 @@ newClient addr = do , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr , nodePort = fromMaybe 0 $ sockAddrPort addr } + addr4 <- atomically $ newTChan + addr6 <- atomically $ newTChan + fork $ fix $ \again -> do + myThreadId >>= flip labelThread "addr6" + addr <- atomically $ readTChan addr6 + hPutStrLn stderr $ "External IPv6: "++show addr + again routing <- atomically $ do let nobkts = R.defaultBucketCount :: Int tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts - committee4 <- newTriadCommittee (const $ return ()) -- TODO: update tbl4 - committee6 <- newTriadCommittee (const $ return ()) -- TODO: update tbl6 + committee4 <- newTriadCommittee $ \a -> do + t4 <- readTVar tbl4 + case bep42 a (nodeId $ R.thisNode t4) of + Just nid -> do + let tbl = R.nullTable (comparing nodeId) + (\s -> hashWithSalt s . nodeId) + (NodeInfo nid + (fromMaybe (toEnum 0) $ fromSockAddr a) + (fromMaybe 0 $ sockAddrPort a)) + nobkts + writeTVar tbl4 tbl + writeTChan addr4 (a,map fst $ concat $ R.toList t4) + Nothing -> return () + committee6 <- newTriadCommittee (writeTChan addr6) -- TODO: update tbl6 sched4 <- newTVar Int.empty sched6 <- newTVar Int.empty return $ Routing tenative_info sched4 tbl4 committee4 sched6 tbl6 committee6 - -- TODO: Provide some means of shutting down these two auxillary threads: - refresh_thread4 <- forkPollForRefresh - (15*60) - (sched4 routing) - (refreshBucket nodeSearch (routing4 routing) (nodeId tenative_info)) - refresh_thread6 <- forkPollForRefresh - (15*60) - (sched6 routing) - (refreshBucket nodeSearch (routing6 routing) (nodeId tenative_info)) swarms <- newSwarmsDatabase map_var <- atomically $ newTVar (0, mempty) let net = onInbound (updateRouting outgoingClient routing) @@ -406,7 +470,7 @@ newClient addr = do client = Client { clientNet = net , clientDispatcher = dispatch - , clientErrorReporter = ignoreErrors -- TODO + , clientErrorReporter = printErrors stderr , clientPending = map_var , clientAddress = \maddr -> atomically $ do let var = case flip prefer4or6 Nothing <$> maddr of @@ -416,8 +480,50 @@ newClient addr = do , clientResponseId = return } + fork $ fix $ \again -> do + myThreadId >>= flip labelThread "addr4" + (addr, ns) <- atomically $ readTChan addr4 + hPutStrLn stderr $ "External IPv4: "++show (addr, length ns) + forM_ ns $ \n -> do + hPutStrLn stderr $ "Change IP, ping: "++show n + ping outgoingClient n + again + + -- TODO: Provide some means of shutting down these two auxillary threads: + refresh_thread4 <- forkPollForRefresh + (15*60) + (sched4 routing) + (refreshBucket (nodeSearch client) (routing4 routing) (nodeId tenative_info)) + refresh_thread6 <- forkPollForRefresh + (15*60) + (sched6 routing) + (refreshBucket (nodeSearch client) (routing6 routing) (nodeId tenative_info)) + return (client, routing) +-- | Modifies a purely random 'NodeId' to one that is related to a given +-- routable address in accordance with BEP 42. +bep42 :: SockAddr -> NodeId -> Maybe NodeId +bep42 addr (NodeId r) + | Just ip <- fmap S.encode (fromSockAddr addr :: Maybe IPv4) + <|> fmap S.encode (fromSockAddr addr :: Maybe IPv6) + = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0) + | otherwise + = Nothing + where + ip4mask = "\x03\x0f\x3f\xff" :: ByteString + ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString + nbhood_select = (B.last r :: Word8) .&. 7 + nodeIdSize = 20 + retr n = pure $ B.drop (nodeIdSize - n) $ S.encode r + crc = (`B.append` B.replicate 16 0) . S.encode . crc32c . B.pack + applyMask ip = case B.zipWith (.&.) msk ip of + (b:bs) -> (b .|. shiftL nbhood_select 5) : bs + bs -> bs + where msk | B.length ip == 4 = ip4mask + | otherwise = ip6mask + + defaultHandler :: ByteString -> Handler defaultHandler meth = MethodHandler decodePayload errorPayload returnError where @@ -432,7 +538,9 @@ mainlineKademlia client committee var sched { tblTransition = \tr -> do io1 <- transitionCommittee committee tr io2 <- touchBucket mainlineSpace (15*60) var sched tr - return $ io1 >> io2 + return $ do + io1 >> io2 + hPutStrLn stderr ("Buckets: "++show tr) } @@ -446,7 +554,8 @@ mainlineSpace = R.KademliaSpace transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ()) transitionCommittee committee (RoutingTransition ni Stranger) = do delVote committee (nodeId ni) - return $ return () + return $ do + hPutStrLn stderr $ "delVote "++show (nodeId ni) transitionCommittee committee _ = return $ return () updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () @@ -458,7 +567,9 @@ updateRouting client routing naddr msg = do go tbl committee sched = do case msg of R { rspReflectedIP = Just sockaddr } - -> atomically $ addVote committee (nodeId naddr) sockaddr + -> do + -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr) + atomically $ addVote committee (nodeId naddr) sockaddr _ -> return () insertNode (mainlineKademlia client committee tbl sched) naddr @@ -517,8 +628,9 @@ instance BEncode NodeFound where fromBEncode bval = NodeFound <$> ns4 <*> ns6 where - ns4 = fromDict (binary getNodeInfo4 nodes_key) bval - ns6 = fromDict (binary getNodeInfo6 nodes6_key) bval + opt ns = fromMaybe [] <$> optional ns + ns4 = opt $ fromDict (binary getNodeInfo4 nodes_key) bval + ns6 = opt $ fromDict (binary getNodeInfo6 nodes6_key) bval binary :: S.Get a -> BKey -> BE.Get [a] binary get k = field (req k) >>= either (fail . format) return . @@ -752,16 +864,33 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do } return Announced +isReadonlyClient client = False -- TODO + ping :: MainlineClient -> NodeInfo -> IO Bool ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr where serializer = MethodSerializer { methodTimeout = 5 , method = Method "ping" - , wrapQuery = encodePayload + , wrapQuery = encodeQueryPayload (Method "ping") (isReadonlyClient client) , unwrapResponse = const True } +-- searchQuery :: ni -> IO ([ni], [r]) +getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO ([NodeInfo],[NodeInfo]) +getNodes client nid addr = + fromMaybe ([],[]) <$> sendQuery client serializer (FindNode nid (Just Want_Both)) addr + where + serializer = MethodSerializer + { methodTimeout = 5 + , method = Method "find_node" + , wrapQuery = encodeQueryPayload (Method "find_node") (isReadonlyClient client) + , unwrapResponse = \case + R { rspPayload = Right bval } | Right (NodeFound ns4 ns6) <- BE.fromBEncode bval + -> (ns4++ns6, ns4++ns6) + _ -> ([],[]) + } + data TriadSlot = SlotA | SlotB | SlotC deriving (Eq,Ord,Enum,Show,Read) @@ -792,11 +921,8 @@ newTriadCommittee onChange = <*> newTVar Nothing <*> pure onChange -triadCountVotes :: Eq a => TriadCommittee voter a -> STM () -triadCountVotes triad = do - prior <- do - slot <- readTVar (triadDecider triad) - fmap snd <$> readTVar (triadSlot slot triad) +triadCountVotes :: Eq a => Maybe a -> TriadCommittee voter a -> STM () +triadCountVotes prior triad = do a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad) b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad) c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad) @@ -822,9 +948,12 @@ addVote triad voter vote = do avail (_,Just x ) = (x == voter) slots = filter avail [a,b,c] forM_ (take 1 slots) $ \(slot,_) -> do + prior <- do + slotp <- readTVar (triadDecider triad) + fmap snd <$> readTVar (triadSlot slotp triad) writeTVar (triadSlot slot triad) (Just (voter,vote)) - triadCountVotes triad + triadCountVotes prior triad delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM () delVote triad voter = do @@ -834,13 +963,27 @@ delVote triad voter = do let match (_,Just x ) = (x == voter) slots = filter match [a,b,c] forM_ (take 1 slots) $ \(slot,_) -> do + prior <- do + slotp <- readTVar (triadDecider triad) + fmap snd <$> readTVar (triadSlot slotp triad) writeTVar (triadSlot slot triad) Nothing - triadCountVotes triad + triadCountVotes prior triad -nodeSearch = Search +nodeSearch client = Search { searchSpace = mainlineSpace , searchNodeAddress = nodeIP &&& nodePort - , searchQuery = error "searchQuery" + , searchQuery = \nid ni -> do + hPutStrLn stderr $ "findNodes "++show nid++" --> "++show ni + handle (\(SomeException e) -> do + hPutStrLn stderr $ "got "++show e + -- threadDelay 1000000 + return ([],[])) + $ do + (xs,y) <- getNodes client nid ni + forM_ xs $ \x -> do + hPutStrLn stderr $ "got "++show x + -- threadDelay 1000000 + return (xs,y) } -- | List of bootstrap nodes maintained by different bittorrent -- cgit v1.2.3