diff options
Diffstat (limited to 'Mainline.hs')
-rw-r--r-- | Mainline.hs | 265 |
1 files changed, 204 insertions, 61 deletions
diff --git a/Mainline.hs b/Mainline.hs index 76e914b3..911f0dbc 100644 --- a/Mainline.hs +++ b/Mainline.hs | |||
@@ -16,49 +16,69 @@ import Control.Arrow | |||
16 | import Control.Concurrent.STM | 16 | import Control.Concurrent.STM |
17 | import Control.Monad | 17 | import Control.Monad |
18 | import Crypto.Random | 18 | import Crypto.Random |
19 | import Data.BEncode as BE | 19 | import Data.BEncode as BE |
20 | import qualified Data.BEncode.BDict as BE | 20 | import qualified Data.BEncode.BDict as BE |
21 | ;import Data.BEncode.BDict (BKey) | 21 | ;import Data.BEncode.BDict (BKey) |
22 | import Data.BEncode.Types (BDict) | 22 | import Data.BEncode.Pretty |
23 | import Data.BEncode.Types (BDict) | ||
23 | import Data.Bits | 24 | import Data.Bits |
24 | import Data.Bits.ByteString | 25 | import Data.Bits.ByteString |
25 | import Data.Bool | 26 | import Data.Bool |
26 | import qualified Data.ByteArray as BA | 27 | import qualified Data.ByteArray as BA |
27 | ;import Data.ByteArray (ByteArrayAccess) | 28 | ;import Data.ByteArray (ByteArrayAccess) |
28 | import qualified Data.ByteString as B | 29 | import qualified Data.ByteString as B |
29 | ;import Data.ByteString (ByteString) | 30 | ;import Data.ByteString (ByteString) |
30 | import Data.ByteString.Lazy (toStrict) | 31 | import qualified Data.ByteString.Base16 as Base16 |
32 | import qualified Data.ByteString.Char8 as Char8 | ||
33 | import Data.ByteString.Lazy (toStrict) | ||
34 | import qualified Data.ByteString.Lazy.Char8 as L8 | ||
31 | import Data.Coerce | 35 | import Data.Coerce |
32 | import Data.Data | 36 | import Data.Data |
33 | import Data.Default | 37 | import Data.Default |
38 | import Data.Digest.CRC32C | ||
39 | import Data.Function (fix) | ||
34 | import Data.Hashable | 40 | import Data.Hashable |
35 | import Data.IP | 41 | import Data.IP |
36 | import Data.List | 42 | import Data.List |
37 | import Data.Maybe | 43 | import Data.Maybe |
38 | import Data.Monoid | 44 | import Data.Monoid |
39 | import Data.Ord | 45 | import Data.Ord |
40 | import qualified Data.Serialize as S | 46 | import qualified Data.Serialize as S |
41 | import Data.Set (Set) | 47 | import Data.Set (Set) |
42 | import Data.Time.Clock.POSIX (POSIXTime) | 48 | import Data.Time.Clock.POSIX (POSIXTime) |
43 | import Data.Torrent | 49 | import Data.Torrent |
44 | import Data.Typeable | 50 | import Data.Typeable |
45 | import Data.Word | 51 | import Data.Word |
46 | import qualified Data.Wrapper.PSQInt as Int | 52 | import qualified Data.Wrapper.PSQInt as Int |
53 | import Debug.Trace | ||
47 | import Kademlia | 54 | import Kademlia |
48 | import Network.Address (Address, fromSockAddr, setPort, | 55 | import Network.Address (Address, fromAddr, fromSockAddr, |
49 | sockAddrPort, testIdBit, toSockAddr) | 56 | setPort, sockAddrPort, testIdBit, |
50 | import Network.BitTorrent.DHT.ContactInfo as Peers | 57 | toSockAddr) |
51 | import Network.BitTorrent.DHT.Search (Search (..)) | 58 | import Network.BitTorrent.DHT.ContactInfo as Peers |
52 | import Network.BitTorrent.DHT.Token as Token | 59 | import Network.BitTorrent.DHT.Search (Search (..)) |
53 | import qualified Network.DHT.Routing as R | 60 | import Network.BitTorrent.DHT.Token as Token |
54 | ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) | 61 | import Network.DatagramServer.Types (genBucketSample') |
62 | import qualified Network.DHT.Routing as R | ||
63 | ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) | ||
55 | import Network.QueryResponse | 64 | import Network.QueryResponse |
56 | import Network.Socket | 65 | import Network.Socket |
66 | import System.IO | ||
57 | import System.IO.Error | 67 | import System.IO.Error |
58 | import System.IO.Unsafe (unsafeInterleaveIO) | 68 | import System.IO.Unsafe (unsafeInterleaveIO) |
69 | #ifdef THREAD_DEBUG | ||
70 | import Control.Concurrent.Lifted.Instrument | ||
71 | #else | ||
72 | import Control.Concurrent.Lifted | ||
73 | import GHC.Conc (labelThread) | ||
74 | #endif | ||
75 | import Control.Exception (SomeException(..),handle) | ||
59 | 76 | ||
60 | newtype NodeId = NodeId ByteString | 77 | newtype NodeId = NodeId ByteString |
61 | deriving (Eq,Ord,Show,ByteArrayAccess, BEncode, Bits, Hashable) | 78 | deriving (Eq,Ord,ByteArrayAccess, BEncode, Bits, Hashable) |
79 | |||
80 | instance Show NodeId where | ||
81 | show (NodeId bs) = Char8.unpack $ Base16.encode bs | ||
62 | 82 | ||
63 | instance S.Serialize NodeId where | 83 | instance S.Serialize NodeId where |
64 | get = NodeId <$> S.getBytes 20 | 84 | get = NodeId <$> S.getBytes 20 |
@@ -84,6 +104,13 @@ instance Hashable NodeInfo where | |||
84 | {-# INLINE hashWithSalt #-} | 104 | {-# INLINE hashWithSalt #-} |
85 | 105 | ||
86 | 106 | ||
107 | instance Show NodeInfo where | ||
108 | show (NodeInfo (NodeId nid) ip port) = | ||
109 | Char8.unpack (Base16.encode nid) ++ "@" ++ show ip' ++ ":" ++ show port | ||
110 | where | ||
111 | ip' | IPv6 ip6 <- ip | ||
112 | , Just ip4 <- un4map ip6 = IPv4 ip4 | ||
113 | | otherwise = ip | ||
87 | 114 | ||
88 | {- | 115 | {- |
89 | 116 | ||
@@ -188,9 +215,20 @@ data Message a = Q { msgOrigin :: NodeId | |||
188 | , rspPayload :: Either Error a | 215 | , rspPayload :: Either Error a |
189 | , rspReflectedIP :: Maybe SockAddr } | 216 | , rspReflectedIP :: Maybe SockAddr } |
190 | 217 | ||
218 | showBE bval = L8.unpack (showBEncode bval) | ||
219 | |||
191 | instance BE.BEncode (Message BValue) where | 220 | instance BE.BEncode (Message BValue) where |
192 | toBEncode = encodeMessage | 221 | toBEncode m = encodeMessage m |
193 | fromBEncode = decodeMessage | 222 | {- |
223 | in case m of | ||
224 | Q {} -> trace ("encoded(query): "++showBE r) r | ||
225 | R {} -> trace ("encoded(response): "++showBE r) r -} | ||
226 | fromBEncode bval = decodeMessage bval | ||
227 | {- | ||
228 | in case r of | ||
229 | Left e -> trace (show e) r | ||
230 | Right (Q {}) -> trace ("decoded(query): "++showBE bval) r | ||
231 | Right (R {}) -> trace ("decoded(response): "++showBE bval) r -} | ||
194 | 232 | ||
195 | decodeMessage :: BValue -> Either String (Message BValue) | 233 | decodeMessage :: BValue -> Either String (Message BValue) |
196 | decodeMessage = fromDict $ do | 234 | decodeMessage = fromDict $ do |
@@ -238,14 +276,19 @@ encodeMessage (R origin tid v ip) | |||
238 | 276 | ||
239 | encodeAddr :: SockAddr -> ByteString | 277 | encodeAddr :: SockAddr -> ByteString |
240 | encodeAddr (SockAddrInet port addr) | 278 | encodeAddr (SockAddrInet port addr) |
241 | = S.runPut (S.putWord32host addr >> S.put (fromIntegral port :: Word16)) | 279 | = S.runPut (S.putWord32host addr >> S.putWord16be (fromIntegral port)) |
242 | encodeAddr (SockAddrInet6 port _ addr _) | 280 | encodeAddr saddr@(SockAddrInet6 port _ addr _) |
243 | = S.runPut (S.put addr >> S.put (fromIntegral port :: Word16)) | 281 | | Just ip4 <- (fromSockAddr saddr >>= un4map) = encodeAddr (setPort port $ toSockAddr ip4) |
244 | encodeAddr _ = B.empty | 282 | | otherwise = S.runPut (S.put addr >> S.putWord16be (fromIntegral port)) |
283 | encodeAddr _ = B.empty | ||
245 | 284 | ||
246 | decodeAddr :: ByteString -> Either String SockAddr | 285 | decodeAddr :: ByteString -> Either String SockAddr |
247 | decodeAddr = S.runGet $ do | 286 | decodeAddr bs = S.runGet g bs |
248 | error "decodeAddr" | 287 | where |
288 | g | (B.length bs == 6) = flip SockAddrInet <$> S.getWord32host <*> (fromIntegral <$> S.getWord16be) | ||
289 | | otherwise = do host <- S.get -- TODO: Is this right? | ||
290 | port <- fromIntegral <$> S.getWord16be | ||
291 | return $ SockAddrInet6 port 0 host 0 | ||
249 | 292 | ||
250 | genericArgs :: BEncode a => a -> Bool -> BDict | 293 | genericArgs :: BEncode a => a -> Bool -> BDict |
251 | genericArgs nodeid ro = | 294 | genericArgs nodeid ro = |
@@ -289,8 +332,12 @@ classify :: Message BValue -> MessageClass String Method TransactionId | |||
289 | classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid | 332 | classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid |
290 | classify (R { msgID = tid }) = IsResponse tid | 333 | classify (R { msgID = tid }) = IsResponse tid |
291 | 334 | ||
292 | encodePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue | 335 | encodeResponsePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue |
293 | encodePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest) | 336 | encodeResponsePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest) |
337 | |||
338 | encodeQueryPayload :: BEncode a => | ||
339 | Method -> Bool -> TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue | ||
340 | encodeQueryPayload meth isReadonly tid self dest b = Q (nodeId self) tid (BE.toBEncode b) meth isReadonly | ||
294 | 341 | ||
295 | errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a | 342 | errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a |
296 | errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest) | 343 | errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest) |
@@ -304,7 +351,7 @@ handler :: ( BEncode a | |||
304 | , BEncode b | 351 | , BEncode b |
305 | ) => | 352 | ) => |
306 | (NodeInfo -> a -> IO b) -> Maybe Handler | 353 | (NodeInfo -> a -> IO b) -> Maybe Handler |
307 | handler f = Just $ MethodHandler decodePayload encodePayload f | 354 | handler f = Just $ MethodHandler decodePayload encodeResponsePayload f |
308 | 355 | ||
309 | 356 | ||
310 | handlerE :: ( BEncode a | 357 | handlerE :: ( BEncode a |
@@ -314,7 +361,7 @@ handlerE :: ( BEncode a | |||
314 | handlerE f = Just $ MethodHandler decodePayload enc f | 361 | handlerE f = Just $ MethodHandler decodePayload enc f |
315 | where | 362 | where |
316 | enc tid self dest (Left e) = errorPayload tid self dest e | 363 | enc tid self dest (Left e) = errorPayload tid self dest e |
317 | enc tid self dest (Right b) = encodePayload tid self dest b | 364 | enc tid self dest (Right b) = encodeResponsePayload tid self dest b |
318 | 365 | ||
319 | type AnnounceSet = Set (InfoHash, PortNumber) | 366 | type AnnounceSet = Set (InfoHash, PortNumber) |
320 | 367 | ||
@@ -344,6 +391,13 @@ data Routing = Routing | |||
344 | , committee6 :: TriadCommittee NodeId SockAddr | 391 | , committee6 :: TriadCommittee NodeId SockAddr |
345 | } | 392 | } |
346 | 393 | ||
394 | traced :: Show tid => TableMethods t tid -> TableMethods t tid | ||
395 | traced (TableMethods ins del lkup) | ||
396 | = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) | ||
397 | (\tid t -> trace ("del "++show tid) $ del tid t) | ||
398 | (\tid t -> trace ("lookup "++show tid) $ lkup tid t) | ||
399 | |||
400 | |||
347 | type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) | 401 | type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) |
348 | 402 | ||
349 | newClient :: SockAddr -> IO (MainlineClient, Routing) | 403 | newClient :: SockAddr -> IO (MainlineClient, Routing) |
@@ -355,24 +409,34 @@ newClient addr = do | |||
355 | , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr | 409 | , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr |
356 | , nodePort = fromMaybe 0 $ sockAddrPort addr | 410 | , nodePort = fromMaybe 0 $ sockAddrPort addr |
357 | } | 411 | } |
412 | addr4 <- atomically $ newTChan | ||
413 | addr6 <- atomically $ newTChan | ||
414 | fork $ fix $ \again -> do | ||
415 | myThreadId >>= flip labelThread "addr6" | ||
416 | addr <- atomically $ readTChan addr6 | ||
417 | hPutStrLn stderr $ "External IPv6: "++show addr | ||
418 | again | ||
358 | routing <- atomically $ do | 419 | routing <- atomically $ do |
359 | let nobkts = R.defaultBucketCount :: Int | 420 | let nobkts = R.defaultBucketCount :: Int |
360 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts | 421 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts |
361 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts | 422 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts |
362 | committee4 <- newTriadCommittee (const $ return ()) -- TODO: update tbl4 | 423 | committee4 <- newTriadCommittee $ \a -> do |
363 | committee6 <- newTriadCommittee (const $ return ()) -- TODO: update tbl6 | 424 | t4 <- readTVar tbl4 |
425 | case bep42 a (nodeId $ R.thisNode t4) of | ||
426 | Just nid -> do | ||
427 | let tbl = R.nullTable (comparing nodeId) | ||
428 | (\s -> hashWithSalt s . nodeId) | ||
429 | (NodeInfo nid | ||
430 | (fromMaybe (toEnum 0) $ fromSockAddr a) | ||
431 | (fromMaybe 0 $ sockAddrPort a)) | ||
432 | nobkts | ||
433 | writeTVar tbl4 tbl | ||
434 | writeTChan addr4 (a,map fst $ concat $ R.toList t4) | ||
435 | Nothing -> return () | ||
436 | committee6 <- newTriadCommittee (writeTChan addr6) -- TODO: update tbl6 | ||
364 | sched4 <- newTVar Int.empty | 437 | sched4 <- newTVar Int.empty |
365 | sched6 <- newTVar Int.empty | 438 | sched6 <- newTVar Int.empty |
366 | return $ Routing tenative_info sched4 tbl4 committee4 sched6 tbl6 committee6 | 439 | return $ Routing tenative_info sched4 tbl4 committee4 sched6 tbl6 committee6 |
367 | -- TODO: Provide some means of shutting down these two auxillary threads: | ||
368 | refresh_thread4 <- forkPollForRefresh | ||
369 | (15*60) | ||
370 | (sched4 routing) | ||
371 | (refreshBucket nodeSearch (routing4 routing) (nodeId tenative_info)) | ||
372 | refresh_thread6 <- forkPollForRefresh | ||
373 | (15*60) | ||
374 | (sched6 routing) | ||
375 | (refreshBucket nodeSearch (routing6 routing) (nodeId tenative_info)) | ||
376 | swarms <- newSwarmsDatabase | 440 | swarms <- newSwarmsDatabase |
377 | map_var <- atomically $ newTVar (0, mempty) | 441 | map_var <- atomically $ newTVar (0, mempty) |
378 | let net = onInbound (updateRouting outgoingClient routing) | 442 | let net = onInbound (updateRouting outgoingClient routing) |
@@ -406,7 +470,7 @@ newClient addr = do | |||
406 | client = Client | 470 | client = Client |
407 | { clientNet = net | 471 | { clientNet = net |
408 | , clientDispatcher = dispatch | 472 | , clientDispatcher = dispatch |
409 | , clientErrorReporter = ignoreErrors -- TODO | 473 | , clientErrorReporter = printErrors stderr |
410 | , clientPending = map_var | 474 | , clientPending = map_var |
411 | , clientAddress = \maddr -> atomically $ do | 475 | , clientAddress = \maddr -> atomically $ do |
412 | let var = case flip prefer4or6 Nothing <$> maddr of | 476 | let var = case flip prefer4or6 Nothing <$> maddr of |
@@ -416,8 +480,50 @@ newClient addr = do | |||
416 | , clientResponseId = return | 480 | , clientResponseId = return |
417 | } | 481 | } |
418 | 482 | ||
483 | fork $ fix $ \again -> do | ||
484 | myThreadId >>= flip labelThread "addr4" | ||
485 | (addr, ns) <- atomically $ readTChan addr4 | ||
486 | hPutStrLn stderr $ "External IPv4: "++show (addr, length ns) | ||
487 | forM_ ns $ \n -> do | ||
488 | hPutStrLn stderr $ "Change IP, ping: "++show n | ||
489 | ping outgoingClient n | ||
490 | again | ||
491 | |||
492 | -- TODO: Provide some means of shutting down these two auxillary threads: | ||
493 | refresh_thread4 <- forkPollForRefresh | ||
494 | (15*60) | ||
495 | (sched4 routing) | ||
496 | (refreshBucket (nodeSearch client) (routing4 routing) (nodeId tenative_info)) | ||
497 | refresh_thread6 <- forkPollForRefresh | ||
498 | (15*60) | ||
499 | (sched6 routing) | ||
500 | (refreshBucket (nodeSearch client) (routing6 routing) (nodeId tenative_info)) | ||
501 | |||
419 | return (client, routing) | 502 | return (client, routing) |
420 | 503 | ||
504 | -- | Modifies a purely random 'NodeId' to one that is related to a given | ||
505 | -- routable address in accordance with BEP 42. | ||
506 | bep42 :: SockAddr -> NodeId -> Maybe NodeId | ||
507 | bep42 addr (NodeId r) | ||
508 | | Just ip <- fmap S.encode (fromSockAddr addr :: Maybe IPv4) | ||
509 | <|> fmap S.encode (fromSockAddr addr :: Maybe IPv6) | ||
510 | = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0) | ||
511 | | otherwise | ||
512 | = Nothing | ||
513 | where | ||
514 | ip4mask = "\x03\x0f\x3f\xff" :: ByteString | ||
515 | ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString | ||
516 | nbhood_select = (B.last r :: Word8) .&. 7 | ||
517 | nodeIdSize = 20 | ||
518 | retr n = pure $ B.drop (nodeIdSize - n) $ S.encode r | ||
519 | crc = (`B.append` B.replicate 16 0) . S.encode . crc32c . B.pack | ||
520 | applyMask ip = case B.zipWith (.&.) msk ip of | ||
521 | (b:bs) -> (b .|. shiftL nbhood_select 5) : bs | ||
522 | bs -> bs | ||
523 | where msk | B.length ip == 4 = ip4mask | ||
524 | | otherwise = ip6mask | ||
525 | |||
526 | |||
421 | defaultHandler :: ByteString -> Handler | 527 | defaultHandler :: ByteString -> Handler |
422 | defaultHandler meth = MethodHandler decodePayload errorPayload returnError | 528 | defaultHandler meth = MethodHandler decodePayload errorPayload returnError |
423 | where | 529 | where |
@@ -432,7 +538,9 @@ mainlineKademlia client committee var sched | |||
432 | { tblTransition = \tr -> do | 538 | { tblTransition = \tr -> do |
433 | io1 <- transitionCommittee committee tr | 539 | io1 <- transitionCommittee committee tr |
434 | io2 <- touchBucket mainlineSpace (15*60) var sched tr | 540 | io2 <- touchBucket mainlineSpace (15*60) var sched tr |
435 | return $ io1 >> io2 | 541 | return $ do |
542 | io1 >> io2 | ||
543 | hPutStrLn stderr ("Buckets: "++show tr) | ||
436 | } | 544 | } |
437 | 545 | ||
438 | 546 | ||
@@ -446,7 +554,8 @@ mainlineSpace = R.KademliaSpace | |||
446 | transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ()) | 554 | transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ()) |
447 | transitionCommittee committee (RoutingTransition ni Stranger) = do | 555 | transitionCommittee committee (RoutingTransition ni Stranger) = do |
448 | delVote committee (nodeId ni) | 556 | delVote committee (nodeId ni) |
449 | return $ return () | 557 | return $ do |
558 | hPutStrLn stderr $ "delVote "++show (nodeId ni) | ||
450 | transitionCommittee committee _ = return $ return () | 559 | transitionCommittee committee _ = return $ return () |
451 | 560 | ||
452 | updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () | 561 | updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () |
@@ -458,7 +567,9 @@ updateRouting client routing naddr msg = do | |||
458 | go tbl committee sched = do | 567 | go tbl committee sched = do |
459 | case msg of | 568 | case msg of |
460 | R { rspReflectedIP = Just sockaddr } | 569 | R { rspReflectedIP = Just sockaddr } |
461 | -> atomically $ addVote committee (nodeId naddr) sockaddr | 570 | -> do |
571 | -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr) | ||
572 | atomically $ addVote committee (nodeId naddr) sockaddr | ||
462 | _ -> return () | 573 | _ -> return () |
463 | insertNode (mainlineKademlia client committee tbl sched) naddr | 574 | insertNode (mainlineKademlia client committee tbl sched) naddr |
464 | 575 | ||
@@ -517,8 +628,9 @@ instance BEncode NodeFound where | |||
517 | 628 | ||
518 | fromBEncode bval = NodeFound <$> ns4 <*> ns6 | 629 | fromBEncode bval = NodeFound <$> ns4 <*> ns6 |
519 | where | 630 | where |
520 | ns4 = fromDict (binary getNodeInfo4 nodes_key) bval | 631 | opt ns = fromMaybe [] <$> optional ns |
521 | ns6 = fromDict (binary getNodeInfo6 nodes6_key) bval | 632 | ns4 = opt $ fromDict (binary getNodeInfo4 nodes_key) bval |
633 | ns6 = opt $ fromDict (binary getNodeInfo6 nodes6_key) bval | ||
522 | 634 | ||
523 | binary :: S.Get a -> BKey -> BE.Get [a] | 635 | binary :: S.Get a -> BKey -> BE.Get [a] |
524 | binary get k = field (req k) >>= either (fail . format) return . | 636 | binary get k = field (req k) >>= either (fail . format) return . |
@@ -752,16 +864,33 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do | |||
752 | } | 864 | } |
753 | return Announced | 865 | return Announced |
754 | 866 | ||
867 | isReadonlyClient client = False -- TODO | ||
868 | |||
755 | ping :: MainlineClient -> NodeInfo -> IO Bool | 869 | ping :: MainlineClient -> NodeInfo -> IO Bool |
756 | ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr | 870 | ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr |
757 | where | 871 | where |
758 | serializer = MethodSerializer | 872 | serializer = MethodSerializer |
759 | { methodTimeout = 5 | 873 | { methodTimeout = 5 |
760 | , method = Method "ping" | 874 | , method = Method "ping" |
761 | , wrapQuery = encodePayload | 875 | , wrapQuery = encodeQueryPayload (Method "ping") (isReadonlyClient client) |
762 | , unwrapResponse = const True | 876 | , unwrapResponse = const True |
763 | } | 877 | } |
764 | 878 | ||
879 | -- searchQuery :: ni -> IO ([ni], [r]) | ||
880 | getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO ([NodeInfo],[NodeInfo]) | ||
881 | getNodes client nid addr = | ||
882 | fromMaybe ([],[]) <$> sendQuery client serializer (FindNode nid (Just Want_Both)) addr | ||
883 | where | ||
884 | serializer = MethodSerializer | ||
885 | { methodTimeout = 5 | ||
886 | , method = Method "find_node" | ||
887 | , wrapQuery = encodeQueryPayload (Method "find_node") (isReadonlyClient client) | ||
888 | , unwrapResponse = \case | ||
889 | R { rspPayload = Right bval } | Right (NodeFound ns4 ns6) <- BE.fromBEncode bval | ||
890 | -> (ns4++ns6, ns4++ns6) | ||
891 | _ -> ([],[]) | ||
892 | } | ||
893 | |||
765 | data TriadSlot = SlotA | SlotB | SlotC | 894 | data TriadSlot = SlotA | SlotB | SlotC |
766 | deriving (Eq,Ord,Enum,Show,Read) | 895 | deriving (Eq,Ord,Enum,Show,Read) |
767 | 896 | ||
@@ -792,11 +921,8 @@ newTriadCommittee onChange = | |||
792 | <*> newTVar Nothing | 921 | <*> newTVar Nothing |
793 | <*> pure onChange | 922 | <*> pure onChange |
794 | 923 | ||
795 | triadCountVotes :: Eq a => TriadCommittee voter a -> STM () | 924 | triadCountVotes :: Eq a => Maybe a -> TriadCommittee voter a -> STM () |
796 | triadCountVotes triad = do | 925 | triadCountVotes prior triad = do |
797 | prior <- do | ||
798 | slot <- readTVar (triadDecider triad) | ||
799 | fmap snd <$> readTVar (triadSlot slot triad) | ||
800 | a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad) | 926 | a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad) |
801 | b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad) | 927 | b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad) |
802 | c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad) | 928 | c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad) |
@@ -822,9 +948,12 @@ addVote triad voter vote = do | |||
822 | avail (_,Just x ) = (x == voter) | 948 | avail (_,Just x ) = (x == voter) |
823 | slots = filter avail [a,b,c] | 949 | slots = filter avail [a,b,c] |
824 | forM_ (take 1 slots) $ \(slot,_) -> do | 950 | forM_ (take 1 slots) $ \(slot,_) -> do |
951 | prior <- do | ||
952 | slotp <- readTVar (triadDecider triad) | ||
953 | fmap snd <$> readTVar (triadSlot slotp triad) | ||
825 | writeTVar (triadSlot slot triad) | 954 | writeTVar (triadSlot slot triad) |
826 | (Just (voter,vote)) | 955 | (Just (voter,vote)) |
827 | triadCountVotes triad | 956 | triadCountVotes prior triad |
828 | 957 | ||
829 | delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM () | 958 | delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM () |
830 | delVote triad voter = do | 959 | delVote triad voter = do |
@@ -834,13 +963,27 @@ delVote triad voter = do | |||
834 | let match (_,Just x ) = (x == voter) | 963 | let match (_,Just x ) = (x == voter) |
835 | slots = filter match [a,b,c] | 964 | slots = filter match [a,b,c] |
836 | forM_ (take 1 slots) $ \(slot,_) -> do | 965 | forM_ (take 1 slots) $ \(slot,_) -> do |
966 | prior <- do | ||
967 | slotp <- readTVar (triadDecider triad) | ||
968 | fmap snd <$> readTVar (triadSlot slotp triad) | ||
837 | writeTVar (triadSlot slot triad) Nothing | 969 | writeTVar (triadSlot slot triad) Nothing |
838 | triadCountVotes triad | 970 | triadCountVotes prior triad |
839 | 971 | ||
840 | nodeSearch = Search | 972 | nodeSearch client = Search |
841 | { searchSpace = mainlineSpace | 973 | { searchSpace = mainlineSpace |
842 | , searchNodeAddress = nodeIP &&& nodePort | 974 | , searchNodeAddress = nodeIP &&& nodePort |
843 | , searchQuery = error "searchQuery" | 975 | , searchQuery = \nid ni -> do |
976 | hPutStrLn stderr $ "findNodes "++show nid++" --> "++show ni | ||
977 | handle (\(SomeException e) -> do | ||
978 | hPutStrLn stderr $ "got "++show e | ||
979 | -- threadDelay 1000000 | ||
980 | return ([],[])) | ||
981 | $ do | ||
982 | (xs,y) <- getNodes client nid ni | ||
983 | forM_ xs $ \x -> do | ||
984 | hPutStrLn stderr $ "got "++show x | ||
985 | -- threadDelay 1000000 | ||
986 | return (xs,y) | ||
844 | } | 987 | } |
845 | 988 | ||
846 | -- | List of bootstrap nodes maintained by different bittorrent | 989 | -- | List of bootstrap nodes maintained by different bittorrent |