summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-01-05 12:18:43 -0500
committerjoe <joe@jerkface.net>2017-01-05 12:18:43 -0500
commit5e2f43d967aa2d07368b7d5552f65a69b3979ab5 (patch)
tree7c84933442cdcd4b6e52c644842e64e34ae906b0 /src/Network
parent990296703f511efe2bc2899d514dbe2a20247c88 (diff)
Routing Table : use STM and per-bucket ping queues
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs54
-rw-r--r--src/Network/BitTorrent/DHT/Routing.hs182
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs6
3 files changed, 163 insertions, 79 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 7f20ad6d..99d8cdaf 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -73,6 +73,7 @@ import Network.BitTorrent.Address
73import Network.BitTorrent.DHT.Message 73import Network.BitTorrent.DHT.Message
74import Network.BitTorrent.DHT.Routing as R 74import Network.BitTorrent.DHT.Routing as R
75import Network.BitTorrent.DHT.Session 75import Network.BitTorrent.DHT.Session
76import Control.Concurrent.STM
76 77
77{----------------------------------------------------------------------- 78{-----------------------------------------------------------------------
78-- Handlers 79-- Handlers
@@ -104,7 +105,10 @@ findNodeH = nodeHandler $ \ _ (FindNode nid) -> do
104-- | Default 'GetPeers' handler. 105-- | Default 'GetPeers' handler.
105getPeersH :: Address ip => NodeHandler ip 106getPeersH :: Address ip => NodeHandler ip
106getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do 107getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do
107 GotPeers <$> getPeerList ih <*> grantToken naddr 108 ps <- getPeerList ih
109 tok <- grantToken naddr
110 $(logDebugS) "getPeersH" $ "INFO-HASH " <> T.pack (show (ih,fmap fromAddr naddr :: NodeAddr (Maybe IP)))
111 return $ GotPeers ps tok
108 112
109-- | Default 'Announce' handler. 113-- | Default 'Announce' handler.
110announceH :: Address ip => NodeHandler ip 114announceH :: Address ip => NodeHandler ip
@@ -236,20 +240,33 @@ refreshNodes nid = do
236insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId 240insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId
237insertNode info = fork $ do 241insertNode info = fork $ do
238 var <- asks routingTable 242 var <- asks routingTable
239 t <- takeMVar var 243 tm <- getTimestamp
240 t' <- do -- modifyMVar_ var $ \ t -> do 244 let showTable = do
241 result <- routing (R.insert info t) 245 t <- liftIO $ atomically $ readTVar var
242 case result of 246 let logMsg = "Routing table: " <> pPrint t
243 Nothing -> do 247 $(logDebugS) "insertNode" (T.pack (render logMsg))
244 $(logDebugS) "insertNode" $ "Routing table is full: " 248 t <- liftIO $ atomically $ readTVar var
245 <> T.pack (show (pPrint t)) 249 let arrival = TryInsert info
246 return t 250 arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4)
247 Just t' -> do 251 $(logDebugS) "insertNode" $ T.pack (show arrival4)
248 let logMsg = "Routing table updated: " 252 ps <- liftIO $ atomically $ do
249 <> pPrint t <> " -> " <> pPrint t' 253 t <- readTVar var
250 $(logDebugS) "insertNode" (T.pack (render logMsg)) 254 (ps,t') <- R.insert tm arrival t
251 return t' 255 writeTVar var t'
252 putMVar var t' 256 return ps
257 showTable
258 fork $ forM_ ps $ \(CheckPing ns)-> do
259 forM_ ns $ \n -> do
260 alive <- PingResult n <$> probeNode (nodeAddr n)
261 let PingResult _ b = alive
262 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b))
263 tm <- getTimestamp
264 liftIO $ atomically $ do
265 t <- readTVar var
266 (_,t') <- R.insert tm alive t
267 writeTVar var t'
268 showTable
269 return ()
253 270
254-- | Throws exception if node is not responding. 271-- | Throws exception if node is not responding.
255queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) 272queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b)
@@ -268,9 +285,4 @@ q <@> addr = snd <$> queryNode addr q
268{-# INLINE (<@>) #-} 285{-# INLINE (<@>) #-}
269 286
270restoreTable :: Address ip => Table ip -> DHT ip () 287restoreTable :: Address ip => Table ip -> DHT ip ()
271restoreTable tbl = do 288restoreTable tbl = mapM_ (insertNode . fst) $ L.concat $ R.toList tbl
272 tblvar <- asks routingTable
273 tbl0 <- liftIO $ takeMVar tblvar
274 mb <- routing $ merge tbl tbl0
275 maybe (return ()) (liftIO . putMVar tblvar) mb
276
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs
index 68edef56..14aec612 100644
--- a/src/Network/BitTorrent/DHT/Routing.hs
+++ b/src/Network/BitTorrent/DHT/Routing.hs
@@ -17,6 +17,7 @@
17{-# LANGUAGE ViewPatterns #-} 17{-# LANGUAGE ViewPatterns #-}
18{-# LANGUAGE TypeOperators #-} 18{-# LANGUAGE TypeOperators #-}
19{-# LANGUAGE DeriveGeneric #-} 19{-# LANGUAGE DeriveGeneric #-}
20{-# LANGUAGE ScopedTypeVariables #-}
20{-# OPTIONS_GHC -fno-warn-orphans #-} 21{-# OPTIONS_GHC -fno-warn-orphans #-}
21module Network.BitTorrent.DHT.Routing 22module Network.BitTorrent.DHT.Routing
22 ( -- * Table 23 ( -- * Table
@@ -45,8 +46,9 @@ module Network.BitTorrent.DHT.Routing
45 46
46 -- * Construction 47 -- * Construction
47 , Network.BitTorrent.DHT.Routing.nullTable 48 , Network.BitTorrent.DHT.Routing.nullTable
49 , Event(..)
50 , CheckPing(..)
48 , Network.BitTorrent.DHT.Routing.insert 51 , Network.BitTorrent.DHT.Routing.insert
49 , Network.BitTorrent.DHT.Routing.merge
50 52
51 -- * Conversion 53 -- * Conversion
52 , Network.BitTorrent.DHT.Routing.TableEntry 54 , Network.BitTorrent.DHT.Routing.TableEntry
@@ -62,17 +64,19 @@ import Control.Applicative as A
62import Control.Arrow 64import Control.Arrow
63import Control.Monad 65import Control.Monad
64import Data.Function 66import Data.Function
67import Data.Functor.Identity
65import Data.List as L hiding (insert) 68import Data.List as L hiding (insert)
66import Data.Maybe 69import Data.Maybe
67import Data.Monoid 70import Data.Monoid
68import Data.PSQueue as PSQ 71import Data.PSQueue as PSQ
69import Data.Serialize as S hiding (Result, Done) 72import Data.Serialize as S hiding (Result, Done)
73import qualified Data.Sequence as Seq
70import Data.Time 74import Data.Time
71import Data.Time.Clock.POSIX 75import Data.Time.Clock.POSIX
72import Data.Word 76import Data.Word
73import GHC.Generics 77import GHC.Generics
74import Text.PrettyPrint as PP hiding ((<>)) 78import Text.PrettyPrint as PP hiding ((<>))
75import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) 79import Text.PrettyPrint.HughesPJClass (pPrint,Pretty)
76 80
77import Data.Torrent 81import Data.Torrent
78import Network.BitTorrent.Address 82import Network.BitTorrent.Address
@@ -199,6 +203,37 @@ type BucketSize = Int
199defaultBucketSize :: BucketSize 203defaultBucketSize :: BucketSize
200defaultBucketSize = 8 204defaultBucketSize = 8
201 205
206data QueueMethods m elem fifo = QueueMethods
207 { pushBack :: elem -> fifo -> m fifo
208 , popFront :: fifo -> m (Maybe elem, fifo)
209 , emptyQueue :: m fifo
210 }
211
212fromQ :: Functor m =>
213 ( a -> b )
214 -> ( b -> a )
215 -> QueueMethods m elem a
216 -> QueueMethods m elem b
217fromQ embed project QueueMethods{..} =
218 QueueMethods { pushBack = \e -> fmap embed . pushBack e . project
219 , popFront = fmap (second embed) . popFront . project
220 , emptyQueue = fmap embed emptyQueue
221 }
222
223seqQ :: QueueMethods Identity (NodeInfo ip) (Seq.Seq (NodeInfo ip))
224seqQ = QueueMethods
225 { pushBack = \e fifo -> pure (fifo Seq.|> e)
226 , popFront = \fifo -> case Seq.viewl fifo of
227 e Seq.:< fifo' -> pure (Just e, fifo')
228 Seq.EmptyL -> pure (Nothing, Seq.empty)
229 , emptyQueue = pure Seq.empty
230 }
231
232type BucketQueue ip = Seq.Seq (NodeInfo ip)
233
234bucketQ :: QueueMethods Identity (NodeInfo ip) (BucketQueue ip)
235bucketQ = seqQ
236
202-- | Bucket is also limited in its length — thus it's called k-bucket. 237-- | Bucket is also limited in its length — thus it's called k-bucket.
203-- When bucket becomes full, we should split it in two lists by 238-- When bucket becomes full, we should split it in two lists by
204-- current span bit. Span bit is defined by depth in the routing 239-- current span bit. Span bit is defined by depth in the routing
@@ -206,7 +241,11 @@ defaultBucketSize = 8
206-- very unlikely that all nodes in bucket fail within an hour of 241-- very unlikely that all nodes in bucket fail within an hour of
207-- each other. 242-- each other.
208-- 243--
209type Bucket ip = PSQ (NodeInfo ip) Timestamp 244data Bucket ip = Bucket { bktNodes :: PSQ (NodeInfo ip) Timestamp
245 , bktQ :: BucketQueue ip
246 } deriving (Show,Generic)
247
248instance (Eq ip, Serialize ip) => Serialize (Bucket ip)
210 249
211instance (Serialize k, Serialize v, Ord k, Ord v) 250instance (Serialize k, Serialize v, Ord k, Ord v)
212 => Serialize (PSQ k v) where 251 => Serialize (PSQ k v) where
@@ -219,59 +258,77 @@ lastChanged bucket
219 | L.null timestamps = Nothing 258 | L.null timestamps = Nothing
220 | otherwise = Just (L.maximumBy (compare `on` prio) timestamps) 259 | otherwise = Just (L.maximumBy (compare `on` prio) timestamps)
221 where 260 where
222 timestamps = PSQ.toList bucket 261 timestamps = PSQ.toList $ bktNodes bucket
223 262
224leastRecently :: Eq ip => Bucket ip -> Maybe (NodeEntry ip, Bucket ip) 263leastRecently :: Eq ip => Bucket ip -> Maybe (NodeEntry ip, Bucket ip)
225leastRecently = minView 264leastRecently b = fmap (\(e,ns) -> (e, b { bktNodes = ns })) $ minView $ bktNodes b
226 265
227-- | Update interval, in seconds. 266-- | Update interval, in seconds.
228delta :: NominalDiffTime 267delta :: NominalDiffTime
229delta = 15 * 60 268delta = 15 * 60
230 269
231-- | Should maintain a set of stable long running nodes. 270-- | Should maintain a set of stable long running nodes.
232insertBucket :: Eq ip => Timestamp -> NodeInfo ip -> Bucket ip 271--
233 -> ip `Routing` Bucket ip 272-- Note: pings are triggerd only when a bucket is full.
234insertBucket curTime info bucket 273insertBucket :: (Eq ip, Alternative f) => Timestamp -> Event ip -> Bucket ip
274 -> f ([CheckPing ip], Bucket ip)
275insertBucket curTime (TryInsert info) bucket
235 -- just update timestamp if a node is already in bucket 276 -- just update timestamp if a node is already in bucket
236 | Just _ <- PSQ.lookup info bucket = do 277 | already_have
237 return $ PSQ.insertWith max info curTime bucket 278 = pure ( [], map_ns $ PSQ.insertWith max info curTime )
238 279 -- bucket is good, but not full => we can insert a new node
239 -- Buckets that have not been changed in 15 minutes should be "refreshed." 280 | PSQ.size (bktNodes bucket) < defaultBucketSize
240 | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket 281 = pure ( [], map_ns $ PSQ.insert info curTime )
241 , curTime - lastSeen > delta = do
242 refresh nodeId
243 insertBucket curTime info bucket
244
245 -- If there are any questionable nodes in the bucket have not been 282 -- If there are any questionable nodes in the bucket have not been
246 -- seen in the last 15 minutes, the least recently seen node is 283 -- seen in the last 15 minutes, the least recently seen node is
247 -- pinged. If any nodes in the bucket are known to have become bad, 284 -- pinged. If any nodes in the bucket are known to have become bad,
248 -- then one is replaced by the new node in the next insertBucket 285 -- then one is replaced by the new node in the next insertBucket
249 -- iteration. 286 -- iteration.
250 | Just ((old @ NodeInfo {..} :-> leastSeen), rest) <- leastRecently bucket 287 | not (L.null stales)
251 , curTime - leastSeen > delta = do 288 = pure ( [CheckPing stales], map_q $ pushBack bucketQ info )
252 pong <- needPing nodeAddr 289 -- When the bucket is full of good nodes, the new node is simply discarded.
253 pongTime <- getTime 290 -- We must return 'A.empty' here to ensure that bucket splitting happens
254 let newBucket = if pong then PSQ.insert old pongTime bucket else rest 291 -- inside 'modifyBucket'.
255 insertBucket pongTime info newBucket 292 | otherwise = A.empty
293 where
294 stales = map key $ PSQ.atMost (curTime - delta) $ bktNodes bucket
256 295
257 -- bucket is good, but not full => we can insert a new node 296 already_have = maybe False (const True) $ PSQ.lookup info (bktNodes bucket)
258 | PSQ.size bucket < defaultBucketSize = do
259 return $ PSQ.insert info curTime bucket
260 297
261 -- When the bucket is full of good nodes, the new node is simply discarded. 298 map_ns f = bucket { bktNodes = f (bktNodes bucket) }
262 | otherwise = Full 299 map_q f = bucket { bktQ = runIdentity $ f (bktQ bucket) }
263 300
264insertNode :: Eq ip => NodeInfo ip -> Bucket ip -> ip `Routing` Bucket ip 301insertBucket curTime (PingResult bad_node got_response) bucket
265insertNode info bucket = do 302 = pure ([], Bucket (update $ bktNodes bucket) popped)
266 curTime <- getTime 303 where
267 insertBucket curTime info bucket 304 (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket)
305 update | got_response = id
306 | Just info <- top = PSQ.insert info curTime . PSQ.delete bad_node
307 | otherwise = id
268 308
269type BitIx = Word 309type BitIx = Word
270 310
311partitionQ imp pred q = do
312 pass <- emptyQueue imp
313 fail <- emptyQueue imp
314 let flipfix a b f = fix f a b
315 flipfix q (pass,fail) $ \loop q qs -> do
316 (mb,q') <- popFront imp q
317 case mb of
318 Nothing -> return qs
319 Just e -> do qs' <- select (pushBack imp e) qs
320 loop q' qs'
321 where
322 select :: Functor f => (b -> f b) -> (b, b) -> f (b, b)
323 select f = if pred e then \(a,b) -> flip (,) b <$> f a
324 else \(a,b) -> (,) a <$> f b
325
271split :: Eq ip => BitIx -> Bucket ip -> (Bucket ip, Bucket ip) 326split :: Eq ip => BitIx -> Bucket ip -> (Bucket ip, Bucket ip)
272split i = (PSQ.fromList *** PSQ.fromList) . partition spanBit . PSQ.toList 327split i b = (Bucket ns qs, Bucket ms rs)
273 where 328 where
274 spanBit entry = testIdBit (nodeId (key entry)) i 329 (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . key) . PSQ.toList $ bktNodes b
330 (qs,rs) = runIdentity $ partitionQ bucketQ spanBit $ bktQ b
331 spanBit entry = testIdBit (nodeId entry) i
275 332
276{----------------------------------------------------------------------- 333{-----------------------------------------------------------------------
277-- Table 334-- Table
@@ -335,22 +392,22 @@ instance Pretty (Table ip) where
335 392
336-- | Empty table with specified /spine/ node id. 393-- | Empty table with specified /spine/ node id.
337nullTable :: Eq ip => NodeId -> BucketCount -> Table ip 394nullTable :: Eq ip => NodeId -> BucketCount -> Table ip
338nullTable nid n = Tip nid (bucketCount (pred n)) PSQ.empty 395nullTable nid n = Tip nid (bucketCount (pred n)) (Bucket PSQ.empty (runIdentity $ emptyQueue bucketQ))
339 where 396 where
340 bucketCount x = max 0 (min 159 x) 397 bucketCount x = max 0 (min 159 x)
341 398
342-- | Test if table is empty. In this case DHT should start 399-- | Test if table is empty. In this case DHT should start
343-- bootstrapping process until table becomes 'full'. 400-- bootstrapping process until table becomes 'full'.
344null :: Table ip -> Bool 401null :: Table ip -> Bool
345null (Tip _ _ b) = PSQ.null b 402null (Tip _ _ b) = PSQ.null $ bktNodes b
346null _ = False 403null _ = False
347 404
348-- | Test if table have maximum number of nodes. No more nodes can be 405-- | Test if table have maximum number of nodes. No more nodes can be
349-- 'insert'ed, except old ones becomes bad. 406-- 'insert'ed, except old ones becomes bad.
350full :: Table ip -> Bool 407full :: Table ip -> Bool
351full (Tip _ n _) = n == 0 408full (Tip _ n _) = n == 0
352full (Zero t b) = PSQ.size b == defaultBucketSize && full t 409full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t
353full (One b t) = PSQ.size b == defaultBucketSize && full t 410full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t
354 411
355-- | Get the /spine/ node id. 412-- | Get the /spine/ node id.
356thisId :: Table ip -> NodeId 413thisId :: Table ip -> NodeId
@@ -364,7 +421,7 @@ type NodeCount = Int
364-- | Internally, routing table is similar to list of buckets or a 421-- | Internally, routing table is similar to list of buckets or a
365-- /matrix/ of nodes. This function returns the shape of the matrix. 422-- /matrix/ of nodes. This function returns the shape of the matrix.
366shape :: Table ip -> [BucketSize] 423shape :: Table ip -> [BucketSize]
367shape = map PSQ.size . toBucketList 424shape = map (PSQ.size . bktNodes) . toBucketList
368 425
369-- | Get number of nodes in the table. 426-- | Get number of nodes in the table.
370size :: Table ip -> NodeCount 427size :: Table ip -> NodeCount
@@ -409,6 +466,7 @@ kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip]
409kclosest k (toNodeId -> nid) 466kclosest k (toNodeId -> nid)
410 = L.take k . rank nid 467 = L.take k . rank nid
411 . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty 468 . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty
469 . fmap bktNodes
412 . lookupBucket nid 470 . lookupBucket nid
413 471
414{----------------------------------------------------------------------- 472{-----------------------------------------------------------------------
@@ -427,20 +485,41 @@ splitTip nid n i bucket
427-- TODO: Kademlia non-empty subtrees should should split if they have less than 485-- TODO: Kademlia non-empty subtrees should should split if they have less than
428-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia 486-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia
429-- paper. The rule requiring additional splits is in section 2.4. 487-- paper. The rule requiring additional splits is in section 2.4.
430insert :: Eq ip => NodeInfo ip -> Table ip -> ip `Routing` Table ip 488modifyBucket
431insert info @ NodeInfo {..} = go (0 :: BitIx) 489 :: forall f ip xs. (Alternative f, Eq ip, Monoid xs) =>
490 NodeId -> (Bucket ip -> f (xs, Bucket ip)) -> Table ip -> f (xs,Table ip)
491modifyBucket nodeId f = go (0 :: BitIx)
432 where 492 where
493 go :: BitIx -> Table ip -> f (xs, Table ip)
433 go i (Zero table bucket) 494 go i (Zero table bucket)
434 | testIdBit nodeId i = Zero table <$> insertNode info bucket 495 | testIdBit nodeId i = second (Zero table) <$> f bucket
435 | otherwise = (`Zero` bucket) <$> go (succ i) table 496 | otherwise = second (`Zero` bucket) <$> go (succ i) table
436 go i (One bucket table ) 497 go i (One bucket table )
437 | testIdBit nodeId i = One bucket <$> go (succ i) table 498 | testIdBit nodeId i = second (One bucket) <$> go (succ i) table
438 | otherwise = (`One` table) <$> insertNode info bucket 499 | otherwise = second (`One` table) <$> f bucket
439 go i (Tip nid n bucket) 500 go i (Tip nid n bucket)
440 | n == 0 = Tip nid n <$> insertNode info bucket 501 | n == 0 = second (Tip nid n) <$> f bucket
441 | otherwise = Tip nid n <$> insertNode info bucket 502 | otherwise = second (Tip nid n) <$> f bucket
442 <|> go i (splitTip nid n i bucket) 503 <|> go i (splitTip nid n i bucket)
443 504
505-- | Triggering event for atomic table update
506data Event ip = TryInsert (NodeInfo ip)
507 | PingResult (NodeInfo ip) Bool
508 deriving (Eq,Ord,Show)
509
510eventId (TryInsert NodeInfo{..}) = nodeId
511eventId (PingResult NodeInfo{..} _) = nodeId
512
513-- | Actions requested by atomic table update
514data CheckPing ip = CheckPing [NodeInfo ip]
515 deriving (Eq,Ord,Show)
516
517
518-- | Atomic 'Table' update
519insert :: (Alternative m, Eq ip) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip)
520insert tm event = modifyBucket (eventId event) (insertBucket tm event)
521
522
444{----------------------------------------------------------------------- 523{-----------------------------------------------------------------------
445-- Conversion 524-- Conversion
446-----------------------------------------------------------------------} 525-----------------------------------------------------------------------}
@@ -457,11 +536,4 @@ toBucketList (Zero t b) = b : toBucketList t
457toBucketList (One b t) = b : toBucketList t 536toBucketList (One b t) = b : toBucketList t
458 537
459toList :: Eq ip => Table ip -> [[TableEntry ip]] 538toList :: Eq ip => Table ip -> [[TableEntry ip]]
460toList = L.map (L.map tableEntry . PSQ.toList) . toBucketList 539toList = L.map (L.map tableEntry . PSQ.toList . bktNodes) . toBucketList
461
462merge :: Eq ip => Table ip -> Table ip -> Routing ip (Table ip)
463merge a b = do
464 let ns = concatMap PSQ.toList $ toBucketList a
465 -- TODO: merge timestamps as well and let refresh take care of ping.
466 as <- filterM (needPing . nodeAddr . PSQ.key) ns
467 foldM (flip $ Network.BitTorrent.DHT.Routing.insert) b $ map PSQ.key as
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index 2bb3ce85..5a8d64ef 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -243,7 +243,7 @@ data Node ip = Node
243 243
244 , resources :: !InternalState 244 , resources :: !InternalState
245 , manager :: !(Manager (DHT ip)) -- ^ RPC manager; 245 , manager :: !(Manager (DHT ip)) -- ^ RPC manager;
246 , routingTable :: !(MVar (Table ip)) -- ^ search table; 246 , routingTable :: !(TVar (Table ip)) -- ^ search table;
247 , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; 247 , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes;
248 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; 248 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node;
249 , sessionTokens :: !(TVar SessionTokens) -- ^ query session IDs. 249 , sessionTokens :: !(TVar SessionTokens) -- ^ query session IDs.
@@ -323,7 +323,7 @@ newNode hs opts naddr logger mbid = do
323 liftIO $ do 323 liftIO $ do
324 myId <- maybe genNodeId return mbid 324 myId <- maybe genNodeId return mbid
325 node <- Node opts myId s m 325 node <- Node opts myId s m
326 <$> newMVar (nullTable myId (optBucketCount opts)) 326 <$> atomically (newTVar (nullTable myId (optBucketCount opts)))
327 <*> newTVarIO def 327 <*> newTVarIO def
328 <*> newTVarIO S.empty 328 <*> newTVarIO S.empty
329 <*> (newTVarIO =<< nullSessionTokens) 329 <*> (newTVarIO =<< nullSessionTokens)
@@ -381,7 +381,7 @@ checkToken addr questionableToken = do
381getTable :: DHT ip (Table ip) 381getTable :: DHT ip (Table ip)
382getTable = do 382getTable = do
383 var <- asks routingTable 383 var <- asks routingTable
384 liftIO (readMVar var) 384 liftIO (atomically $ readTVar var)
385 385
386-- | Find a set of closest nodes from routing table of this node. (in 386-- | Find a set of closest nodes from routing table of this node. (in
387-- no particular order) 387-- no particular order)