1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
|
{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE DeriveFunctor, DeriveTraversable #-}
-- {-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PatternSynonyms #-}
module Kademlia where
import Data.Function
import Data.Maybe
import Network.DHT.Routing as R
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, getPOSIXTime)
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import GHC.Conc (labelThread)
import Control.Concurrent.Lifted
#endif
import Data.Bits
import Text.PrettyPrint as PP hiding ((<>), ($$))
import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
import Data.IP
import Control.Concurrent.STM
import Control.Monad
import Data.Monoid
import Data.Time.Clock.POSIX (POSIXTime)
import Data.Wrapper.PSQInt ( pattern (:->) )
import qualified Data.Wrapper.PSQInt as Int
import Network.BitTorrent.DHT.Search
import Network.DatagramServer.Types (genBucketSample)
import Network.Address (bucketRange)
import Data.Serialize (Serialize)
import Data.Hashable
-- | The status of a given node with respect to a given routint table.
data RoutingStatus
= Stranger -- ^ The node is unknown to the Kademlia routing table.
| Applicant -- ^ The node may be inserted pending a ping timeout.
| Accepted -- ^ The node has a slot in one of the Kademlia buckets.
deriving (Eq,Ord,Enum,Show,Read)
-- | A change occured in the kademlia routing table.
data RoutingTransition ni = RoutingTransition
{ transitioningNode :: ni
, transitionedTo :: !RoutingStatus
}
deriving (Eq,Ord,Show,Read)
data InsertionReporter ni = InsertionReporter
{ -- | Called on every inbound packet.
reportArrival :: POSIXTime
-> ni -- ^ Origin of packet.
-> [ni] -- ^ These will be pinged as a result.
-> IO ()
-- | Called on every ping probe.
, reportPingResult :: POSIXTime
-> ni -- ^ Who was pinged.
-> Bool -- ^ True if they ponged.
-> IO ()
}
quietInsertions = InsertionReporter
{ reportArrival = \_ _ _ -> return ()
, reportPingResult = \_ _ _ -> return ()
}
contramapIR f ir = InsertionReporter
{ reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis)
, reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b
}
-- | All the IO operations neccessary to maintain a Kademlia routing table.
data TableStateIO ni = TableStateIO
{ -- | Write the routing table. Typically 'writeTVar'.
tblWrite :: R.BucketList ni -> STM ()
-- | Read the routing table. Typically 'readTVar'.
, tblRead :: STM (R.BucketList ni)
-- | Issue a ping to a remote node and report 'True' if the node
-- responded within an acceptable time and 'False' otherwise.
, tblPing :: ni -> IO Bool
-- | Convenience method provided to assist in maintaining state
-- consistent with the routing table. It will be invoked in the same
-- transaction that 'tblRead'\/'tblWrite' occured but only when there was
-- an interesting change. The returned IO action will be triggered soon
-- afterward.
--
-- It is not necessary to do anything interesting here. The following
-- trivial implementation is fine:
--
-- > tblTransition = const $ return $ return ()
, tblTransition :: RoutingTransition ni -> STM (IO ())
}
vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni
vanillaIO var ping = TableStateIO
{ tblRead = readTVar var
, tblWrite = writeTVar var
, tblPing = ping
, tblTransition = const $ return $ return ()
}
-- | Everything neccessary to maintain a routing table of /ni/ (node
-- information) entries.
data Kademlia nid ni = Kademlia (InsertionReporter ni)
(KademliaSpace nid ni)
(TableStateIO ni)
-- Helper to 'insertNode'.
--
-- Adapt return value from 'updateForPingResult' into a
-- more easily groked list of transitions.
transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni]
transition (x,m) =
-- | Just _ <- m = Node transition: Accepted --> Stranger
-- | Nothing <- m = Node transition: Applicant --> Stranger
RoutingTransition x Stranger
: maybeToList (accepted <$> m)
-- Helper to 'transition'
--
-- Node transition: Applicant --> Accepted
accepted :: (t,ni) -> RoutingTransition ni
accepted (_,y) = RoutingTransition y Accepted
insertNode :: Kademlia nid ni -> ni -> IO ()
insertNode (Kademlia reporter space io) node = do
tm <- utcTimeToPOSIXSeconds <$> getCurrentTime
(ps,reaction) <- atomically $ do
tbl <- tblRead io
let (inserted, ps,t') = R.updateForInbound space tm node tbl
tblWrite io t'
reaction <- case ps of
_ | inserted -> -- Node transition: Stranger --> Accepted
tblTransition io $ RoutingTransition node Accepted
(_:_) -> -- Node transition: Stranger --> Applicant
tblTransition io $ RoutingTransition node Applicant
_ -> return $ return ()
return (ps, reaction)
reportArrival reporter tm node ps
reaction
_ <- fork $ do
myThreadId >>= flip labelThread "pingResults"
forM_ ps $ \n -> do
b <- tblPing io n
reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result
join $ atomically $ do
tbl <- tblRead io
let (replacements, t') = R.updateForPingResult space n b tbl
tblWrite io t'
ios <- sequence $ concatMap
(map (tblTransition io) . transition)
replacements
return $ sequence_ ios
return ()
-- TODO: Bootstrap/Refresh
--
-- From BEP 05:
--
-- Each bucket should maintain a "last changed" property to indicate how
-- "fresh" the contents are.
--
-- Note: We will use a "time to next refresh" property instead and store it in
-- a priority search queue.
--
-- When...
--
-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
-- >>> bucketEvents =
-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
-- >>>
-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
-- >>>
-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
-- >>> , Applicant :--> Accepted -- with another node,
-- >>> ]
--
-- the bucket's last changed property should be updated. Buckets
-- that have not been changed in 15 minutes should be "refreshed." This is done
-- by picking a random ID in the range of the bucket and performing a
-- find_nodes search on it.
--
-- The only other possible BucketTouchEvents are as follows:
--
-- >>> not_handled =
-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
-- >>> -- (Applicant :--> Stranger)
-- >>> -- (Applicant :--> Accepted)
-- >>> , Accepted :--> Applicant -- Never happens
-- >>> ]
--
-- XXX: This will be redundantly triggered twice upon every node replacement
-- because we do not currently distinguish between standalone
-- insertion/deletion events and an insertion/deletion pair constituting
-- replacement.
--
-- It might also be better to pass the timestamp of the transition here and
-- keep the refresh queue in better sync with the routing table by updating it
-- within the STM monad.
touchBucket :: KademliaSpace nid ni -> POSIXTime -> TVar (BucketList ni) -> TVar (Int.PSQ POSIXTime) -> RoutingTransition ni -> STM (IO ())
touchBucket space interval bkts psq tr
| (transitionedTo tr == Applicant)
= return $ return ()
| otherwise = return $ do
now <- getPOSIXTime
atomically $ do
let nid = kademliaLocation space (transitioningNode tr)
num <- R.bucketNumber space nid <$> readTVar bkts
modifyTVar' psq $ Int.insert num (now + interval)
-- | > pollForRefresh interval queue refresh
--
-- Fork a refresh loop. Kill the returned thread to terminate it. The
-- arguments are: a staleness threshold (if a bucket goes this long without
-- being touched, a refresh will be triggered), a TVar with the time-to-refresh
-- schedule for each bucket, and a refresh action to be forked when a bucket
-- excedes the staleness threshold.
--
-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
-- TVar.
pollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO ()) -> IO ThreadId
pollForRefresh interval psq refresh = do
fork $ do
myThreadId >>= flip labelThread "pollForRefresh"
fix $ \again -> do
join $ atomically $ do
nextup <- Int.findMin <$> readTVar psq
maybe retry (return . go again) nextup
where
go again ( bktnum :-> refresh_time ) = do
now <- getPOSIXTime
case fromEnum (refresh_time - now) of
x | x <= 0 -> do -- Refresh time!
-- Move it to the back of the refresh queue.
atomically $ modifyTVar' psq
$ Int.insert bktnum (now + interval)
-- Now fork the refresh operation.
-- TODO: We should probably propogate the kill signal to this thread.
fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
refresh bktnum
return ()
seconds -> threadDelay ( seconds * 1000000 )
again
refreshBucket :: forall nid ni addr.
( FiniteBits nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) =>
Search nid addr ni ni -> TVar (BucketList ni) -> nid -> Int -> IO ()
refreshBucket sch var nid n = do
tbl <- atomically (readTVar var)
sample <- genBucketSample nid (bucketRange (n) (n + 1 < bktCount tbl))
resultCounter <- atomically $ newTVar 0
let fullcount = R.defaultBucketSize
let checkBucketFull :: ni -> STM Bool
checkBucketFull found_node = do
tbl <- readTVar var
let counts = R.shape tbl
when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
$ modifyTVar resultCounter (+ 1)
resultCount <- readTVar resultCounter
case drop (n - 1) counts of
(cnt:_) | cnt < fullcount -> return True
_ | resultCount + 3 < fullcount -> return True -- +3 because maybe duplicates.
_ -> return False
search sch tbl sample checkBucketFull
|