summaryrefslogtreecommitdiff
path: root/src/Network/DHT.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-06-15 20:08:08 -0400
committerjoe <joe@jerkface.net>2017-06-15 20:08:08 -0400
commitb9a2c3dd44f5dd59157676fb386584a148d854cb (patch)
treef9521b0e4e46cadfbc18722f3e077640e51e374b /src/Network/DHT.hs
parent362d36e4e31da0d3e3f78cd0aa6dd99cddeaba49 (diff)
Refactored insertNode.
Diffstat (limited to 'src/Network/DHT.hs')
-rw-r--r--src/Network/DHT.hs125
1 files changed, 125 insertions, 0 deletions
diff --git a/src/Network/DHT.hs b/src/Network/DHT.hs
new file mode 100644
index 00000000..0dab29cd
--- /dev/null
+++ b/src/Network/DHT.hs
@@ -0,0 +1,125 @@
1{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
2module Network.DHT
3 ( -- makeTableKeeper
4 -- , TableKeeper(..)
5 module Network.DHT -- for now
6 , module Network.DHT.Types
7 ) where
8
9import Data.Bits
10import Data.Maybe
11import Data.Monoid
12import Network.Address
13import Network.DHT.Types
14import Network.DatagramServer.Types
15import Network.DHT.Routing
16import Control.Concurrent.STM
17#ifdef THREAD_DEBUG
18import Control.Concurrent.Lifted.Instrument
19#else
20import GHC.Conc (labelThread)
21import Control.Concurrent.Lifted
22#endif
23import Text.PrettyPrint as PP hiding ((<>), ($$))
24import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
25
26import Control.Monad
27import Data.Time.Clock (getCurrentTime)
28import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
29
30data TableKeeper msg ip u = TableKeeper
31 { routingInfo :: TVar (Maybe (Info msg ip u))
32 , grokNode :: NodeInfo msg ip u -> Maybe (ReflectedIP) -> IO ()
33 , grokAddress :: Maybe SockAddr -> ReflectedIP -> IO ()
34 }
35
36makeTableKeeper :: forall msg ip u.
37 ( Address ip
38 , Show u
39 , Show (NodeId msg)
40 , Ord (NodeId msg)
41 , FiniteBits (NodeId msg)
42 ) => TableParameters msg ip u -> IO (TableKeeper msg ip u)
43makeTableKeeper param@TableParameters{..} = do
44 error "TODO makeTableKeeper" -- kick off table-updating thread
45 ri <- atomically (newTVar Nothing)
46 let tk = TableKeeper{ routingInfo = ri
47 , grokNode = insertNode param tk
48 , grokAddress = error "todo"
49 }
50 return tk
51
52atomicInsert :: ( Eq ip, Address ip, Ord (NodeId msg), FiniteBits (NodeId msg)
53 ) => TableParameters msg ip u -> TableKeeper msg ip u -> Timestamp -> Event msg ip u -> Maybe ReflectedIP -> STM (IO [CheckPing msg ip u])
54atomicInsert param@TableParameters{..} state tm arrival witnessed_ip = do
55 minfo <- readTVar (routingInfo state)
56 case minfo of
57 Just inf -> do
58 (ps,t') <- insert tm arrival $ myBuckets inf
59 writeTVar (routingInfo state) $ Just $ inf { myBuckets = t' }
60 return $ do
61 case witnessed_ip of
62 Just (ReflectedIP ip)
63 | ip /= myAddress inf
64 -> logMessage 'I' $ unwords
65 $ [ "Possible NAT?"
66 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
67 , "reports my address:"
68 , show ip ]
69 -- TODO: Let routing table vote on my IP/NodeId.
70 _ -> return ()
71 return ps
72 Nothing ->
73 let dropped = return $ do
74 -- Ignore non-witnessing nodes until somebody tells
75 -- us our ip address.
76 logMessage 'W' ("Dropped " ++ show (toSockAddr $ nodeAddr $ foreignNode arrival))
77 return []
78 in fromMaybe dropped $ do
79 ReflectedIP ip <- witnessed_ip
80 let nil = nullTable (adjustID ip arrival) maxBuckets
81 return $ do
82 (ps,t') <- insert tm arrival nil
83 let new_info = Info t' (adjustID ip arrival) ip
84 writeTVar (routingInfo state) $ Just new_info
85 return $ do
86 logMessage 'I' $ unwords
87 [ "External IP address:"
88 , show ip
89 , "(reported by"
90 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
91 ++ ")"
92 ]
93 return ps
94
95-- | This operation do not block but acquire exclusive access to
96-- routing table.
97insertNode :: forall msg ip u.
98 ( Address ip
99 , Show u
100 , Show (NodeId msg)
101 , Ord (NodeId msg)
102 , FiniteBits (NodeId msg)
103 ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO ()
104insertNode param@TableParameters{..} state info witnessed_ip0 = do
105 tm <- utcTimeToPOSIXSeconds <$> getCurrentTime -- Network.DHT.Routing.TimeStamp = POSIXTime
106 let showTable = do
107 t <- atomically $ fmap myBuckets <$> readTVar (routingInfo state)
108 let logMsg = "Routing table: " <> pPrint t
109 logMessage 'D' (render logMsg)
110 let arrival = TryInsert info
111 logMessage 'D' $ show ( TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ )
112 ps <- join $ atomically $ atomicInsert param state tm arrival witnessed_ip0
113 showTable
114 _ <- fork $ do
115 myThreadId >>= flip labelThread "DHT.insertNode.pingResults"
116 forM_ ps $ \(CheckPing ns)-> do
117 forM_ ns $ \n -> do
118 (b,mip) <- pingProbe (nodeAddr n)
119 let alive = PingResult n b
120 logMessage 'D' $ "PingResult "++show (nodeId n,b)
121 _ <- join $ atomically $ atomicInsert param state tm alive mip
122 showTable
123 return ()
124 return ()
125