summaryrefslogtreecommitdiff
path: root/kad
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /kad
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'kad')
-rw-r--r--kad/CHANGELOG.md5
-rw-r--r--kad/LICENSE30
-rw-r--r--kad/Setup.hs2
-rw-r--r--kad/kad.cabal76
-rw-r--r--kad/src/DebugTag.hs24
-rw-r--r--kad/src/Network/Kademlia.hs163
-rw-r--r--kad/src/Network/Kademlia/Bootstrap.hs439
-rw-r--r--kad/src/Network/Kademlia/CommonAPI.hs84
-rw-r--r--kad/src/Network/Kademlia/Persistence.hs52
-rw-r--r--kad/src/Network/Kademlia/Routing.hs809
-rw-r--r--kad/src/Network/Kademlia/Search.hs236
11 files changed, 1920 insertions, 0 deletions
diff --git a/kad/CHANGELOG.md b/kad/CHANGELOG.md
new file mode 100644
index 00000000..6255a362
--- /dev/null
+++ b/kad/CHANGELOG.md
@@ -0,0 +1,5 @@
1# Revision history for kad
2
3## 0.1.0.0 -- YYYY-mm-dd
4
5* First version. Released on an unsuspecting world.
diff --git a/kad/LICENSE b/kad/LICENSE
new file mode 100644
index 00000000..e8eaef49
--- /dev/null
+++ b/kad/LICENSE
@@ -0,0 +1,30 @@
1Copyright (c) 2019, James Crayne
2
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7
8 * Redistributions of source code must retain the above copyright
9 notice, this list of conditions and the following disclaimer.
10
11 * Redistributions in binary form must reproduce the above
12 copyright notice, this list of conditions and the following
13 disclaimer in the documentation and/or other materials provided
14 with the distribution.
15
16 * Neither the name of James Crayne nor the names of other
17 contributors may be used to endorse or promote products derived
18 from this software without specific prior written permission.
19
20THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/kad/Setup.hs b/kad/Setup.hs
new file mode 100644
index 00000000..9a994af6
--- /dev/null
+++ b/kad/Setup.hs
@@ -0,0 +1,2 @@
1import Distribution.Simple
2main = defaultMain
diff --git a/kad/kad.cabal b/kad/kad.cabal
new file mode 100644
index 00000000..5babda13
--- /dev/null
+++ b/kad/kad.cabal
@@ -0,0 +1,76 @@
1-- Initial kad.cabal generated by cabal init. For further documentation,
2-- see http://haskell.org/cabal/users-guide/
3
4name: kad
5version: 0.1.0.0
6-- synopsis:
7-- description:
8license: BSD3
9license-file: LICENSE
10author: James Crayne
11maintainer: jim.crayne@gmail.com
12-- copyright:
13-- category:
14build-type: Simple
15extra-source-files: CHANGELOG.md
16cabal-version: >=1.10
17
18library
19 cpp-options: -DTHREAD_DEBUG
20 exposed-modules:
21 Network.Kademlia
22 , Network.Kademlia.Bootstrap
23 , Network.Kademlia.Routing
24 , Network.Kademlia.CommonAPI
25 , Network.Kademlia.Persistence
26 , Network.Kademlia.Search
27 other-modules: DebugTag
28 other-extensions:
29 CPP
30 , ConstraintKinds
31 , DeriveFunctor
32 , DeriveTraversable
33 , FlexibleContexts
34 , GADTs
35 , KindSignatures
36 , LambdaCase
37 , NamedFieldPuns
38 , PartialTypeSignatures
39 , PatternSynonyms
40 , RankNTypes
41 , ScopedTypeVariables
42 , RecordWildCards
43 , BangPatterns
44 , ViewPatterns
45 , TypeOperators
46 , DeriveGeneric
47 , TupleSections
48 , StandaloneDeriving
49 , MultiParamTypeClasses
50 , FlexibleInstances
51 , ExistentialQuantification
52 build-depends:
53 base
54 , tox-crypto
55 , entropy
56 , lifted-base
57 , lifted-concurrent
58 , aeson
59 , vector
60 , containers
61 , unordered-containers
62 , dput-hslogger
63 , time
64 , stm
65 , pretty
66 , bytestring
67 , hashable
68 , contravariant
69 , reflection
70 , psq-wrap
71 , minmax-psq
72 , network-addr
73 , cereal
74 , tasks
75 hs-source-dirs: src
76 default-language: Haskell2010
diff --git a/kad/src/DebugTag.hs b/kad/src/DebugTag.hs
new file mode 100644
index 00000000..9ac04bb0
--- /dev/null
+++ b/kad/src/DebugTag.hs
@@ -0,0 +1,24 @@
1module DebugTag where
2
3import Data.Typeable
4
5-- | Debug Tags, add more as needed, but ensure XAnnounce is always first, XMisc last
6data DebugTag
7 = XAnnounce
8 | XBitTorrent
9 | XDHT
10 | XLan
11 | XMan
12 | XNetCrypto
13 | XNetCryptoOut
14 | XOnion
15 | XRoutes
16 | XPing
17 | XRefresh
18 | XJabber
19 | XTCP
20 | XMisc
21 | XNodeinfoSearch
22 | XUnexpected -- Used only for special anomalous errors that we didn't expect to happen.
23 | XUnused -- Never commit code that uses XUnused.
24 deriving (Eq, Ord, Show, Read, Enum, Bounded,Typeable)
diff --git a/kad/src/Network/Kademlia.hs b/kad/src/Network/Kademlia.hs
new file mode 100644
index 00000000..e61afe9b
--- /dev/null
+++ b/kad/src/Network/Kademlia.hs
@@ -0,0 +1,163 @@
1{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
2{-# LANGUAGE KindSignatures #-}
3{-# LANGUAGE DeriveFunctor, DeriveTraversable #-}
4-- {-# LANGUAGE TypeFamilies #-}
5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE PatternSynonyms #-}
7module Network.Kademlia where
8
9import Data.Maybe
10import Data.Time.Clock.POSIX
11import Network.Kademlia.Routing as R
12#ifdef THREAD_DEBUG
13import Control.Concurrent.Lifted.Instrument
14#else
15import Control.Concurrent.Lifted
16import GHC.Conc (labelThread)
17#endif
18import Control.Concurrent.STM
19import Control.Monad
20import Data.Time.Clock.POSIX (POSIXTime)
21
22-- | The status of a given node with respect to a given routint table.
23data RoutingStatus
24 = Stranger -- ^ The node is unknown to the Kademlia routing table.
25 | Applicant -- ^ The node may be inserted pending a ping timeout.
26 | Accepted -- ^ The node has a slot in one of the Kademlia buckets.
27 deriving (Eq,Ord,Enum,Show,Read)
28
29-- | A change occured in the kademlia routing table.
30data RoutingTransition ni = RoutingTransition
31 { transitioningNode :: ni
32 , transitionedTo :: !RoutingStatus
33 }
34 deriving (Eq,Ord,Show,Read)
35
36data InsertionReporter ni = InsertionReporter
37 { -- | Called on every inbound packet. Accepts:
38 --
39 -- * Origin of packet.
40 --
41 -- * List of nodes to be pinged as a result.
42 reportArrival :: POSIXTime
43 -> ni
44 -> [ni]
45 -> IO ()
46 -- | Called on every ping probe. Accepts:
47 --
48 -- * Who was pinged.
49 --
50 -- * True Bool value if they ponged.
51 , reportPingResult :: POSIXTime
52 -> ni
53 -> Bool
54 -> IO ()
55 }
56
57quietInsertions :: InsertionReporter ni
58quietInsertions = InsertionReporter
59 { reportArrival = \_ _ _ -> return ()
60 , reportPingResult = \_ _ _ -> return ()
61 }
62
63contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t
64contramapIR f ir = InsertionReporter
65 { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis)
66 , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b
67 }
68
69-- | All the IO operations necessary to maintain a Kademlia routing table.
70data TableStateIO ni = TableStateIO
71 { -- | Write the routing table. Typically 'writeTVar'.
72 tblWrite :: R.BucketList ni -> STM ()
73
74 -- | Read the routing table. Typically 'readTVar'.
75 , tblRead :: STM (R.BucketList ni)
76
77 -- | Issue a ping to a remote node and report 'True' if the node
78 -- responded within an acceptable time and 'False' otherwise.
79 , tblPing :: ni -> IO Bool
80
81 -- | Convenience method provided to assist in maintaining state
82 -- consistent with the routing table. It will be invoked in the same
83 -- transaction that 'tblRead'\/'tblWrite' occured but only when there was
84 -- an interesting change. The returned IO action will be triggered soon
85 -- afterward.
86 --
87 -- It is not necessary to do anything interesting here. The following
88 -- trivial implementation is fine:
89 --
90 -- > tblTransition = const $ return $ return ()
91 , tblTransition :: RoutingTransition ni -> STM (IO ())
92 }
93
94vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni
95vanillaIO var ping = TableStateIO
96 { tblRead = readTVar var
97 , tblWrite = writeTVar var
98 , tblPing = ping
99 , tblTransition = const $ return $ return ()
100 }
101
102-- | Everything necessary to maintain a routing table of /ni/ (node
103-- information) entries.
104data Kademlia nid ni = Kademlia { kademInsertionReporter :: InsertionReporter ni
105 , kademSpace :: KademliaSpace nid ni
106 , kademIO :: TableStateIO ni
107 }
108
109
110-- Helper to 'insertNode'.
111--
112-- Adapt return value from 'updateForPingResult' into a
113-- more easily grokked list of transitions.
114transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni]
115transition (x,m) =
116 -- Just _ <- m = Node transition: Accepted --> Stranger
117 -- Nothing <- m = Node transition: Applicant --> Stranger
118 RoutingTransition x Stranger
119 : maybeToList (accepted <$> m)
120
121-- Helper to 'transition'
122--
123-- Node transition: Applicant --> Accepted
124accepted :: (t,ni) -> RoutingTransition ni
125accepted (_,y) = RoutingTransition y Accepted
126
127
128insertNode :: Kademlia nid ni -> ni -> IO ()
129insertNode (Kademlia reporter space io) node = do
130
131 tm <- getPOSIXTime
132
133 (ps,reaction) <- atomically $ do
134 tbl <- tblRead io
135 let (inserted, ps,t') = R.updateForInbound space tm node tbl
136 tblWrite io t'
137 reaction <- case ps of
138 _ | inserted -> -- Node transition: Stranger --> Accepted
139 tblTransition io $ RoutingTransition node Accepted
140 (_:_) -> -- Node transition: Stranger --> Applicant
141 tblTransition io $ RoutingTransition node Applicant
142 _ -> return $ return ()
143 return (ps, reaction)
144
145 reportArrival reporter tm node ps
146 reaction
147
148 _ <- fork $ do
149 myThreadId >>= flip labelThread "pingResults"
150 forM_ ps $ \n -> do
151 b <- tblPing io n
152 reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result
153 join $ atomically $ do
154 tbl <- tblRead io
155 let (replacements, t') = R.updateForPingResult space n b tbl
156 tblWrite io t'
157 ios <- sequence $ concatMap
158 (map (tblTransition io) . transition)
159 replacements
160 return $ sequence_ ios
161
162 return ()
163
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs
new file mode 100644
index 00000000..08ba3318
--- /dev/null
+++ b/kad/src/Network/Kademlia/Bootstrap.hs
@@ -0,0 +1,439 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE ConstraintKinds #-}
3{-# LANGUAGE DeriveFunctor #-}
4{-# LANGUAGE DeriveTraversable #-}
5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE GADTs #-}
7{-# LANGUAGE KindSignatures #-}
8{-# LANGUAGE LambdaCase #-}
9{-# LANGUAGE NamedFieldPuns #-}
10{-# LANGUAGE RecordWildCards #-}
11{-# LANGUAGE NondecreasingIndentation #-}
12{-# LANGUAGE PartialTypeSignatures #-}
13{-# LANGUAGE PatternSynonyms #-}
14{-# LANGUAGE RankNTypes #-}
15{-# LANGUAGE ScopedTypeVariables #-}
16module Network.Kademlia.Bootstrap where
17
18import Data.Function
19import Data.Maybe
20import qualified Data.Set as Set
21import Data.Time.Clock.POSIX (getPOSIXTime)
22import Network.Kademlia.Routing as R
23#ifdef THREAD_DEBUG
24import Control.Concurrent.Lifted.Instrument
25#else
26import Control.Concurrent.Lifted
27import GHC.Conc (labelThread)
28#endif
29import Control.Concurrent.STM
30import Control.Monad
31import Data.Hashable
32import Data.Time.Clock.POSIX (POSIXTime)
33import Data.Ord
34import System.Entropy
35import System.Timeout
36import DPut
37import DebugTag
38
39import qualified Data.Wrapper.PSQInt as Int
40 ;import Data.Wrapper.PSQInt (pattern (:->))
41import Network.Address (bucketRange)
42import Network.Kademlia.Search
43import Control.Concurrent.Tasks
44import Network.Kademlia
45
46type SensibleNodeId nid ni =
47 ( Show nid
48 , Ord nid
49 , Ord ni
50 , Hashable nid
51 , Hashable ni )
52
53data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
54 { -- | A staleness threshold (if a bucket goes this long without being
55 -- touched, a refresh will be triggered).
56 refreshInterval :: POSIXTime
57 -- | A TVar with the time-to-refresh schedule for each bucket.
58 --
59 -- To "touch" a bucket and prevent it from being refreshed, reschedule
60 -- its refresh time to some time into the future by modifying its
61 -- priority in this priority search queue.
62 , refreshQueue :: TVar (Int.PSQ POSIXTime)
63 -- | This is the kademlia node search specification.
64 , refreshSearch :: Search nid addr tok ni ni
65 -- | The current kademlia routing table buckets.
66 , refreshBuckets :: TVar (R.BucketList ni)
67 -- | Action to ping a node. This is used only during initial bootstrap
68 -- to get some nodes in our table. A 'True' result is interpreted as a a
69 -- pong, where 'False' is a non-response.
70 , refreshPing :: ni -> IO Bool
71 , -- | Timestamp of last bucket event.
72 refreshLastTouch :: TVar POSIXTime
73 , -- | This variable indicates whether or not we are in bootstrapping mode.
74 bootstrapMode :: TVar Bool
75 , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on
76 -- every finished refresh.
77 bootstrapCountdown :: TVar (Maybe Int)
78 }
79
80newBucketRefresher :: ( Ord addr, Hashable addr
81 , SensibleNodeId nid ni )
82 => TVar (R.BucketList ni)
83 -> Search nid addr tok ni ni
84 -> (ni -> IO Bool)
85 -> STM (BucketRefresher nid ni)
86newBucketRefresher bkts sch ping = do
87 let spc = searchSpace sch
88 nodeId = kademliaLocation spc
89 -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount
90 sched <- newTVar Int.empty
91 lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas...
92 bootstrapVar <- newTVar True -- Start in bootstrapping mode.
93 bootstrapCnt <- newTVar Nothing
94 return BucketRefresher
95 { refreshInterval = 15 * 60
96 , refreshQueue = sched
97 , refreshSearch = sch
98 , refreshBuckets = bkts
99 , refreshPing = ping
100 , refreshLastTouch = lasttouch
101 , bootstrapMode = bootstrapVar
102 , bootstrapCountdown = bootstrapCnt
103 }
104
105-- | This was added to avoid the compile error "Record update for
106-- insufficiently polymorphic field" when trying to update the existentially
107-- quantified field 'refreshSearch'.
108updateRefresherIO :: Ord addr
109 => Search nid addr tok ni ni
110 -> (ni -> IO Bool)
111 -> BucketRefresher nid ni -> BucketRefresher nid ni
112updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
113 { refreshSearch = sch
114 , refreshPing = ping
115 , refreshInterval = refreshInterval
116 , refreshBuckets = refreshBuckets
117 , refreshQueue = refreshQueue
118 , refreshLastTouch = refreshLastTouch
119 , bootstrapMode = bootstrapMode
120 , bootstrapCountdown = bootstrapCountdown
121 }
122
123-- | Fork a refresh loop. Kill the returned thread to terminate it.
124forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId
125forkPollForRefresh r@BucketRefresher{ refreshInterval
126 , refreshQueue
127 , refreshBuckets
128 , refreshSearch } = fork $ do
129 myThreadId >>= flip labelThread "pollForRefresh"
130 fix $ \again -> do
131 join $ atomically $ do
132 nextup <- Int.findMin <$> readTVar refreshQueue
133 maybe retry (return . go again) nextup
134 where
135 refresh :: Int -> IO Int
136 refresh n = do
137 -- dput XRefresh $ "Refresh time! "++ show n
138 refreshBucket r n
139
140 go again ( bktnum :-> refresh_time ) = do
141 now <- getPOSIXTime
142 case fromEnum (refresh_time - now) of
143 x | x <= 0 -> do -- Refresh time!
144 -- Move it to the back of the refresh queue.
145 atomically $ do
146 interval <- effectiveRefreshInterval r bktnum
147 modifyTVar' refreshQueue
148 $ Int.insert bktnum (now + interval)
149 -- Now fork the refresh operation.
150 -- TODO: We should probably propogate the kill signal to this thread.
151 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
152 _ <- refresh bktnum
153 return ()
154 return ()
155 picoseconds -> do
156 -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum
157 threadDelay ( picoseconds `div` 10^6 )
158 again
159
160
161-- | This is a helper to 'refreshBucket' which does some book keeping to decide
162-- whether or not a bucket is sufficiently refreshed or not. It will return
163-- false when we can terminate a node search.
164checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node.
165 -> TVar (BucketList ni) -- ^ The current routing table.
166 -> TVar (Set.Set ni) -- ^ In-range nodes found so far.
167 -> TVar Bool -- ^ The result will also be written here.
168 -> Int -- ^ The bucket number of interest.
169 -> ni -- ^ A newly found node.
170 -> STM Bool
171checkBucketFull space var resultCounter fin n found_node = do
172 let fullcount = R.defaultBucketSize
173 saveit True = writeTVar fin True >> return True
174 saveit _ = return False
175 tbl <- readTVar var
176 let counts = R.shape tbl
177 nid = kademliaLocation space found_node
178 -- Update the result set with every found node that is in the
179 -- bucket of interest.
180 when (n == R.bucketNumber space nid tbl)
181 $ modifyTVar' resultCounter (Set.insert found_node)
182 resultCount <- readTVar resultCounter
183 saveit $ case drop (n - 1) counts of
184 (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going
185 _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going
186 _ -> False -- okay, good enough, let's quit.
187
188-- | Called from 'refreshBucket' with the current time when a refresh of the
189-- supplied bucket number finishes.
190onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ())
191onFinishedRefresh BucketRefresher { bootstrapCountdown
192 , bootstrapMode
193 , refreshQueue
194 , refreshBuckets } num now = do
195 bootstrapping <- readTVar bootstrapMode
196 if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num
197 else do
198 tbl <- readTVar refreshBuckets
199 action <-
200 if num /= R.bktCount tbl - 1
201 then do modifyTVar' bootstrapCountdown (fmap pred)
202 return $ return () -- dput XRefresh $ "BOOTSTRAP decrement"
203 else do
204 -- The last bucket finished.
205 cnt <- readTVar bootstrapCountdown
206 case cnt of
207 Nothing -> do
208 let fullsize = R.defaultBucketSize
209 notfull (n,len) | n==num = False
210 | len>=fullsize = False
211 | otherwise = True
212 unfull = case filter notfull $ zip [0..] (R.shape tbl) of
213 [] -> [(0,0)] -- Schedule at least 1 more refresh.
214 xs -> xs
215 forM_ unfull $ \(n,_) -> do
216 -- Schedule immediate refresh for unfull buckets (other than this one).
217 modifyTVar' refreshQueue $ Int.insert n (now - 1)
218 writeTVar bootstrapCountdown $! Just $! length unfull
219 return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull
220 Just n -> do writeTVar bootstrapCountdown $! Just $! pred n
221 return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)"
222 cnt <- readTVar bootstrapCountdown
223 if (cnt == Just 0)
224 then do
225 -- Boostrap finished!
226 writeTVar bootstrapMode False
227 writeTVar bootstrapCountdown Nothing
228 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")."
229 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt)
230
231refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) =>
232 BucketRefresher nid ni -> Int -> IO Int
233refreshBucket r@BucketRefresher{ refreshSearch = sch
234 , refreshBuckets = var }
235 n = do
236 tbl <- atomically (readTVar var)
237 let count = bktCount tbl
238 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
239 sample <- if n+1 >= count -- Is this the last bucket?
240 then return nid -- Yes? Search our own id.
241 else kademliaSample (searchSpace sch) -- No? Generate a random id.
242 getEntropy
243 nid
244 (bucketRange n (n + 1 < count))
245 fin <- atomically $ newTVar False
246 resultCounter <- atomically $ newTVar Set.empty
247
248 dput XRefresh $ "Start refresh " ++ show (n,sample)
249
250 -- Set 15 minute timeout in order to avoid overlapping refreshes.
251 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
252 then const $ return True -- Never short-circuit the last bucket.
253 else checkBucketFull (searchSpace sch) var resultCounter fin n
254 _ <- timeout (15*60*1000000) $ do
255 atomically $ searchIsFinished s >>= check
256 atomically $ searchCancel s
257 dput XDHT $ "Finish refresh " ++ show (n,sample)
258 now <- getPOSIXTime
259 join $ atomically $ onFinishedRefresh r n now
260 rcount <- atomically $ do
261 c <- Set.size <$> readTVar resultCounter
262 b <- readTVar fin
263 return $ if b then 1 else c
264 return rcount
265
266refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ()
267refreshLastBucket r@BucketRefresher { refreshBuckets
268 , refreshQueue } = do
269
270 now <- getPOSIXTime
271 atomically $ do
272 cnt <- bktCount <$> readTVar refreshBuckets
273 -- Schedule immediate refresh.
274 modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1)
275
276restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) =>
277 BucketRefresher nid ni -> STM (IO ())
278restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
279 unchanged <- readTVar bootstrapMode
280 writeTVar bootstrapMode True
281 writeTVar bootstrapCountdown Nothing
282 if not unchanged then return $ do
283 dput XRefresh "BOOTSTRAP entered bootstrap mode"
284 refreshLastBucket r
285 else return $ dput XRefresh "BOOTSTRAP already bootstrapping"
286
287bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
288 BucketRefresher nid ni
289 -> t1 ni -- ^ Nodes to bootstrap from.
290 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive.
291 -> IO ()
292bootstrap r@BucketRefresher { refreshSearch = sch
293 , refreshBuckets = var
294 , refreshPing = ping
295 , bootstrapMode } ns ns0 = do
296 gotPing <- atomically $ newTVar False
297
298 -- First, ping the given nodes so that they are added to
299 -- our routing table.
300 withTaskGroup "bootstrap.resume" 20 $ \g -> do
301 forM_ ns $ \n -> do
302 let lbl = show $ kademliaLocation (searchSpace sch) n
303 forkTask g lbl $ do
304 b <- ping n
305 when b $ atomically $ writeTVar gotPing True
306
307 -- We resort to the hardcoded fallback nodes only when we got no
308 -- responses. This is to lesson the burden on well-known boostrap
309 -- nodes.
310 fallback <- atomically (readTVar gotPing) >>= return . when . not
311 fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
312 forM_ ns0 $ \n -> do
313 forkTask g (show $ kademliaLocation (searchSpace sch) n)
314 (void $ ping n)
315 dput XDHT "Finished bootstrap pings."
316 -- Now search our own Id by entering bootstrap mode from non-bootstrap mode.
317 join $ atomically $ do
318 writeTVar bootstrapMode False
319 restartBootstrap r
320 --
321 -- Hopefully 'forkPollForRefresh' was invoked and can take over
322 -- maintenance.
323
324
325effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime
326effectiveRefreshInterval BucketRefresher{ refreshInterval
327 , refreshBuckets
328 , bootstrapMode } num = do
329 tbl <- readTVar refreshBuckets
330 bootstrapping <- readTVar bootstrapMode
331 case bootstrapping of
332 False -> return refreshInterval
333 True -> do
334 -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds.
335 let fullcount = R.defaultBucketSize
336 count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl
337 if count == fullcount
338 then return refreshInterval
339 else return 15 -- seconds
340
341
342
343-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket
344-- changes. This will typically be invoked from 'tblTransition'.
345--
346-- From BEP 05:
347--
348-- > Each bucket should maintain a "last changed" property to indicate how
349-- > "fresh" the contents are.
350--
351-- We will use a "time to next refresh" property instead and store it in
352-- a priority search queue.
353--
354-- In detail using an expository (not actually implemented) type
355-- 'BucketTouchEvent'...
356--
357-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
358-- >>> bucketEvents =
359-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
360-- >>>
361-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
362-- >>>
363-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
364-- >>> , Applicant :--> Accepted -- with another node,
365-- >>> ]
366--
367-- the bucket's last changed property should be updated. Buckets that have not
368-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed."
369-- This is done by picking a random ID in the range of the bucket and
370-- performing a find_nodes search on it.
371--
372-- The only other possible BucketTouchEvents are as follows:
373--
374-- >>> not_handled =
375-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
376-- >>> -- (Applicant :--> Stranger)
377-- >>> -- (Applicant :--> Accepted)
378-- >>> , Accepted :--> Applicant -- Never happens
379-- >>> ]
380--
381-- Because this BucketTouchEvent type is not actually implemented and we only
382-- receive notifications of a node's new state, it suffices to reschedule the
383-- bucket refresh 'touchBucket' on every transition to a state other than
384-- 'Applicant'.
385--
386-- XXX: Unfortunately, this means redundantly triggering twice upon every node
387-- replacement because we do not currently distinguish between standalone
388-- insertion/deletion events and an insertion/deletion pair constituting
389-- replacement.
390--
391-- It might also be better to pass the timestamp of the transition here and
392-- keep the refresh queue in better sync with the routing table by updating it
393-- within the STM monad.
394--
395-- We embed the result in the STM monad but currently, no STM state changes
396-- occur until the returned IO action is invoked. TODO: simplify?
397touchBucket :: SensibleNodeId nid ni
398 => BucketRefresher nid ni
399 -> RoutingTransition ni -- ^ What happened to the bucket?
400 -> STM (IO ())
401touchBucket r@BucketRefresher{ refreshSearch
402 , refreshInterval
403 , refreshBuckets
404 , refreshQueue
405 , refreshLastTouch
406 , bootstrapMode
407 , bootstrapCountdown }
408 RoutingTransition{ transitionedTo
409 , transitioningNode }
410 = case transitionedTo of
411 Applicant -> return $ return () -- Ignore transition to applicant.
412 _ -> return $ do -- Reschedule for any other transition.
413 now <- getPOSIXTime
414 join $ atomically $ do
415 let space = searchSpace refreshSearch
416 nid = kademliaLocation space transitioningNode
417 tbl <- readTVar refreshBuckets
418 let num = R.bucketNumber space nid tbl
419 stamp <- readTVar refreshLastTouch
420 action <- case stamp /= 0 && (now - stamp > 60) of
421 True -> do
422 -- It's been one minute since any bucket has been touched, re-enter bootstrap mode.
423 restartBootstrap r
424 False -> return $ return ()
425 interval <- effectiveRefreshInterval r num
426 modifyTVar' refreshQueue $ Int.insert num (now + interval)
427 writeTVar refreshLastTouch now
428 return action
429
430refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni
431refreshKademlia r@BucketRefresher { refreshSearch = sch
432 , refreshPing = ping
433 , refreshBuckets = bkts
434 }
435 = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping)
436 { tblTransition = \tr -> do
437 io <- touchBucket r tr
438 return io
439 }
diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs
new file mode 100644
index 00000000..601be5d8
--- /dev/null
+++ b/kad/src/Network/Kademlia/CommonAPI.hs
@@ -0,0 +1,84 @@
1{-# LANGUAGE ExistentialQuantification #-}
2module Network.Kademlia.CommonAPI where
3
4
5import Control.Concurrent
6import Control.Concurrent.STM
7import Data.Aeson as J (FromJSON, ToJSON)
8import Data.Hashable
9import qualified Data.Map as Map
10import Data.Serialize as S
11import qualified Data.Set as Set
12import Data.Time.Clock.POSIX
13import Data.Typeable
14
15import Network.Kademlia.Search
16import Network.Kademlia.Routing as R
17import Crypto.Tox (SecretKey,PublicKey)
18
19data DHT = forall nid ni. ( Show ni
20 , Read ni
21 , ToJSON ni
22 , FromJSON ni
23 , Ord ni
24 , Hashable ni
25 , Show nid
26 , Ord nid
27 , Hashable nid
28 , Typeable ni
29 , S.Serialize nid
30 ) =>
31 DHT
32 { dhtBuckets :: TVar (BucketList ni)
33 , dhtSecretKey :: STM (Maybe SecretKey)
34 , dhtPing :: Map.Map String (DHTPing ni)
35 , dhtQuery :: Map.Map String (DHTQuery nid ni)
36 , dhtAnnouncables :: Map.Map String (DHTAnnouncable nid)
37 , dhtParseId :: String -> Either String nid
38 , dhtSearches :: TVar (Map.Map (String,nid) (DHTSearch nid ni))
39 , dhtFallbackNodes :: IO [ni]
40 , dhtBootstrap :: [ni] -> [ni] -> IO ()
41 }
42
43data DHTQuery nid ni = forall addr r tok.
44 ( Ord addr
45 , Typeable r
46 , Typeable tok
47 , Typeable ni
48 ) => DHTQuery
49 { qsearch :: Search nid addr tok ni r
50 , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination.
51 , qshowR :: r -> String
52 , qshowTok :: tok -> Maybe String
53 }
54
55data DHTAnnouncable nid = forall dta tok ni r.
56 ( Show r
57 , Typeable dta -- information being announced
58 , Typeable tok -- token
59 , Typeable r -- search result
60 , Typeable ni -- node
61 ) => DHTAnnouncable
62 { announceParseData :: String -> Either String dta
63 , announceParseToken :: dta -> String -> Either String tok
64 , announceParseAddress :: String -> Either String ni
65 , announceSendData :: Either ( String {- search name -}
66 , String -> Either String r
67 , PublicKey {- me -} -> dta -> r -> IO ())
68 (dta -> tok -> Maybe ni -> IO (Maybe r))
69 , announceInterval :: POSIXTime
70 , announceTarget :: dta -> nid
71 }
72
73data DHTSearch nid ni = forall addr tok r. DHTSearch
74 { searchThread :: ThreadId
75 , searchState :: SearchState nid addr tok ni r
76 , searchShowTok :: tok -> Maybe String
77 , searchResults :: TVar (Set.Set String)
78 }
79
80data DHTPing ni = forall r. DHTPing
81 { pingQuery :: [String] -> ni -> IO (Maybe r)
82 , pingShowResult :: r -> String
83 }
84
diff --git a/kad/src/Network/Kademlia/Persistence.hs b/kad/src/Network/Kademlia/Persistence.hs
new file mode 100644
index 00000000..32ec169d
--- /dev/null
+++ b/kad/src/Network/Kademlia/Persistence.hs
@@ -0,0 +1,52 @@
1{-# LANGUAGE NamedFieldPuns #-}
2{-# LANGUAGE OverloadedStrings #-}
3module Network.Kademlia.Persistence where
4
5import Network.Kademlia.CommonAPI
6import Network.Kademlia.Routing as R
7
8import Control.Concurrent.STM
9import qualified Data.Aeson as J
10 ;import Data.Aeson as J (FromJSON)
11import qualified Data.ByteString.Lazy as L
12import qualified Data.HashMap.Strict as HashMap
13import Data.List
14import qualified Data.Vector as V
15import System.IO.Error
16
17saveNodes :: String -> DHT -> IO ()
18saveNodes netname DHT{dhtBuckets} = do
19 bkts <- atomically $ readTVar dhtBuckets
20 let ns = map fst $ concat $ R.toList bkts
21 bs = J.encode ns
22 fname = nodesFileName netname
23 L.writeFile fname bs
24
25loadNodes :: FromJSON ni => String -> IO [ni]
26loadNodes netname = do
27 let fname = nodesFileName netname
28 attempt <- tryIOError $ do
29 J.decode <$> L.readFile fname
30 >>= maybe (ioError $ userError "Nothing") return
31 either (const $ fallbackLoad fname) return attempt
32
33nodesFileName :: String -> String
34nodesFileName netname = netname ++ "-nodes.json"
35
36fallbackLoad :: FromJSON t => FilePath -> IO [t]
37fallbackLoad fname = do
38 attempt <- tryIOError $ do
39 J.decode <$> L.readFile fname
40 >>= maybe (ioError $ userError "Nothing") return
41 let go r = do
42 let m = HashMap.lookup "nodes" (r :: J.Object)
43 ns0 = case m of Just (J.Array v) -> V.toList v
44 Nothing -> []
45 ns1 = zip (map J.fromJSON ns0) ns0
46 issuc (J.Error _,_) = False
47 issuc _ = True
48 (ss,fs) = partition issuc ns1
49 ns = map (\(J.Success n,_) -> n) ss
50 mapM_ print (map snd fs) >> return ns
51 either (const $ return []) go attempt
52
diff --git a/kad/src/Network/Kademlia/Routing.hs b/kad/src/Network/Kademlia/Routing.hs
new file mode 100644
index 00000000..c7fdf028
--- /dev/null
+++ b/kad/src/Network/Kademlia/Routing.hs
@@ -0,0 +1,809 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- (c) Joe Crayne 2017
4-- License : BSD3
5-- Maintainer : pxqr.sta@gmail.com
6-- Stability : experimental
7-- Portability : portable
8--
9-- Every node maintains a routing table of known good nodes. The
10-- nodes in the routing table are used as starting points for
11-- queries in the DHT. Nodes from the routing table are returned in
12-- response to queries from other nodes.
13--
14-- For more info see:
15-- <http://www.bittorrent.org/beps/bep_0005.html#routing-table>
16--
17{-# LANGUAGE CPP #-}
18{-# LANGUAGE RecordWildCards #-}
19{-# LANGUAGE BangPatterns #-}
20{-# LANGUAGE RankNTypes #-}
21{-# LANGUAGE ViewPatterns #-}
22{-# LANGUAGE TypeOperators #-}
23{-# LANGUAGE DeriveGeneric #-}
24{-# LANGUAGE DeriveFunctor #-}
25{-# LANGUAGE GADTs #-}
26{-# LANGUAGE ScopedTypeVariables #-}
27{-# LANGUAGE TupleSections #-}
28{-# LANGUAGE OverloadedStrings #-}
29{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-}
30{-# OPTIONS_GHC -fno-warn-orphans #-}
31module Network.Kademlia.Routing
32 {-
33 ( -- * BucketList
34 BucketList
35 , Info(..)
36
37 -- * Attributes
38 , BucketCount
39 , defaultBucketCount
40 , BucketSize
41 , defaultBucketSize
42 , NodeCount
43
44 -- * Query
45 , Network.Kademlia.Routing.null
46 , Network.Kademlia.Routing.full
47 , thisId
48 , shape
49 , Network.Kademlia.Routing.size
50 , Network.Kademlia.Routing.depth
51 , compatibleNodeId
52
53 -- * Lookup
54 , K
55 , defaultK
56 , TableKey (..)
57 , kclosest
58
59 -- * Construction
60 , Network.Kademlia.Routing.nullTable
61 , Event(..)
62 , CheckPing(..)
63 , Network.Kademlia.Routing.insert
64
65 -- * Conversion
66 , Network.Kademlia.Routing.TableEntry
67 , Network.Kademlia.Routing.toList
68
69 -- * Routing
70 , Timestamp
71 , getTimestamp
72 ) -} where
73
74import Control.Applicative as A
75import Control.Arrow
76import Control.Monad
77import Data.Function
78import Data.Functor.Contravariant
79import Data.Functor.Identity
80import Data.List as L hiding (insert)
81import Data.Maybe
82import Data.Monoid
83import Data.Wrapper.PSQ as PSQ
84import Data.Serialize as S hiding (Result, Done)
85import qualified Data.Sequence as Seq
86import Data.Time
87import Data.Time.Clock.POSIX
88import Data.Word
89import GHC.Generics
90import Text.PrettyPrint as PP hiding ((<>))
91import Text.PrettyPrint.HughesPJClass (pPrint,Pretty)
92import qualified Data.ByteString as BS
93import Data.Bits
94import Data.Ord
95import Data.Reflection
96import Network.Address
97import Data.Typeable
98import Data.Coerce
99import Data.Hashable
100
101
102-- | Last time the node was responding to our queries.
103--
104-- Not all nodes that we learn about are equal. Some are \"good\" and
105-- some are not. Many nodes using the DHT are able to send queries
106-- and receive responses, but are not able to respond to queries
107-- from other nodes. It is important that each node's routing table
108-- must contain only known good nodes. A good node is a node has
109-- responded to one of our queries within the last 15 minutes. A
110-- node is also good if it has ever responded to one of our queries
111-- and has sent us a query within the last 15 minutes. After 15
112-- minutes of inactivity, a node becomes questionable. Nodes become
113-- bad when they fail to respond to multiple queries in a row. Nodes
114-- that we know are good are given priority over nodes with unknown
115-- status.
116--
117type Timestamp = POSIXTime
118
119getTimestamp :: IO Timestamp
120getTimestamp = do
121 utcTime <- getCurrentTime
122 return $ utcTimeToPOSIXSeconds utcTime
123
124
125
126{-----------------------------------------------------------------------
127 Bucket
128-----------------------------------------------------------------------}
129--
130-- When a k-bucket is full and a new node is discovered for that
131-- k-bucket, the least recently seen node in the k-bucket is
132-- PINGed. If the node is found to be still alive, the new node is
133-- place in a secondary list, a replacement cache. The replacement
134-- cache is used only if a node in the k-bucket stops responding. In
135-- other words: new nodes are used only when older nodes disappear.
136
137-- | Timestamp - last time this node is pinged.
138type NodeEntry ni = Binding ni Timestamp
139
140
141-- | Maximum number of 'NodeInfo's stored in a bucket. Most clients
142-- use this value.
143defaultBucketSize :: Int
144defaultBucketSize = 8
145
146data QueueMethods m elem fifo = QueueMethods
147 { pushBack :: elem -> fifo -> m fifo
148 , popFront :: fifo -> m (Maybe elem, fifo)
149 , emptyQueue :: m fifo
150 }
151
152{-
153fromQ :: Functor m =>
154 ( a -> b )
155 -> ( b -> a )
156 -> QueueMethods m elem a
157 -> QueueMethods m elem b
158fromQ embed project QueueMethods{..} =
159 QueueMethods { pushBack = \e -> fmap embed . pushBack e . project
160 , popFront = fmap (second embed) . popFront . project
161 , emptyQueue = fmap embed emptyQueue
162 }
163-}
164
165seqQ :: QueueMethods Identity ni (Seq.Seq ni)
166seqQ = QueueMethods
167 { pushBack = \e fifo -> pure (fifo Seq.|> e)
168 , popFront = \fifo -> case Seq.viewl fifo of
169 e Seq.:< fifo' -> pure (Just e, fifo')
170 Seq.EmptyL -> pure (Nothing, Seq.empty)
171 , emptyQueue = pure Seq.empty
172 }
173
174type BucketQueue ni = Seq.Seq ni
175
176bucketQ :: QueueMethods Identity ni (BucketQueue ni)
177bucketQ = seqQ
178
179
180data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int)
181
182contramapC :: (b -> a) -> Compare a -> Compare b
183contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b))
184 (\s x -> hsh s (f x))
185
186newtype Ordered' s a = Ordered a
187 deriving (Show)
188
189-- | Hack to avoid UndecidableInstances
190newtype Shrink a = Shrink a
191 deriving (Show)
192
193type Ordered s a = Ordered' s (Shrink a)
194
195instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where
196 a == b = (compare a b == EQ)
197
198instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where
199 compare a b = cmp (coerce a) (coerce b)
200 where Compare cmp _ = reflect (Proxy :: Proxy s)
201
202instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where
203 hashWithSalt salt x = hash salt (coerce x)
204 where Compare _ hash = reflect (Proxy :: Proxy s)
205
206-- | Bucket is also limited in its length — thus it's called k-bucket.
207-- When bucket becomes full, we should split it in two lists by
208-- current span bit. Span bit is defined by depth in the routing
209-- table tree. Size of the bucket should be choosen such that it's
210-- very unlikely that all nodes in bucket fail within an hour of
211-- each other.
212data Bucket s ni = Bucket
213 { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes
214 , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs
215 } deriving (Generic)
216
217#define CAN_SHOW_BUCKET 0
218
219#if CAN_SHOW_BUCKET
220deriving instance Show ni => Show (Bucket s ni)
221#endif
222
223bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni
224bucketCompare _ = reflect (Proxy :: Proxy s)
225
226mapBucket :: ( Reifies s (Compare a)
227 , Reifies t (Compare ni)
228 ) => (a -> ni) -> Bucket s a -> Bucket t ni
229mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns)
230 (fmap (second f) q)
231 where f' = coerce . f . coerce
232
233
234#if 0
235
236{-
237getGenericNode :: ( Serialize (NodeId)
238 , Serialize ip
239 , Serialize u
240 ) => Get (NodeInfo)
241getGenericNode = do
242 nid <- get
243 naddr <- get
244 u <- get
245 return NodeInfo
246 { nodeId = nid
247 , nodeAddr = naddr
248 , nodeAnnotation = u
249 }
250
251putGenericNode :: ( Serialize (NodeId)
252 , Serialize ip
253 , Serialize u
254 ) => NodeInfo -> Put
255putGenericNode (NodeInfo nid naddr u) = do
256 put nid
257 put naddr
258 put u
259
260instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where
261 get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ)
262 put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes
263-}
264
265#endif
266
267psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p
268psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs
269
270psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)]
271psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq
272
273-- | Update interval, in seconds.
274delta :: NominalDiffTime
275delta = 15 * 60
276
277-- | Should maintain a set of stable long running nodes.
278--
279-- Note: pings are triggerd only when a bucket is full.
280updateBucketForInbound :: ( Coercible t1 t
281 , Alternative f
282 , Reifies s (Compare t1)
283 ) => NominalDiffTime -> t1 -> Bucket s t1 -> f ([t], Bucket s t1)
284updateBucketForInbound curTime info bucket
285 -- Just update timestamp if a node is already in bucket.
286 --
287 -- Note PingResult events should only occur for nodes we requested a ping for,
288 -- and those will always already be in the routing queue and will get their
289 -- timestamp updated here, since 'TryInsert' is called on every inbound packet,
290 -- including ping results.
291 | already_have
292 = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime )
293 -- bucket is good, but not full => we can insert a new node
294 | PSQ.size (bktNodes bucket) < defaultBucketSize
295 = pure ( [], map_ns $ PSQ.insert (coerce info) curTime )
296 -- If there are any questionable nodes in the bucket have not been
297 -- seen in the last 15 minutes, the least recently seen node is
298 -- pinged. If any nodes in the bucket are known to have become bad,
299 -- then one is replaced by the new node in the next insertBucket
300 -- iteration.
301 | not (L.null stales)
302 = pure ( stales
303 , bucket { -- Update timestamps so that we don't redundantly ping.
304 bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket
305 -- Update queue with the pending NodeInfo in case of ping fail.
306 , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } )
307 -- When the bucket is full of good nodes, the new node is simply discarded.
308 -- We must return 'A.empty' here to ensure that bucket splitting happens
309 -- inside 'modifyBucket'.
310 | otherwise = A.empty
311 where
312 -- We (take 1) to keep a 1-to-1 correspondence between pending pings and
313 -- waiting nodes in the bktQ. This way, we don't have to worry about what
314 -- to do with failed pings for which there is no ready replacements.
315 stales = -- One stale:
316 do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket)
317 guard (t < curTime - delta)
318 return $ coerce n
319 -- All stale:
320 -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket
321
322 already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket)
323
324 map_ns f = bucket { bktNodes = f (bktNodes bucket) }
325 -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) }
326
327updateBucketForPingResult :: (Applicative f, Reifies s (Compare a)) =>
328 a -> Bool -> Bucket s a -> f ([(a, Maybe (Timestamp, a))], Bucket s a)
329updateBucketForPingResult bad_node got_response bucket
330 = pure ( map (,Nothing) forgotten
331 ++ map (second Just) replacements
332 , Bucket (foldr replace
333 (bktNodes bucket)
334 replacements)
335 popped
336 )
337 where
338 (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket)
339
340 -- Dropped from accepted, replaced by pending.
341 replacements | got_response = [] -- Timestamp was already updated by TryInsert.
342 | Just info <- top = do
343 -- Insert only if there's a removal.
344 _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket)
345 return (bad_node, info)
346 | otherwise = []
347
348 -- Dropped from the pending queue without replacing.
349 forgotten | got_response = maybeToList $ fmap snd top
350 | otherwise = []
351
352
353 replace (bad_node, (tm, info)) =
354 PSQ.insert (coerce info) tm
355 . PSQ.delete (coerce bad_node)
356
357
358updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp
359updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales
360
361type BitIx = Word
362
363partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b)
364partitionQ imp test q0 = do
365 pass0 <- emptyQueue imp
366 fail0 <- emptyQueue imp
367 let flipfix a b f = fix f a b
368 flipfix q0 (pass0,fail0) $ \rec q qs -> do
369 (mb,q') <- popFront imp q
370 case mb of
371 Nothing -> return qs
372 Just e -> do qs' <- select (pushBack imp e) qs
373 rec q' qs'
374 where
375 select :: Functor f => (b -> f b) -> (b, b) -> f (b, b)
376 select f = if test e then \(a,b) -> flip (,) b <$> f a
377 else \(a,b) -> (,) a <$> f b
378
379
380
381split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
382 forall ni s. ( Reifies s (Compare ni) ) =>
383 (ni -> Word -> Bool)
384 -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni)
385split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs)
386 where
387 (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b
388 (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b
389
390 spanBit :: ni -> Bool
391 spanBit entry = testNodeIdBit entry i
392
393
394{-----------------------------------------------------------------------
395-- BucketList
396-----------------------------------------------------------------------}
397
398defaultBucketCount :: Int
399defaultBucketCount = 20
400
401defaultMaxBucketCount :: Word
402defaultMaxBucketCount = 24
403
404data Info ni nid = Info
405 { myBuckets :: BucketList ni
406 , myNodeId :: nid
407 , myAddress :: SockAddr
408 }
409 deriving Generic
410
411deriving instance (Eq ni, Eq nid) => Eq (Info ni nid)
412deriving instance (Show ni, Show nid) => Show (Info ni nid)
413
414-- instance (Eq ip, Serialize ip) => Serialize (Info ip)
415
416-- | The routing table covers the entire 'NodeId' space from 0 to 2 ^
417-- 160. The routing table is subdivided into 'Bucket's that each cover
418-- a portion of the space. An empty table has one bucket with an ID
419-- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\"
420-- is inserted into the table, it is placed within the bucket that has
421-- @min <= N < max@. An empty table has only one bucket so any node
422-- must fit within it. Each bucket can only hold 'K' nodes, currently
423-- eight, before becoming 'Full'. When a bucket is full of known good
424-- nodes, no more nodes may be added unless our own 'NodeId' falls
425-- within the range of the 'Bucket'. In that case, the bucket is
426-- replaced by two new buckets each with half the range of the old
427-- bucket and the nodes from the old bucket are distributed among the
428-- two new ones. For a new table with only one bucket, the full bucket
429-- is always split into two new buckets covering the ranges @0..2 ^
430-- 159@ and @2 ^ 159..2 ^ 160@.
431--
432data BucketList ni = forall s. Reifies s (Compare ni) =>
433 BucketList { thisNode :: !ni
434 -- | Non-empty list of buckets.
435 , buckets :: [Bucket s ni]
436 }
437
438mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b
439mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts)
440 $ \p -> BucketList
441 { thisNode = f self
442 , buckets = map (resolve p . mapBucket f) bkts
443 }
444 where
445 resolve :: Proxy s -> Bucket s ni -> Bucket s ni
446 resolve = const id
447
448instance (Eq ni) => Eq (BucketList ni) where
449 (==) = (==) `on` Network.Kademlia.Routing.toList
450
451#if 0
452
453instance Serialize NominalDiffTime where
454 put = putWord32be . fromIntegral . fromEnum
455 get = (toEnum . fromIntegral) <$> getWord32be
456
457#endif
458
459#if CAN_SHOW_BUCKET
460deriving instance (Show ni) => Show (BucketList ni)
461#else
462instance Show ni => Show (BucketList ni) where
463 showsPrec d (BucketList self bkts) =
464 mappend "BucketList "
465 . showsPrec (d+1) self
466 . mappend " (fromList "
467 . showsPrec (d+1) (L.map (L.map tableEntry . PSQ.toList . bktNodes) $ bkts)
468 . mappend ") "
469#endif
470
471#if 0
472
473-- | Normally, routing table should be saved between invocations of
474-- the client software. Note that you don't need to store /this/
475-- 'NodeId' since it is already included in routing table.
476instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList)
477
478#endif
479
480-- | Shape of the table.
481instance Pretty (BucketList ni) where
482 pPrint t
483 | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss
484 | otherwise = brackets $
485 PP.int (L.sum ss) <> " nodes, " <>
486 PP.int bucketCount <> " buckets"
487 where
488 bucketCount = L.length ss
489 ss = shape t
490
491-- | Empty table with specified /spine/ node id.
492--
493-- XXX: The comparison function argument is awkward here.
494nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni
495nullTable cmp hsh ni n =
496 reify (Compare cmp hsh)
497 $ \p -> BucketList
498 ni
499 [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)]
500 where
501 empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp
502 empty = const $ PSQ.empty
503
504#if 0
505
506-- | Test if table is empty. In this case DHT should start
507-- bootstrapping process until table becomes 'full'.
508null :: BucketList -> Bool
509null (Tip _ _ b) = PSQ.null $ bktNodes b
510null _ = False
511
512-- | Test if table have maximum number of nodes. No more nodes can be
513-- 'insert'ed, except old ones becomes bad.
514full :: BucketList -> Bool
515full (Tip _ n _) = n == 0
516full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t
517full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t
518
519-- | Get the /spine/ node id.
520thisId :: BucketList -> NodeId
521thisId (Tip nid _ _) = nid
522thisId (Zero table _) = thisId table
523thisId (One _ table) = thisId table
524
525-- | Number of nodes in a bucket or a table.
526type NodeCount = Int
527
528#endif
529
530-- | Internally, routing table is similar to list of buckets or a
531-- /matrix/ of nodes. This function returns the shape of the matrix.
532shape :: BucketList ni -> [Int]
533shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl
534
535#if 0
536
537-- | Get number of nodes in the table.
538size :: BucketList -> NodeCount
539size = L.sum . shape
540
541-- | Get number of buckets in the table.
542depth :: BucketList -> BucketCount
543depth = L.length . shape
544
545#endif
546
547lookupBucket :: forall ni nid x.
548 ( -- FiniteBits nid
549 Ord nid
550 ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x
551lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts
552 where
553 d = kademliaXor space nid (kademliaLocation space self)
554
555 go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni]
556 go i bs (bucket : buckets)
557 | kademliaTestBit space d i = bucket : buckets ++ bs
558 | otherwise = go (succ i) (bucket:bs) buckets
559 go _ bs [] = bs
560
561bucketNumber :: forall ni nid.
562 KademliaSpace nid ni -> nid -> BucketList ni -> Int
563bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts
564 where
565 d = kademliaXor space nid (kademliaLocation space self)
566
567 go :: Word -> [Bucket s ni] -> Word
568 go i (bucket : buckets)
569 | kademliaTestBit space d i = i
570 | otherwise = go (succ i) buckets
571 go i [] = i
572
573
574compatibleNodeId :: forall ni nid.
575 ( Serialize nid, FiniteBits nid) =>
576 (ni -> nid) -> BucketList ni -> IO nid
577compatibleNodeId nodeId tbl = genBucketSample prefix br
578 where
579 br = bucketRange (L.length (shape tbl) - 1) True
580 nodeIdSize = finiteBitSize (undefined :: nid) `div` 8
581 bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0
582 prefix = either error id $ S.decode bs
583
584tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8]
585tablePrefix testbit = map (packByte . take 8 . (++repeat False))
586 . chunksOf 8
587 . tableBits testbit
588 where
589 packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0]
590 bitmask ix True = bit ix
591 bitmask _ _ = 0
592
593tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool]
594tableBits testbit (BucketList self bkts) =
595 zipWith const (map (testbit self) [0..])
596 bkts
597
598selfNode :: BucketList ni -> ni
599selfNode (BucketList self _) = self
600
601chunksOf :: Int -> [e] -> [[e]]
602chunksOf i ls = map (take i) (build (splitter ls)) where
603 splitter :: [e] -> ([e] -> a -> a) -> a -> a
604 splitter [] _ n = n
605 splitter l c n = l `c` splitter (drop i l) c n
606
607build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a]
608build g = g (:) []
609
610
611
612-- | Count of closest nodes in find_node reply.
613type K = Int
614
615-- | Default 'K' is equal to 'defaultBucketSize'.
616defaultK :: K
617defaultK = 8
618
619#if 0
620class TableKey dht k where
621 toNodeId :: k -> NodeId
622
623instance TableKey dht (NodeId) where
624 toNodeId = id
625
626#endif
627
628-- | In Kademlia, the distance metric is XOR and the result is
629-- interpreted as an unsigned integer.
630newtype NodeDistance nodeid = NodeDistance nodeid
631 deriving (Eq, Ord)
632
633-- | distance(A,B) = |A xor B| Smaller values are closer.
634distance :: Bits nid => nid -> nid -> NodeDistance nid
635distance a b = NodeDistance $ xor a b
636
637-- | Order by closeness: nearest nodes first.
638rank :: ( Ord nid
639 ) => KademliaSpace nid ni -> nid -> [ni] -> [ni]
640rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space))
641
642
643-- | Get a list of /K/ closest nodes using XOR metric. Used in
644-- 'find_node' and 'get_peers' queries.
645kclosest :: ( -- FiniteBits nid
646 Ord nid
647 ) =>
648 KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni]
649kclosest space k nid tbl = take k $ rank space nid (L.concat bucket)
650 ++ rank space nid (L.concat everyone)
651 where
652 (bucket,everyone) =
653 L.splitAt 1
654 . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes))
655 $ tbl
656
657
658
659{-----------------------------------------------------------------------
660-- Routing
661-----------------------------------------------------------------------}
662
663splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
664 ( Reifies s (Compare ni) ) =>
665 (ni -> Word -> Bool)
666 -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ]
667splitTip testNodeBit ni i bucket
668 | testNodeBit ni i = [zeros , ones ]
669 | otherwise = [ones , zeros ]
670 where
671 (ones, zeros) = split testNodeBit i bucket
672
673-- | Used in each query.
674--
675-- TODO: Kademlia non-empty subtrees should should split if they have less than
676-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia
677-- paper. The rule requiring additional splits is in section 2.4.
678modifyBucket
679 :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
680 forall ni nid xs.
681 KademliaSpace nid ni
682 -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni)
683modifyBucket space nid f (BucketList self bkts)
684 = second (BucketList self) <$> go (0 :: BitIx) bkts
685 where
686 d = kademliaXor space nid (kademliaLocation space self)
687
688 -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni])
689
690 go !i (bucket : buckets@(_:_))
691 | kademliaTestBit space d i = second (: buckets) <$> f bucket
692 | otherwise = second (bucket :) <$> go (succ i) buckets
693
694 go !i [bucket] = second (: []) <$> f bucket <|> gosplit
695 where
696 gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space
697 . kademliaLocation space )
698 self
699 i
700 bucket)
701 | otherwise = Nothing -- Limit the number of buckets.
702
703
704bktCount :: BucketList ni -> Int
705bktCount (BucketList _ bkts) = L.length bkts
706
707-- | Triggering event for atomic table update
708data Event ni = TryInsert { foreignNode :: ni }
709 | PingResult { foreignNode :: ni , ponged :: Bool }
710
711#if 0
712deriving instance Eq (NodeId) => Eq (Event)
713deriving instance ( Show ip
714 , Show (NodeId)
715 , Show u
716 ) => Show (Event)
717
718#endif
719
720eventId :: (ni -> nid) -> Event ni -> nid
721eventId nodeId (TryInsert ni) = nodeId ni
722eventId nodeId (PingResult ni _) = nodeId ni
723
724
725-- | Actions requested by atomic table update
726data CheckPing ni = CheckPing [ni]
727
728#if 0
729
730deriving instance Eq (NodeId) => Eq (CheckPing)
731deriving instance ( Show ip
732 , Show (NodeId)
733 , Show u
734 ) => Show (CheckPing)
735
736#endif
737
738
739-- | Call on every inbound packet (including requested ping results).
740-- Returns a triple (was_inserted, to_ping, tbl') where
741--
742-- [ /was_inserted/ ] True if the node was added to the routing table.
743--
744-- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'.
745-- This will be empty if /was_inserted/, but a non-inserted node
746-- may be added to a replacement queue and will be inserted if
747-- one of the items in this list time out.
748--
749-- [ /tbl'/ ] The updated routing 'BucketList'.
750--
751updateForInbound ::
752 KademliaSpace nid ni
753 -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni)
754updateForInbound space tm ni tbl@(BucketList _ bkts) =
755 maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl'))
756 $ modifyBucket space
757 (kademliaLocation space ni)
758 (updateBucketForInbound tm ni)
759 tbl
760
761-- | Update the routing table with the results of a ping.
762--
763-- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the
764-- routing table and the node /b/, with timestamp /tm/, has taken its place.
765updateForPingResult ::
766 KademliaSpace nid ni
767 -> ni -- ^ The pinged node.
768 -> Bool -- ^ True if we got a reply, False if it timed out.
769 -> BucketList ni -- ^ The routing table.
770 -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni )
771updateForPingResult space ni got_reply tbl =
772 fromMaybe ([],tbl)
773 $ modifyBucket space
774 (kademliaLocation space ni)
775 (updateBucketForPingResult ni got_reply)
776 tbl
777
778
779{-----------------------------------------------------------------------
780-- Conversion
781-----------------------------------------------------------------------}
782
783type TableEntry ni = (ni, Timestamp)
784
785tableEntry :: NodeEntry ni -> TableEntry ni
786tableEntry (a :-> b) = (a, b)
787
788toList :: BucketList ni -> [[TableEntry ni]]
789toList (BucketList _ bkts) = coerce $ L.map (L.map tableEntry . PSQ.toList . bktNodes) bkts
790
791data KademliaSpace nid ni = KademliaSpace
792 { -- | Given a node record (probably including IP address), yields a
793 -- kademlia xor-metric location.
794 kademliaLocation :: ni -> nid
795 -- | Used when comparing locations. This is similar to
796 -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so
797 -- that 0 is the most significant bit.
798 , kademliaTestBit :: nid -> Word -> Bool
799 -- | The Kademlia xor-metric.
800 , kademliaXor :: nid -> nid -> nid
801
802 , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid
803 }
804
805instance Contravariant (KademliaSpace nid) where
806 contramap f ks = ks
807 { kademliaLocation = kademliaLocation ks . f
808 }
809
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs
new file mode 100644
index 00000000..1be1afc1
--- /dev/null
+++ b/kad/src/Network/Kademlia/Search.hs
@@ -0,0 +1,236 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE PatternSynonyms #-}
3{-# LANGUAGE RecordWildCards #-}
4{-# LANGUAGE ScopedTypeVariables #-}
5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE LambdaCase #-}
7module Network.Kademlia.Search where
8
9import Control.Concurrent.Tasks
10import Control.Concurrent.STM
11import Control.Monad
12import Data.Function
13import Data.Maybe
14import qualified Data.Set as Set
15 ;import Data.Set (Set)
16import Data.Hashable (Hashable(..)) -- for type sigs
17import System.IO.Error
18
19import qualified Data.MinMaxPSQ as MM
20 ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ')
21import qualified Data.Wrapper.PSQ as PSQ
22 ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey)
23import Network.Kademlia.Routing as R
24#ifdef THREAD_DEBUG
25import Control.Concurrent.Lifted.Instrument
26#else
27import Control.Concurrent.Lifted
28import GHC.Conc (labelThread)
29#endif
30
31data Search nid addr tok ni r = Search
32 { searchSpace :: KademliaSpace nid ni
33 , searchNodeAddress :: ni -> addr
34 , searchQuery :: Either (nid -> ni -> IO (Maybe ([ni], [r], Maybe tok)))
35 (nid -> ni -> (Maybe ([ni],[r],Maybe tok) -> IO ()) -> IO ())
36 , searchAlpha :: Int -- α = 8
37 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on
38 -- how fast the queries are. For Tox's much slower onion-routed queries, we
39 -- need to ensure that closer non-responding queries don't completely push out
40 -- farther away queries.
41 --
42 -- For BitTorrent, setting them both 8 was not an issue, but that is no longer
43 -- supported because now the number of remembered informants is now the
44 -- difference between these two numbers. So, if searchK = 16 and searchAlpha =
45 -- 4, then the number of remembered query responses is 12.
46 , searchK :: Int -- K = 16
47 }
48
49data SearchState nid addr tok ni r = SearchState
50 { -- | The number of pending queries. Incremented before any query is sent
51 -- and decremented when we get a reply.
52 searchPendingCount :: TVar Int
53 -- | Nodes scheduled to be queried (roughly at most K).
54 , searchQueued :: TVar (MinMaxPSQ ni nid)
55 -- | The nearest (K - α) nodes that issued a reply.
56 --
57 -- α is the maximum number of simultaneous queries.
58 , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok))
59 -- | This tracks already-queried addresses so we avoid bothering them
60 -- again. XXX: We could probably keep only the pending queries in this
61 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
62 -- should limit the number of outstanding queries.
63 , searchVisited :: TVar (Set addr)
64 , searchSpec :: Search nid addr tok ni r
65 }
66
67
68newSearch :: ( Ord addr
69 , PSQKey nid
70 , PSQKey ni
71 ) =>
72 {-
73 KademliaSpace nid ni
74 -> (ni -> addr)
75 -> (ni -> IO ([ni], [r])) -- the query action.
76 -> (r -> STM Bool) -- receives search results.
77 -> nid -- target of search
78 -}
79 Search nid addr tok ni r
80 -> nid
81 -> [ni] -- Initial nodes to query.
82 -> STM (SearchState nid addr tok ni r)
83newSearch s@(Search space nAddr qry _ _) target ns = do
84 c <- newTVar 0
85 q <- newTVar $ MM.fromList
86 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
87 $ ns
88 i <- newTVar MM.empty
89 v <- newTVar Set.empty
90 return -- (Search space nAddr qry) , r , target
91 ( SearchState c q i v s )
92
93-- | Discard a value from a key-priority-value tuple. This is useful for
94-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ".
95stripValue :: Binding' k p v -> Binding k p
96stripValue (Binding ni _ nid) = (ni :-> nid)
97
98-- | Reset a 'SearchState' object to ready it for a repeated search.
99reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) =>
100 (nid -> STM [ni])
101 -> Search nid addr1 tok1 ni r1
102 -> nid
103 -> SearchState nid addr tok ni r
104 -> STM (SearchState nid addr tok ni r)
105reset nearestNodes qsearch target st = do
106 searchIsFinished st >>= check -- Wait for a search to finish before resetting.
107 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni)
108 <$> nearestNodes target
109 priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st)
110 writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes
111 writeTVar (searchInformant st) MM.empty
112 writeTVar (searchVisited st) Set.empty
113 writeTVar (searchPendingCount st) 0
114 return st
115
116sendAsyncQuery :: forall addr nid tok ni r.
117 ( Ord addr
118 , PSQKey nid
119 , PSQKey ni
120 , Show nid
121 ) =>
122 Search nid addr tok ni r
123 -> nid
124 -> (r -> STM Bool) -- ^ return False to terminate the search.
125 -> SearchState nid addr tok ni r
126 -> Binding ni nid
127 -> TaskGroup
128 -> IO ()
129sendAsyncQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) g =
130 case searchQuery of
131 Left blockingQuery ->
132 forkTask g "searchQuery" $ do
133 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
134 reply <- blockingQuery searchTarget ni `catchIOError` const (return Nothing)
135 atomically $ do
136 modifyTVar searchPendingCount pred
137 maybe (return ()) go reply
138 Right nonblockingQuery -> do
139 nonblockingQuery searchTarget ni $ \reply ->
140 atomically $ do
141 modifyTVar searchPendingCount pred
142 maybe (return ()) go reply
143 where
144 go (ns,rs,tok) = do
145 vs <- readTVar searchVisited
146 -- We only queue a node if it is not yet visited
147 let insertFoundNode :: Int
148 -> ni
149 -> MinMaxPSQ ni nid
150 -> MinMaxPSQ ni nid
151 insertFoundNode k n q
152 | searchNodeAddress n `Set.member` vs
153 = q
154 | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget
155 $ kademliaLocation searchSpace n )
156 q
157
158 qsize0 <- MM.size <$> readTVar searchQueued
159 let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow
160 -- only when there's fewer than
161 -- K elements.
162 modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns
163 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d
164 flip fix rs $ \loop -> \case
165 r:rs' -> do
166 wanting <- searchResult r
167 if wanting then loop rs'
168 else searchCancel sch
169 [] -> return ()
170
171
172searchIsFinished :: ( PSQKey nid
173 , PSQKey ni
174 ) => SearchState nid addr tok ni r -> STM Bool
175searchIsFinished SearchState{..} = do
176 q <- readTVar searchQueued
177 cnt <- readTVar searchPendingCount
178 informants <- readTVar searchInformant
179 return $ cnt == 0
180 && ( MM.null q
181 || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec)
182 && ( PSQ.prio (fromJust $ MM.findMax informants)
183 <= PSQ.prio (fromJust $ MM.findMin q))))
184
185searchCancel :: SearchState nid addr tok ni r -> STM ()
186searchCancel SearchState{..} = do
187 writeTVar searchPendingCount 0
188 writeTVar searchQueued MM.empty
189
190search ::
191 ( Ord r
192 , Ord addr
193 , PSQKey nid
194 , PSQKey ni
195 , Show nid
196 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r)
197search sch buckets target result = do
198 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets
199 st <- atomically $ newSearch sch target ns
200 forkIO $ searchLoop sch target result st
201 return st
202
203searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni )
204 => Search nid addr tok ni r -- ^ Query and distance methods.
205 -> nid -- ^ The target we are searching for.
206 -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching.
207 -> SearchState nid addr tok ni r -- ^ Search-related state.
208 -> IO ()
209searchLoop sch@Search{..} target result s@SearchState{..} = do
210 myThreadId >>= flip labelThread ("search."++show target)
211 withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do
212 join $ atomically $ do
213 cnt <- readTVar $ searchPendingCount
214 check (cnt <= 8) -- Only 8 pending queries at a time.
215 informants <- readTVar searchInformant
216 found <- MM.minView <$> readTVar searchQueued
217 case found of
218 Just (ni :-> d, q)
219 | -- If there's fewer than /k - α/ informants and there's any
220 -- node we haven't yet got a response from.
221 (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q))
222 -- Or there's no informants yet at all.
223 || MM.null informants
224 -- Or if the closest scheduled node is nearer than the
225 -- nearest /k/ informants.
226 || (d < PSQ.prio (fromJust $ MM.findMax informants))
227 -> -- Then the search continues, send a query.
228 do writeTVar searchQueued q
229 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
230 modifyTVar searchPendingCount succ
231 return $ do
232 sendAsyncQuery sch target result s (ni :-> d) g
233 again
234 _ -> -- Otherwise, we are finished.
235 do check (cnt == 0)
236 return $ return ()