From 4c8f5796d37815ad05e35bcdf0cc09b8f447d0c8 Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 18 Jul 2017 04:24:21 -0400 Subject: Mainline routing table update. --- Mainline.hs | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 125 insertions(+), 20 deletions(-) (limited to 'Mainline.hs') diff --git a/Mainline.hs b/Mainline.hs index ef452f7d..84fe96bf 100644 --- a/Mainline.hs +++ b/Mainline.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveFoldable #-} {-# LANGUAGE DeriveFunctor #-} @@ -7,11 +8,13 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TupleSections #-} module Mainline where import Control.Applicative import Control.Arrow import Control.Concurrent.STM +import Control.Monad import Crypto.Random import Data.BEncode as BE import qualified Data.BEncode.BDict as BE @@ -38,15 +41,15 @@ import Data.Set (Set) import Data.Torrent import Data.Typeable import Data.Word +import Kademlia import Network.Address (Address, fromSockAddr, setPort, - sockAddrPort, toSockAddr, testIdBit) + sockAddrPort, testIdBit, toSockAddr) import Network.BitTorrent.DHT.ContactInfo as Peers import Network.BitTorrent.DHT.Token as Token import qualified Network.DHT.Routing as R ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) import Network.QueryResponse import Network.Socket -import Kademlia newtype NodeId = NodeId ByteString deriving (Eq,Ord,Show,ByteArrayAccess, BEncode, Bits) @@ -99,6 +102,9 @@ putNodeInfo6 (NodeInfo (NodeId nid) (IPv6 ip) port) putNodeInfo6 _ = return () +-- TODO: We should use a SocketAddrInet6 address for a dual-stack listen +-- socket. Therefore, the behavior of this method should depend on the bind +-- address for outbound packets. nodeAddr :: NodeInfo -> SockAddr nodeAddr (NodeInfo _ ip port) = setPort port $ toSockAddr ip @@ -265,8 +271,9 @@ data Routing = Routing , routing6 :: !( TVar (R.BucketList NodeInfo) ) } -newClient :: - SockAddr -> IO (Client String Method TransactionId NodeInfo (Message BValue)) +type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) + +newClient :: SockAddr -> IO MainlineClient newClient addr = do udp <- udpTransport addr nid <- NodeId <$> getRandomBytes 20 @@ -281,13 +288,21 @@ newClient addr = do tbl6 <- newTVar $ R.nullTable (comparing nodeId) tenative_info nobkts return $ Routing tenative_info tbl4 tbl6 swarms <- newSwarmsDatabase - let net = onInbound (updateRouting routing) + map_var <- atomically $ newTVar (0, mempty) + let net = onInbound (updateRouting outgoingClient routing) $ layerTransport parsePacket encodePacket $ udp - dispatch tbl = DispatchMethods - { classifyInbound = classify - , lookupHandler = handlers - , tableMethods = tbl + + -- Paranoid: It's safe to define /net/ and /client/ to be mutually + -- recursive since 'updateRouting' does not invoke 'awaitMessage' which + -- which was modified by 'onInbound'. However, I'm going to avoid the + -- mutual reference just to be safe. + outgoingClient = client { clientNet = net { awaitMessage = return Nothing } } + + dispatch = DispatchMethods + { classifyInbound = classify -- :: x -> MessageClass err meth tid + , lookupHandler = handlers -- :: meth -> Maybe (MethodHandler err tid addr x) + , tableMethods = mapT -- :: TransactionMethods tbl tid x } handlers :: Method -> Maybe Handler @@ -298,12 +313,13 @@ newClient addr = do handlers ( Method meth ) = Just $ defaultHandler meth mapT = transactionMethods mapMethods gen + gen :: Word16 -> (TransactionId, Word16) gen cnt = (TransactionId $ S.encode cnt, cnt+1) - map_var <- atomically $ newTVar (0, mempty) - return Client + + client = Client { clientNet = net - , clientDispatcher = dispatch mapT + , clientDispatcher = dispatch , clientErrorReporter = ignoreErrors -- TODO , clientPending = map_var , clientAddress = \maddr -> atomically $ do @@ -314,16 +330,18 @@ newClient addr = do , clientResponseId = return } + return client + defaultHandler :: ByteString -> Handler defaultHandler meth = MethodHandler decodePayload errorPayload returnError where returnError :: NodeInfo -> BValue -> IO Error returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) -mainlineKademlia :: Kademlia NodeId NodeInfo -mainlineKademlia = Kademlia quietInsertions - mainlineSpace - (vanillaIO (error "var") $ error "pingProbe") +mainlineKademlia :: MainlineClient -> TVar (R.BucketList NodeInfo) -> Kademlia NodeId NodeInfo +mainlineKademlia client var = Kademlia quietInsertions + mainlineSpace + (vanillaIO var $ ping client) mainlineSpace :: R.KademliaSpace NodeId NodeInfo mainlineSpace = R.KademliaSpace @@ -333,10 +351,11 @@ mainlineSpace = R.KademliaSpace } -updateRouting :: Routing -> NodeInfo -> Message BValue -> IO () -updateRouting routing naddr _ = do - error "todo" insertNode - -- TODO Update kademlia table. +updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () +updateRouting client routing naddr _ = do + case prefer4or6 naddr Nothing of + Want_IP4 -> insertNode (mainlineKademlia client $ routing4 routing) naddr + Want_IP6 -> insertNode (mainlineKademlia client $ routing6 routing) naddr -- TODO Update external ip address and update BEP-42 node id. return () @@ -618,3 +637,89 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do else port announcement } return Announced + +ping :: MainlineClient -> NodeInfo -> IO Bool +ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr + where + serializer = MethodSerializer + { methodTimeout = 5 + , method = Method "ping" + , wrapQuery = encodePayload + , unwrapResponse = const True + } + +data TriadSlot = SlotA | SlotB | SlotC + deriving (Eq,Ord,Enum,Show,Read) + +data TriadCommittee voter a = TriadCommittee + { triadDecider :: TVar TriadSlot + , triadA :: TVar (Maybe (voter,a)) + , triadB :: TVar (Maybe (voter,a)) + , triadC :: TVar (Maybe (voter,a)) + , triadNewDecision :: a -> STM () + } + +triadSlot :: TriadSlot -> TriadCommittee voter a -> TVar (Maybe (voter,a)) +triadSlot SlotA = triadA +triadSlot SlotB = triadB +triadSlot SlotC = triadC + +triadDecision :: a -> TriadCommittee voter a -> STM a +triadDecision fallback triad = do + slot <- readTVar (triadDecider triad) + maybe fallback snd <$> readTVar (triadSlot slot triad) + + +newTriadCommittee :: (a -> STM ()) -> STM (TriadCommittee voter a) +newTriadCommittee onChange = + TriadCommittee <$> newTVar SlotA + <*> newTVar Nothing + <*> newTVar Nothing + <*> newTVar Nothing + <*> pure onChange + +triadCountVotes :: Eq a => TriadCommittee voter a -> STM () +triadCountVotes triad = do + prior <- do + slot <- readTVar (triadDecider triad) + fmap snd <$> readTVar (triadSlot slot triad) + a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad) + b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad) + c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad) + let (slot,vote) = case catMaybes [a,b,c] of + [ (x,xvote) + , (y,yvote) + , (z,zvote) ] -> if xvote == yvote then (x,Just xvote) + else (z,Just zvote) + [] -> (SlotA,Nothing) + ((slot,vote):_) -> (slot, Just vote) + writeTVar (triadDecider triad) slot + case vote of + Just v | vote /= prior -> triadNewDecision triad v + _ -> return () + + +addVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> a -> STM () +addVote triad voter vote = do + a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) + b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) + c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) + let avail (_,Nothing) = True + avail (_,Just x ) = (x == voter) + slots = filter avail [a,b,c] + forM_ (take 1 slots) $ \(slot,_) -> do + writeTVar (triadSlot slot triad) + (Just (voter,vote)) + triadCountVotes triad + +delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM () +delVote triad voter = do + a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) + b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) + c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) + let match (_,Just x ) = (x == voter) + slots = filter match [a,b,c] + forM_ (take 1 slots) $ \(slot,_) -> do + writeTVar (triadSlot slot triad) Nothing + triadCountVotes triad + -- cgit v1.2.3