diff options
author | joe <joe@jerkface.net> | 2017-07-22 01:15:44 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-07-22 01:15:44 -0400 |
commit | 77f6b96492223e7d7b147dac8d026e0b6f6a651b (patch) | |
tree | 661e2115a814de82ba251bccf0ab21ae4dfd1ff1 /Mainline.hs | |
parent | 7f1eb53d34ea6dda02cae1934b5011e38de248a6 (diff) |
Implemented bucket refresh for Mainline.
Diffstat (limited to 'Mainline.hs')
-rw-r--r-- | Mainline.hs | 51 |
1 files changed, 38 insertions, 13 deletions
diff --git a/Mainline.hs b/Mainline.hs index 8853e920..1e30b718 100644 --- a/Mainline.hs +++ b/Mainline.hs | |||
@@ -38,13 +38,16 @@ import Data.Monoid | |||
38 | import Data.Ord | 38 | import Data.Ord |
39 | import qualified Data.Serialize as S | 39 | import qualified Data.Serialize as S |
40 | import Data.Set (Set) | 40 | import Data.Set (Set) |
41 | import Data.Time.Clock.POSIX (POSIXTime) | ||
41 | import Data.Torrent | 42 | import Data.Torrent |
42 | import Data.Typeable | 43 | import Data.Typeable |
43 | import Data.Word | 44 | import Data.Word |
45 | import qualified Data.Wrapper.PSQInt as Int | ||
44 | import Kademlia | 46 | import Kademlia |
45 | import Network.Address (Address, fromSockAddr, setPort, | 47 | import Network.Address (Address, fromSockAddr, setPort, |
46 | sockAddrPort, testIdBit, toSockAddr) | 48 | sockAddrPort, testIdBit, toSockAddr) |
47 | import Network.BitTorrent.DHT.ContactInfo as Peers | 49 | import Network.BitTorrent.DHT.ContactInfo as Peers |
50 | import Network.BitTorrent.DHT.Search (Search (..)) | ||
48 | import Network.BitTorrent.DHT.Token as Token | 51 | import Network.BitTorrent.DHT.Token as Token |
49 | import qualified Network.DHT.Routing as R | 52 | import qualified Network.DHT.Routing as R |
50 | ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) | 53 | ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) |
@@ -54,6 +57,10 @@ import Network.Socket | |||
54 | newtype NodeId = NodeId ByteString | 57 | newtype NodeId = NodeId ByteString |
55 | deriving (Eq,Ord,Show,ByteArrayAccess, BEncode, Bits, Hashable) | 58 | deriving (Eq,Ord,Show,ByteArrayAccess, BEncode, Bits, Hashable) |
56 | 59 | ||
60 | instance S.Serialize NodeId where | ||
61 | get = NodeId <$> S.getBytes 20 | ||
62 | put (NodeId bs) = S.putByteString bs | ||
63 | |||
57 | instance FiniteBits NodeId where | 64 | instance FiniteBits NodeId where |
58 | finiteBitSize _ = 160 | 65 | finiteBitSize _ = 160 |
59 | 66 | ||
@@ -62,6 +69,7 @@ data NodeInfo = NodeInfo | |||
62 | , nodeIP :: IP | 69 | , nodeIP :: IP |
63 | , nodePort :: PortNumber | 70 | , nodePort :: PortNumber |
64 | } | 71 | } |
72 | deriving (Eq,Ord) | ||
65 | 73 | ||
66 | -- The Hashable instance depends only on the IP address and port number. It is | 74 | -- The Hashable instance depends only on the IP address and port number. It is |
67 | -- used to compute the announce token. | 75 | -- used to compute the announce token. |
@@ -271,10 +279,12 @@ type RoutingInfo = Info NodeInfo NodeId | |||
271 | 279 | ||
272 | data Routing = Routing | 280 | data Routing = Routing |
273 | { tentativeId :: NodeInfo | 281 | { tentativeId :: NodeInfo |
274 | , routing4 :: !( TVar (R.BucketList NodeInfo) ) | 282 | , sched4 :: !( TVar (Int.PSQ POSIXTime) ) |
275 | , committee4 :: TriadCommittee NodeId SockAddr | 283 | , routing4 :: !( TVar (R.BucketList NodeInfo) ) |
276 | , routing6 :: !( TVar (R.BucketList NodeInfo) ) | 284 | , committee4 :: TriadCommittee NodeId SockAddr |
277 | , committee6 :: TriadCommittee NodeId SockAddr | 285 | , sched6 :: !( TVar (Int.PSQ POSIXTime) ) |
286 | , routing6 :: !( TVar (R.BucketList NodeInfo) ) | ||
287 | , committee6 :: TriadCommittee NodeId SockAddr | ||
278 | } | 288 | } |
279 | 289 | ||
280 | type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) | 290 | type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) |
@@ -294,7 +304,12 @@ newClient addr = do | |||
294 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts | 304 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts |
295 | committee4 <- newTriadCommittee (const $ return ()) -- TODO: update tbl4 | 305 | committee4 <- newTriadCommittee (const $ return ()) -- TODO: update tbl4 |
296 | committee6 <- newTriadCommittee (const $ return ()) -- TODO: update tbl6 | 306 | committee6 <- newTriadCommittee (const $ return ()) -- TODO: update tbl6 |
297 | return $ Routing tenative_info tbl4 committee4 tbl6 committee6 | 307 | sched4 <- newTVar Int.empty |
308 | sched6 <- newTVar Int.empty | ||
309 | return $ Routing tenative_info sched4 tbl4 committee4 sched6 tbl6 committee6 | ||
310 | -- TODO: Provide some means of shutting down these two auxillary threads: | ||
311 | refresh_thread4 <- pollForRefresh (15*60) (sched4 routing) (refreshBucket nodeSearch (routing4 routing) (nodeId tenative_info)) | ||
312 | refresh_thread6 <- pollForRefresh (15*60) (sched6 routing) (refreshBucket nodeSearch (routing6 routing) (nodeId tenative_info)) | ||
298 | swarms <- newSwarmsDatabase | 313 | swarms <- newSwarmsDatabase |
299 | map_var <- atomically $ newTVar (0, mempty) | 314 | map_var <- atomically $ newTVar (0, mempty) |
300 | let net = onInbound (updateRouting outgoingClient routing) | 315 | let net = onInbound (updateRouting outgoingClient routing) |
@@ -346,12 +361,16 @@ defaultHandler meth = MethodHandler decodePayload errorPayload returnError | |||
346 | returnError :: NodeInfo -> BValue -> IO Error | 361 | returnError :: NodeInfo -> BValue -> IO Error |
347 | returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) | 362 | returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) |
348 | 363 | ||
349 | mainlineKademlia :: MainlineClient -> TriadCommittee NodeId SockAddr -> TVar (R.BucketList NodeInfo) -> Kademlia NodeId NodeInfo | 364 | mainlineKademlia :: MainlineClient -> TriadCommittee NodeId SockAddr -> TVar (R.BucketList NodeInfo) -> TVar (Int.PSQ POSIXTime) -> Kademlia NodeId NodeInfo |
350 | mainlineKademlia client committee var | 365 | mainlineKademlia client committee var sched |
351 | = Kademlia quietInsertions | 366 | = Kademlia quietInsertions |
352 | mainlineSpace | 367 | mainlineSpace |
353 | (vanillaIO var $ ping client) | 368 | (vanillaIO var $ ping client) |
354 | { tblTransition = transitionCommittee committee } | 369 | { tblTransition = \tr -> do |
370 | io1 <- transitionCommittee committee tr | ||
371 | io2 <- touchBucket mainlineSpace (15*60) var sched tr | ||
372 | return $ io1 >> io2 | ||
373 | } | ||
355 | 374 | ||
356 | 375 | ||
357 | mainlineSpace :: R.KademliaSpace NodeId NodeInfo | 376 | mainlineSpace :: R.KademliaSpace NodeId NodeInfo |
@@ -365,19 +384,20 @@ transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeI | |||
365 | transitionCommittee committee (RoutingTransition ni Stranger) = do | 384 | transitionCommittee committee (RoutingTransition ni Stranger) = do |
366 | delVote committee (nodeId ni) | 385 | delVote committee (nodeId ni) |
367 | return $ return () | 386 | return $ return () |
387 | transitionCommittee committee _ = return $ return () | ||
368 | 388 | ||
369 | updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () | 389 | updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () |
370 | updateRouting client routing naddr msg = do | 390 | updateRouting client routing naddr msg = do |
371 | case prefer4or6 naddr Nothing of | 391 | case prefer4or6 naddr Nothing of |
372 | Want_IP4 -> go (routing4 routing) (committee4 routing) | 392 | Want_IP4 -> go (routing4 routing) (committee4 routing) (sched4 routing) |
373 | Want_IP6 -> go (routing6 routing) (committee6 routing) | 393 | Want_IP6 -> go (routing6 routing) (committee6 routing) (sched6 routing) |
374 | where | 394 | where |
375 | go tbl committee = do | 395 | go tbl committee sched = do |
376 | case msg of | 396 | case msg of |
377 | R { rspReflectedIP = Just sockaddr } | 397 | R { rspReflectedIP = Just sockaddr } |
378 | -> atomically $ addVote committee (nodeId naddr) sockaddr | 398 | -> atomically $ addVote committee (nodeId naddr) sockaddr |
379 | _ -> return () | 399 | _ -> return () |
380 | insertNode (mainlineKademlia client committee tbl) naddr | 400 | insertNode (mainlineKademlia client committee tbl sched) naddr |
381 | 401 | ||
382 | data Ping = Ping deriving Show | 402 | data Ping = Ping deriving Show |
383 | 403 | ||
@@ -478,7 +498,7 @@ findNodeH routing addr (FindNode node iptyp) = do | |||
478 | ks6 <- bool (return []) (go $ routing6 routing) (preferred /= Want_IP4) | 498 | ks6 <- bool (return []) (go $ routing6 routing) (preferred /= Want_IP4) |
479 | return $ NodeFound ks ks6 | 499 | return $ NodeFound ks ks6 |
480 | where | 500 | where |
481 | go var = R.kclosest nodeId k node <$> atomically (readTVar var) | 501 | go var = R.kclosest mainlineSpace k node <$> atomically (readTVar var) |
482 | k = R.defaultK | 502 | k = R.defaultK |
483 | 503 | ||
484 | 504 | ||
@@ -754,3 +774,8 @@ delVote triad voter = do | |||
754 | writeTVar (triadSlot slot triad) Nothing | 774 | writeTVar (triadSlot slot triad) Nothing |
755 | triadCountVotes triad | 775 | triadCountVotes triad |
756 | 776 | ||
777 | nodeSearch = Search | ||
778 | { searchSpace = mainlineSpace | ||
779 | , searchNodeAddress = nodeIP &&& nodePort | ||
780 | , searchQuery = error "searchQuery" | ||
781 | } | ||