summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/dhtd.hs9
-rw-r--r--src/Network/BitTorrent/MainlineDHT.hs70
-rw-r--r--src/Network/Kademlia/Bootstrap.hs227
-rw-r--r--src/Network/Tox.hs30
-rw-r--r--src/Network/Tox/DHT/Handlers.hs61
5 files changed, 240 insertions, 157 deletions
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index d1db1938..7d3661e6 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -61,7 +61,7 @@ import Network.Address hiding (NodeId, NodeInfo(..))
61import Network.QueryResponse 61import Network.QueryResponse
62import Network.StreamServer 62import Network.StreamServer
63import Network.Kademlia 63import Network.Kademlia
64import Network.Kademlia.Bootstrap (bootstrap) 64import Network.Kademlia.Bootstrap
65import Network.Kademlia.Search 65import Network.Kademlia.Search
66import qualified Network.BitTorrent.MainlineDHT as Mainline 66import qualified Network.BitTorrent.MainlineDHT as Mainline
67import qualified Network.Tox as Tox 67import qualified Network.Tox as Tox
@@ -1323,7 +1323,12 @@ main = do
1323 case Map.lookup "node" qrys of 1323 case Map.lookup "node" qrys of
1324 Just DHTQuery { qsearch = srch } -> do 1324 Just DHTQuery { qsearch = srch } -> do
1325 case eqT of 1325 case eqT of
1326 Just witness -> bootstrap (isNodesSearch witness srch) bkts ping btSaved fallbackNodes 1326 Just witness -> let strapper = BucketRefresher
1327 { refreshSearch = isNodesSearch witness srch
1328 , refreshBuckets = bkts
1329 , refreshPing = ping
1330 }
1331 in bootstrap strapper btSaved fallbackNodes
1327 _ -> error $ "Missing node-search for "++netname++"." 1332 _ -> error $ "Missing node-search for "++netname++"."
1328 saveNodes netname dht 1333 saveNodes netname dht
1329 Nothing -> return () 1334 Nothing -> return ()
diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs
index c7ef560c..268cacfb 100644
--- a/src/Network/BitTorrent/MainlineDHT.hs
+++ b/src/Network/BitTorrent/MainlineDHT.hs
@@ -6,6 +6,7 @@
6{-# LANGUAGE FlexibleInstances #-} 6{-# LANGUAGE FlexibleInstances #-}
7{-# LANGUAGE GeneralizedNewtypeDeriving #-} 7{-# LANGUAGE GeneralizedNewtypeDeriving #-}
8{-# LANGUAGE LambdaCase #-} 8{-# LANGUAGE LambdaCase #-}
9{-# LANGUAGE NamedFieldPuns #-}
9{-# LANGUAGE PatternSynonyms #-} 10{-# LANGUAGE PatternSynonyms #-}
10{-# LANGUAGE StandaloneDeriving #-} 11{-# LANGUAGE StandaloneDeriving #-}
11{-# LANGUAGE TupleSections #-} 12{-# LANGUAGE TupleSections #-}
@@ -502,14 +503,24 @@ newSwarmsDatabase = do
502 503
503data Routing = Routing 504data Routing = Routing
504 { tentativeId :: NodeInfo 505 { tentativeId :: NodeInfo
505 , sched4 :: !( TVar (Int.PSQ POSIXTime) )
506 , routing4 :: !( TVar (R.BucketList NodeInfo) )
507 , committee4 :: TriadCommittee NodeId SockAddr 506 , committee4 :: TriadCommittee NodeId SockAddr
508 , sched6 :: !( TVar (Int.PSQ POSIXTime) )
509 , routing6 :: !( TVar (R.BucketList NodeInfo) )
510 , committee6 :: TriadCommittee NodeId SockAddr 507 , committee6 :: TriadCommittee NodeId SockAddr
508 , refresher4 :: BucketRefresher NodeId NodeInfo
509 , refresher6 :: BucketRefresher NodeId NodeInfo
511 } 510 }
512 511
512sched4 :: Routing -> TVar (Int.PSQ POSIXTime)
513sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue
514
515sched6 :: Routing -> TVar (Int.PSQ POSIXTime)
516sched6 Routing { refresher6 = BucketRefresher { refreshQueue } } = refreshQueue
517
518routing4 :: Routing -> TVar (R.BucketList NodeInfo)
519routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBuckets
520
521routing6 :: Routing -> TVar (R.BucketList NodeInfo)
522routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets
523
513traced :: Show tid => TableMethods t tid -> TableMethods t tid 524traced :: Show tid => TableMethods t tid -> TableMethods t tid
514traced (TableMethods ins del lkup) 525traced (TableMethods ins del lkup)
515 = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) 526 = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t)
@@ -537,7 +548,7 @@ newClient swarms addr = do
537 <$> global6 548 <$> global6
538 addr4 <- atomically $ newTChan 549 addr4 <- atomically $ newTChan
539 addr6 <- atomically $ newTChan 550 addr6 <- atomically $ newTChan
540 routing <- atomically $ do 551 mkrouting <- atomically $ do
541 let nobkts = R.defaultBucketCount :: Int 552 let nobkts = R.defaultBucketCount :: Int
542 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts 553 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts
543 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts 554 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts
@@ -558,9 +569,22 @@ newClient swarms addr = do
558 committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 569 committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6
559 sched4 <- newTVar Int.empty 570 sched4 <- newTVar Int.empty
560 sched6 <- newTVar Int.empty 571 sched6 <- newTVar Int.empty
561 return $ Routing tentative_info sched4 tbl4 committee4 sched6 tbl6 committee6 572 return $ \client ->
573 let refresher sched bkts = BucketRefresher
574 { refreshInterval = 15 * 60
575 , refreshQueue = sched
576 , refreshSearch = nodeSearch client
577 , refreshBuckets = bkts
578 , refreshPing = ping client
579 }
580 refresher4 = refresher sched4 tbl4
581 refresher6 = refresher sched6 tbl6
582 in Routing tentative_info committee4 committee6 refresher4 refresher6
562 map_var <- atomically $ newTVar (0, mempty) 583 map_var <- atomically $ newTVar (0, mempty)
563 let net = onInbound (updateRouting outgoingClient routing) 584
585 let routing = mkrouting outgoingClient
586
587 net = onInbound (updateRouting outgoingClient routing)
564 $ layerTransport parsePacket encodePacket 588 $ layerTransport parsePacket encodePacket
565 $ udp 589 $ udp
566 590
@@ -625,14 +649,9 @@ newClient swarms addr = do
625 -- TODO: trigger bootstrap ipv6 649 -- TODO: trigger bootstrap ipv6
626 again 650 again
627 651
628 refresh_thread4 <- forkPollForRefresh 652
629 (15*60) 653 refresh_thread4 <- forkPollForRefresh $ refresher4 routing
630 (sched4 routing) 654 refresh_thread6 <- forkPollForRefresh $ refresher6 routing
631 (refreshBucket (nodeSearch client) (routing4 routing))
632 refresh_thread6 <- forkPollForRefresh
633 (15*60)
634 (sched6 routing)
635 (refreshBucket (nodeSearch client) (routing6 routing))
636 655
637 return (client, routing) 656 return (client, routing)
638 657
@@ -676,14 +695,17 @@ defaultHandler meth = MethodHandler decodePayload errorPayload returnError
676 returnError :: NodeInfo -> BValue -> IO Error 695 returnError :: NodeInfo -> BValue -> IO Error
677 returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) 696 returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth)
678 697
679mainlineKademlia :: MainlineClient -> TriadCommittee NodeId SockAddr -> TVar (R.BucketList NodeInfo) -> TVar (Int.PSQ POSIXTime) -> Kademlia NodeId NodeInfo 698mainlineKademlia :: MainlineClient
680mainlineKademlia client committee var sched 699 -> TriadCommittee NodeId SockAddr
700 -> BucketRefresher NodeId NodeInfo
701 -> Kademlia NodeId NodeInfo
702mainlineKademlia client committee refresher
681 = Kademlia quietInsertions 703 = Kademlia quietInsertions
682 mainlineSpace 704 mainlineSpace
683 (vanillaIO var $ ping client) 705 (vanillaIO (refreshBuckets refresher) $ ping client)
684 { tblTransition = \tr -> do 706 { tblTransition = \tr -> do
685 io1 <- transitionCommittee committee tr 707 io1 <- transitionCommittee committee tr
686 io2 <- touchBucket mainlineSpace (15*60) var sched tr 708 io2 <- touchBucket refresher tr
687 return $ do 709 return $ do
688 io1 >> io2 710 io1 >> io2
689 {- noisy (timestamp updates are currently reported as transitions to Accepted) 711 {- noisy (timestamp updates are currently reported as transitions to Accepted)
@@ -712,11 +734,11 @@ transitionCommittee committee _ = return $ return ()
712updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () 734updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO ()
713updateRouting client routing naddr msg = do 735updateRouting client routing naddr msg = do
714 case prefer4or6 naddr Nothing of 736 case prefer4or6 naddr Nothing of
715 Want_IP4 -> go (routing4 routing) (committee4 routing) (sched4 routing) 737 Want_IP4 -> go (committee4 routing) (refresher4 routing)
716 Want_IP6 -> go (routing6 routing) (committee6 routing) (sched6 routing) 738 Want_IP6 -> go (committee6 routing) (refresher6 routing)
717 where 739 where
718 go tbl committee sched = do 740 go committee refresher = do
719 self <- atomically $ R.thisNode <$> readTVar tbl 741 self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher)
720 when (nodeIP self /= nodeIP naddr) $ do 742 when (nodeIP self /= nodeIP naddr) $ do
721 case msg of 743 case msg of
722 R { rspReflectedIP = Just sockaddr } 744 R { rspReflectedIP = Just sockaddr }
@@ -724,7 +746,7 @@ updateRouting client routing naddr msg = do
724 -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr) 746 -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr)
725 atomically $ addVote committee (nodeId naddr) sockaddr 747 atomically $ addVote committee (nodeId naddr) sockaddr
726 _ -> return () 748 _ -> return ()
727 insertNode (mainlineKademlia client committee tbl sched) naddr 749 insertNode (mainlineKademlia client committee refresher) naddr
728 750
729data Ping = Ping deriving Show 751data Ping = Ping deriving Show
730 752
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs
index 283e054a..42bff665 100644
--- a/src/Network/Kademlia/Bootstrap.hs
+++ b/src/Network/Kademlia/Bootstrap.hs
@@ -1,11 +1,14 @@
1{-# LANGUAGE CPP #-} 1{-# LANGUAGE CPP #-}
2{-# LANGUAGE ConstraintKinds #-}
2{-# LANGUAGE DeriveFunctor #-} 3{-# LANGUAGE DeriveFunctor #-}
3{-# LANGUAGE DeriveTraversable #-} 4{-# LANGUAGE DeriveTraversable #-}
4{-# LANGUAGE FlexibleContexts #-} 5{-# LANGUAGE FlexibleContexts #-}
5{-# LANGUAGE GADTs #-} 6{-# LANGUAGE GADTs #-}
6{-# LANGUAGE KindSignatures #-} 7{-# LANGUAGE KindSignatures #-}
8{-# LANGUAGE NamedFieldPuns #-}
7{-# LANGUAGE PartialTypeSignatures #-} 9{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE PatternSynonyms #-} 10{-# LANGUAGE PatternSynonyms #-}
11{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE ScopedTypeVariables #-} 12{-# LANGUAGE ScopedTypeVariables #-}
10module Network.Kademlia.Bootstrap where 13module Network.Kademlia.Bootstrap where
11 14
@@ -42,34 +45,90 @@ import Network.Kademlia.Search
42import Control.Concurrent.Tasks 45import Control.Concurrent.Tasks
43import Network.Kademlia 46import Network.Kademlia
44 47
45 48-- From BEP 05:
46-- | > pollForRefresh interval queue refresh 49--
50-- Each bucket should maintain a "last changed" property to indicate how
51-- "fresh" the contents are.
52--
53-- Note: We will use a "time to next refresh" property instead and store it in
54-- a priority search queue.
55--
56-- When...
57--
58-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
59-- >>> bucketEvents =
60-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
61-- >>>
62-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
63-- >>>
64-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
65-- >>> , Applicant :--> Accepted -- with another node,
66-- >>> ]
67--
68-- the bucket's last changed property should be updated. Buckets
69-- that have not been changed in 15 minutes should be "refreshed." This is done
70-- by picking a random ID in the range of the bucket and performing a
71-- find_nodes search on it.
72--
73-- The only other possible BucketTouchEvents are as follows:
47-- 74--
48-- Fork a refresh loop. Kill the returned thread to terminate it. The 75-- >>> not_handled =
49-- arguments are: a staleness threshold (if a bucket goes this long without 76-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
50-- being touched, a refresh will be triggered), a TVar with the time-to-refresh 77-- >>> -- (Applicant :--> Stranger)
51-- schedule for each bucket, and a refresh action to be forked when a bucket 78-- >>> -- (Applicant :--> Accepted)
52-- excedes the staleness threshold. 79-- >>> , Accepted :--> Applicant -- Never happens
80-- >>> ]
53-- 81--
54-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's 82
55-- refresh time to some time into the future by modifying the 'Int.PSQ' in the 83type SensibleNodeId nid ni =
56-- TVar. 84 ( Show nid
57forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId 85 , Ord nid
58forkPollForRefresh interval psq refresh = do 86 , Ord ni
59 fork $ do 87 , Hashable nid
88 , Hashable ni )
89
90data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
91 { -- | A staleness threshold (if a bucket goes this long without being
92 -- touched, a refresh will be triggered).
93 refreshInterval :: POSIXTime
94 -- | A TVar with the time-to-refresh schedule for each bucket.
95 --
96 -- To "touch" a bucket and prevent it from being refreshed, reschedule
97 -- it's refresh time to some time into the future by modifying the
98 -- 'Int.PSQ' in the TVar. (See 'touchBucket').
99 , refreshQueue :: TVar (Int.PSQ POSIXTime)
100 -- | This is the kademlia node search specification.
101 , refreshSearch :: Search nid addr tok ni ni
102 -- | The current kademlia routing table buckets.
103 , refreshBuckets :: TVar (R.BucketList ni)
104 -- | Action to ping a node. This is used only during initial bootstrap
105 -- to get some nodes in our table. A 'True' result is interpreted as a a
106 -- pong, where 'False' is a non-response.
107 , refreshPing :: ni -> IO Bool
108 }
109
110-- | Fork a refresh loop. Kill the returned thread to terminate it.
111forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId
112forkPollForRefresh BucketRefresher{ refreshInterval
113 , refreshQueue
114 , refreshBuckets
115 , refreshSearch } = fork $ do
60 myThreadId >>= flip labelThread "pollForRefresh" 116 myThreadId >>= flip labelThread "pollForRefresh"
61 fix $ \again -> do 117 fix $ \again -> do
62 join $ atomically $ do 118 join $ atomically $ do
63 nextup <- Int.findMin <$> readTVar psq 119 nextup <- Int.findMin <$> readTVar refreshQueue
64 maybe retry (return . go again) nextup 120 maybe retry (return . go again) nextup
65 where 121 where
122 refresh :: Int -> IO Int
123 refresh = refreshBucket refreshSearch refreshBuckets
124
66 go again ( bktnum :-> refresh_time ) = do 125 go again ( bktnum :-> refresh_time ) = do
67 now <- getPOSIXTime 126 now <- getPOSIXTime
68 case fromEnum (refresh_time - now) of 127 case fromEnum (refresh_time - now) of
69 x | x <= 0 -> do -- Refresh time! 128 x | x <= 0 -> do -- Refresh time!
70 -- Move it to the back of the refresh queue. 129 -- Move it to the back of the refresh queue.
71 atomically $ modifyTVar' psq 130 atomically $ modifyTVar' refreshQueue
72 $ Int.insert bktnum (now + interval) 131 $ Int.insert bktnum (now + refreshInterval)
73 -- Now fork the refresh operation. 132 -- Now fork the refresh operation.
74 -- TODO: We should probably propogate the kill signal to this thread. 133 -- TODO: We should probably propogate the kill signal to this thread.
75 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) 134 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
@@ -79,15 +138,37 @@ forkPollForRefresh interval psq refresh = do
79 seconds -> threadDelay ( seconds * 1000000 ) 138 seconds -> threadDelay ( seconds * 1000000 )
80 again 139 again
81 140
82refreshBucket :: forall nid tok ni addr. 141
83 ( Show nid 142-- | This is a helper to 'refreshBucket' which does some book keeping to decide
84 , Serialize nid 143-- whether or not a bucket is sufficiently refreshed or not. It will return
85 , Ord nid 144-- false when we can terminate a node search.
86 , Ord ni 145checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node.
87 , Ord addr 146 -> TVar (BucketList ni) -- ^ The current routing table.
88 , Hashable nid 147 -> TVar (Set.Set ni) -- ^ In-range nodes found so far.
89 , Hashable ni ) => 148 -> TVar Bool -- ^ The result will also be written here.
90 Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int 149 -> Int -- ^ The bucket number of interest.
150 -> ni -- ^ A newly found node.
151 -> STM Bool
152checkBucketFull space var resultCounter fin n found_node = do
153 let fullcount = R.defaultBucketSize
154 saveit True = writeTVar fin True >> return True
155 saveit _ = return False
156 tbl <- readTVar var
157 let counts = R.shape tbl
158 nid = kademliaLocation space found_node
159 -- Update the result set with every found node that is in the
160 -- bucket of interest.
161 when (n == R.bucketNumber space nid tbl)
162 $ modifyTVar' resultCounter (Set.insert found_node)
163 resultCount <- readTVar resultCounter
164 saveit $ case drop (n - 1) counts of
165 (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going
166 _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going
167 _ -> False -- okay, good enough, let's quit.
168
169
170refreshBucket :: (Hashable a, Hashable t, Ord t, Ord addr, Ord a, Show t) =>
171 Search t addr tok a a -> TVar (BucketList a) -> Int -> IO Int
91refreshBucket sch var n = do 172refreshBucket sch var n = do
92 tbl <- atomically (readTVar var) 173 tbl <- atomically (readTVar var)
93 let count = bktCount tbl 174 let count = bktCount tbl
@@ -100,27 +181,13 @@ refreshBucket sch var n = do
100 (bucketRange n (n + 1 < count)) 181 (bucketRange n (n + 1 < count))
101 fin <- atomically $ newTVar False 182 fin <- atomically $ newTVar False
102 resultCounter <- atomically $ newTVar Set.empty 183 resultCounter <- atomically $ newTVar Set.empty
103 let fullcount = R.defaultBucketSize
104 saveit True = writeTVar fin True >> return True
105 saveit _ = return False
106 checkBucketFull :: ni -> STM Bool
107 checkBucketFull found_node = do
108 tbl <- readTVar var
109 let counts = R.shape tbl
110 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
111 $ modifyTVar' resultCounter (Set.insert found_node)
112 resultCount <- readTVar resultCounter
113 saveit $ case drop (n - 1) counts of
114 (cnt:_) | cnt < fullcount -> True
115 _ | Set.size resultCount < fullcount -> True
116 _ -> False
117 184
118 hPutStrLn stderr $ "Start refresh " ++ show (n,sample) 185 hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
119 186
120 -- Set 15 minute timeout in order to avoid overlapping refreshes. 187 -- Set 15 minute timeout in order to avoid overlapping refreshes.
121 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount 188 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
122 then const $ return True -- Never short-circuit the last bucket. 189 then const $ return True -- Never short-circuit the last bucket.
123 else checkBucketFull 190 else checkBucketFull (searchSpace sch) var resultCounter fin n
124 _ <- timeout (15*60*1000000) $ do 191 _ <- timeout (15*60*1000000) $ do
125 atomically $ searchIsFinished s >>= check 192 atomically $ searchIsFinished s >>= check
126 atomically $ searchCancel s 193 atomically $ searchCancel s
@@ -131,20 +198,11 @@ refreshBucket sch var n = do
131 return $ if b then 1 else c 198 return $ if b then 1 else c
132 return rcount 199 return rcount
133 200
134 201bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
135bootstrap :: 202 BucketRefresher nid ni -> t1 ni -> t ni -> IO ()
136 ( Show nid 203bootstrap BucketRefresher { refreshSearch = sch
137 , Serialize nid 204 , refreshBuckets = var
138 -- , FiniteBits nid 205 , refreshPing = ping } ns ns0 = do
139 , Hashable ni
140 , Hashable nid
141 , Ord ni
142 , Ord addr
143 , Ord nid
144 , Traversable t1
145 , Traversable t
146 ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
147bootstrap sch var ping ns ns0 = do
148 gotPing <- atomically $ newTVar False 206 gotPing <- atomically $ newTVar False
149 207
150 -- First, ping the given nodes so that they are added to 208 -- First, ping the given nodes so that they are added to
@@ -193,6 +251,9 @@ bootstrap sch var ping ns ns0 = do
193 cnt <- refreshBucket sch var num 251 cnt <- refreshBucket sch var num
194 again cnt 252 again cnt
195 253
254-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket
255-- changes. This will typically be invoked from 'tblTransition'.
256--
196-- XXX: This will be redundantly triggered twice upon every node replacement 257-- XXX: This will be redundantly triggered twice upon every node replacement
197-- because we do not currently distinguish between standalone 258-- because we do not currently distinguish between standalone
198-- insertion/deletion events and an insertion/deletion pair constituting 259-- insertion/deletion events and an insertion/deletion pair constituting
@@ -201,55 +262,21 @@ bootstrap sch var ping ns ns0 = do
201-- It might also be better to pass the timestamp of the transition here and 262-- It might also be better to pass the timestamp of the transition here and
202-- keep the refresh queue in better sync with the routing table by updating it 263-- keep the refresh queue in better sync with the routing table by updating it
203-- within the STM monad. 264-- within the STM monad.
204touchBucket :: KademliaSpace nid ni 265touchBucket :: BucketRefresher nid ni
205 -> POSIXTime 266 -> RoutingTransition ni -- ^ What happened to the bucket?
206 -> TVar (BucketList ni)
207 -> TVar (Int.PSQ POSIXTime)
208 -> RoutingTransition ni
209 -> STM (IO ()) 267 -> STM (IO ())
210touchBucket space interval bkts psq tr 268touchBucket BucketRefresher{ refreshSearch
269 , refreshInterval
270 , refreshBuckets
271 , refreshQueue
272 }
273 tr
211 | (transitionedTo tr == Applicant) 274 | (transitionedTo tr == Applicant)
212 = return $ return () 275 = return $ return ()
213 | otherwise = return $ do 276 | otherwise = return $ do
214 now <- getPOSIXTime 277 now <- getPOSIXTime
215 atomically $ do 278 atomically $ do
216 let nid = kademliaLocation space (transitioningNode tr) 279 let space = searchSpace refreshSearch
217 num <- R.bucketNumber space nid <$> readTVar bkts 280 nid = kademliaLocation space (transitioningNode tr)
218 modifyTVar' psq $ Int.insert num (now + interval) 281 num <- R.bucketNumber space nid <$> readTVar refreshBuckets
219 282 modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval)
220-- TODO: Bootstrap/Refresh
221--
222-- From BEP 05:
223--
224-- Each bucket should maintain a "last changed" property to indicate how
225-- "fresh" the contents are.
226--
227-- Note: We will use a "time to next refresh" property instead and store it in
228-- a priority search queue.
229--
230-- When...
231--
232-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
233-- >>> bucketEvents =
234-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
235-- >>>
236-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
237-- >>>
238-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
239-- >>> , Applicant :--> Accepted -- with another node,
240-- >>> ]
241--
242-- the bucket's last changed property should be updated. Buckets
243-- that have not been changed in 15 minutes should be "refreshed." This is done
244-- by picking a random ID in the range of the bucket and performing a
245-- find_nodes search on it.
246--
247-- The only other possible BucketTouchEvents are as follows:
248--
249-- >>> not_handled =
250-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
251-- >>> -- (Applicant :--> Stranger)
252-- >>> -- (Applicant :--> Accepted)
253-- >>> , Accepted :--> Applicant -- Never happens
254-- >>> ]
255--
diff --git a/src/Network/Tox.hs b/src/Network/Tox.hs
index 35eaebb5..15b00780 100644
--- a/src/Network/Tox.hs
+++ b/src/Network/Tox.hs
@@ -167,11 +167,15 @@ w64Key (DHT.TransactionId (Nonce8 w) _) = w
167nonceKey :: DHT.TransactionId -> Nonce8 167nonceKey :: DHT.TransactionId -> Nonce8
168nonceKey (DHT.TransactionId n _) = n 168nonceKey (DHT.TransactionId n _) = n
169 169
170myAddr :: DHT.Routing -> Maybe NodeInfo -> IO NodeInfo 170-- | Return my own address.
171myAddr routing maddr = atomically $ do 171myAddr :: TVar (R.BucketList NodeInfo) -- ^ IPv4 buckets
172 -> TVar (R.BucketList NodeInfo) -- ^ IPv6 buckets
173 -> Maybe NodeInfo -- ^ Interested remote address
174 -> IO NodeInfo
175myAddr routing4 routing6 maddr = atomically $ do
172 let var = case flip DHT.prefer4or6 Nothing <$> maddr of 176 let var = case flip DHT.prefer4or6 Nothing <$> maddr of
173 Just Want_IP6 -> DHT.routing6 routing 177 Just Want_IP6 -> routing4
174 _ -> DHT.routing4 routing 178 _ -> routing6
175 a <- readTVar var 179 a <- readTVar var
176 return $ R.thisNode a 180 return $ R.thisNode a
177 181
@@ -179,7 +183,7 @@ newClient :: (DRG g, Show addr, Show meth) =>
179 g -> Transport String addr x 183 g -> Transport String addr x
180 -> (Client String meth DHT.TransactionId addr x -> x -> MessageClass String meth DHT.TransactionId addr x) 184 -> (Client String meth DHT.TransactionId addr x -> x -> MessageClass String meth DHT.TransactionId addr x)
181 -> (Maybe addr -> IO addr) 185 -> (Maybe addr -> IO addr)
182 -> (meth -> Maybe (MethodHandler String DHT.TransactionId addr x)) 186 -> (Client String meth DHT.TransactionId addr x -> meth -> Maybe (MethodHandler String DHT.TransactionId addr x))
183 -> (forall d. TransactionMethods d DHT.TransactionId addr x -> TransactionMethods d DHT.TransactionId addr x) 187 -> (forall d. TransactionMethods d DHT.TransactionId addr x -> TransactionMethods d DHT.TransactionId addr x)
184 -> (Client String meth DHT.TransactionId addr x -> Transport String addr x -> Transport String addr x) 188 -> (Client String meth DHT.TransactionId addr x -> Transport String addr x -> Transport String addr x)
185 -> IO (Client String meth DHT.TransactionId addr x) 189 -> IO (Client String meth DHT.TransactionId addr x)
@@ -207,7 +211,7 @@ newClient drg net classify selfAddr handlers modifytbl modifynet = do
207 mkclient (tbl,var) handlers = 211 mkclient (tbl,var) handlers =
208 let client = Client 212 let client = Client
209 { clientNet = addHandler (reportParseError eprinter) (handleMessage client) $ modifynet client net 213 { clientNet = addHandler (reportParseError eprinter) (handleMessage client) $ modifynet client net
210 , clientDispatcher = dispatch tbl var handlers client 214 , clientDispatcher = dispatch tbl var (handlers client) client
211 , clientErrorReporter = eprinter { reportTimeout = reportTimeout ignoreErrors } 215 , clientErrorReporter = eprinter { reportTimeout = reportTimeout ignoreErrors }
212 , clientPending = var 216 , clientPending = var
213 , clientAddress = selfAddr 217 , clientAddress = selfAddr
@@ -308,13 +312,15 @@ newTox keydb addr mbSessionsState suppliedDHTKey = do
308 drg <- drgNew 312 drg <- drgNew
309 let lookupClose _ = return Nothing 313 let lookupClose _ = return Nothing
310 314
311 routing <- DHT.newRouting addr crypto updateIP updateIP 315 mkrouting <- DHT.newRouting addr crypto updateIP updateIP
312 let ignoreErrors _ = return () -- Set this to (hPutStrLn stderr) to debug onion route building. 316 let ignoreErrors _ = return () -- Set this to (hPutStrLn stderr) to debug onion route building.
313 orouter <- newOnionRouter ignoreErrors 317 orouter <- newOnionRouter ignoreErrors
314 (dhtcrypt,onioncrypt,dtacrypt,cryptonet) <- toxTransport crypto orouter lookupClose udp 318 (dhtcrypt,onioncrypt,dtacrypt,cryptonet) <- toxTransport crypto orouter lookupClose udp
315 let dhtnet0 = layerTransportM (DHT.decrypt crypto) (DHT.encrypt crypto) dhtcrypt 319 let dhtnet0 = layerTransportM (DHT.decrypt crypto) (DHT.encrypt crypto) dhtcrypt
316 dhtclient <- newClient drg dhtnet0 DHT.classify (myAddr routing) (DHT.handlers crypto routing) id 320 tbl4 = DHT.routing4 $ mkrouting (error "missing client")
317 $ \client net -> onInbound (DHT.updateRouting client routing orouter) net 321 tbl6 = DHT.routing6 $ mkrouting (error "missing client")
322 dhtclient <- newClient drg dhtnet0 DHT.classify (myAddr tbl4 tbl6) (DHT.handlers crypto . mkrouting) id
323 $ \client net -> onInbound (DHT.updateRouting client (mkrouting client) orouter) net
318 324
319 orouter <- forkRouteBuilder orouter $ \nid ni -> fmap (\(_,ns,_)->ns) <$> DHT.getNodes dhtclient nid ni 325 orouter <- forkRouteBuilder orouter $ \nid ni -> fmap (\(_,ns,_)->ns) <$> DHT.getNodes dhtclient nid ni
320 326
@@ -324,8 +330,8 @@ newTox keydb addr mbSessionsState suppliedDHTKey = do
324 oniondrg <- drgNew 330 oniondrg <- drgNew
325 let onionnet = layerTransportM (Onion.decrypt crypto) (Onion.encrypt crypto) onioncrypt 331 let onionnet = layerTransportM (Onion.decrypt crypto) (Onion.encrypt crypto) onioncrypt
326 onionclient <- newClient oniondrg onionnet (const Onion.classify) 332 onionclient <- newClient oniondrg onionnet (const Onion.classify)
327 (getOnionAlias crypto $ R.thisNode <$> readTVar (DHT.routing4 routing)) 333 (getOnionAlias crypto $ R.thisNode <$> readTVar (DHT.routing4 $ mkrouting dhtclient))
328 (Onion.handlers onionnet routing toks keydb) 334 (const $ Onion.handlers onionnet (mkrouting dhtclient) toks keydb)
329 (hookQueries orouter DHT.transactionKey) 335 (hookQueries orouter DHT.transactionKey)
330 (const id) 336 (const id)
331 337
@@ -337,7 +343,7 @@ newTox keydb addr mbSessionsState suppliedDHTKey = do
337 , toxCrypto = addHandler (hPutStrLn stderr) (cryptoNetHandler sessionsState) cryptonet 343 , toxCrypto = addHandler (hPutStrLn stderr) (cryptoNetHandler sessionsState) cryptonet
338 , toxCryptoSessions = sessionsState 344 , toxCryptoSessions = sessionsState
339 , toxCryptoKeys = crypto 345 , toxCryptoKeys = crypto
340 , toxRouting = routing 346 , toxRouting = mkrouting dhtclient
341 , toxTokens = toks 347 , toxTokens = toks
342 , toxAnnouncedKeys = keydb 348 , toxAnnouncedKeys = keydb
343 , toxOnionRoutes = orouter 349 , toxOnionRoutes = orouter
diff --git a/src/Network/Tox/DHT/Handlers.hs b/src/Network/Tox/DHT/Handlers.hs
index 494e319b..d3d36525 100644
--- a/src/Network/Tox/DHT/Handlers.hs
+++ b/src/Network/Tox/DHT/Handlers.hs
@@ -1,7 +1,8 @@
1{-# LANGUAGE CPP #-}
1{-# LANGUAGE GeneralizedNewtypeDeriving #-} 2{-# LANGUAGE GeneralizedNewtypeDeriving #-}
3{-# LANGUAGE NamedFieldPuns #-}
2{-# LANGUAGE PatternSynonyms #-} 4{-# LANGUAGE PatternSynonyms #-}
3{-# LANGUAGE TupleSections #-} 5{-# LANGUAGE TupleSections #-}
4{-# LANGUAGE CPP #-}
5module Network.Tox.DHT.Handlers where 6module Network.Tox.DHT.Handlers where
6 7
7import Network.Tox.DHT.Transport as DHTTransport 8import Network.Tox.DHT.Transport as DHTTransport
@@ -11,7 +12,7 @@ import Crypto.Tox
11import Network.Kademlia.Search 12import Network.Kademlia.Search
12import qualified Data.Wrapper.PSQInt as Int 13import qualified Data.Wrapper.PSQInt as Int
13import Network.Kademlia 14import Network.Kademlia
14import Network.Kademlia.Bootstrap (touchBucket) 15import Network.Kademlia.Bootstrap
15import Network.Address (WantIP (..), ipFamily, testIdBit,fromSockAddr, sockAddrPort) 16import Network.Address (WantIP (..), ipFamily, testIdBit,fromSockAddr, sockAddrPort)
16import qualified Network.Kademlia.Routing as R 17import qualified Network.Kademlia.Routing as R
17import Control.TriadCommittee 18import Control.TriadCommittee
@@ -111,18 +112,28 @@ classify client msg = fromMaybe (IsUnknown "unknown")
111 112
112data Routing = Routing 113data Routing = Routing
113 { tentativeId :: NodeInfo 114 { tentativeId :: NodeInfo
114 , sched4 :: !( TVar (Int.PSQ POSIXTime) )
115 , routing4 :: !( TVar (R.BucketList NodeInfo) )
116 , committee4 :: TriadCommittee NodeId SockAddr 115 , committee4 :: TriadCommittee NodeId SockAddr
117 , sched6 :: !( TVar (Int.PSQ POSIXTime) )
118 , routing6 :: !( TVar (R.BucketList NodeInfo) )
119 , committee6 :: TriadCommittee NodeId SockAddr 116 , committee6 :: TriadCommittee NodeId SockAddr
117 , refresher4 :: BucketRefresher NodeId NodeInfo
118 , refresher6 :: BucketRefresher NodeId NodeInfo
120 } 119 }
121 120
121sched4 :: Routing -> TVar (Int.PSQ POSIXTime)
122sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue
123
124sched6 :: Routing -> TVar (Int.PSQ POSIXTime)
125sched6 Routing { refresher6 = BucketRefresher { refreshQueue } } = refreshQueue
126
127routing4 :: Routing -> TVar (R.BucketList NodeInfo)
128routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBuckets
129
130routing6 :: Routing -> TVar (R.BucketList NodeInfo)
131routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets
132
122newRouting :: SockAddr -> TransportCrypto 133newRouting :: SockAddr -> TransportCrypto
123 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change 134 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change
124 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change 135 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change
125 -> IO Routing 136 -> IO (Client -> Routing)
126newRouting addr crypto update4 update6 = do 137newRouting addr crypto update4 update6 = do
127 let tentative_ip4 = fromMaybe (IPv4 $ toEnum 0) (IPv4 <$> fromSockAddr addr) 138 let tentative_ip4 = fromMaybe (IPv4 $ toEnum 0) (IPv4 <$> fromSockAddr addr)
128 tentative_ip6 = fromMaybe (IPv6 $ toEnum 0) (IPv6 <$> fromSockAddr addr) 139 tentative_ip6 = fromMaybe (IPv6 $ toEnum 0) (IPv6 <$> fromSockAddr addr)
@@ -146,7 +157,17 @@ newRouting addr crypto update4 update6 = do
146 committee6 <- newTriadCommittee (update6 tbl6) -- updateIPVote tbl6 addr6 157 committee6 <- newTriadCommittee (update6 tbl6) -- updateIPVote tbl6 addr6
147 sched4 <- newTVar Int.empty 158 sched4 <- newTVar Int.empty
148 sched6 <- newTVar Int.empty 159 sched6 <- newTVar Int.empty
149 return $ Routing tentative_info sched4 tbl4 committee4 sched6 tbl6 committee6 160 return $ \client ->
161 let refresher sched bkts = BucketRefresher
162 { refreshInterval = 15 * 60
163 , refreshQueue = sched
164 , refreshSearch = nodeSearch client
165 , refreshBuckets = bkts
166 , refreshPing = ping client
167 }
168 refresher4 = refresher sched4 tbl6
169 refresher6 = refresher sched6 tbl6
170 in Routing tentative_info committee4 committee6 refresher4 refresher6
150 171
151 172
152-- TODO: This should cover more cases 173-- TODO: This should cover more cases
@@ -322,26 +343,28 @@ updateRouting client routing orouter naddr msg
322 | PacketKind 0x21 <- msgType msg = return () -- ignore lan discovery 343 | PacketKind 0x21 <- msgType msg = return () -- ignore lan discovery
323 | otherwise = do 344 | otherwise = do
324 case prefer4or6 naddr Nothing of 345 case prefer4or6 naddr Nothing of
325 Want_IP4 -> updateTable client naddr orouter (routing4 routing) (committee4 routing) (sched4 routing) 346 Want_IP4 -> updateTable client naddr orouter (committee4 routing) (refresher4 routing)
326 Want_IP6 -> updateTable client naddr orouter (routing6 routing) (committee6 routing) (sched6 routing) 347 Want_IP6 -> updateTable client naddr orouter (committee6 routing) (refresher4 routing)
327 Want_Both -> error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ 348 Want_Both -> error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__
328 349
329updateTable :: Client -> NodeInfo -> OnionRouter -> TVar (R.BucketList NodeInfo) -> TriadCommittee NodeId SockAddr -> TVar (Int.PSQ POSIXTime) -> IO () 350updateTable :: Client -> NodeInfo -> OnionRouter -> TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo -> IO ()
330updateTable client naddr orouter tbl committee sched = do 351updateTable client naddr orouter committee refresher = do
331 self <- atomically $ R.thisNode <$> readTVar tbl 352 self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher)
332 when (nodeIP self /= nodeIP naddr) $ do 353 when (nodeIP self /= nodeIP naddr) $ do
333 -- TODO: IP address vote? 354 -- TODO: IP address vote?
334 insertNode (toxKademlia client committee orouter tbl sched) naddr 355 insertNode (toxKademlia client committee orouter refresher) naddr
335 356
336toxKademlia :: Client -> TriadCommittee NodeId SockAddr -> OnionRouter -> TVar (R.BucketList NodeInfo) -> TVar (Int.PSQ POSIXTime) -> Kademlia NodeId NodeInfo 357toxKademlia :: Client -> TriadCommittee NodeId SockAddr -> OnionRouter
337toxKademlia client committee orouter var sched 358 -> BucketRefresher NodeId NodeInfo
359 -> Kademlia NodeId NodeInfo
360toxKademlia client committee orouter refresher
338 = Kademlia quietInsertions 361 = Kademlia quietInsertions
339 toxSpace 362 toxSpace
340 (vanillaIO var $ ping client) 363 (vanillaIO (refreshBuckets refresher) $ ping client)
341 { tblTransition = \tr -> do 364 { tblTransition = \tr -> do
342 io1 <- transitionCommittee committee tr 365 io1 <- transitionCommittee committee tr
343 io2 <- touchBucket toxSpace (15*60) var sched tr 366 io2 <- touchBucket refresher tr -- toxSpace (15*60) var sched tr
344 hookBucketList toxSpace var orouter tr 367 hookBucketList toxSpace (refreshBuckets refresher) orouter tr
345 return $ do 368 return $ do
346 io1 >> io2 369 io1 >> io2
347 {- 370 {-