diff options
-rw-r--r-- | Mainline.hs | 87 | ||||
-rw-r--r-- | Tox.hs | 79 | ||||
-rw-r--r-- | TriadCommittee.hs | 88 |
3 files changed, 160 insertions, 94 deletions
diff --git a/Mainline.hs b/Mainline.hs index 77c0d5f1..4ce4f4da 100644 --- a/Mainline.hs +++ b/Mainline.hs | |||
@@ -61,7 +61,7 @@ import Network.BitTorrent.DHT.ContactInfo as Peers | |||
61 | import Network.BitTorrent.DHT.Search (Search (..)) | 61 | import Network.BitTorrent.DHT.Search (Search (..)) |
62 | import Network.BitTorrent.DHT.Token as Token | 62 | import Network.BitTorrent.DHT.Token as Token |
63 | import qualified Network.DHT.Routing as R | 63 | import qualified Network.DHT.Routing as R |
64 | ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) | 64 | ;import Network.DHT.Routing (Timestamp, getTimestamp) |
65 | import Network.QueryResponse | 65 | import Network.QueryResponse |
66 | import Network.Socket | 66 | import Network.Socket |
67 | import System.IO | 67 | import System.IO |
@@ -79,6 +79,7 @@ import qualified Data.Aeson as JSON | |||
79 | ;import Data.Aeson (FromJSON, ToJSON, (.=)) | 79 | ;import Data.Aeson (FromJSON, ToJSON, (.=)) |
80 | import Text.Read | 80 | import Text.Read |
81 | import Global6 | 81 | import Global6 |
82 | import TriadCommittee | ||
82 | 83 | ||
83 | newtype NodeId = NodeId ByteString | 84 | newtype NodeId = NodeId ByteString |
84 | deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable) | 85 | deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable) |
@@ -495,8 +496,6 @@ newSwarmsDatabase = do | |||
495 | <*> newTVar toks | 496 | <*> newTVar toks |
496 | <*> newTVar def | 497 | <*> newTVar def |
497 | 498 | ||
498 | type RoutingInfo = Info NodeInfo NodeId | ||
499 | |||
500 | data Routing = Routing | 499 | data Routing = Routing |
501 | { tentativeId :: NodeInfo | 500 | { tentativeId :: NodeInfo |
502 | , sched4 :: !( TVar (Int.PSQ POSIXTime) ) | 501 | , sched4 :: !( TVar (Int.PSQ POSIXTime) ) |
@@ -596,7 +595,7 @@ newClient addr = do | |||
596 | let var = case flip prefer4or6 Nothing <$> maddr of | 595 | let var = case flip prefer4or6 Nothing <$> maddr of |
597 | Just Want_IP6 -> routing6 routing | 596 | Just Want_IP6 -> routing6 routing |
598 | _ -> routing4 routing | 597 | _ -> routing4 routing |
599 | R.selfNode <$> readTVar var | 598 | R.thisNode <$> readTVar var |
600 | , clientResponseId = return | 599 | , clientResponseId = return |
601 | } | 600 | } |
602 | 601 | ||
@@ -1027,84 +1026,6 @@ getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just W | |||
1027 | 1026 | ||
1028 | unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, tok) | 1027 | unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, tok) |
1029 | 1028 | ||
1030 | data TriadSlot = SlotA | SlotB | SlotC | ||
1031 | deriving (Eq,Ord,Enum,Show,Read) | ||
1032 | |||
1033 | data TriadCommittee voter a = TriadCommittee | ||
1034 | { triadDecider :: TVar TriadSlot | ||
1035 | , triadA :: TVar (Maybe (voter,a)) | ||
1036 | , triadB :: TVar (Maybe (voter,a)) | ||
1037 | , triadC :: TVar (Maybe (voter,a)) | ||
1038 | , triadNewDecision :: a -> STM () | ||
1039 | } | ||
1040 | |||
1041 | triadSlot :: TriadSlot -> TriadCommittee voter a -> TVar (Maybe (voter,a)) | ||
1042 | triadSlot SlotA = triadA | ||
1043 | triadSlot SlotB = triadB | ||
1044 | triadSlot SlotC = triadC | ||
1045 | |||
1046 | triadDecision :: a -> TriadCommittee voter a -> STM a | ||
1047 | triadDecision fallback triad = do | ||
1048 | slot <- readTVar (triadDecider triad) | ||
1049 | maybe fallback snd <$> readTVar (triadSlot slot triad) | ||
1050 | |||
1051 | |||
1052 | newTriadCommittee :: (a -> STM ()) -> STM (TriadCommittee voter a) | ||
1053 | newTriadCommittee onChange = | ||
1054 | TriadCommittee <$> newTVar SlotA | ||
1055 | <*> newTVar Nothing | ||
1056 | <*> newTVar Nothing | ||
1057 | <*> newTVar Nothing | ||
1058 | <*> pure onChange | ||
1059 | |||
1060 | triadCountVotes :: Eq a => Maybe a -> TriadCommittee voter a -> STM () | ||
1061 | triadCountVotes prior triad = do | ||
1062 | a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad) | ||
1063 | b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad) | ||
1064 | c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad) | ||
1065 | let (slot,vote) = case catMaybes [a,b,c] of | ||
1066 | [ (x,xvote) | ||
1067 | , (y,yvote) | ||
1068 | , (z,zvote) ] -> if xvote == yvote then (x,Just xvote) | ||
1069 | else (z,Just zvote) | ||
1070 | [] -> (SlotA,Nothing) | ||
1071 | ((slot,vote):_) -> (slot, Just vote) | ||
1072 | writeTVar (triadDecider triad) slot | ||
1073 | case vote of | ||
1074 | Just v | vote /= prior -> triadNewDecision triad v | ||
1075 | _ -> return () | ||
1076 | |||
1077 | |||
1078 | addVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> a -> STM () | ||
1079 | addVote triad voter vote = do | ||
1080 | a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) | ||
1081 | b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) | ||
1082 | c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) | ||
1083 | let avail (_,Nothing) = True | ||
1084 | avail (_,Just x ) = (x == voter) | ||
1085 | slots = filter avail [a,b,c] | ||
1086 | forM_ (take 1 slots) $ \(slot,_) -> do | ||
1087 | prior <- do | ||
1088 | slotp <- readTVar (triadDecider triad) | ||
1089 | fmap snd <$> readTVar (triadSlot slotp triad) | ||
1090 | writeTVar (triadSlot slot triad) | ||
1091 | (Just (voter,vote)) | ||
1092 | triadCountVotes prior triad | ||
1093 | |||
1094 | delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM () | ||
1095 | delVote triad voter = do | ||
1096 | a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) | ||
1097 | b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) | ||
1098 | c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) | ||
1099 | let match (_,Just x ) = (x == voter) | ||
1100 | slots = filter match [a,b,c] | ||
1101 | forM_ (take 1 slots) $ \(slot,_) -> do | ||
1102 | prior <- do | ||
1103 | slotp <- readTVar (triadDecider triad) | ||
1104 | fmap snd <$> readTVar (triadSlot slotp triad) | ||
1105 | writeTVar (triadSlot slot triad) Nothing | ||
1106 | triadCountVotes prior triad | ||
1107 | |||
1108 | mainlineSearch qry = Search | 1029 | mainlineSearch qry = Search |
1109 | { searchSpace = mainlineSpace | 1030 | { searchSpace = mainlineSpace |
1110 | , searchNodeAddress = nodeIP &&& nodePort | 1031 | , searchNodeAddress = nodeIP &&& nodePort |
@@ -1154,3 +1075,5 @@ resolve want hostAndPort = do | |||
1154 | -- pattern matching never fails. | 1075 | -- pattern matching never fails. |
1155 | info : _ <- getAddrInfo (Just hints) (Just host) port | 1076 | info : _ <- getAddrInfo (Just hints) (Just host) port |
1156 | return $ addrAddress info | 1077 | return $ addrAddress info |
1078 | |||
1079 | |||
@@ -41,7 +41,7 @@ import Foreign.Ptr | |||
41 | import Foreign.Storable | 41 | import Foreign.Storable |
42 | import GHC.Generics (Generic) | 42 | import GHC.Generics (Generic) |
43 | import Network.Address (Address, fromSockAddr, sockAddrPort, | 43 | import Network.Address (Address, fromSockAddr, sockAddrPort, |
44 | toSockAddr, setPort,un4map) | 44 | toSockAddr, setPort, un4map, WantIP(..), ipFamily) |
45 | import Network.QueryResponse | 45 | import Network.QueryResponse |
46 | import Network.Socket | 46 | import Network.Socket |
47 | import System.Endian | 47 | import System.Endian |
@@ -50,9 +50,16 @@ import Data.Bits | |||
50 | import Data.Bits.ByteString () | 50 | import Data.Bits.ByteString () |
51 | import qualified Text.ParserCombinators.ReadP as RP | 51 | import qualified Text.ParserCombinators.ReadP as RP |
52 | import Data.Char | 52 | import Data.Char |
53 | import TriadCommittee | ||
54 | import qualified Network.DHT.Routing as R | ||
55 | import qualified Data.Wrapper.PSQInt as Int | ||
56 | import Data.Time.Clock.POSIX (POSIXTime) | ||
57 | import Global6 | ||
58 | import Data.Ord | ||
59 | import System.IO | ||
53 | 60 | ||
54 | newtype NodeId = NodeId ByteString | 61 | newtype NodeId = NodeId ByteString |
55 | deriving (Eq,Ord,ByteArrayAccess, Bits) | 62 | deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable) |
56 | 63 | ||
57 | instance Show NodeId where | 64 | instance Show NodeId where |
58 | show (NodeId bs) = C8.unpack $ Base16.encode bs | 65 | show (NodeId bs) = C8.unpack $ Base16.encode bs |
@@ -350,6 +357,18 @@ encodePacket :: SecretKey -> SecretsCache -> Message ByteString -> NodeInfo -> ( | |||
350 | encodePacket sk cache msg ni = ( S.runPut . putMessage $ encryptMessage sk cache (nodeId ni) msg | 357 | encodePacket sk cache msg ni = ( S.runPut . putMessage $ encryptMessage sk cache (nodeId ni) msg |
351 | , nodeAddr ni ) | 358 | , nodeAddr ni ) |
352 | 359 | ||
360 | |||
361 | data Routing = Routing | ||
362 | { tentativeId :: NodeInfo | ||
363 | , sched4 :: !( TVar (Int.PSQ POSIXTime) ) | ||
364 | , routing4 :: !( TVar (R.BucketList NodeInfo) ) | ||
365 | , committee4 :: TriadCommittee NodeId SockAddr | ||
366 | , sched6 :: !( TVar (Int.PSQ POSIXTime) ) | ||
367 | , routing6 :: !( TVar (R.BucketList NodeInfo) ) | ||
368 | , committee6 :: TriadCommittee NodeId SockAddr | ||
369 | } | ||
370 | |||
371 | |||
353 | newClient :: SockAddr -> IO (Client String Method TransactionId NodeInfo (Message ByteString)) | 372 | newClient :: SockAddr -> IO (Client String Method TransactionId NodeInfo (Message ByteString)) |
354 | newClient addr = do | 373 | newClient addr = do |
355 | udp <- udpTransport addr | 374 | udp <- udpTransport addr |
@@ -357,9 +376,32 @@ newClient addr = do | |||
357 | let pubkey = key2id $ toPublic secret | 376 | let pubkey = key2id $ toPublic secret |
358 | cache <- newEmptyCache | 377 | cache <- newEmptyCache |
359 | drg <- getSystemDRG | 378 | drg <- getSystemDRG |
360 | self <- atomically $ newTVar | 379 | let tentative_info = NodeInfo |
361 | $ NodeInfo pubkey (fromMaybe (toEnum 0) $ fromSockAddr addr) | 380 | { nodeId = pubkey |
362 | (fromMaybe 0 $ sockAddrPort addr) | 381 | , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr |
382 | , nodePort = fromMaybe 0 $ sockAddrPort addr | ||
383 | } | ||
384 | tentative_info6 <- | ||
385 | maybe tentative_info | ||
386 | (\ip6 -> tentative_info { nodeIP = IPv6 ip6 }) | ||
387 | <$> global6 | ||
388 | addr4 <- atomically $ newTChan | ||
389 | addr6 <- atomically $ newTChan | ||
390 | routing <- atomically $ do | ||
391 | let nobkts = R.defaultBucketCount :: Int | ||
392 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts | ||
393 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts | ||
394 | let updateIPVote tblvar addrvar a = do | ||
395 | bkts <- readTVar tblvar | ||
396 | case nodeInfo (nodeId (R.thisNode bkts)) a of | ||
397 | Right ni -> writeTVar tblvar (bkts { R.thisNode = ni }) | ||
398 | Left _ -> return () | ||
399 | writeTChan addrvar (a,map fst $ concat $ R.toList bkts) | ||
400 | committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4 | ||
401 | committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 | ||
402 | sched4 <- newTVar Int.empty | ||
403 | sched6 <- newTVar Int.empty | ||
404 | return $ Routing tentative_info sched4 tbl4 committee4 sched6 tbl6 committee6 | ||
363 | let net = layerTransport (parsePacket secret cache) | 405 | let net = layerTransport (parsePacket secret cache) |
364 | (encodePacket secret cache) | 406 | (encodePacket secret cache) |
365 | udp | 407 | udp |
@@ -368,6 +410,12 @@ newClient addr = do | |||
368 | , lookupHandler = handlers | 410 | , lookupHandler = handlers |
369 | , tableMethods = tbl | 411 | , tableMethods = tbl |
370 | } | 412 | } |
413 | |||
414 | handlers :: Method -> Maybe Handler | ||
415 | handlers PingType = handler PongType pingH | ||
416 | handlers GetNodesType = handler SendNodesType $ getNodesH routing | ||
417 | handlers _ = Nothing | ||
418 | |||
371 | genNonce24 var (TransactionId nonce8 _) = atomically $ do | 419 | genNonce24 var (TransactionId nonce8 _) = atomically $ do |
372 | (g,pending) <- readTVar var | 420 | (g,pending) <- readTVar var |
373 | let (bs, g') = randomBytesGenerate 24 g | 421 | let (bs, g') = randomBytesGenerate 24 g |
@@ -376,9 +424,13 @@ newClient addr = do | |||
376 | client tbl var = Client | 424 | client tbl var = Client |
377 | { clientNet = net | 425 | { clientNet = net |
378 | , clientDispatcher = dispatch tbl | 426 | , clientDispatcher = dispatch tbl |
379 | , clientErrorReporter = ignoreErrors -- TODO | 427 | , clientErrorReporter = printErrors stderr |
380 | , clientPending = var | 428 | , clientPending = var |
381 | , clientAddress = \maddr -> atomically (readTVar self) | 429 | , clientAddress = \maddr -> atomically $ do |
430 | let var = case flip prefer4or6 Nothing <$> maddr of | ||
431 | Just Want_IP6 -> routing6 routing | ||
432 | _ -> routing4 routing | ||
433 | R.thisNode <$> readTVar var | ||
382 | , clientResponseId = genNonce24 var | 434 | , clientResponseId = genNonce24 var |
383 | } | 435 | } |
384 | if fitsInInt (Proxy :: Proxy Word64) | 436 | if fitsInInt (Proxy :: Proxy Word64) |
@@ -426,12 +478,9 @@ encodePayload typ (TransactionId (Nonce8 tid) nonce) self dest b | |||
426 | decodePayload :: S.Serialize a => Message ByteString -> Either String a | 478 | decodePayload :: S.Serialize a => Message ByteString -> Either String a |
427 | decodePayload msg = S.decode $ dropEnd8 $ msgPayload msg | 479 | decodePayload msg = S.decode $ dropEnd8 $ msgPayload msg |
428 | 480 | ||
429 | handler typ f = Just $ MethodHandler decodePayload (encodePayload typ) f | 481 | type Handler = MethodHandler String TransactionId NodeInfo (Message ByteString) |
430 | 482 | ||
431 | handlers :: Method -> Maybe (MethodHandler String TransactionId NodeInfo (Message ByteString)) | 483 | handler typ f = Just $ MethodHandler decodePayload (encodePayload typ) f |
432 | handlers PingType = handler PingType pingH | ||
433 | handlers GetNodesType = error "find_node" | ||
434 | handlers _ = Nothing | ||
435 | 484 | ||
436 | data Ping = Ping deriving Show | 485 | data Ping = Ping deriving Show |
437 | data Pong = Pong deriving Show | 486 | data Pong = Pong deriving Show |
@@ -471,6 +520,12 @@ instance S.Serialize SendNodes where | |||
471 | pingH :: NodeInfo -> Ping -> IO Pong | 520 | pingH :: NodeInfo -> Ping -> IO Pong |
472 | pingH _ Ping = return Pong | 521 | pingH _ Ping = return Pong |
473 | 522 | ||
523 | prefer4or6 :: NodeInfo -> Maybe WantIP -> WantIP | ||
524 | prefer4or6 addr iptyp = fromMaybe (ipFamily $ nodeIP addr) iptyp | ||
525 | |||
526 | getNodesH :: Routing -> NodeInfo -> GetNodes -> IO SendNodes | ||
527 | getNodesH = error "todo: getNodesH" | ||
528 | |||
474 | intKey :: TransactionId -> Int | 529 | intKey :: TransactionId -> Int |
475 | intKey (TransactionId (Nonce8 w) _) = fromIntegral w | 530 | intKey (TransactionId (Nonce8 w) _) = fromIntegral w |
476 | 531 | ||
diff --git a/TriadCommittee.hs b/TriadCommittee.hs new file mode 100644 index 00000000..3fe3ebe6 --- /dev/null +++ b/TriadCommittee.hs | |||
@@ -0,0 +1,88 @@ | |||
1 | {-# LANGUAGE TupleSections #-} | ||
2 | module TriadCommittee where | ||
3 | |||
4 | import Control.Concurrent.STM | ||
5 | import Control.Monad | ||
6 | import Data.Maybe | ||
7 | |||
8 | |||
9 | data TriadSlot = SlotA | SlotB | SlotC | ||
10 | deriving (Eq,Ord,Enum,Show,Read) | ||
11 | |||
12 | data TriadCommittee voter a = TriadCommittee | ||
13 | { triadDecider :: TVar TriadSlot | ||
14 | , triadA :: TVar (Maybe (voter,a)) | ||
15 | , triadB :: TVar (Maybe (voter,a)) | ||
16 | , triadC :: TVar (Maybe (voter,a)) | ||
17 | , triadNewDecision :: a -> STM () | ||
18 | } | ||
19 | |||
20 | triadSlot :: TriadSlot -> TriadCommittee voter a -> TVar (Maybe (voter,a)) | ||
21 | triadSlot SlotA = triadA | ||
22 | triadSlot SlotB = triadB | ||
23 | triadSlot SlotC = triadC | ||
24 | |||
25 | triadDecision :: a -> TriadCommittee voter a -> STM a | ||
26 | triadDecision fallback triad = do | ||
27 | slot <- readTVar (triadDecider triad) | ||
28 | maybe fallback snd <$> readTVar (triadSlot slot triad) | ||
29 | |||
30 | |||
31 | newTriadCommittee :: (a -> STM ()) -> STM (TriadCommittee voter a) | ||
32 | newTriadCommittee onChange = | ||
33 | TriadCommittee <$> newTVar SlotA | ||
34 | <*> newTVar Nothing | ||
35 | <*> newTVar Nothing | ||
36 | <*> newTVar Nothing | ||
37 | <*> pure onChange | ||
38 | |||
39 | |||
40 | triadCountVotes :: Eq a => Maybe a -> TriadCommittee voter a -> STM () | ||
41 | triadCountVotes prior triad = do | ||
42 | a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad) | ||
43 | b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad) | ||
44 | c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad) | ||
45 | let (slot,vote) = case catMaybes [a,b,c] of | ||
46 | [ (x,xvote) | ||
47 | , (y,yvote) | ||
48 | , (z,zvote) ] -> if xvote == yvote then (x,Just xvote) | ||
49 | else (z,Just zvote) | ||
50 | [] -> (SlotA,Nothing) | ||
51 | ((slot,vote):_) -> (slot, Just vote) | ||
52 | writeTVar (triadDecider triad) slot | ||
53 | case vote of | ||
54 | Just v | vote /= prior -> triadNewDecision triad v | ||
55 | _ -> return () | ||
56 | |||
57 | |||
58 | addVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> a -> STM () | ||
59 | addVote triad voter vote = do | ||
60 | a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) | ||
61 | b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) | ||
62 | c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) | ||
63 | let avail (_,Nothing) = True | ||
64 | avail (_,Just x ) = (x == voter) | ||
65 | slots = filter avail [a,b,c] | ||
66 | forM_ (take 1 slots) $ \(slot,_) -> do | ||
67 | prior <- do | ||
68 | slotp <- readTVar (triadDecider triad) | ||
69 | fmap snd <$> readTVar (triadSlot slotp triad) | ||
70 | writeTVar (triadSlot slot triad) | ||
71 | (Just (voter,vote)) | ||
72 | triadCountVotes prior triad | ||
73 | |||
74 | |||
75 | delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM () | ||
76 | delVote triad voter = do | ||
77 | a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) | ||
78 | b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) | ||
79 | c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) | ||
80 | let match (_,Just x ) = (x == voter) | ||
81 | slots = filter match [a,b,c] | ||
82 | forM_ (take 1 slots) $ \(slot,_) -> do | ||
83 | prior <- do | ||
84 | slotp <- readTVar (triadDecider triad) | ||
85 | fmap snd <$> readTVar (triadSlot slotp triad) | ||
86 | writeTVar (triadSlot slot triad) Nothing | ||
87 | triadCountVotes prior triad | ||
88 | |||