summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-06-15 20:08:08 -0400
committerjoe <joe@jerkface.net>2017-06-15 20:08:08 -0400
commitb9a2c3dd44f5dd59157676fb386584a148d854cb (patch)
treef9521b0e4e46cadfbc18722f3e077640e51e374b /src
parent362d36e4e31da0d3e3f78cd0aa6dd99cddeaba49 (diff)
Refactored insertNode.
Diffstat (limited to 'src')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs103
-rw-r--r--src/Network/DHT.hs125
-rw-r--r--src/Network/DHT/Types.hs13
3 files changed, 172 insertions, 69 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 9162abdc..68c67900 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -86,6 +86,7 @@ import Network.KRPC.Method as KRPC
86import Network.DatagramServer.Mainline (ReflectedIP(..)) 86import Network.DatagramServer.Mainline (ReflectedIP(..))
87import Network.DatagramServer (QueryFailure(..)) 87import Network.DatagramServer (QueryFailure(..))
88import Data.Torrent 88import Data.Torrent
89import qualified Network.DHT as DHT
89import Network.DHT.Mainline 90import Network.DHT.Mainline
90import Network.DHT.Routing as R 91import Network.DHT.Routing as R
91import Network.BitTorrent.DHT.Session 92import Network.BitTorrent.DHT.Session
@@ -100,6 +101,7 @@ import Network.DatagramServer.Tox
100#endif 101#endif
101import Network.Address hiding (NodeId) 102import Network.Address hiding (NodeId)
102import Network.DatagramServer.Types as RPC hiding (Query,Response) 103import Network.DatagramServer.Types as RPC hiding (Query,Response)
104import Control.Monad.Trans.Control
103 105
104{----------------------------------------------------------------------- 106{-----------------------------------------------------------------------
105-- Handlers 107-- Handlers
@@ -312,79 +314,42 @@ refreshNodes nid = do
312 return () 314 return ()
313 return () -- \$ L.concat nss 315 return () -- \$ L.concat nss
314 316
317logc :: Char -> String -> DHT ip ()
318logc 'D' = $(logDebugS) "insertNode" . T.pack
319logc 'W' = $(logWarnS) "insertNode" . T.pack
320logc 'I' = $(logInfoS) "insertNode" . T.pack
321logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :)
322
315-- | This operation do not block but acquire exclusive access to 323-- | This operation do not block but acquire exclusive access to
316-- routing table. 324-- routing table.
317insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () 325insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip ()
318insertNode info witnessed_ip0 = do 326insertNode info witnessed_ip0 = do
319 var <- asks routingInfo 327 bc <- optBucketCount <$> asks options
320 tm <- getTimestamp 328 nid <- asks tentativeNodeId
321 let showTable = do 329 logm0 <- embed_ (uncurry logc)
322 t <- getTable 330 let logm c = logm0 . (c,)
323 let logMsg = "Routing table: " <> pPrint t 331 dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state.
324 $(logDebugS) "insertNode" (T.pack (render logMsg)) 332 probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP)
325 let arrival0 = TryInsert info 333 let probe n = probe0 n >>= runDHT dht_node_state . restoreM
326 arrival4 = TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ 334 changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive
327 $(logDebugS) "insertNode" $ T.pack (show arrival4) 335 ip <- fromSockAddr ip0 :: Maybe ip
328 maxbuckets <- asks (optBucketCount . options) 336 listToMaybe
329 fallbackid <- asks tentativeNodeId 337 $ rank id (nodeId $ foreignNode arrival)
330 let atomicInsert arrival witnessed_ip = do 338 $ bep42s ip (DHT.fallbackID params) -- warning: recursive
331 minfo <- readTVar var 339 params = DHT.TableParameters
332 let change ip0 = fromMaybe fallbackid $ do 340 { maxBuckets = bc :: Int
333 ip <- fromSockAddr ip0 :: Maybe ip 341 , fallbackID = nid :: NodeId KMessageOf
334 listToMaybe 342 , adjustID = changeip :: SockAddr -> Event KMessageOf ip () -> NodeId KMessageOf
335 $ rank id (nodeId $ foreignNode arrival) 343 , logMessage = logm :: Char -> String -> IO ()
336 $ bep42s ip fallbackid 344 , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP)
337 case minfo of 345 }
338 Just inf -> do 346 tbl <- asks routingInfo
339 (ps,t') <- R.insert tm arrival $ myBuckets inf 347 let state = DHT.TableKeeper
340 writeTVar var $ Just $ inf { myBuckets = t' } 348 { routingInfo = tbl
341 return $ do 349 , grokNode = DHT.insertNode params state
342 case witnessed_ip of 350 , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO ()
343 Just (ReflectedIP ip) 351 }
344 | ip /= myAddress inf 352 liftIO $ DHT.insertNode params state info witnessed_ip0
345 -> $(logInfo) ( T.pack $ L.unwords
346 $ [ "Possible NAT?"
347 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
348 , "reports my address:"
349 , show ip ] )
350 -- TODO: Let routing table vote on my IP/NodeId.
351 _ -> return ()
352 return ps
353 Nothing ->
354 let dropped = return $ do
355 -- Ignore non-witnessing nodes until somebody tells
356 -- us our ip address.
357 $(logWarnS) "insertNode" ("Dropped "
358 <> T.pack (show (toSockAddr $ nodeAddr $ foreignNode arrival)))
359 return []
360 in fromMaybe dropped $ do
361 ReflectedIP ip <- witnessed_ip
362 let nil = nullTable (change ip) maxbuckets
363 return $ do
364 (ps,t') <- R.insert tm arrival nil
365 let new_info = R.Info t' (change ip) ip
366 writeTVar var $ Just new_info
367 return $ do
368 $(logInfo) ( T.pack $ L.unwords
369 [ "External IP address:"
370 , show ip
371 , "(reported by"
372 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
373 <> ")"
374 ] )
375 return ps
376 ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0
377 showTable
378 _ <- fork $ do
379 myThreadId >>= liftIO . flip labelThread "DHT.insertNode.pingResults"
380 forM_ ps $ \(CheckPing ns)-> do
381 forM_ ns $ \n -> do
382 (b,mip) <- probeNode (nodeAddr n)
383 let alive = PingResult n b
384 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b))
385 _ <- join $ liftIO $ atomically $ atomicInsert alive mip
386 showTable
387 return ()
388 353
389-- | Throws exception if node is not responding. 354-- | Throws exception if node is not responding.
390queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) 355queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b)
diff --git a/src/Network/DHT.hs b/src/Network/DHT.hs
new file mode 100644
index 00000000..0dab29cd
--- /dev/null
+++ b/src/Network/DHT.hs
@@ -0,0 +1,125 @@
1{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
2module Network.DHT
3 ( -- makeTableKeeper
4 -- , TableKeeper(..)
5 module Network.DHT -- for now
6 , module Network.DHT.Types
7 ) where
8
9import Data.Bits
10import Data.Maybe
11import Data.Monoid
12import Network.Address
13import Network.DHT.Types
14import Network.DatagramServer.Types
15import Network.DHT.Routing
16import Control.Concurrent.STM
17#ifdef THREAD_DEBUG
18import Control.Concurrent.Lifted.Instrument
19#else
20import GHC.Conc (labelThread)
21import Control.Concurrent.Lifted
22#endif
23import Text.PrettyPrint as PP hiding ((<>), ($$))
24import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
25
26import Control.Monad
27import Data.Time.Clock (getCurrentTime)
28import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
29
30data TableKeeper msg ip u = TableKeeper
31 { routingInfo :: TVar (Maybe (Info msg ip u))
32 , grokNode :: NodeInfo msg ip u -> Maybe (ReflectedIP) -> IO ()
33 , grokAddress :: Maybe SockAddr -> ReflectedIP -> IO ()
34 }
35
36makeTableKeeper :: forall msg ip u.
37 ( Address ip
38 , Show u
39 , Show (NodeId msg)
40 , Ord (NodeId msg)
41 , FiniteBits (NodeId msg)
42 ) => TableParameters msg ip u -> IO (TableKeeper msg ip u)
43makeTableKeeper param@TableParameters{..} = do
44 error "TODO makeTableKeeper" -- kick off table-updating thread
45 ri <- atomically (newTVar Nothing)
46 let tk = TableKeeper{ routingInfo = ri
47 , grokNode = insertNode param tk
48 , grokAddress = error "todo"
49 }
50 return tk
51
52atomicInsert :: ( Eq ip, Address ip, Ord (NodeId msg), FiniteBits (NodeId msg)
53 ) => TableParameters msg ip u -> TableKeeper msg ip u -> Timestamp -> Event msg ip u -> Maybe ReflectedIP -> STM (IO [CheckPing msg ip u])
54atomicInsert param@TableParameters{..} state tm arrival witnessed_ip = do
55 minfo <- readTVar (routingInfo state)
56 case minfo of
57 Just inf -> do
58 (ps,t') <- insert tm arrival $ myBuckets inf
59 writeTVar (routingInfo state) $ Just $ inf { myBuckets = t' }
60 return $ do
61 case witnessed_ip of
62 Just (ReflectedIP ip)
63 | ip /= myAddress inf
64 -> logMessage 'I' $ unwords
65 $ [ "Possible NAT?"
66 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
67 , "reports my address:"
68 , show ip ]
69 -- TODO: Let routing table vote on my IP/NodeId.
70 _ -> return ()
71 return ps
72 Nothing ->
73 let dropped = return $ do
74 -- Ignore non-witnessing nodes until somebody tells
75 -- us our ip address.
76 logMessage 'W' ("Dropped " ++ show (toSockAddr $ nodeAddr $ foreignNode arrival))
77 return []
78 in fromMaybe dropped $ do
79 ReflectedIP ip <- witnessed_ip
80 let nil = nullTable (adjustID ip arrival) maxBuckets
81 return $ do
82 (ps,t') <- insert tm arrival nil
83 let new_info = Info t' (adjustID ip arrival) ip
84 writeTVar (routingInfo state) $ Just new_info
85 return $ do
86 logMessage 'I' $ unwords
87 [ "External IP address:"
88 , show ip
89 , "(reported by"
90 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
91 ++ ")"
92 ]
93 return ps
94
95-- | This operation do not block but acquire exclusive access to
96-- routing table.
97insertNode :: forall msg ip u.
98 ( Address ip
99 , Show u
100 , Show (NodeId msg)
101 , Ord (NodeId msg)
102 , FiniteBits (NodeId msg)
103 ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO ()
104insertNode param@TableParameters{..} state info witnessed_ip0 = do
105 tm <- utcTimeToPOSIXSeconds <$> getCurrentTime -- Network.DHT.Routing.TimeStamp = POSIXTime
106 let showTable = do
107 t <- atomically $ fmap myBuckets <$> readTVar (routingInfo state)
108 let logMsg = "Routing table: " <> pPrint t
109 logMessage 'D' (render logMsg)
110 let arrival = TryInsert info
111 logMessage 'D' $ show ( TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ )
112 ps <- join $ atomically $ atomicInsert param state tm arrival witnessed_ip0
113 showTable
114 _ <- fork $ do
115 myThreadId >>= flip labelThread "DHT.insertNode.pingResults"
116 forM_ ps $ \(CheckPing ns)-> do
117 forM_ ns $ \n -> do
118 (b,mip) <- pingProbe (nodeAddr n)
119 let alive = PingResult n b
120 logMessage 'D' $ "PingResult "++show (nodeId n,b)
121 _ <- join $ atomically $ atomicInsert param state tm alive mip
122 showTable
123 return ()
124 return ()
125
diff --git a/src/Network/DHT/Types.hs b/src/Network/DHT/Types.hs
new file mode 100644
index 00000000..ed2dc175
--- /dev/null
+++ b/src/Network/DHT/Types.hs
@@ -0,0 +1,13 @@
1module Network.DHT.Types where
2
3import Network.Socket (SockAddr)
4import Network.DatagramServer.Types
5import Network.DHT.Routing
6
7data TableParameters msg ip u = TableParameters
8 { maxBuckets :: Int
9 , fallbackID :: NodeId msg
10 , pingProbe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP)
11 , logMessage :: Char -> String -> IO ()
12 , adjustID :: SockAddr -> Event msg ip u -> NodeId msg
13 }