summaryrefslogtreecommitdiff
path: root/Mainline.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Mainline.hs')
-rw-r--r--Mainline.hs265
1 files changed, 204 insertions, 61 deletions
diff --git a/Mainline.hs b/Mainline.hs
index 76e914b3..911f0dbc 100644
--- a/Mainline.hs
+++ b/Mainline.hs
@@ -16,49 +16,69 @@ import Control.Arrow
16import Control.Concurrent.STM 16import Control.Concurrent.STM
17import Control.Monad 17import Control.Monad
18import Crypto.Random 18import Crypto.Random
19import Data.BEncode as BE 19import Data.BEncode as BE
20import qualified Data.BEncode.BDict as BE 20import qualified Data.BEncode.BDict as BE
21 ;import Data.BEncode.BDict (BKey) 21 ;import Data.BEncode.BDict (BKey)
22import Data.BEncode.Types (BDict) 22import Data.BEncode.Pretty
23import Data.BEncode.Types (BDict)
23import Data.Bits 24import Data.Bits
24import Data.Bits.ByteString 25import Data.Bits.ByteString
25import Data.Bool 26import Data.Bool
26import qualified Data.ByteArray as BA 27import qualified Data.ByteArray as BA
27 ;import Data.ByteArray (ByteArrayAccess) 28 ;import Data.ByteArray (ByteArrayAccess)
28import qualified Data.ByteString as B 29import qualified Data.ByteString as B
29 ;import Data.ByteString (ByteString) 30 ;import Data.ByteString (ByteString)
30import Data.ByteString.Lazy (toStrict) 31import qualified Data.ByteString.Base16 as Base16
32import qualified Data.ByteString.Char8 as Char8
33import Data.ByteString.Lazy (toStrict)
34import qualified Data.ByteString.Lazy.Char8 as L8
31import Data.Coerce 35import Data.Coerce
32import Data.Data 36import Data.Data
33import Data.Default 37import Data.Default
38import Data.Digest.CRC32C
39import Data.Function (fix)
34import Data.Hashable 40import Data.Hashable
35import Data.IP 41import Data.IP
36import Data.List 42import Data.List
37import Data.Maybe 43import Data.Maybe
38import Data.Monoid 44import Data.Monoid
39import Data.Ord 45import Data.Ord
40import qualified Data.Serialize as S 46import qualified Data.Serialize as S
41import Data.Set (Set) 47import Data.Set (Set)
42import Data.Time.Clock.POSIX (POSIXTime) 48import Data.Time.Clock.POSIX (POSIXTime)
43import Data.Torrent 49import Data.Torrent
44import Data.Typeable 50import Data.Typeable
45import Data.Word 51import Data.Word
46import qualified Data.Wrapper.PSQInt as Int 52import qualified Data.Wrapper.PSQInt as Int
53import Debug.Trace
47import Kademlia 54import Kademlia
48import Network.Address (Address, fromSockAddr, setPort, 55import Network.Address (Address, fromAddr, fromSockAddr,
49 sockAddrPort, testIdBit, toSockAddr) 56 setPort, sockAddrPort, testIdBit,
50import Network.BitTorrent.DHT.ContactInfo as Peers 57 toSockAddr)
51import Network.BitTorrent.DHT.Search (Search (..)) 58import Network.BitTorrent.DHT.ContactInfo as Peers
52import Network.BitTorrent.DHT.Token as Token 59import Network.BitTorrent.DHT.Search (Search (..))
53import qualified Network.DHT.Routing as R 60import Network.BitTorrent.DHT.Token as Token
54 ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) 61import Network.DatagramServer.Types (genBucketSample')
62import qualified Network.DHT.Routing as R
63 ;import Network.DHT.Routing (Info, Timestamp, getTimestamp)
55import Network.QueryResponse 64import Network.QueryResponse
56import Network.Socket 65import Network.Socket
66import System.IO
57import System.IO.Error 67import System.IO.Error
58import System.IO.Unsafe (unsafeInterleaveIO) 68import System.IO.Unsafe (unsafeInterleaveIO)
69#ifdef THREAD_DEBUG
70import Control.Concurrent.Lifted.Instrument
71#else
72import Control.Concurrent.Lifted
73import GHC.Conc (labelThread)
74#endif
75import Control.Exception (SomeException(..),handle)
59 76
60newtype NodeId = NodeId ByteString 77newtype NodeId = NodeId ByteString
61 deriving (Eq,Ord,Show,ByteArrayAccess, BEncode, Bits, Hashable) 78 deriving (Eq,Ord,ByteArrayAccess, BEncode, Bits, Hashable)
79
80instance Show NodeId where
81 show (NodeId bs) = Char8.unpack $ Base16.encode bs
62 82
63instance S.Serialize NodeId where 83instance S.Serialize NodeId where
64 get = NodeId <$> S.getBytes 20 84 get = NodeId <$> S.getBytes 20
@@ -84,6 +104,13 @@ instance Hashable NodeInfo where
84 {-# INLINE hashWithSalt #-} 104 {-# INLINE hashWithSalt #-}
85 105
86 106
107instance Show NodeInfo where
108 show (NodeInfo (NodeId nid) ip port) =
109 Char8.unpack (Base16.encode nid) ++ "@" ++ show ip' ++ ":" ++ show port
110 where
111 ip' | IPv6 ip6 <- ip
112 , Just ip4 <- un4map ip6 = IPv4 ip4
113 | otherwise = ip
87 114
88{- 115{-
89 116
@@ -188,9 +215,20 @@ data Message a = Q { msgOrigin :: NodeId
188 , rspPayload :: Either Error a 215 , rspPayload :: Either Error a
189 , rspReflectedIP :: Maybe SockAddr } 216 , rspReflectedIP :: Maybe SockAddr }
190 217
218showBE bval = L8.unpack (showBEncode bval)
219
191instance BE.BEncode (Message BValue) where 220instance BE.BEncode (Message BValue) where
192 toBEncode = encodeMessage 221 toBEncode m = encodeMessage m
193 fromBEncode = decodeMessage 222 {-
223 in case m of
224 Q {} -> trace ("encoded(query): "++showBE r) r
225 R {} -> trace ("encoded(response): "++showBE r) r -}
226 fromBEncode bval = decodeMessage bval
227 {-
228 in case r of
229 Left e -> trace (show e) r
230 Right (Q {}) -> trace ("decoded(query): "++showBE bval) r
231 Right (R {}) -> trace ("decoded(response): "++showBE bval) r -}
194 232
195decodeMessage :: BValue -> Either String (Message BValue) 233decodeMessage :: BValue -> Either String (Message BValue)
196decodeMessage = fromDict $ do 234decodeMessage = fromDict $ do
@@ -238,14 +276,19 @@ encodeMessage (R origin tid v ip)
238 276
239encodeAddr :: SockAddr -> ByteString 277encodeAddr :: SockAddr -> ByteString
240encodeAddr (SockAddrInet port addr) 278encodeAddr (SockAddrInet port addr)
241 = S.runPut (S.putWord32host addr >> S.put (fromIntegral port :: Word16)) 279 = S.runPut (S.putWord32host addr >> S.putWord16be (fromIntegral port))
242encodeAddr (SockAddrInet6 port _ addr _) 280encodeAddr saddr@(SockAddrInet6 port _ addr _)
243 = S.runPut (S.put addr >> S.put (fromIntegral port :: Word16)) 281 | Just ip4 <- (fromSockAddr saddr >>= un4map) = encodeAddr (setPort port $ toSockAddr ip4)
244encodeAddr _ = B.empty 282 | otherwise = S.runPut (S.put addr >> S.putWord16be (fromIntegral port))
283encodeAddr _ = B.empty
245 284
246decodeAddr :: ByteString -> Either String SockAddr 285decodeAddr :: ByteString -> Either String SockAddr
247decodeAddr = S.runGet $ do 286decodeAddr bs = S.runGet g bs
248 error "decodeAddr" 287 where
288 g | (B.length bs == 6) = flip SockAddrInet <$> S.getWord32host <*> (fromIntegral <$> S.getWord16be)
289 | otherwise = do host <- S.get -- TODO: Is this right?
290 port <- fromIntegral <$> S.getWord16be
291 return $ SockAddrInet6 port 0 host 0
249 292
250genericArgs :: BEncode a => a -> Bool -> BDict 293genericArgs :: BEncode a => a -> Bool -> BDict
251genericArgs nodeid ro = 294genericArgs nodeid ro =
@@ -289,8 +332,12 @@ classify :: Message BValue -> MessageClass String Method TransactionId
289classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid 332classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid
290classify (R { msgID = tid }) = IsResponse tid 333classify (R { msgID = tid }) = IsResponse tid
291 334
292encodePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue 335encodeResponsePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue
293encodePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest) 336encodeResponsePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest)
337
338encodeQueryPayload :: BEncode a =>
339 Method -> Bool -> TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue
340encodeQueryPayload meth isReadonly tid self dest b = Q (nodeId self) tid (BE.toBEncode b) meth isReadonly
294 341
295errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a 342errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a
296errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest) 343errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest)
@@ -304,7 +351,7 @@ handler :: ( BEncode a
304 , BEncode b 351 , BEncode b
305 ) => 352 ) =>
306 (NodeInfo -> a -> IO b) -> Maybe Handler 353 (NodeInfo -> a -> IO b) -> Maybe Handler
307handler f = Just $ MethodHandler decodePayload encodePayload f 354handler f = Just $ MethodHandler decodePayload encodeResponsePayload f
308 355
309 356
310handlerE :: ( BEncode a 357handlerE :: ( BEncode a
@@ -314,7 +361,7 @@ handlerE :: ( BEncode a
314handlerE f = Just $ MethodHandler decodePayload enc f 361handlerE f = Just $ MethodHandler decodePayload enc f
315 where 362 where
316 enc tid self dest (Left e) = errorPayload tid self dest e 363 enc tid self dest (Left e) = errorPayload tid self dest e
317 enc tid self dest (Right b) = encodePayload tid self dest b 364 enc tid self dest (Right b) = encodeResponsePayload tid self dest b
318 365
319type AnnounceSet = Set (InfoHash, PortNumber) 366type AnnounceSet = Set (InfoHash, PortNumber)
320 367
@@ -344,6 +391,13 @@ data Routing = Routing
344 , committee6 :: TriadCommittee NodeId SockAddr 391 , committee6 :: TriadCommittee NodeId SockAddr
345 } 392 }
346 393
394traced :: Show tid => TableMethods t tid -> TableMethods t tid
395traced (TableMethods ins del lkup)
396 = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t)
397 (\tid t -> trace ("del "++show tid) $ del tid t)
398 (\tid t -> trace ("lookup "++show tid) $ lkup tid t)
399
400
347type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) 401type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue)
348 402
349newClient :: SockAddr -> IO (MainlineClient, Routing) 403newClient :: SockAddr -> IO (MainlineClient, Routing)
@@ -355,24 +409,34 @@ newClient addr = do
355 , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr 409 , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr
356 , nodePort = fromMaybe 0 $ sockAddrPort addr 410 , nodePort = fromMaybe 0 $ sockAddrPort addr
357 } 411 }
412 addr4 <- atomically $ newTChan
413 addr6 <- atomically $ newTChan
414 fork $ fix $ \again -> do
415 myThreadId >>= flip labelThread "addr6"
416 addr <- atomically $ readTChan addr6
417 hPutStrLn stderr $ "External IPv6: "++show addr
418 again
358 routing <- atomically $ do 419 routing <- atomically $ do
359 let nobkts = R.defaultBucketCount :: Int 420 let nobkts = R.defaultBucketCount :: Int
360 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts 421 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts
361 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts 422 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts
362 committee4 <- newTriadCommittee (const $ return ()) -- TODO: update tbl4 423 committee4 <- newTriadCommittee $ \a -> do
363 committee6 <- newTriadCommittee (const $ return ()) -- TODO: update tbl6 424 t4 <- readTVar tbl4
425 case bep42 a (nodeId $ R.thisNode t4) of
426 Just nid -> do
427 let tbl = R.nullTable (comparing nodeId)
428 (\s -> hashWithSalt s . nodeId)
429 (NodeInfo nid
430 (fromMaybe (toEnum 0) $ fromSockAddr a)
431 (fromMaybe 0 $ sockAddrPort a))
432 nobkts
433 writeTVar tbl4 tbl
434 writeTChan addr4 (a,map fst $ concat $ R.toList t4)
435 Nothing -> return ()
436 committee6 <- newTriadCommittee (writeTChan addr6) -- TODO: update tbl6
364 sched4 <- newTVar Int.empty 437 sched4 <- newTVar Int.empty
365 sched6 <- newTVar Int.empty 438 sched6 <- newTVar Int.empty
366 return $ Routing tenative_info sched4 tbl4 committee4 sched6 tbl6 committee6 439 return $ Routing tenative_info sched4 tbl4 committee4 sched6 tbl6 committee6
367 -- TODO: Provide some means of shutting down these two auxillary threads:
368 refresh_thread4 <- forkPollForRefresh
369 (15*60)
370 (sched4 routing)
371 (refreshBucket nodeSearch (routing4 routing) (nodeId tenative_info))
372 refresh_thread6 <- forkPollForRefresh
373 (15*60)
374 (sched6 routing)
375 (refreshBucket nodeSearch (routing6 routing) (nodeId tenative_info))
376 swarms <- newSwarmsDatabase 440 swarms <- newSwarmsDatabase
377 map_var <- atomically $ newTVar (0, mempty) 441 map_var <- atomically $ newTVar (0, mempty)
378 let net = onInbound (updateRouting outgoingClient routing) 442 let net = onInbound (updateRouting outgoingClient routing)
@@ -406,7 +470,7 @@ newClient addr = do
406 client = Client 470 client = Client
407 { clientNet = net 471 { clientNet = net
408 , clientDispatcher = dispatch 472 , clientDispatcher = dispatch
409 , clientErrorReporter = ignoreErrors -- TODO 473 , clientErrorReporter = printErrors stderr
410 , clientPending = map_var 474 , clientPending = map_var
411 , clientAddress = \maddr -> atomically $ do 475 , clientAddress = \maddr -> atomically $ do
412 let var = case flip prefer4or6 Nothing <$> maddr of 476 let var = case flip prefer4or6 Nothing <$> maddr of
@@ -416,8 +480,50 @@ newClient addr = do
416 , clientResponseId = return 480 , clientResponseId = return
417 } 481 }
418 482
483 fork $ fix $ \again -> do
484 myThreadId >>= flip labelThread "addr4"
485 (addr, ns) <- atomically $ readTChan addr4
486 hPutStrLn stderr $ "External IPv4: "++show (addr, length ns)
487 forM_ ns $ \n -> do
488 hPutStrLn stderr $ "Change IP, ping: "++show n
489 ping outgoingClient n
490 again
491
492 -- TODO: Provide some means of shutting down these two auxillary threads:
493 refresh_thread4 <- forkPollForRefresh
494 (15*60)
495 (sched4 routing)
496 (refreshBucket (nodeSearch client) (routing4 routing) (nodeId tenative_info))
497 refresh_thread6 <- forkPollForRefresh
498 (15*60)
499 (sched6 routing)
500 (refreshBucket (nodeSearch client) (routing6 routing) (nodeId tenative_info))
501
419 return (client, routing) 502 return (client, routing)
420 503
504-- | Modifies a purely random 'NodeId' to one that is related to a given
505-- routable address in accordance with BEP 42.
506bep42 :: SockAddr -> NodeId -> Maybe NodeId
507bep42 addr (NodeId r)
508 | Just ip <- fmap S.encode (fromSockAddr addr :: Maybe IPv4)
509 <|> fmap S.encode (fromSockAddr addr :: Maybe IPv6)
510 = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0)
511 | otherwise
512 = Nothing
513 where
514 ip4mask = "\x03\x0f\x3f\xff" :: ByteString
515 ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString
516 nbhood_select = (B.last r :: Word8) .&. 7
517 nodeIdSize = 20
518 retr n = pure $ B.drop (nodeIdSize - n) $ S.encode r
519 crc = (`B.append` B.replicate 16 0) . S.encode . crc32c . B.pack
520 applyMask ip = case B.zipWith (.&.) msk ip of
521 (b:bs) -> (b .|. shiftL nbhood_select 5) : bs
522 bs -> bs
523 where msk | B.length ip == 4 = ip4mask
524 | otherwise = ip6mask
525
526
421defaultHandler :: ByteString -> Handler 527defaultHandler :: ByteString -> Handler
422defaultHandler meth = MethodHandler decodePayload errorPayload returnError 528defaultHandler meth = MethodHandler decodePayload errorPayload returnError
423 where 529 where
@@ -432,7 +538,9 @@ mainlineKademlia client committee var sched
432 { tblTransition = \tr -> do 538 { tblTransition = \tr -> do
433 io1 <- transitionCommittee committee tr 539 io1 <- transitionCommittee committee tr
434 io2 <- touchBucket mainlineSpace (15*60) var sched tr 540 io2 <- touchBucket mainlineSpace (15*60) var sched tr
435 return $ io1 >> io2 541 return $ do
542 io1 >> io2
543 hPutStrLn stderr ("Buckets: "++show tr)
436 } 544 }
437 545
438 546
@@ -446,7 +554,8 @@ mainlineSpace = R.KademliaSpace
446transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ()) 554transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ())
447transitionCommittee committee (RoutingTransition ni Stranger) = do 555transitionCommittee committee (RoutingTransition ni Stranger) = do
448 delVote committee (nodeId ni) 556 delVote committee (nodeId ni)
449 return $ return () 557 return $ do
558 hPutStrLn stderr $ "delVote "++show (nodeId ni)
450transitionCommittee committee _ = return $ return () 559transitionCommittee committee _ = return $ return ()
451 560
452updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () 561updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO ()
@@ -458,7 +567,9 @@ updateRouting client routing naddr msg = do
458 go tbl committee sched = do 567 go tbl committee sched = do
459 case msg of 568 case msg of
460 R { rspReflectedIP = Just sockaddr } 569 R { rspReflectedIP = Just sockaddr }
461 -> atomically $ addVote committee (nodeId naddr) sockaddr 570 -> do
571 -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr)
572 atomically $ addVote committee (nodeId naddr) sockaddr
462 _ -> return () 573 _ -> return ()
463 insertNode (mainlineKademlia client committee tbl sched) naddr 574 insertNode (mainlineKademlia client committee tbl sched) naddr
464 575
@@ -517,8 +628,9 @@ instance BEncode NodeFound where
517 628
518 fromBEncode bval = NodeFound <$> ns4 <*> ns6 629 fromBEncode bval = NodeFound <$> ns4 <*> ns6
519 where 630 where
520 ns4 = fromDict (binary getNodeInfo4 nodes_key) bval 631 opt ns = fromMaybe [] <$> optional ns
521 ns6 = fromDict (binary getNodeInfo6 nodes6_key) bval 632 ns4 = opt $ fromDict (binary getNodeInfo4 nodes_key) bval
633 ns6 = opt $ fromDict (binary getNodeInfo6 nodes6_key) bval
522 634
523binary :: S.Get a -> BKey -> BE.Get [a] 635binary :: S.Get a -> BKey -> BE.Get [a]
524binary get k = field (req k) >>= either (fail . format) return . 636binary get k = field (req k) >>= either (fail . format) return .
@@ -752,16 +864,33 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do
752 } 864 }
753 return Announced 865 return Announced
754 866
867isReadonlyClient client = False -- TODO
868
755ping :: MainlineClient -> NodeInfo -> IO Bool 869ping :: MainlineClient -> NodeInfo -> IO Bool
756ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr 870ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr
757 where 871 where
758 serializer = MethodSerializer 872 serializer = MethodSerializer
759 { methodTimeout = 5 873 { methodTimeout = 5
760 , method = Method "ping" 874 , method = Method "ping"
761 , wrapQuery = encodePayload 875 , wrapQuery = encodeQueryPayload (Method "ping") (isReadonlyClient client)
762 , unwrapResponse = const True 876 , unwrapResponse = const True
763 } 877 }
764 878
879-- searchQuery :: ni -> IO ([ni], [r])
880getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO ([NodeInfo],[NodeInfo])
881getNodes client nid addr =
882 fromMaybe ([],[]) <$> sendQuery client serializer (FindNode nid (Just Want_Both)) addr
883 where
884 serializer = MethodSerializer
885 { methodTimeout = 5
886 , method = Method "find_node"
887 , wrapQuery = encodeQueryPayload (Method "find_node") (isReadonlyClient client)
888 , unwrapResponse = \case
889 R { rspPayload = Right bval } | Right (NodeFound ns4 ns6) <- BE.fromBEncode bval
890 -> (ns4++ns6, ns4++ns6)
891 _ -> ([],[])
892 }
893
765data TriadSlot = SlotA | SlotB | SlotC 894data TriadSlot = SlotA | SlotB | SlotC
766 deriving (Eq,Ord,Enum,Show,Read) 895 deriving (Eq,Ord,Enum,Show,Read)
767 896
@@ -792,11 +921,8 @@ newTriadCommittee onChange =
792 <*> newTVar Nothing 921 <*> newTVar Nothing
793 <*> pure onChange 922 <*> pure onChange
794 923
795triadCountVotes :: Eq a => TriadCommittee voter a -> STM () 924triadCountVotes :: Eq a => Maybe a -> TriadCommittee voter a -> STM ()
796triadCountVotes triad = do 925triadCountVotes prior triad = do
797 prior <- do
798 slot <- readTVar (triadDecider triad)
799 fmap snd <$> readTVar (triadSlot slot triad)
800 a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad) 926 a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad)
801 b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad) 927 b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad)
802 c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad) 928 c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad)
@@ -822,9 +948,12 @@ addVote triad voter vote = do
822 avail (_,Just x ) = (x == voter) 948 avail (_,Just x ) = (x == voter)
823 slots = filter avail [a,b,c] 949 slots = filter avail [a,b,c]
824 forM_ (take 1 slots) $ \(slot,_) -> do 950 forM_ (take 1 slots) $ \(slot,_) -> do
951 prior <- do
952 slotp <- readTVar (triadDecider triad)
953 fmap snd <$> readTVar (triadSlot slotp triad)
825 writeTVar (triadSlot slot triad) 954 writeTVar (triadSlot slot triad)
826 (Just (voter,vote)) 955 (Just (voter,vote))
827 triadCountVotes triad 956 triadCountVotes prior triad
828 957
829delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM () 958delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM ()
830delVote triad voter = do 959delVote triad voter = do
@@ -834,13 +963,27 @@ delVote triad voter = do
834 let match (_,Just x ) = (x == voter) 963 let match (_,Just x ) = (x == voter)
835 slots = filter match [a,b,c] 964 slots = filter match [a,b,c]
836 forM_ (take 1 slots) $ \(slot,_) -> do 965 forM_ (take 1 slots) $ \(slot,_) -> do
966 prior <- do
967 slotp <- readTVar (triadDecider triad)
968 fmap snd <$> readTVar (triadSlot slotp triad)
837 writeTVar (triadSlot slot triad) Nothing 969 writeTVar (triadSlot slot triad) Nothing
838 triadCountVotes triad 970 triadCountVotes prior triad
839 971
840nodeSearch = Search 972nodeSearch client = Search
841 { searchSpace = mainlineSpace 973 { searchSpace = mainlineSpace
842 , searchNodeAddress = nodeIP &&& nodePort 974 , searchNodeAddress = nodeIP &&& nodePort
843 , searchQuery = error "searchQuery" 975 , searchQuery = \nid ni -> do
976 hPutStrLn stderr $ "findNodes "++show nid++" --> "++show ni
977 handle (\(SomeException e) -> do
978 hPutStrLn stderr $ "got "++show e
979 -- threadDelay 1000000
980 return ([],[]))
981 $ do
982 (xs,y) <- getNodes client nid ni
983 forM_ xs $ \x -> do
984 hPutStrLn stderr $ "got "++show x
985 -- threadDelay 1000000
986 return (xs,y)
844 } 987 }
845 988
846-- | List of bootstrap nodes maintained by different bittorrent 989-- | List of bootstrap nodes maintained by different bittorrent