summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Mainline.hs87
-rw-r--r--Tox.hs79
-rw-r--r--TriadCommittee.hs88
3 files changed, 160 insertions, 94 deletions
diff --git a/Mainline.hs b/Mainline.hs
index 77c0d5f1..4ce4f4da 100644
--- a/Mainline.hs
+++ b/Mainline.hs
@@ -61,7 +61,7 @@ import Network.BitTorrent.DHT.ContactInfo as Peers
61import Network.BitTorrent.DHT.Search (Search (..)) 61import Network.BitTorrent.DHT.Search (Search (..))
62import Network.BitTorrent.DHT.Token as Token 62import Network.BitTorrent.DHT.Token as Token
63import qualified Network.DHT.Routing as R 63import qualified Network.DHT.Routing as R
64 ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) 64 ;import Network.DHT.Routing (Timestamp, getTimestamp)
65import Network.QueryResponse 65import Network.QueryResponse
66import Network.Socket 66import Network.Socket
67import System.IO 67import System.IO
@@ -79,6 +79,7 @@ import qualified Data.Aeson as JSON
79 ;import Data.Aeson (FromJSON, ToJSON, (.=)) 79 ;import Data.Aeson (FromJSON, ToJSON, (.=))
80import Text.Read 80import Text.Read
81import Global6 81import Global6
82import TriadCommittee
82 83
83newtype NodeId = NodeId ByteString 84newtype NodeId = NodeId ByteString
84 deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable) 85 deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable)
@@ -495,8 +496,6 @@ newSwarmsDatabase = do
495 <*> newTVar toks 496 <*> newTVar toks
496 <*> newTVar def 497 <*> newTVar def
497 498
498type RoutingInfo = Info NodeInfo NodeId
499
500data Routing = Routing 499data Routing = Routing
501 { tentativeId :: NodeInfo 500 { tentativeId :: NodeInfo
502 , sched4 :: !( TVar (Int.PSQ POSIXTime) ) 501 , sched4 :: !( TVar (Int.PSQ POSIXTime) )
@@ -596,7 +595,7 @@ newClient addr = do
596 let var = case flip prefer4or6 Nothing <$> maddr of 595 let var = case flip prefer4or6 Nothing <$> maddr of
597 Just Want_IP6 -> routing6 routing 596 Just Want_IP6 -> routing6 routing
598 _ -> routing4 routing 597 _ -> routing4 routing
599 R.selfNode <$> readTVar var 598 R.thisNode <$> readTVar var
600 , clientResponseId = return 599 , clientResponseId = return
601 } 600 }
602 601
@@ -1027,84 +1026,6 @@ getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just W
1027 1026
1028unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, tok) 1027unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, tok)
1029 1028
1030data TriadSlot = SlotA | SlotB | SlotC
1031 deriving (Eq,Ord,Enum,Show,Read)
1032
1033data TriadCommittee voter a = TriadCommittee
1034 { triadDecider :: TVar TriadSlot
1035 , triadA :: TVar (Maybe (voter,a))
1036 , triadB :: TVar (Maybe (voter,a))
1037 , triadC :: TVar (Maybe (voter,a))
1038 , triadNewDecision :: a -> STM ()
1039 }
1040
1041triadSlot :: TriadSlot -> TriadCommittee voter a -> TVar (Maybe (voter,a))
1042triadSlot SlotA = triadA
1043triadSlot SlotB = triadB
1044triadSlot SlotC = triadC
1045
1046triadDecision :: a -> TriadCommittee voter a -> STM a
1047triadDecision fallback triad = do
1048 slot <- readTVar (triadDecider triad)
1049 maybe fallback snd <$> readTVar (triadSlot slot triad)
1050
1051
1052newTriadCommittee :: (a -> STM ()) -> STM (TriadCommittee voter a)
1053newTriadCommittee onChange =
1054 TriadCommittee <$> newTVar SlotA
1055 <*> newTVar Nothing
1056 <*> newTVar Nothing
1057 <*> newTVar Nothing
1058 <*> pure onChange
1059
1060triadCountVotes :: Eq a => Maybe a -> TriadCommittee voter a -> STM ()
1061triadCountVotes prior triad = do
1062 a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad)
1063 b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad)
1064 c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad)
1065 let (slot,vote) = case catMaybes [a,b,c] of
1066 [ (x,xvote)
1067 , (y,yvote)
1068 , (z,zvote) ] -> if xvote == yvote then (x,Just xvote)
1069 else (z,Just zvote)
1070 [] -> (SlotA,Nothing)
1071 ((slot,vote):_) -> (slot, Just vote)
1072 writeTVar (triadDecider triad) slot
1073 case vote of
1074 Just v | vote /= prior -> triadNewDecision triad v
1075 _ -> return ()
1076
1077
1078addVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> a -> STM ()
1079addVote triad voter vote = do
1080 a <- (SlotA,) . fmap fst <$> readTVar (triadA triad)
1081 b <- (SlotB,) . fmap fst <$> readTVar (triadB triad)
1082 c <- (SlotC,) . fmap fst <$> readTVar (triadC triad)
1083 let avail (_,Nothing) = True
1084 avail (_,Just x ) = (x == voter)
1085 slots = filter avail [a,b,c]
1086 forM_ (take 1 slots) $ \(slot,_) -> do
1087 prior <- do
1088 slotp <- readTVar (triadDecider triad)
1089 fmap snd <$> readTVar (triadSlot slotp triad)
1090 writeTVar (triadSlot slot triad)
1091 (Just (voter,vote))
1092 triadCountVotes prior triad
1093
1094delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM ()
1095delVote triad voter = do
1096 a <- (SlotA,) . fmap fst <$> readTVar (triadA triad)
1097 b <- (SlotB,) . fmap fst <$> readTVar (triadB triad)
1098 c <- (SlotC,) . fmap fst <$> readTVar (triadC triad)
1099 let match (_,Just x ) = (x == voter)
1100 slots = filter match [a,b,c]
1101 forM_ (take 1 slots) $ \(slot,_) -> do
1102 prior <- do
1103 slotp <- readTVar (triadDecider triad)
1104 fmap snd <$> readTVar (triadSlot slotp triad)
1105 writeTVar (triadSlot slot triad) Nothing
1106 triadCountVotes prior triad
1107
1108mainlineSearch qry = Search 1029mainlineSearch qry = Search
1109 { searchSpace = mainlineSpace 1030 { searchSpace = mainlineSpace
1110 , searchNodeAddress = nodeIP &&& nodePort 1031 , searchNodeAddress = nodeIP &&& nodePort
@@ -1154,3 +1075,5 @@ resolve want hostAndPort = do
1154 -- pattern matching never fails. 1075 -- pattern matching never fails.
1155 info : _ <- getAddrInfo (Just hints) (Just host) port 1076 info : _ <- getAddrInfo (Just hints) (Just host) port
1156 return $ addrAddress info 1077 return $ addrAddress info
1078
1079
diff --git a/Tox.hs b/Tox.hs
index 25c650b3..34e5d6f3 100644
--- a/Tox.hs
+++ b/Tox.hs
@@ -41,7 +41,7 @@ import Foreign.Ptr
41import Foreign.Storable 41import Foreign.Storable
42import GHC.Generics (Generic) 42import GHC.Generics (Generic)
43import Network.Address (Address, fromSockAddr, sockAddrPort, 43import Network.Address (Address, fromSockAddr, sockAddrPort,
44 toSockAddr, setPort,un4map) 44 toSockAddr, setPort, un4map, WantIP(..), ipFamily)
45import Network.QueryResponse 45import Network.QueryResponse
46import Network.Socket 46import Network.Socket
47import System.Endian 47import System.Endian
@@ -50,9 +50,16 @@ import Data.Bits
50import Data.Bits.ByteString () 50import Data.Bits.ByteString ()
51import qualified Text.ParserCombinators.ReadP as RP 51import qualified Text.ParserCombinators.ReadP as RP
52import Data.Char 52import Data.Char
53import TriadCommittee
54import qualified Network.DHT.Routing as R
55import qualified Data.Wrapper.PSQInt as Int
56import Data.Time.Clock.POSIX (POSIXTime)
57import Global6
58import Data.Ord
59import System.IO
53 60
54newtype NodeId = NodeId ByteString 61newtype NodeId = NodeId ByteString
55 deriving (Eq,Ord,ByteArrayAccess, Bits) 62 deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable)
56 63
57instance Show NodeId where 64instance Show NodeId where
58 show (NodeId bs) = C8.unpack $ Base16.encode bs 65 show (NodeId bs) = C8.unpack $ Base16.encode bs
@@ -350,6 +357,18 @@ encodePacket :: SecretKey -> SecretsCache -> Message ByteString -> NodeInfo -> (
350encodePacket sk cache msg ni = ( S.runPut . putMessage $ encryptMessage sk cache (nodeId ni) msg 357encodePacket sk cache msg ni = ( S.runPut . putMessage $ encryptMessage sk cache (nodeId ni) msg
351 , nodeAddr ni ) 358 , nodeAddr ni )
352 359
360
361data Routing = Routing
362 { tentativeId :: NodeInfo
363 , sched4 :: !( TVar (Int.PSQ POSIXTime) )
364 , routing4 :: !( TVar (R.BucketList NodeInfo) )
365 , committee4 :: TriadCommittee NodeId SockAddr
366 , sched6 :: !( TVar (Int.PSQ POSIXTime) )
367 , routing6 :: !( TVar (R.BucketList NodeInfo) )
368 , committee6 :: TriadCommittee NodeId SockAddr
369 }
370
371
353newClient :: SockAddr -> IO (Client String Method TransactionId NodeInfo (Message ByteString)) 372newClient :: SockAddr -> IO (Client String Method TransactionId NodeInfo (Message ByteString))
354newClient addr = do 373newClient addr = do
355 udp <- udpTransport addr 374 udp <- udpTransport addr
@@ -357,9 +376,32 @@ newClient addr = do
357 let pubkey = key2id $ toPublic secret 376 let pubkey = key2id $ toPublic secret
358 cache <- newEmptyCache 377 cache <- newEmptyCache
359 drg <- getSystemDRG 378 drg <- getSystemDRG
360 self <- atomically $ newTVar 379 let tentative_info = NodeInfo
361 $ NodeInfo pubkey (fromMaybe (toEnum 0) $ fromSockAddr addr) 380 { nodeId = pubkey
362 (fromMaybe 0 $ sockAddrPort addr) 381 , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr
382 , nodePort = fromMaybe 0 $ sockAddrPort addr
383 }
384 tentative_info6 <-
385 maybe tentative_info
386 (\ip6 -> tentative_info { nodeIP = IPv6 ip6 })
387 <$> global6
388 addr4 <- atomically $ newTChan
389 addr6 <- atomically $ newTChan
390 routing <- atomically $ do
391 let nobkts = R.defaultBucketCount :: Int
392 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts
393 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts
394 let updateIPVote tblvar addrvar a = do
395 bkts <- readTVar tblvar
396 case nodeInfo (nodeId (R.thisNode bkts)) a of
397 Right ni -> writeTVar tblvar (bkts { R.thisNode = ni })
398 Left _ -> return ()
399 writeTChan addrvar (a,map fst $ concat $ R.toList bkts)
400 committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4
401 committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6
402 sched4 <- newTVar Int.empty
403 sched6 <- newTVar Int.empty
404 return $ Routing tentative_info sched4 tbl4 committee4 sched6 tbl6 committee6
363 let net = layerTransport (parsePacket secret cache) 405 let net = layerTransport (parsePacket secret cache)
364 (encodePacket secret cache) 406 (encodePacket secret cache)
365 udp 407 udp
@@ -368,6 +410,12 @@ newClient addr = do
368 , lookupHandler = handlers 410 , lookupHandler = handlers
369 , tableMethods = tbl 411 , tableMethods = tbl
370 } 412 }
413
414 handlers :: Method -> Maybe Handler
415 handlers PingType = handler PongType pingH
416 handlers GetNodesType = handler SendNodesType $ getNodesH routing
417 handlers _ = Nothing
418
371 genNonce24 var (TransactionId nonce8 _) = atomically $ do 419 genNonce24 var (TransactionId nonce8 _) = atomically $ do
372 (g,pending) <- readTVar var 420 (g,pending) <- readTVar var
373 let (bs, g') = randomBytesGenerate 24 g 421 let (bs, g') = randomBytesGenerate 24 g
@@ -376,9 +424,13 @@ newClient addr = do
376 client tbl var = Client 424 client tbl var = Client
377 { clientNet = net 425 { clientNet = net
378 , clientDispatcher = dispatch tbl 426 , clientDispatcher = dispatch tbl
379 , clientErrorReporter = ignoreErrors -- TODO 427 , clientErrorReporter = printErrors stderr
380 , clientPending = var 428 , clientPending = var
381 , clientAddress = \maddr -> atomically (readTVar self) 429 , clientAddress = \maddr -> atomically $ do
430 let var = case flip prefer4or6 Nothing <$> maddr of
431 Just Want_IP6 -> routing6 routing
432 _ -> routing4 routing
433 R.thisNode <$> readTVar var
382 , clientResponseId = genNonce24 var 434 , clientResponseId = genNonce24 var
383 } 435 }
384 if fitsInInt (Proxy :: Proxy Word64) 436 if fitsInInt (Proxy :: Proxy Word64)
@@ -426,12 +478,9 @@ encodePayload typ (TransactionId (Nonce8 tid) nonce) self dest b
426decodePayload :: S.Serialize a => Message ByteString -> Either String a 478decodePayload :: S.Serialize a => Message ByteString -> Either String a
427decodePayload msg = S.decode $ dropEnd8 $ msgPayload msg 479decodePayload msg = S.decode $ dropEnd8 $ msgPayload msg
428 480
429handler typ f = Just $ MethodHandler decodePayload (encodePayload typ) f 481type Handler = MethodHandler String TransactionId NodeInfo (Message ByteString)
430 482
431handlers :: Method -> Maybe (MethodHandler String TransactionId NodeInfo (Message ByteString)) 483handler typ f = Just $ MethodHandler decodePayload (encodePayload typ) f
432handlers PingType = handler PingType pingH
433handlers GetNodesType = error "find_node"
434handlers _ = Nothing
435 484
436data Ping = Ping deriving Show 485data Ping = Ping deriving Show
437data Pong = Pong deriving Show 486data Pong = Pong deriving Show
@@ -471,6 +520,12 @@ instance S.Serialize SendNodes where
471pingH :: NodeInfo -> Ping -> IO Pong 520pingH :: NodeInfo -> Ping -> IO Pong
472pingH _ Ping = return Pong 521pingH _ Ping = return Pong
473 522
523prefer4or6 :: NodeInfo -> Maybe WantIP -> WantIP
524prefer4or6 addr iptyp = fromMaybe (ipFamily $ nodeIP addr) iptyp
525
526getNodesH :: Routing -> NodeInfo -> GetNodes -> IO SendNodes
527getNodesH = error "todo: getNodesH"
528
474intKey :: TransactionId -> Int 529intKey :: TransactionId -> Int
475intKey (TransactionId (Nonce8 w) _) = fromIntegral w 530intKey (TransactionId (Nonce8 w) _) = fromIntegral w
476 531
diff --git a/TriadCommittee.hs b/TriadCommittee.hs
new file mode 100644
index 00000000..3fe3ebe6
--- /dev/null
+++ b/TriadCommittee.hs
@@ -0,0 +1,88 @@
1{-# LANGUAGE TupleSections #-}
2module TriadCommittee where
3
4import Control.Concurrent.STM
5import Control.Monad
6import Data.Maybe
7
8
9data TriadSlot = SlotA | SlotB | SlotC
10 deriving (Eq,Ord,Enum,Show,Read)
11
12data TriadCommittee voter a = TriadCommittee
13 { triadDecider :: TVar TriadSlot
14 , triadA :: TVar (Maybe (voter,a))
15 , triadB :: TVar (Maybe (voter,a))
16 , triadC :: TVar (Maybe (voter,a))
17 , triadNewDecision :: a -> STM ()
18 }
19
20triadSlot :: TriadSlot -> TriadCommittee voter a -> TVar (Maybe (voter,a))
21triadSlot SlotA = triadA
22triadSlot SlotB = triadB
23triadSlot SlotC = triadC
24
25triadDecision :: a -> TriadCommittee voter a -> STM a
26triadDecision fallback triad = do
27 slot <- readTVar (triadDecider triad)
28 maybe fallback snd <$> readTVar (triadSlot slot triad)
29
30
31newTriadCommittee :: (a -> STM ()) -> STM (TriadCommittee voter a)
32newTriadCommittee onChange =
33 TriadCommittee <$> newTVar SlotA
34 <*> newTVar Nothing
35 <*> newTVar Nothing
36 <*> newTVar Nothing
37 <*> pure onChange
38
39
40triadCountVotes :: Eq a => Maybe a -> TriadCommittee voter a -> STM ()
41triadCountVotes prior triad = do
42 a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad)
43 b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad)
44 c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad)
45 let (slot,vote) = case catMaybes [a,b,c] of
46 [ (x,xvote)
47 , (y,yvote)
48 , (z,zvote) ] -> if xvote == yvote then (x,Just xvote)
49 else (z,Just zvote)
50 [] -> (SlotA,Nothing)
51 ((slot,vote):_) -> (slot, Just vote)
52 writeTVar (triadDecider triad) slot
53 case vote of
54 Just v | vote /= prior -> triadNewDecision triad v
55 _ -> return ()
56
57
58addVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> a -> STM ()
59addVote triad voter vote = do
60 a <- (SlotA,) . fmap fst <$> readTVar (triadA triad)
61 b <- (SlotB,) . fmap fst <$> readTVar (triadB triad)
62 c <- (SlotC,) . fmap fst <$> readTVar (triadC triad)
63 let avail (_,Nothing) = True
64 avail (_,Just x ) = (x == voter)
65 slots = filter avail [a,b,c]
66 forM_ (take 1 slots) $ \(slot,_) -> do
67 prior <- do
68 slotp <- readTVar (triadDecider triad)
69 fmap snd <$> readTVar (triadSlot slotp triad)
70 writeTVar (triadSlot slot triad)
71 (Just (voter,vote))
72 triadCountVotes prior triad
73
74
75delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM ()
76delVote triad voter = do
77 a <- (SlotA,) . fmap fst <$> readTVar (triadA triad)
78 b <- (SlotB,) . fmap fst <$> readTVar (triadB triad)
79 c <- (SlotC,) . fmap fst <$> readTVar (triadC triad)
80 let match (_,Just x ) = (x == voter)
81 slots = filter match [a,b,c]
82 forM_ (take 1 slots) $ \(slot,_) -> do
83 prior <- do
84 slotp <- readTVar (triadDecider triad)
85 fmap snd <$> readTVar (triadSlot slotp triad)
86 writeTVar (triadSlot slot triad) Nothing
87 triadCountVotes prior triad
88