1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
module Network.DHT
( -- makeTableKeeper
-- , TableKeeper(..)
module Network.DHT -- for now
, module Network.DHT.Types
) where
import Data.Bits
import Data.Maybe
import Data.Monoid
import Network.Address
import Network.DHT.Types
import Network.DatagramServer.Types
import Network.DHT.Routing
import Control.Concurrent.STM
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import GHC.Conc (labelThread)
import Control.Concurrent.Lifted
#endif
import Text.PrettyPrint as PP hiding ((<>), ($$))
import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
import Control.Monad
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
data TableKeeper msg ip u = TableKeeper
{ routingInfo :: TVar (Maybe (Info msg ip u))
, grokNode :: NodeInfo msg ip u -> Maybe (ReflectedIP) -> IO ()
, grokAddress :: Maybe SockAddr -> ReflectedIP -> IO ()
}
makeTableKeeper :: forall msg ip u.
( Address ip
, Show u
, Show (NodeId msg)
, Ord (NodeId msg)
, FiniteBits (NodeId msg)
) => TableParameters msg ip u -> IO (TableKeeper msg ip u)
makeTableKeeper param@TableParameters{..} = do
error "TODO makeTableKeeper" -- kick off table-updating thread
ri <- atomically (newTVar Nothing)
let tk = TableKeeper{ routingInfo = ri
, grokNode = insertNode param tk
, grokAddress = error "todo"
}
return tk
atomicInsert :: ( Eq ip, Address ip, Ord (NodeId msg), FiniteBits (NodeId msg)
) => TableParameters msg ip u -> TableKeeper msg ip u -> Timestamp -> Event msg ip u -> Maybe ReflectedIP -> STM (IO [CheckPing msg ip u])
atomicInsert param@TableParameters{..} state tm arrival witnessed_ip = do
minfo <- readTVar (routingInfo state)
case minfo of
Just inf -> do
(ps,t') <- insert tm arrival $ myBuckets inf
writeTVar (routingInfo state) $ Just $ inf { myBuckets = t' }
return $ do
case witnessed_ip of
Just (ReflectedIP ip)
| ip /= myAddress inf
-> logMessage 'I' $ unwords
$ [ "Possible NAT?"
, show (toSockAddr $ nodeAddr $ foreignNode arrival)
, "reports my address:"
, show ip ]
-- TODO: Let routing table vote on my IP/NodeId.
_ -> return ()
return ps
Nothing ->
let dropped = return $ do
-- Ignore non-witnessing nodes until somebody tells
-- us our ip address.
logMessage 'W' ("Dropped " ++ show (toSockAddr $ nodeAddr $ foreignNode arrival))
return []
in fromMaybe dropped $ do
ReflectedIP ip <- witnessed_ip
let nil = nullTable (adjustID ip arrival) maxBuckets
return $ do
(ps,t') <- insert tm arrival nil
let new_info = Info t' (adjustID ip arrival) ip
writeTVar (routingInfo state) $ Just new_info
return $ do
logMessage 'I' $ unwords
[ "External IP address:"
, show ip
, "(reported by"
, show (toSockAddr $ nodeAddr $ foreignNode arrival)
++ ")"
]
return ps
-- | This operation do not block but acquire exclusive access to
-- routing table.
insertNode :: forall msg ip u.
( Address ip
, Show u
, Show (NodeId msg)
, Ord (NodeId msg)
, FiniteBits (NodeId msg)
) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO ()
insertNode param@TableParameters{..} state info witnessed_ip0 = do
tm <- utcTimeToPOSIXSeconds <$> getCurrentTime -- Network.DHT.Routing.TimeStamp = POSIXTime
let showTable = do
t <- atomically $ fmap myBuckets <$> readTVar (routingInfo state)
let logMsg = "Routing table: " <> pPrint t
logMessage 'D' (render logMsg)
let arrival = TryInsert info
logMessage 'D' $ show ( TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ )
ps <- join $ atomically $ atomicInsert param state tm arrival witnessed_ip0
showTable
_ <- fork $ do
myThreadId >>= flip labelThread "DHT.insertNode.pingResults"
forM_ ps $ \(CheckPing ns)-> do
forM_ ns $ \n -> do
(b,mip) <- pingProbe n
let alive = PingResult n b
logMessage 'D' $ "PingResult "++show (nodeId n,b)
_ <- join $ atomically $ atomicInsert param state tm alive mip
showTable
return ()
return ()
|