1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
|
{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE DeriveFunctor, DeriveTraversable #-}
-- {-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PatternSynonyms #-}
module Kademlia where
import Data.Function
import Data.Maybe
import qualified Data.Set as Set
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds)
import Network.DHT.Routing as R
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
import GHC.Conc (labelThread)
#endif
import Control.Concurrent.STM
import Control.Monad
import Data.Bits
import Data.Hashable
import Data.IP
import Data.Monoid
import Data.Serialize (Serialize)
import Data.Time.Clock.POSIX (POSIXTime)
import qualified Data.Wrapper.PSQInt as Int
;import Data.Wrapper.PSQInt (pattern (:->))
import Network.Address (bucketRange,genBucketSample)
import Network.BitTorrent.DHT.Search
import System.Timeout
import Text.PrettyPrint as PP hiding (($$), (<>))
import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
import System.IO
import Tasks
-- | The status of a given node with respect to a given routint table.
data RoutingStatus
= Stranger -- ^ The node is unknown to the Kademlia routing table.
| Applicant -- ^ The node may be inserted pending a ping timeout.
| Accepted -- ^ The node has a slot in one of the Kademlia buckets.
deriving (Eq,Ord,Enum,Show,Read)
-- | A change occured in the kademlia routing table.
data RoutingTransition ni = RoutingTransition
{ transitioningNode :: ni
, transitionedTo :: !RoutingStatus
}
deriving (Eq,Ord,Show,Read)
data InsertionReporter ni = InsertionReporter
{ -- | Called on every inbound packet.
reportArrival :: POSIXTime
-> ni -- ^ Origin of packet.
-> [ni] -- ^ These will be pinged as a result.
-> IO ()
-- | Called on every ping probe.
, reportPingResult :: POSIXTime
-> ni -- ^ Who was pinged.
-> Bool -- ^ True if they ponged.
-> IO ()
}
quietInsertions :: InsertionReporter ni
quietInsertions = InsertionReporter
{ reportArrival = \_ _ _ -> return ()
, reportPingResult = \_ _ _ -> return ()
}
contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t
contramapIR f ir = InsertionReporter
{ reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis)
, reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b
}
-- | All the IO operations neccessary to maintain a Kademlia routing table.
data TableStateIO ni = TableStateIO
{ -- | Write the routing table. Typically 'writeTVar'.
tblWrite :: R.BucketList ni -> STM ()
-- | Read the routing table. Typically 'readTVar'.
, tblRead :: STM (R.BucketList ni)
-- | Issue a ping to a remote node and report 'True' if the node
-- responded within an acceptable time and 'False' otherwise.
, tblPing :: ni -> IO Bool
-- | Convenience method provided to assist in maintaining state
-- consistent with the routing table. It will be invoked in the same
-- transaction that 'tblRead'\/'tblWrite' occured but only when there was
-- an interesting change. The returned IO action will be triggered soon
-- afterward.
--
-- It is not necessary to do anything interesting here. The following
-- trivial implementation is fine:
--
-- > tblTransition = const $ return $ return ()
, tblTransition :: RoutingTransition ni -> STM (IO ())
}
vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni
vanillaIO var ping = TableStateIO
{ tblRead = readTVar var
, tblWrite = writeTVar var
, tblPing = ping
, tblTransition = const $ return $ return ()
}
-- | Everything neccessary to maintain a routing table of /ni/ (node
-- information) entries.
data Kademlia nid ni = Kademlia (InsertionReporter ni)
(KademliaSpace nid ni)
(TableStateIO ni)
-- Helper to 'insertNode'.
--
-- Adapt return value from 'updateForPingResult' into a
-- more easily groked list of transitions.
transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni]
transition (x,m) =
-- | Just _ <- m = Node transition: Accepted --> Stranger
-- | Nothing <- m = Node transition: Applicant --> Stranger
RoutingTransition x Stranger
: maybeToList (accepted <$> m)
-- Helper to 'transition'
--
-- Node transition: Applicant --> Accepted
accepted :: (t,ni) -> RoutingTransition ni
accepted (_,y) = RoutingTransition y Accepted
insertNode :: Kademlia nid ni -> ni -> IO ()
insertNode (Kademlia reporter space io) node = do
tm <- utcTimeToPOSIXSeconds <$> getCurrentTime
(ps,reaction) <- atomically $ do
tbl <- tblRead io
let (inserted, ps,t') = R.updateForInbound space tm node tbl
tblWrite io t'
reaction <- case ps of
_ | inserted -> -- Node transition: Stranger --> Accepted
tblTransition io $ RoutingTransition node Accepted
(_:_) -> -- Node transition: Stranger --> Applicant
tblTransition io $ RoutingTransition node Applicant
_ -> return $ return ()
return (ps, reaction)
reportArrival reporter tm node ps
reaction
_ <- fork $ do
myThreadId >>= flip labelThread "pingResults"
forM_ ps $ \n -> do
b <- tblPing io n
reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result
join $ atomically $ do
tbl <- tblRead io
let (replacements, t') = R.updateForPingResult space n b tbl
tblWrite io t'
ios <- sequence $ concatMap
(map (tblTransition io) . transition)
replacements
return $ sequence_ ios
return ()
-- TODO: Bootstrap/Refresh
--
-- From BEP 05:
--
-- Each bucket should maintain a "last changed" property to indicate how
-- "fresh" the contents are.
--
-- Note: We will use a "time to next refresh" property instead and store it in
-- a priority search queue.
--
-- When...
--
-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
-- >>> bucketEvents =
-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
-- >>>
-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
-- >>>
-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
-- >>> , Applicant :--> Accepted -- with another node,
-- >>> ]
--
-- the bucket's last changed property should be updated. Buckets
-- that have not been changed in 15 minutes should be "refreshed." This is done
-- by picking a random ID in the range of the bucket and performing a
-- find_nodes search on it.
--
-- The only other possible BucketTouchEvents are as follows:
--
-- >>> not_handled =
-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
-- >>> -- (Applicant :--> Stranger)
-- >>> -- (Applicant :--> Accepted)
-- >>> , Accepted :--> Applicant -- Never happens
-- >>> ]
--
-- XXX: This will be redundantly triggered twice upon every node replacement
-- because we do not currently distinguish between standalone
-- insertion/deletion events and an insertion/deletion pair constituting
-- replacement.
--
-- It might also be better to pass the timestamp of the transition here and
-- keep the refresh queue in better sync with the routing table by updating it
-- within the STM monad.
touchBucket :: KademliaSpace nid ni
-> POSIXTime
-> TVar (BucketList ni)
-> TVar (Int.PSQ POSIXTime)
-> RoutingTransition ni
-> STM (IO ())
touchBucket space interval bkts psq tr
| (transitionedTo tr == Applicant)
= return $ return ()
| otherwise = return $ do
now <- getPOSIXTime
atomically $ do
let nid = kademliaLocation space (transitioningNode tr)
num <- R.bucketNumber space nid <$> readTVar bkts
modifyTVar' psq $ Int.insert num (now + interval)
-- | > pollForRefresh interval queue refresh
--
-- Fork a refresh loop. Kill the returned thread to terminate it. The
-- arguments are: a staleness threshold (if a bucket goes this long without
-- being touched, a refresh will be triggered), a TVar with the time-to-refresh
-- schedule for each bucket, and a refresh action to be forked when a bucket
-- excedes the staleness threshold.
--
-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
-- TVar.
forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId
forkPollForRefresh interval psq refresh = do
fork $ do
myThreadId >>= flip labelThread "pollForRefresh"
fix $ \again -> do
join $ atomically $ do
nextup <- Int.findMin <$> readTVar psq
maybe retry (return . go again) nextup
where
go again ( bktnum :-> refresh_time ) = do
now <- getPOSIXTime
case fromEnum (refresh_time - now) of
x | x <= 0 -> do -- Refresh time!
-- Move it to the back of the refresh queue.
atomically $ modifyTVar' psq
$ Int.insert bktnum (now + interval)
-- Now fork the refresh operation.
-- TODO: We should probably propogate the kill signal to this thread.
fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
_ <- refresh bktnum
return ()
return ()
seconds -> threadDelay ( seconds * 1000000 )
again
refreshBucket :: forall nid tok ni addr.
( Show nid, FiniteBits nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) =>
Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int
refreshBucket sch var n = do
tbl <- atomically (readTVar var)
let count = bktCount tbl
nid = kademliaLocation (searchSpace sch) (thisNode tbl)
sample <- if n+1 >= count -- Is this the last bucket?
then return nid -- Yes? Search our own id.
else genBucketSample nid -- No? Generate a random id.
(bucketRange n (n + 1 < count))
fin <- atomically $ newTVar False
resultCounter <- atomically $ newTVar Set.empty
let fullcount = R.defaultBucketSize
saveit True = writeTVar fin True >> return True
saveit _ = return False
checkBucketFull :: ni -> STM Bool
checkBucketFull found_node = do
tbl <- readTVar var
let counts = R.shape tbl
when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
$ modifyTVar' resultCounter (Set.insert found_node)
resultCount <- readTVar resultCounter
saveit $ case drop (n - 1) counts of
(cnt:_) | cnt < fullcount -> True
_ | Set.size resultCount < fullcount -> True
_ -> False
hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
-- Set 15 minute timeout in order to avoid overlapping refreshes.
s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
then const $ return True -- Never short-circuit the last bucket.
else checkBucketFull
_ <- timeout (15*60*1000000) $ do
atomically $ searchIsFinished s >>= check
atomically $ searchCancel s
hPutStrLn stderr $ "Finish refresh " ++ show (n,sample)
rcount <- atomically $ do
c <- Set.size <$> readTVar resultCounter
b <- readTVar fin
return $ if b then 1 else c
return rcount
bootstrap ::
( Show nid
, Serialize nid
, FiniteBits nid
, Hashable ni
, Hashable nid
, Ord ni
, Ord addr
, Ord nid
, Traversable t1
, Traversable t
) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
bootstrap sch var ping ns ns0 = do
gotPing <- atomically $ newTVar False
-- First, ping the given nodes so that they are added to
-- our routing table.
withTaskGroup "bootstrap.resume" 20 $ \g -> do
forM_ ns $ \n -> do
let lbl = show $ kademliaLocation (searchSpace sch) n
forkTask g lbl $ do
b <- ping n
when b $ atomically $ writeTVar gotPing True
-- We resort to the hardcoded fallback nodes only when we got no
-- responses. This is to lesson the burden on well-known boostrap
-- nodes.
fallback <- atomically (readTVar gotPing) >>= return . when . not
fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
forM_ ns0 $ \n -> do
forkTask g (show $ kademliaLocation (searchSpace sch) n)
(void $ ping n)
hPutStrLn stderr "Finished bootstrap pings."
-- Now run searches until all the buckets are full. On a small network,
-- this may never quit.
--
-- TODO: For small networks, we should give up on filling a nearby bucket
-- at some point and move on to one farther away.
flip fix 1 $ \again cnt -> do
when (cnt==0) $ do
-- Force a delay in case the search returns too quickly
hPutStrLn stderr $ "Zero results, forcing 1 minute delay"
threadDelay (60 * 1000000)
tbl <- atomically $ readTVar var
let shp = zip (R.shape tbl) [0 .. ]
unfull = filter ( (< R.defaultBucketSize) . fst ) shp
case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of
[] -> do
when (length shp < R.defaultBucketCount) $ do
-- Not enough buckets, keep trying.
hPutStrLn stderr
$ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1)
cnt <- refreshBucket sch var
(R.defaultBucketCount - 1)
again cnt
(size,num):_ -> do
-- If we don't yet have enough buckets, we need to search our own id.
-- We indicate that by setting the bucket number to the target.
let num' | bktCount tbl < R.defaultBucketCount = R.defaultBucketCount - 1
| otherwise = num
hPutStrLn stderr $ "Bucket too small, refresh "++ show (num',(size,num),shp)
cnt <- refreshBucket sch var num'
again cnt
|