summaryrefslogtreecommitdiff
path: root/Mainline.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-07-18 04:24:21 -0400
committerjoe <joe@jerkface.net>2017-07-18 04:24:21 -0400
commit4c8f5796d37815ad05e35bcdf0cc09b8f447d0c8 (patch)
tree948bce8d91725537342d2d3ece3edd47cb8b5b2b /Mainline.hs
parent8c47549a75399fe0f375fbf1ad1de9f6cf2e1f44 (diff)
Mainline routing table update.
Diffstat (limited to 'Mainline.hs')
-rw-r--r--Mainline.hs145
1 files changed, 125 insertions, 20 deletions
diff --git a/Mainline.hs b/Mainline.hs
index ef452f7d..84fe96bf 100644
--- a/Mainline.hs
+++ b/Mainline.hs
@@ -1,3 +1,4 @@
1{-# LANGUAGE CPP #-}
1{-# LANGUAGE DeriveDataTypeable #-} 2{-# LANGUAGE DeriveDataTypeable #-}
2{-# LANGUAGE DeriveFoldable #-} 3{-# LANGUAGE DeriveFoldable #-}
3{-# LANGUAGE DeriveFunctor #-} 4{-# LANGUAGE DeriveFunctor #-}
@@ -7,11 +8,13 @@
7{-# LANGUAGE LambdaCase #-} 8{-# LANGUAGE LambdaCase #-}
8{-# LANGUAGE PatternSynonyms #-} 9{-# LANGUAGE PatternSynonyms #-}
9{-# LANGUAGE StandaloneDeriving #-} 10{-# LANGUAGE StandaloneDeriving #-}
11{-# LANGUAGE TupleSections #-}
10module Mainline where 12module Mainline where
11 13
12import Control.Applicative 14import Control.Applicative
13import Control.Arrow 15import Control.Arrow
14import Control.Concurrent.STM 16import Control.Concurrent.STM
17import Control.Monad
15import Crypto.Random 18import Crypto.Random
16import Data.BEncode as BE 19import Data.BEncode as BE
17import qualified Data.BEncode.BDict as BE 20import qualified Data.BEncode.BDict as BE
@@ -38,15 +41,15 @@ import Data.Set (Set)
38import Data.Torrent 41import Data.Torrent
39import Data.Typeable 42import Data.Typeable
40import Data.Word 43import Data.Word
44import Kademlia
41import Network.Address (Address, fromSockAddr, setPort, 45import Network.Address (Address, fromSockAddr, setPort,
42 sockAddrPort, toSockAddr, testIdBit) 46 sockAddrPort, testIdBit, toSockAddr)
43import Network.BitTorrent.DHT.ContactInfo as Peers 47import Network.BitTorrent.DHT.ContactInfo as Peers
44import Network.BitTorrent.DHT.Token as Token 48import Network.BitTorrent.DHT.Token as Token
45import qualified Network.DHT.Routing as R 49import qualified Network.DHT.Routing as R
46 ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) 50 ;import Network.DHT.Routing (Info, Timestamp, getTimestamp)
47import Network.QueryResponse 51import Network.QueryResponse
48import Network.Socket 52import Network.Socket
49import Kademlia
50 53
51newtype NodeId = NodeId ByteString 54newtype NodeId = NodeId ByteString
52 deriving (Eq,Ord,Show,ByteArrayAccess, BEncode, Bits) 55 deriving (Eq,Ord,Show,ByteArrayAccess, BEncode, Bits)
@@ -99,6 +102,9 @@ putNodeInfo6 (NodeInfo (NodeId nid) (IPv6 ip) port)
99putNodeInfo6 _ = return () 102putNodeInfo6 _ = return ()
100 103
101 104
105-- TODO: We should use a SocketAddrInet6 address for a dual-stack listen
106-- socket. Therefore, the behavior of this method should depend on the bind
107-- address for outbound packets.
102nodeAddr :: NodeInfo -> SockAddr 108nodeAddr :: NodeInfo -> SockAddr
103nodeAddr (NodeInfo _ ip port) = setPort port $ toSockAddr ip 109nodeAddr (NodeInfo _ ip port) = setPort port $ toSockAddr ip
104 110
@@ -265,8 +271,9 @@ data Routing = Routing
265 , routing6 :: !( TVar (R.BucketList NodeInfo) ) 271 , routing6 :: !( TVar (R.BucketList NodeInfo) )
266 } 272 }
267 273
268newClient :: 274type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue)
269 SockAddr -> IO (Client String Method TransactionId NodeInfo (Message BValue)) 275
276newClient :: SockAddr -> IO MainlineClient
270newClient addr = do 277newClient addr = do
271 udp <- udpTransport addr 278 udp <- udpTransport addr
272 nid <- NodeId <$> getRandomBytes 20 279 nid <- NodeId <$> getRandomBytes 20
@@ -281,13 +288,21 @@ newClient addr = do
281 tbl6 <- newTVar $ R.nullTable (comparing nodeId) tenative_info nobkts 288 tbl6 <- newTVar $ R.nullTable (comparing nodeId) tenative_info nobkts
282 return $ Routing tenative_info tbl4 tbl6 289 return $ Routing tenative_info tbl4 tbl6
283 swarms <- newSwarmsDatabase 290 swarms <- newSwarmsDatabase
284 let net = onInbound (updateRouting routing) 291 map_var <- atomically $ newTVar (0, mempty)
292 let net = onInbound (updateRouting outgoingClient routing)
285 $ layerTransport parsePacket encodePacket 293 $ layerTransport parsePacket encodePacket
286 $ udp 294 $ udp
287 dispatch tbl = DispatchMethods 295
288 { classifyInbound = classify 296 -- Paranoid: It's safe to define /net/ and /client/ to be mutually
289 , lookupHandler = handlers 297 -- recursive since 'updateRouting' does not invoke 'awaitMessage' which
290 , tableMethods = tbl 298 -- which was modified by 'onInbound'. However, I'm going to avoid the
299 -- mutual reference just to be safe.
300 outgoingClient = client { clientNet = net { awaitMessage = return Nothing } }
301
302 dispatch = DispatchMethods
303 { classifyInbound = classify -- :: x -> MessageClass err meth tid
304 , lookupHandler = handlers -- :: meth -> Maybe (MethodHandler err tid addr x)
305 , tableMethods = mapT -- :: TransactionMethods tbl tid x
291 } 306 }
292 307
293 handlers :: Method -> Maybe Handler 308 handlers :: Method -> Maybe Handler
@@ -298,12 +313,13 @@ newClient addr = do
298 handlers ( Method meth ) = Just $ defaultHandler meth 313 handlers ( Method meth ) = Just $ defaultHandler meth
299 314
300 mapT = transactionMethods mapMethods gen 315 mapT = transactionMethods mapMethods gen
316
301 gen :: Word16 -> (TransactionId, Word16) 317 gen :: Word16 -> (TransactionId, Word16)
302 gen cnt = (TransactionId $ S.encode cnt, cnt+1) 318 gen cnt = (TransactionId $ S.encode cnt, cnt+1)
303 map_var <- atomically $ newTVar (0, mempty) 319
304 return Client 320 client = Client
305 { clientNet = net 321 { clientNet = net
306 , clientDispatcher = dispatch mapT 322 , clientDispatcher = dispatch
307 , clientErrorReporter = ignoreErrors -- TODO 323 , clientErrorReporter = ignoreErrors -- TODO
308 , clientPending = map_var 324 , clientPending = map_var
309 , clientAddress = \maddr -> atomically $ do 325 , clientAddress = \maddr -> atomically $ do
@@ -314,16 +330,18 @@ newClient addr = do
314 , clientResponseId = return 330 , clientResponseId = return
315 } 331 }
316 332
333 return client
334
317defaultHandler :: ByteString -> Handler 335defaultHandler :: ByteString -> Handler
318defaultHandler meth = MethodHandler decodePayload errorPayload returnError 336defaultHandler meth = MethodHandler decodePayload errorPayload returnError
319 where 337 where
320 returnError :: NodeInfo -> BValue -> IO Error 338 returnError :: NodeInfo -> BValue -> IO Error
321 returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) 339 returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth)
322 340
323mainlineKademlia :: Kademlia NodeId NodeInfo 341mainlineKademlia :: MainlineClient -> TVar (R.BucketList NodeInfo) -> Kademlia NodeId NodeInfo
324mainlineKademlia = Kademlia quietInsertions 342mainlineKademlia client var = Kademlia quietInsertions
325 mainlineSpace 343 mainlineSpace
326 (vanillaIO (error "var") $ error "pingProbe") 344 (vanillaIO var $ ping client)
327 345
328mainlineSpace :: R.KademliaSpace NodeId NodeInfo 346mainlineSpace :: R.KademliaSpace NodeId NodeInfo
329mainlineSpace = R.KademliaSpace 347mainlineSpace = R.KademliaSpace
@@ -333,10 +351,11 @@ mainlineSpace = R.KademliaSpace
333 } 351 }
334 352
335 353
336updateRouting :: Routing -> NodeInfo -> Message BValue -> IO () 354updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO ()
337updateRouting routing naddr _ = do 355updateRouting client routing naddr _ = do
338 error "todo" insertNode 356 case prefer4or6 naddr Nothing of
339 -- TODO Update kademlia table. 357 Want_IP4 -> insertNode (mainlineKademlia client $ routing4 routing) naddr
358 Want_IP6 -> insertNode (mainlineKademlia client $ routing6 routing) naddr
340 -- TODO Update external ip address and update BEP-42 node id. 359 -- TODO Update external ip address and update BEP-42 node id.
341 return () 360 return ()
342 361
@@ -618,3 +637,89 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do
618 else port announcement 637 else port announcement
619 } 638 }
620 return Announced 639 return Announced
640
641ping :: MainlineClient -> NodeInfo -> IO Bool
642ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr
643 where
644 serializer = MethodSerializer
645 { methodTimeout = 5
646 , method = Method "ping"
647 , wrapQuery = encodePayload
648 , unwrapResponse = const True
649 }
650
651data TriadSlot = SlotA | SlotB | SlotC
652 deriving (Eq,Ord,Enum,Show,Read)
653
654data TriadCommittee voter a = TriadCommittee
655 { triadDecider :: TVar TriadSlot
656 , triadA :: TVar (Maybe (voter,a))
657 , triadB :: TVar (Maybe (voter,a))
658 , triadC :: TVar (Maybe (voter,a))
659 , triadNewDecision :: a -> STM ()
660 }
661
662triadSlot :: TriadSlot -> TriadCommittee voter a -> TVar (Maybe (voter,a))
663triadSlot SlotA = triadA
664triadSlot SlotB = triadB
665triadSlot SlotC = triadC
666
667triadDecision :: a -> TriadCommittee voter a -> STM a
668triadDecision fallback triad = do
669 slot <- readTVar (triadDecider triad)
670 maybe fallback snd <$> readTVar (triadSlot slot triad)
671
672
673newTriadCommittee :: (a -> STM ()) -> STM (TriadCommittee voter a)
674newTriadCommittee onChange =
675 TriadCommittee <$> newTVar SlotA
676 <*> newTVar Nothing
677 <*> newTVar Nothing
678 <*> newTVar Nothing
679 <*> pure onChange
680
681triadCountVotes :: Eq a => TriadCommittee voter a -> STM ()
682triadCountVotes triad = do
683 prior <- do
684 slot <- readTVar (triadDecider triad)
685 fmap snd <$> readTVar (triadSlot slot triad)
686 a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad)
687 b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad)
688 c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad)
689 let (slot,vote) = case catMaybes [a,b,c] of
690 [ (x,xvote)
691 , (y,yvote)
692 , (z,zvote) ] -> if xvote == yvote then (x,Just xvote)
693 else (z,Just zvote)
694 [] -> (SlotA,Nothing)
695 ((slot,vote):_) -> (slot, Just vote)
696 writeTVar (triadDecider triad) slot
697 case vote of
698 Just v | vote /= prior -> triadNewDecision triad v
699 _ -> return ()
700
701
702addVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> a -> STM ()
703addVote triad voter vote = do
704 a <- (SlotA,) . fmap fst <$> readTVar (triadA triad)
705 b <- (SlotB,) . fmap fst <$> readTVar (triadB triad)
706 c <- (SlotC,) . fmap fst <$> readTVar (triadC triad)
707 let avail (_,Nothing) = True
708 avail (_,Just x ) = (x == voter)
709 slots = filter avail [a,b,c]
710 forM_ (take 1 slots) $ \(slot,_) -> do
711 writeTVar (triadSlot slot triad)
712 (Just (voter,vote))
713 triadCountVotes triad
714
715delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM ()
716delVote triad voter = do
717 a <- (SlotA,) . fmap fst <$> readTVar (triadA triad)
718 b <- (SlotB,) . fmap fst <$> readTVar (triadB triad)
719 c <- (SlotC,) . fmap fst <$> readTVar (triadC triad)
720 let match (_,Just x ) = (x == voter)
721 slots = filter match [a,b,c]
722 forM_ (take 1 slots) $ \(slot,_) -> do
723 writeTVar (triadSlot slot triad) Nothing
724 triadCountVotes triad
725