summaryrefslogtreecommitdiff
path: root/src/Network/Kademlia/Bootstrap.hs
blob: 87fdc22fef53605531d4b46b4aa763e7b0b2036d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
{-# LANGUAGE CPP                   #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE DeriveFunctor         #-}
{-# LANGUAGE DeriveTraversable     #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE KindSignatures        #-}
{-# LANGUAGE NamedFieldPuns        #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE PatternSynonyms       #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE ScopedTypeVariables   #-}
module Network.Kademlia.Bootstrap where

import Data.Function
import Data.Maybe
import qualified Data.Set as Set
import Data.Time.Clock       (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds)
import Network.Kademlia.Routing   as R
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
import GHC.Conc                  (labelThread)
#endif
import Control.Concurrent.STM
import Control.Monad
import Data.Bits
import Data.Hashable
import Data.IP
import Data.Monoid
import Data.Serialize                 (Serialize)
import Data.Time.Clock.POSIX          (POSIXTime)
import Data.Ord
import System.Entropy
import System.Timeout
import Text.PrettyPrint               as PP hiding (($$), (<>))
import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
import System.IO

import qualified Data.Wrapper.PSQInt  as Int
         ;import Data.Wrapper.PSQInt  (pattern (:->))
import Network.Address                (bucketRange,genBucketSample)
import Network.Kademlia.Search
import Control.Concurrent.Tasks
import Network.Kademlia

type SensibleNodeId nid ni =
                 ( Show nid
                 , Ord nid
                 , Ord ni
                 , Hashable nid
                 , Hashable ni )

data BucketRefresher nid ni = forall tok addr. Ord addr =>  BucketRefresher
    { -- | A staleness threshold (if a bucket goes this long without being
      -- touched, a refresh will be triggered).
      refreshInterval :: POSIXTime
      -- | A TVar with the time-to-refresh schedule for each bucket.
      --
      -- To "touch" a bucket and prevent it from being refreshed, reschedule
      -- it's refresh time to some time into the future by modifying the
      -- 'Int.PSQ' in the TVar.  (See 'touchBucket').
    , refreshQueue    :: TVar (Int.PSQ POSIXTime)
      -- | This is the kademlia node search specification.
    , refreshSearch   :: Search nid addr tok ni ni
      -- | The current kademlia routing table buckets.
    , refreshBuckets  :: TVar (R.BucketList ni)
      -- | Action to ping a node.  This is used only during initial bootstrap
      -- to get some nodes in our table.  A 'True' result is interpreted as a a
      -- pong, where 'False' is a non-response.
    , refreshPing     :: ni -> IO Bool
    }

newBucketRefresher :: (Ord addr, Ord a, Hashable a)
                    => KademliaSpace a ni
                    -> ni
                    -> Search nid addr tok ni ni
                    -> (ni -> IO Bool)
                    -> STM (BucketRefresher nid ni)
newBucketRefresher spc template_ni sch ping = do
    let nodeId = kademliaLocation spc
    bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount
    sched <- newTVar Int.empty
    return  BucketRefresher
                    { refreshInterval = 15 * 60
                    , refreshQueue    = sched
                    , refreshSearch   = sch
                    , refreshBuckets  = bkts
                    , refreshPing     = ping
                    }

-- | This was added to avoid the compile error "Record update for
-- insufficiently polymorphic field" when trying to update the existentially
-- quantified field 'refreshSearch'.
updateRefresherIO :: Ord addr
                     => Search nid addr tok ni ni
                     -> (ni -> IO Bool)
                     -> BucketRefresher nid ni ->  BucketRefresher nid ni
updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
    { refreshSearch   = sch
    , refreshPing     = ping
    , refreshInterval = refreshInterval
    , refreshBuckets  = refreshBuckets
    , refreshQueue    = refreshQueue
    }

-- | Fork a refresh loop.  Kill the returned thread to terminate it.
forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId
forkPollForRefresh BucketRefresher{ refreshInterval
                                  , refreshQueue
                                  , refreshBuckets
                                  , refreshSearch   } = fork $ do
      myThreadId >>= flip labelThread "pollForRefresh"
      fix $ \again -> do
        join $ atomically $ do
            nextup <- Int.findMin <$> readTVar refreshQueue
            maybe retry (return . go again) nextup
 where
    refresh :: Int -> IO Int
    refresh = refreshBucket refreshSearch refreshBuckets

    go again ( bktnum :-> refresh_time ) = do
        now <- getPOSIXTime
        case fromEnum (refresh_time - now) of
            x | x <= 0 -> do -- Refresh time!
                             -- Move it to the back of the refresh queue.
                             atomically $ modifyTVar' refreshQueue
                                        $ Int.insert bktnum (now + refreshInterval)
                             -- Now fork the refresh operation.
                             -- TODO: We should probably propogate the kill signal to this thread.
                             fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
                                       _ <- refresh bktnum
                                       return ()
                             return ()
            seconds    -> threadDelay ( seconds * 1000000 )
        again


-- | This is a helper to 'refreshBucket' which does some book keeping to decide
-- whether or not a bucket is sufficiently refreshed or not.  It will return
-- false when we can terminate a node search.
checkBucketFull :: Ord ni => KademliaSpace nid ni   -- ^ Obtain a node id from a node.
                            -> TVar (BucketList ni) -- ^ The current routing table.
                            -> TVar (Set.Set ni)    -- ^ In-range nodes found so far.
                            -> TVar Bool            -- ^ The result will also be written here.
                            -> Int                  -- ^ The bucket number of interest.
                            -> ni                   -- ^ A newly found node.
                            -> STM Bool
checkBucketFull space var resultCounter fin n found_node = do
    let fullcount = R.defaultBucketSize
        saveit True = writeTVar fin True >> return True
        saveit _    = return False
    tbl <- readTVar var
    let counts = R.shape tbl
        nid = kademliaLocation space found_node
    -- Update the result set with every found node that is in the
    -- bucket of interest.
    when (n == R.bucketNumber space nid tbl)
         $ modifyTVar' resultCounter (Set.insert found_node)
    resultCount <- readTVar resultCounter
    saveit $ case drop (n - 1) counts of
        (cnt:_) | cnt < fullcount            -> True  -- bucket not full, keep going
        _ | Set.size resultCount < fullcount -> True  -- we haven't got many results, keep going
        _                                    -> False -- okay, good enough, let's quit.


refreshBucket :: (Hashable a, Hashable t, Ord t, Ord addr, Ord a, Show t) =>
                 Search t addr tok a a -> TVar (BucketList a) -> Int -> IO Int
refreshBucket sch var n = do
    tbl <- atomically (readTVar var)
    let count = bktCount tbl
        nid = kademliaLocation (searchSpace sch) (thisNode tbl)
    sample <- if n+1 >= count                         -- Is this the last bucket?
                then return nid                       -- Yes? Search our own id.
                else kademliaSample (searchSpace sch) -- No? Generate a random id.
                        getEntropy
                        nid
                        (bucketRange n (n + 1 < count))
    fin <- atomically $ newTVar False
    resultCounter <- atomically $ newTVar Set.empty

    hPutStrLn stderr $ "Start refresh " ++ show (n,sample)

    -- Set 15 minute timeout in order to avoid overlapping refreshes.
    s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
                                    then const $ return True -- Never short-circuit the last bucket.
                                    else checkBucketFull (searchSpace sch) var resultCounter fin n
    _ <- timeout (15*60*1000000) $ do
            atomically $ searchIsFinished s >>= check
    atomically $ searchCancel s
    hPutStrLn stderr $ "Finish refresh " ++ show (n,sample)
    rcount <- atomically $ do
        c <- Set.size <$> readTVar resultCounter
        b <- readTVar fin
        return $ if b then 1 else c
    return rcount

bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
             BucketRefresher nid ni
             -> t1 ni -- ^ Nodes to bootstrap from.
             -> t ni  -- ^ Fallback nodes; used only if the others are unresponsive.
             -> IO ()
bootstrap BucketRefresher { refreshSearch  = sch
                          , refreshBuckets = var
                          , refreshPing    = ping } ns ns0 = do
    gotPing <- atomically $ newTVar False

    -- First, ping the given nodes so that they are added to
    -- our routing table.
    withTaskGroup "bootstrap.resume" 20 $ \g -> do
        forM_ ns $ \n -> do
            let lbl = show $ kademliaLocation (searchSpace sch) n
            forkTask g lbl $ do
                b <- ping n
                when b $ atomically $ writeTVar gotPing True

    -- We resort to the hardcoded fallback nodes only when we got no
    -- responses.  This is to lesson the burden on well-known boostrap
    -- nodes.
    fallback <- atomically (readTVar gotPing) >>= return . when . not
    fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
        forM_ ns0 $ \n -> do
            forkTask g (show $ kademliaLocation (searchSpace sch) n)
                       (void $ ping n)
    hPutStrLn stderr "Finished bootstrap pings."
    -- Now search our own Id by refreshing the last bucket.
    last <- atomically $ bktCount <$> readTVar var
    void $ refreshBucket sch var last
    -- That's it.
    --
    -- Hopefully 'forkPollForRefresh' was invoked and can take over
    -- maintenance.


-- | Reschedule a bucket's refresh-time.  It should be called whenever a bucket
-- changes.  This will typically be invoked from 'tblTransition'.
--
-- From BEP 05:
--
-- > Each bucket should maintain a "last changed" property to indicate how
-- > "fresh" the contents are.
--
-- We will use a "time to next refresh" property instead and store it in
-- a priority search queue.
--
-- In detail using an expository (not actually implemented) type
-- 'BucketTouchEvent'...
--
-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
-- >>> bucketEvents =
-- >>>  [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
-- >>>
-- >>>  , Stranger  :--> Accepted -- or a node is added to a bucket,
-- >>>
-- >>>  , Accepted  :--> Stranger -- or a node in a bucket is replaced
-- >>>  , Applicant :--> Accepted -- with another node,
-- >>>  ]
--
-- the bucket's last changed property should be updated. Buckets that have not
-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed."
-- This is done by picking a random ID in the range of the bucket and
-- performing a find_nodes search on it.
--
-- The only other possible BucketTouchEvents are as follows:
--
-- >>> not_handled =
-- >>>   , Stranger :--> Applicant -- A ping is pending, it's result is covered:
-- >>>                             --     (Applicant :--> Stranger)
-- >>>                             --     (Applicant :--> Accepted)
-- >>>   , Accepted :--> Applicant -- Never happens
-- >>>   ]
--
-- Because this BucketTouchEvent type is not actually implemented and we only
-- receive notifications of a node's new state, it suffices to reschedule the
-- bucket refresh 'touchBucket' on every transition to a state other than
-- 'Applicant'.
--
-- XXX: Unfortunately, this means redundantly triggering twice upon every node
-- replacement because we do not currently distinguish between standalone
-- insertion/deletion events and an insertion/deletion pair constituting
-- replacement.
--
-- It might also be better to pass the timestamp of the transition here and
-- keep the refresh queue in better sync with the routing table by updating it
-- within the STM monad.
--
-- We embed the result in the STM monad but currently, no STM state changes
-- occur until the returned IO action is invoked.  TODO: simplify?
touchBucket :: BucketRefresher nid ni
            -> RoutingTransition ni     -- ^ What happened to the bucket?
            -> STM (IO ())
touchBucket BucketRefresher{ refreshSearch
                           , refreshInterval
                           , refreshBuckets
                           , refreshQueue    }
            RoutingTransition{ transitionedTo
                             , transitioningNode }
    = case transitionedTo of
        Applicant -> return $ return () -- Ignore transition to applicant.
        _         -> return $ do        -- Reschedule for any other transition.
         now <- getPOSIXTime
         atomically $ do
            let space = searchSpace refreshSearch
                nid = kademliaLocation space transitioningNode
            num <- R.bucketNumber space nid <$> readTVar refreshBuckets
            modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval)