diff options
author | joe <joe@jerkface.net> | 2017-06-15 20:08:08 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-06-15 20:08:08 -0400 |
commit | b9a2c3dd44f5dd59157676fb386584a148d854cb (patch) | |
tree | f9521b0e4e46cadfbc18722f3e077640e51e374b /src | |
parent | 362d36e4e31da0d3e3f78cd0aa6dd99cddeaba49 (diff) |
Refactored insertNode.
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 103 | ||||
-rw-r--r-- | src/Network/DHT.hs | 125 | ||||
-rw-r--r-- | src/Network/DHT/Types.hs | 13 |
3 files changed, 172 insertions, 69 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 9162abdc..68c67900 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -86,6 +86,7 @@ import Network.KRPC.Method as KRPC | |||
86 | import Network.DatagramServer.Mainline (ReflectedIP(..)) | 86 | import Network.DatagramServer.Mainline (ReflectedIP(..)) |
87 | import Network.DatagramServer (QueryFailure(..)) | 87 | import Network.DatagramServer (QueryFailure(..)) |
88 | import Data.Torrent | 88 | import Data.Torrent |
89 | import qualified Network.DHT as DHT | ||
89 | import Network.DHT.Mainline | 90 | import Network.DHT.Mainline |
90 | import Network.DHT.Routing as R | 91 | import Network.DHT.Routing as R |
91 | import Network.BitTorrent.DHT.Session | 92 | import Network.BitTorrent.DHT.Session |
@@ -100,6 +101,7 @@ import Network.DatagramServer.Tox | |||
100 | #endif | 101 | #endif |
101 | import Network.Address hiding (NodeId) | 102 | import Network.Address hiding (NodeId) |
102 | import Network.DatagramServer.Types as RPC hiding (Query,Response) | 103 | import Network.DatagramServer.Types as RPC hiding (Query,Response) |
104 | import Control.Monad.Trans.Control | ||
103 | 105 | ||
104 | {----------------------------------------------------------------------- | 106 | {----------------------------------------------------------------------- |
105 | -- Handlers | 107 | -- Handlers |
@@ -312,79 +314,42 @@ refreshNodes nid = do | |||
312 | return () | 314 | return () |
313 | return () -- \$ L.concat nss | 315 | return () -- \$ L.concat nss |
314 | 316 | ||
317 | logc :: Char -> String -> DHT ip () | ||
318 | logc 'D' = $(logDebugS) "insertNode" . T.pack | ||
319 | logc 'W' = $(logWarnS) "insertNode" . T.pack | ||
320 | logc 'I' = $(logInfoS) "insertNode" . T.pack | ||
321 | logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) | ||
322 | |||
315 | -- | This operation do not block but acquire exclusive access to | 323 | -- | This operation do not block but acquire exclusive access to |
316 | -- routing table. | 324 | -- routing table. |
317 | insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () | 325 | insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () |
318 | insertNode info witnessed_ip0 = do | 326 | insertNode info witnessed_ip0 = do |
319 | var <- asks routingInfo | 327 | bc <- optBucketCount <$> asks options |
320 | tm <- getTimestamp | 328 | nid <- asks tentativeNodeId |
321 | let showTable = do | 329 | logm0 <- embed_ (uncurry logc) |
322 | t <- getTable | 330 | let logm c = logm0 . (c,) |
323 | let logMsg = "Routing table: " <> pPrint t | 331 | dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. |
324 | $(logDebugS) "insertNode" (T.pack (render logMsg)) | 332 | probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) |
325 | let arrival0 = TryInsert info | 333 | let probe n = probe0 n >>= runDHT dht_node_state . restoreM |
326 | arrival4 = TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ | 334 | changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive |
327 | $(logDebugS) "insertNode" $ T.pack (show arrival4) | 335 | ip <- fromSockAddr ip0 :: Maybe ip |
328 | maxbuckets <- asks (optBucketCount . options) | 336 | listToMaybe |
329 | fallbackid <- asks tentativeNodeId | 337 | $ rank id (nodeId $ foreignNode arrival) |
330 | let atomicInsert arrival witnessed_ip = do | 338 | $ bep42s ip (DHT.fallbackID params) -- warning: recursive |
331 | minfo <- readTVar var | 339 | params = DHT.TableParameters |
332 | let change ip0 = fromMaybe fallbackid $ do | 340 | { maxBuckets = bc :: Int |
333 | ip <- fromSockAddr ip0 :: Maybe ip | 341 | , fallbackID = nid :: NodeId KMessageOf |
334 | listToMaybe | 342 | , adjustID = changeip :: SockAddr -> Event KMessageOf ip () -> NodeId KMessageOf |
335 | $ rank id (nodeId $ foreignNode arrival) | 343 | , logMessage = logm :: Char -> String -> IO () |
336 | $ bep42s ip fallbackid | 344 | , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP) |
337 | case minfo of | 345 | } |
338 | Just inf -> do | 346 | tbl <- asks routingInfo |
339 | (ps,t') <- R.insert tm arrival $ myBuckets inf | 347 | let state = DHT.TableKeeper |
340 | writeTVar var $ Just $ inf { myBuckets = t' } | 348 | { routingInfo = tbl |
341 | return $ do | 349 | , grokNode = DHT.insertNode params state |
342 | case witnessed_ip of | 350 | , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO () |
343 | Just (ReflectedIP ip) | 351 | } |
344 | | ip /= myAddress inf | 352 | liftIO $ DHT.insertNode params state info witnessed_ip0 |
345 | -> $(logInfo) ( T.pack $ L.unwords | ||
346 | $ [ "Possible NAT?" | ||
347 | , show (toSockAddr $ nodeAddr $ foreignNode arrival) | ||
348 | , "reports my address:" | ||
349 | , show ip ] ) | ||
350 | -- TODO: Let routing table vote on my IP/NodeId. | ||
351 | _ -> return () | ||
352 | return ps | ||
353 | Nothing -> | ||
354 | let dropped = return $ do | ||
355 | -- Ignore non-witnessing nodes until somebody tells | ||
356 | -- us our ip address. | ||
357 | $(logWarnS) "insertNode" ("Dropped " | ||
358 | <> T.pack (show (toSockAddr $ nodeAddr $ foreignNode arrival))) | ||
359 | return [] | ||
360 | in fromMaybe dropped $ do | ||
361 | ReflectedIP ip <- witnessed_ip | ||
362 | let nil = nullTable (change ip) maxbuckets | ||
363 | return $ do | ||
364 | (ps,t') <- R.insert tm arrival nil | ||
365 | let new_info = R.Info t' (change ip) ip | ||
366 | writeTVar var $ Just new_info | ||
367 | return $ do | ||
368 | $(logInfo) ( T.pack $ L.unwords | ||
369 | [ "External IP address:" | ||
370 | , show ip | ||
371 | , "(reported by" | ||
372 | , show (toSockAddr $ nodeAddr $ foreignNode arrival) | ||
373 | <> ")" | ||
374 | ] ) | ||
375 | return ps | ||
376 | ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0 | ||
377 | showTable | ||
378 | _ <- fork $ do | ||
379 | myThreadId >>= liftIO . flip labelThread "DHT.insertNode.pingResults" | ||
380 | forM_ ps $ \(CheckPing ns)-> do | ||
381 | forM_ ns $ \n -> do | ||
382 | (b,mip) <- probeNode (nodeAddr n) | ||
383 | let alive = PingResult n b | ||
384 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) | ||
385 | _ <- join $ liftIO $ atomically $ atomicInsert alive mip | ||
386 | showTable | ||
387 | return () | ||
388 | 353 | ||
389 | -- | Throws exception if node is not responding. | 354 | -- | Throws exception if node is not responding. |
390 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 355 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) |
diff --git a/src/Network/DHT.hs b/src/Network/DHT.hs new file mode 100644 index 00000000..0dab29cd --- /dev/null +++ b/src/Network/DHT.hs | |||
@@ -0,0 +1,125 @@ | |||
1 | {-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} | ||
2 | module Network.DHT | ||
3 | ( -- makeTableKeeper | ||
4 | -- , TableKeeper(..) | ||
5 | module Network.DHT -- for now | ||
6 | , module Network.DHT.Types | ||
7 | ) where | ||
8 | |||
9 | import Data.Bits | ||
10 | import Data.Maybe | ||
11 | import Data.Monoid | ||
12 | import Network.Address | ||
13 | import Network.DHT.Types | ||
14 | import Network.DatagramServer.Types | ||
15 | import Network.DHT.Routing | ||
16 | import Control.Concurrent.STM | ||
17 | #ifdef THREAD_DEBUG | ||
18 | import Control.Concurrent.Lifted.Instrument | ||
19 | #else | ||
20 | import GHC.Conc (labelThread) | ||
21 | import Control.Concurrent.Lifted | ||
22 | #endif | ||
23 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | ||
24 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
25 | |||
26 | import Control.Monad | ||
27 | import Data.Time.Clock (getCurrentTime) | ||
28 | import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) | ||
29 | |||
30 | data TableKeeper msg ip u = TableKeeper | ||
31 | { routingInfo :: TVar (Maybe (Info msg ip u)) | ||
32 | , grokNode :: NodeInfo msg ip u -> Maybe (ReflectedIP) -> IO () | ||
33 | , grokAddress :: Maybe SockAddr -> ReflectedIP -> IO () | ||
34 | } | ||
35 | |||
36 | makeTableKeeper :: forall msg ip u. | ||
37 | ( Address ip | ||
38 | , Show u | ||
39 | , Show (NodeId msg) | ||
40 | , Ord (NodeId msg) | ||
41 | , FiniteBits (NodeId msg) | ||
42 | ) => TableParameters msg ip u -> IO (TableKeeper msg ip u) | ||
43 | makeTableKeeper param@TableParameters{..} = do | ||
44 | error "TODO makeTableKeeper" -- kick off table-updating thread | ||
45 | ri <- atomically (newTVar Nothing) | ||
46 | let tk = TableKeeper{ routingInfo = ri | ||
47 | , grokNode = insertNode param tk | ||
48 | , grokAddress = error "todo" | ||
49 | } | ||
50 | return tk | ||
51 | |||
52 | atomicInsert :: ( Eq ip, Address ip, Ord (NodeId msg), FiniteBits (NodeId msg) | ||
53 | ) => TableParameters msg ip u -> TableKeeper msg ip u -> Timestamp -> Event msg ip u -> Maybe ReflectedIP -> STM (IO [CheckPing msg ip u]) | ||
54 | atomicInsert param@TableParameters{..} state tm arrival witnessed_ip = do | ||
55 | minfo <- readTVar (routingInfo state) | ||
56 | case minfo of | ||
57 | Just inf -> do | ||
58 | (ps,t') <- insert tm arrival $ myBuckets inf | ||
59 | writeTVar (routingInfo state) $ Just $ inf { myBuckets = t' } | ||
60 | return $ do | ||
61 | case witnessed_ip of | ||
62 | Just (ReflectedIP ip) | ||
63 | | ip /= myAddress inf | ||
64 | -> logMessage 'I' $ unwords | ||
65 | $ [ "Possible NAT?" | ||
66 | , show (toSockAddr $ nodeAddr $ foreignNode arrival) | ||
67 | , "reports my address:" | ||
68 | , show ip ] | ||
69 | -- TODO: Let routing table vote on my IP/NodeId. | ||
70 | _ -> return () | ||
71 | return ps | ||
72 | Nothing -> | ||
73 | let dropped = return $ do | ||
74 | -- Ignore non-witnessing nodes until somebody tells | ||
75 | -- us our ip address. | ||
76 | logMessage 'W' ("Dropped " ++ show (toSockAddr $ nodeAddr $ foreignNode arrival)) | ||
77 | return [] | ||
78 | in fromMaybe dropped $ do | ||
79 | ReflectedIP ip <- witnessed_ip | ||
80 | let nil = nullTable (adjustID ip arrival) maxBuckets | ||
81 | return $ do | ||
82 | (ps,t') <- insert tm arrival nil | ||
83 | let new_info = Info t' (adjustID ip arrival) ip | ||
84 | writeTVar (routingInfo state) $ Just new_info | ||
85 | return $ do | ||
86 | logMessage 'I' $ unwords | ||
87 | [ "External IP address:" | ||
88 | , show ip | ||
89 | , "(reported by" | ||
90 | , show (toSockAddr $ nodeAddr $ foreignNode arrival) | ||
91 | ++ ")" | ||
92 | ] | ||
93 | return ps | ||
94 | |||
95 | -- | This operation do not block but acquire exclusive access to | ||
96 | -- routing table. | ||
97 | insertNode :: forall msg ip u. | ||
98 | ( Address ip | ||
99 | , Show u | ||
100 | , Show (NodeId msg) | ||
101 | , Ord (NodeId msg) | ||
102 | , FiniteBits (NodeId msg) | ||
103 | ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO () | ||
104 | insertNode param@TableParameters{..} state info witnessed_ip0 = do | ||
105 | tm <- utcTimeToPOSIXSeconds <$> getCurrentTime -- Network.DHT.Routing.TimeStamp = POSIXTime | ||
106 | let showTable = do | ||
107 | t <- atomically $ fmap myBuckets <$> readTVar (routingInfo state) | ||
108 | let logMsg = "Routing table: " <> pPrint t | ||
109 | logMessage 'D' (render logMsg) | ||
110 | let arrival = TryInsert info | ||
111 | logMessage 'D' $ show ( TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ ) | ||
112 | ps <- join $ atomically $ atomicInsert param state tm arrival witnessed_ip0 | ||
113 | showTable | ||
114 | _ <- fork $ do | ||
115 | myThreadId >>= flip labelThread "DHT.insertNode.pingResults" | ||
116 | forM_ ps $ \(CheckPing ns)-> do | ||
117 | forM_ ns $ \n -> do | ||
118 | (b,mip) <- pingProbe (nodeAddr n) | ||
119 | let alive = PingResult n b | ||
120 | logMessage 'D' $ "PingResult "++show (nodeId n,b) | ||
121 | _ <- join $ atomically $ atomicInsert param state tm alive mip | ||
122 | showTable | ||
123 | return () | ||
124 | return () | ||
125 | |||
diff --git a/src/Network/DHT/Types.hs b/src/Network/DHT/Types.hs new file mode 100644 index 00000000..ed2dc175 --- /dev/null +++ b/src/Network/DHT/Types.hs | |||
@@ -0,0 +1,13 @@ | |||
1 | module Network.DHT.Types where | ||
2 | |||
3 | import Network.Socket (SockAddr) | ||
4 | import Network.DatagramServer.Types | ||
5 | import Network.DHT.Routing | ||
6 | |||
7 | data TableParameters msg ip u = TableParameters | ||
8 | { maxBuckets :: Int | ||
9 | , fallbackID :: NodeId msg | ||
10 | , pingProbe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP) | ||
11 | , logMessage :: Char -> String -> IO () | ||
12 | , adjustID :: SockAddr -> Event msg ip u -> NodeId msg | ||
13 | } | ||