summaryrefslogtreecommitdiff
path: root/kad/src/Network/Kademlia/Bootstrap.hs
blob: c07b3c6c0e45a2e43dab3dcf62ab9d1d2a3575ad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
{-# LANGUAGE CPP                   #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE DeriveFunctor         #-}
{-# LANGUAGE DeriveTraversable     #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE KindSignatures        #-}
{-# LANGUAGE LambdaCase            #-}
{-# LANGUAGE NamedFieldPuns        #-}
{-# LANGUAGE RecordWildCards       #-}
{-# LANGUAGE NondecreasingIndentation #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE PatternSynonyms       #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE ScopedTypeVariables   #-}
module Network.Kademlia.Bootstrap where

import Data.Function
import qualified Data.IntMap.Strict as IntMap
         ;import Data.IntMap.Strict (IntMap)
import Data.Maybe
import qualified Data.Set as Set
import Data.Time.Clock.POSIX (getPOSIXTime)
import Network.Kademlia.Routing   as R
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
import GHC.Conc                  (labelThread)
#endif
import Control.Concurrent.STM
import Control.Monad
import Data.Hashable
import Data.Time.Clock.POSIX          (POSIXTime)
import Data.Ord
import System.Entropy
import System.Timeout
import DPut
import DebugTag

import qualified Data.Wrapper.PSQInt  as Int
         ;import Data.Wrapper.PSQInt  (pattern (:->))
import Network.Address                (bucketRange)
import Network.Kademlia.Search
import Control.Concurrent.Tasks
import Network.Kademlia

type SensibleNodeId nid ni =
                 ( Show nid
                 , Ord nid
                 , Ord ni
                 , Hashable nid
                 , Hashable ni )

data BucketRefresher nid ni = forall tok addr. Ord addr =>  BucketRefresher
    { -- | A staleness threshold (if a bucket goes this long without being
      -- touched, a refresh will be triggered).
      refreshInterval  :: POSIXTime
      -- | A TVar with the time-to-refresh schedule for each bucket.
      --
      -- To "touch" a bucket and prevent it from being refreshed, reschedule
      -- its refresh time to some time into the future by modifying its
      -- priority in this priority search queue.
    , refreshQueue     :: TVar (Int.PSQ POSIXTime)
      -- | This is the kademlia node search specification.
    , refreshSearch    :: Search nid addr tok ni ni
      -- | The current kademlia routing table buckets.
    , refreshBuckets   :: TVar (R.BucketList ni)
      -- | Action to ping a node.  This is used only during initial bootstrap
      -- to get some nodes in our table.  A 'True' result is interpreted as a a
      -- pong, where 'False' is a non-response.
    , refreshPing      :: ni -> IO Bool
    , -- | Timestamp of last bucket event.
      refreshLastTouch :: TVar POSIXTime
    , -- | This variable indicates whether or not we are in bootstrapping mode.
      bootstrapMode    :: TVar Bool
    , -- | When this countdown reaches 0, we exit bootstrap mode.  It is decremented on
      -- every finished refresh.
      bootstrapCountdown :: TVar (Maybe Int)
      -- | Internal state of background searches.  Exposed for debugging purposes.
    , refreshState     :: TVar (IntMap [BucketSearch nid ni])
    }

newBucketRefresher :: ( Ord addr, Hashable addr
                      , SensibleNodeId nid ni )
                    => TVar (R.BucketList ni)
                    -> Search nid addr tok ni ni
                    -> (ni -> IO Bool)
                    -> STM (BucketRefresher nid ni)
newBucketRefresher bkts sch ping = do
    let spc    = searchSpace sch
        nodeId = kademliaLocation spc
    -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount
    sched <- newTVar Int.empty
    lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas...
    bootstrapVar <- newTVar True -- Start in bootstrapping mode.
    bootstrapCnt <- newTVar Nothing
    st <- newTVar IntMap.empty
    return  BucketRefresher
                    { refreshInterval    = 15 * 60
                    , refreshQueue       = sched
                    , refreshSearch      = sch
                    , refreshBuckets     = bkts
                    , refreshPing        = ping
                    , refreshLastTouch   = lasttouch
                    , bootstrapMode      = bootstrapVar
                    , bootstrapCountdown = bootstrapCnt
                    , refreshState       = st
                    }

-- | This was added to avoid the compile error "Record update for
-- insufficiently polymorphic field" when trying to update the existentially
-- quantified field 'refreshSearch'.
updateRefresherIO :: Ord addr
                     => Search nid addr tok ni ni
                     -> (ni -> IO Bool)
                     -> BucketRefresher nid ni ->  BucketRefresher nid ni
updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
    { refreshSearch      = sch
    , refreshPing        = ping
    , refreshInterval    = refreshInterval
    , refreshBuckets     = refreshBuckets
    , refreshQueue       = refreshQueue
    , refreshLastTouch   = refreshLastTouch
    , bootstrapMode      = bootstrapMode
    , bootstrapCountdown = bootstrapCountdown
    , refreshState       = refreshState
    }

-- | Fork a refresh loop.  Kill the returned thread to terminate it.
forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId
forkPollForRefresh r@BucketRefresher{ refreshInterval
                                    , refreshQueue
                                    , refreshBuckets
                                    , refreshSearch   } = fork $ do
      myThreadId >>= flip labelThread "pollForRefresh"
      fix $ \again -> do
        join $ atomically $ do
            nextup <- Int.findMin <$> readTVar refreshQueue
            maybe retry (return . go again) nextup
 where
    refresh :: Int -> IO Int
    refresh n = do
        -- dput XRefresh $ "Refresh time! "++ show n
        refreshBucket r n

    go again ( bktnum :-> refresh_time ) = do
        now <- getPOSIXTime
        case fromEnum (refresh_time - now) of
            x | x <= 0 -> do -- Refresh time!
                             -- Move it to the back of the refresh queue.
                             atomically $ do
                                interval <- effectiveRefreshInterval r bktnum
                                modifyTVar' refreshQueue
                                            $ Int.insert bktnum (now + interval)
                             -- Now fork the refresh operation.
                             -- TODO: We should probably propogate the kill signal to this thread.
                             fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
                                       _ <- refresh bktnum
                                       return ()
                             return ()
            picoseconds -> do
                -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum
                threadDelay ( picoseconds `div` 10^6 )
        again


-- | This is a helper to 'refreshBucket' which does some book keeping to decide
-- whether or not a bucket is sufficiently refreshed or not.  It will return
-- false when we can terminate a node search.
checkBucketFull :: Ord ni => KademliaSpace nid ni   -- ^ Obtain a node id from a node.
                            -> TVar (BucketList ni) -- ^ The current routing table.
                            -> TVar (Set.Set ni)    -- ^ In-range nodes found so far.
                            -> TVar Bool            -- ^ The result will also be written here.
                            -> Int                  -- ^ The bucket number of interest.
                            -> ni                   -- ^ A newly found node.
                            -> STM Bool
checkBucketFull space var resultCounter fin n found_node = do
    let fullcount = R.defaultBucketSize
        saveit True = writeTVar fin True >> return True
        saveit _    = return False
    tbl <- readTVar var
    let counts = R.shape tbl
        nid = kademliaLocation space found_node
    -- Update the result set with every found node that is in the
    -- bucket of interest.
    when (n == R.bucketNumber space nid tbl)
         $ modifyTVar' resultCounter (Set.insert found_node)
    resultCount <- readTVar resultCounter
    saveit $ case drop (n - 1) counts of
        (cnt:_) | cnt < fullcount            -> True  -- bucket not full, keep going
        _ | Set.size resultCount < fullcount -> True  -- we haven't got many results, keep going
        _                                    -> False -- okay, good enough, let's quit.

-- | Called from 'refreshBucket' with the current time when a refresh of the
-- supplied bucket number finishes.
onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ())
onFinishedRefresh BucketRefresher { bootstrapCountdown
                                  , bootstrapMode
                                  , refreshQueue
                                  , refreshBuckets     } num now = do
    bootstrapping <- readTVar bootstrapMode
    if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num
                         else do
    tbl <- readTVar refreshBuckets
    action <-
      if num /= R.bktCount tbl - 1
        then do modifyTVar' bootstrapCountdown (fmap pred)
                return $ return () -- dput XRefresh $ "BOOTSTRAP decrement"
        else do
            -- The last bucket finished.
            cnt <- readTVar bootstrapCountdown
            case cnt of
                Nothing -> do
                    let fullsize = R.defaultBucketSize
                        notfull (n,len) | n==num         = False
                                        | len>=fullsize  = False
                                        | otherwise      = True
                        unfull = case filter notfull $ zip [0..] (R.shape tbl) of
                                    [] -> [(0,0)] -- Schedule at least 1 more refresh.
                                    xs -> xs
                    forM_ unfull $ \(n,_) -> do
                        -- Schedule immediate refresh for unfull buckets (other than this one).
                        modifyTVar' refreshQueue $ Int.insert n (now - 1)
                    writeTVar bootstrapCountdown $! Just $! length unfull
                    return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull
                Just n  -> do writeTVar bootstrapCountdown $! Just $! pred n
                              return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)"
    cnt <- readTVar bootstrapCountdown
    if (cnt == Just 0)
    then do
        -- Boostrap finished!
        writeTVar bootstrapMode False
        writeTVar bootstrapCountdown Nothing
        return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")."
    else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt)

data BucketSearch nid ni = forall addr tok. BucketSearch
    { bucketSample  :: nid
    , bucketResults :: TVar (Set.Set ni)
    , bucketFinFlag :: TVar Bool
    , bucketState   :: SearchState nid addr tok ni ni
    , bucketThread  :: ThreadId
    }

removeBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni]
removeBucketState bst Nothing   = Nothing
removeBucketState bst (Just xs) = case filter (\b -> bucketThread b /= bucketThread bst) xs of
    [] -> Nothing
    ys -> Just ys

insertBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni]
insertBucketState bst Nothing   = Just [bst]
insertBucketState bst (Just xs) = Just (bst : xs)

refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) =>
                 BucketRefresher nid ni -> Int -> IO Int
refreshBucket r@BucketRefresher{ refreshSearch  = sch
                               , refreshBuckets = var
                               , refreshState   = rstate }
              n = do
    tbl <- atomically (readTVar var)
    let count = bktCount tbl
        nid = kademliaLocation (searchSpace sch) (thisNode tbl)
    sample <- if n+1 >= count                         -- Is this the last bucket?
                then return nid                       -- Yes? Search our own id.
                else kademliaSample (searchSpace sch) -- No? Generate a random id.
                        getEntropy
                        nid
                        (bucketRange n (n + 1 < count))
    fin <- atomically $ newTVar False
    resultCounter <- atomically $ newTVar Set.empty

    dput XRefresh $ "Start refresh " ++ show (n,sample)

    -- Set 15 minute timeout in order to avoid overlapping refreshes.
    (s,thread) <- search sch tbl sample $ if n+1 == R.defaultBucketCount
                                            then const $ return True -- Never short-circuit the last bucket.
                                            else checkBucketFull (searchSpace sch) var resultCounter fin n
    let bstate = BucketSearch sample resultCounter fin s thread
    atomically $ modifyTVar' rstate $ IntMap.alter (insertBucketState bstate) n
    _ <- timeout (15*60*1000000) $ do
            atomically $ searchIsFinished s >>= check
    atomically $ searchCancel s
    dput XDHT $ "Finish refresh " ++ show (n,sample)
    bg <- forkIO $ do
        atomically $ do
            searchIsFinished s >>= check
            modifyTVar' rstate $ IntMap.alter (removeBucketState bstate) n

    labelThread bg ("backgrounded." ++ show n ++ "." ++ show sample)
    now <- getPOSIXTime
    join $ atomically $ onFinishedRefresh r n now
    rcount <- atomically $ do
        c <- Set.size <$> readTVar resultCounter
        b <- readTVar fin
        return $ if b then 1 else c
    return rcount

refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ()
refreshLastBucket r@BucketRefresher { refreshBuckets
                                    , refreshQueue   } = do

    now <- getPOSIXTime
    atomically $ do
    cnt <- bktCount <$> readTVar refreshBuckets
    -- Schedule immediate refresh.
    modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1)

restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) =>
                 BucketRefresher nid ni -> STM (IO ())
restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
    unchanged <- readTVar bootstrapMode
    writeTVar bootstrapMode True
    writeTVar bootstrapCountdown Nothing
    if not unchanged then return $ do
                            dput XRefresh "BOOTSTRAP entered bootstrap mode"
                            refreshLastBucket r
                     else return $ dput XRefresh "BOOTSTRAP already bootstrapping"

bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
             BucketRefresher nid ni
             -> t1 ni -- ^ Nodes to bootstrap from.
             -> t ni  -- ^ Fallback nodes; used only if the others are unresponsive.
             -> IO ()
bootstrap r@BucketRefresher { refreshSearch  = sch
                            , refreshBuckets = var
                            , refreshPing    = ping
                            , bootstrapMode         } ns ns0 = do
    gotPing <- atomically $ newTVar False

    -- First, ping the given nodes so that they are added to
    -- our routing table.
    withTaskGroup "bootstrap.resume" 20 $ \g -> do
        forM_ ns $ \n -> do
            let lbl = show $ kademliaLocation (searchSpace sch) n
            forkTask g lbl $ do
                b <- ping n
                when b $ atomically $ writeTVar gotPing True

    -- We resort to the hardcoded fallback nodes only when we got no
    -- responses.  This is to lesson the burden on well-known boostrap
    -- nodes.
    fallback <- atomically (readTVar gotPing) >>= return . when . not
    fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
        forM_ ns0 $ \n -> do
            forkTask g (show $ kademliaLocation (searchSpace sch) n)
                       (void $ ping n)
    dput XDHT "Finished bootstrap pings."
    -- Now search our own Id by entering bootstrap mode from non-bootstrap mode.
    join $ atomically $ do
        writeTVar bootstrapMode False
        restartBootstrap r
    --
    -- Hopefully 'forkPollForRefresh' was invoked and can take over
    -- maintenance.


effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime
effectiveRefreshInterval BucketRefresher{ refreshInterval
                                        , refreshBuckets
                                        , bootstrapMode   } num = do
    tbl <- readTVar refreshBuckets
    bootstrapping <- readTVar bootstrapMode
    case bootstrapping of
        False -> return refreshInterval
        True  -> do
            -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds.
            let fullcount = R.defaultBucketSize
                count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl
            if count == fullcount
                then return refreshInterval
                else return 15 -- seconds



-- | Reschedule a bucket's refresh-time.  It should be called whenever a bucket
-- changes.  This will typically be invoked from 'tblTransition'.
--
-- From BEP 05:
--
-- > Each bucket should maintain a "last changed" property to indicate how
-- > "fresh" the contents are.
--
-- We will use a "time to next refresh" property instead and store it in
-- a priority search queue.
--
-- In detail using an expository (not actually implemented) type
-- 'BucketTouchEvent'...
--
-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
-- >>> bucketEvents =
-- >>>  [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
-- >>>
-- >>>  , Stranger  :--> Accepted -- or a node is added to a bucket,
-- >>>
-- >>>  , Accepted  :--> Stranger -- or a node in a bucket is replaced
-- >>>  , Applicant :--> Accepted -- with another node,
-- >>>  ]
--
-- the bucket's last changed property should be updated. Buckets that have not
-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed."
-- This is done by picking a random ID in the range of the bucket and
-- performing a find_nodes search on it.
--
-- The only other possible BucketTouchEvents are as follows:
--
-- >>> not_handled =
-- >>>   , Stranger :--> Applicant -- A ping is pending, it's result is covered:
-- >>>                             --     (Applicant :--> Stranger)
-- >>>                             --     (Applicant :--> Accepted)
-- >>>   , Accepted :--> Applicant -- Never happens
-- >>>   ]
--
-- Because this BucketTouchEvent type is not actually implemented and we only
-- receive notifications of a node's new state, it suffices to reschedule the
-- bucket refresh 'touchBucket' on every transition to a state other than
-- 'Applicant'.
--
-- XXX: Unfortunately, this means redundantly triggering twice upon every node
-- replacement because we do not currently distinguish between standalone
-- insertion/deletion events and an insertion/deletion pair constituting
-- replacement.
--
-- It might also be better to pass the timestamp of the transition here and
-- keep the refresh queue in better sync with the routing table by updating it
-- within the STM monad.
--
-- We embed the result in the STM monad but currently, no STM state changes
-- occur until the returned IO action is invoked.  TODO: simplify?
touchBucket :: SensibleNodeId nid ni
            => BucketRefresher nid ni
            -> RoutingTransition ni     -- ^ What happened to the bucket?
            -> STM (IO ())
touchBucket r@BucketRefresher{ refreshSearch
                             , refreshInterval
                             , refreshBuckets
                             , refreshQueue
                             , refreshLastTouch
                             , bootstrapMode
                             , bootstrapCountdown }
            RoutingTransition{ transitionedTo
                             , transitioningNode }
    = case transitionedTo of
        Applicant -> return $ return () -- Ignore transition to applicant.
        _         -> return $ do        -- Reschedule for any other transition.
         now <- getPOSIXTime
         join $ atomically $ do
            let space = searchSpace refreshSearch
                nid = kademliaLocation space transitioningNode
            tbl <- readTVar refreshBuckets
            let num = R.bucketNumber space nid tbl
            stamp <- readTVar refreshLastTouch
            action <- case stamp /= 0 && (now - stamp > 60) of
                True -> do
                    -- It's been one minute since any bucket has been touched, re-enter bootstrap mode.
                    restartBootstrap r
                False -> return $ return ()
            interval <- effectiveRefreshInterval r num
            modifyTVar' refreshQueue $ Int.insert num (now + interval)
            writeTVar refreshLastTouch now
            return action

refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni
refreshKademlia r@BucketRefresher { refreshSearch  = sch
                                  , refreshPing    = ping
                                  , refreshBuckets = bkts
                                  }
                = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping)
                    { tblTransition = \tr -> do
                        io <- touchBucket r tr
                        return io
                    }