diff options
author | joe <joe@jerkface.net> | 2017-01-07 20:50:33 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-01-08 16:35:59 -0500 |
commit | a18fe8a84025b3f0beb357eba73f37d77244a44a (patch) | |
tree | 6cad0091df7d6aaceaa4f88be0a29fd320a8abba /src | |
parent | bcd860aa8816cf52a01c313aecfdcde21fcd2c16 (diff) |
Use BEP 42 compatible node ids.
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/Address.hs | 20 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 61 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 97 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Routing.hs | 18 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 43 |
5 files changed, 154 insertions, 85 deletions
diff --git a/src/Network/BitTorrent/Address.hs b/src/Network/BitTorrent/Address.hs index 7ef837db..fea7139d 100644 --- a/src/Network/BitTorrent/Address.hs +++ b/src/Network/BitTorrent/Address.hs | |||
@@ -65,6 +65,7 @@ module Network.BitTorrent.Address | |||
65 | , bucketRange | 65 | , bucketRange |
66 | , genBucketSample | 66 | , genBucketSample |
67 | , bep42 | 67 | , bep42 |
68 | , bep42s | ||
68 | 69 | ||
69 | -- ** Info | 70 | -- ** Info |
70 | , NodeAddr (..) | 71 | , NodeAddr (..) |
@@ -102,7 +103,7 @@ import Data.Foldable | |||
102 | import Data.IP | 103 | import Data.IP |
103 | import Data.List as L | 104 | import Data.List as L |
104 | import Data.List.Split as L | 105 | import Data.List.Split as L |
105 | import Data.Maybe (fromMaybe, catMaybes) | 106 | import Data.Maybe (fromMaybe, catMaybes, mapMaybe) |
106 | import Data.Monoid | 107 | import Data.Monoid |
107 | import Data.Hashable | 108 | import Data.Hashable |
108 | import Data.Ord | 109 | import Data.Ord |
@@ -813,8 +814,8 @@ instance Pretty ip => Pretty [NodeInfo ip] where | |||
813 | pPrint = PP.vcat . PP.punctuate "," . L.map pPrint | 814 | pPrint = PP.vcat . PP.punctuate "," . L.map pPrint |
814 | 815 | ||
815 | -- | Order by closeness: nearest nodes first. | 816 | -- | Order by closeness: nearest nodes first. |
816 | rank :: Eq ip => NodeId -> [NodeInfo ip] -> [NodeInfo ip] | 817 | rank :: (x -> NodeId) -> NodeId -> [x] -> [x] |
817 | rank nid = L.sortBy (comparing (distance nid . nodeId)) | 818 | rank f nid = L.sortBy (comparing (distance nid . f)) |
818 | 819 | ||
819 | {----------------------------------------------------------------------- | 820 | {----------------------------------------------------------------------- |
820 | -- Fingerprint | 821 | -- Fingerprint |
@@ -1224,6 +1225,15 @@ fingerprint pid = either (const def) id $ runGet getCI (getPeerId pid) | |||
1224 | return $ Version (catMaybes $ L.map decodeShadowVerNr str) [] | 1225 | return $ Version (catMaybes $ L.map decodeShadowVerNr str) [] |
1225 | 1226 | ||
1226 | 1227 | ||
1228 | -- | Yields all 8 DHT neighborhoods available to you given a particular ip | ||
1229 | -- address. | ||
1230 | bep42s :: Address a => a -> NodeId -> [NodeId] | ||
1231 | bep42s addr (NodeId r) = mapMaybe (bep42 addr) rs | ||
1232 | where | ||
1233 | rs = L.map (NodeId . change3bits r) [0..7] | ||
1234 | |||
1235 | change3bits bs n = BS.snoc (BS.init bs) (BS.last bs .&. 0xF8 .|. n) | ||
1236 | |||
1227 | -- | Modifies a purely random 'NodeId' to one that is related to a given | 1237 | -- | Modifies a purely random 'NodeId' to one that is related to a given |
1228 | -- routable address in accordance with BEP 42. | 1238 | -- routable address in accordance with BEP 42. |
1229 | bep42 :: Address a => a -> NodeId -> Maybe NodeId | 1239 | bep42 :: Address a => a -> NodeId -> Maybe NodeId |
@@ -1236,11 +1246,11 @@ bep42 addr (NodeId r) | |||
1236 | where | 1246 | where |
1237 | ip4mask = "\x03\x0f\x3f\xff" :: ByteString | 1247 | ip4mask = "\x03\x0f\x3f\xff" :: ByteString |
1238 | ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString | 1248 | ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString |
1239 | rbyte = BS.last r | 1249 | nbhood_select = BS.last r .&. 7 |
1240 | retr n = pure $ BS.drop (BS.length r - n) r | 1250 | retr n = pure $ BS.drop (BS.length r - n) r |
1241 | crc = S.encode . crc32c . BS.pack | 1251 | crc = S.encode . crc32c . BS.pack |
1242 | masked ip = case BS.zipWith (.&.) msk ip of | 1252 | masked ip = case BS.zipWith (.&.) msk ip of |
1243 | (b:bs) -> (b .|. shiftL (rbyte .&. 7) 5) : bs | 1253 | (b:bs) -> (b .|. shiftL nbhood_select 5) : bs |
1244 | bs -> bs | 1254 | bs -> bs |
1245 | where msk | BS.length ip == 4 = ip4mask | 1255 | where msk | BS.length ip == 4 = ip4mask |
1246 | | otherwise = ip6mask | 1256 | | otherwise = ip6mask |
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 1c62a0e0..3867d182 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs | |||
@@ -33,7 +33,6 @@ module Network.BitTorrent.DHT | |||
33 | 33 | ||
34 | -- * Initialization | 34 | -- * Initialization |
35 | , snapshot | 35 | , snapshot |
36 | , restore | ||
37 | 36 | ||
38 | -- * Operations | 37 | -- * Operations |
39 | , Network.BitTorrent.DHT.lookup | 38 | , Network.BitTorrent.DHT.lookup |
@@ -162,24 +161,39 @@ resolveHostName NodeAddr {..} = do | |||
162 | -- | 161 | -- |
163 | -- This operation do block, use | 162 | -- This operation do block, use |
164 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | 163 | -- 'Control.Concurrent.Async.Lifted.async' if needed. |
165 | bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () | 164 | bootstrap :: Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT ip () |
166 | bootstrap startNodes = do | 165 | bootstrap mbs startNodes = do |
166 | restored <- | ||
167 | case decode <$> mbs of | ||
168 | Just (Right tbl) -> return (T.toList tbl) | ||
169 | Just (Left e) -> do $(logWarnS) "restore" (Text.pack e) | ||
170 | return [] | ||
171 | Nothing -> return [] | ||
172 | |||
167 | $(logInfoS) "bootstrap" "Start node bootstrapping" | 173 | $(logInfoS) "bootstrap" "Start node bootstrapping" |
168 | nid <- asks thisNodeId | 174 | let searchAll aliveNodes = do |
169 | let searchAll aliveNodes | 175 | nid <- myNodeIdAccordingTo (error "FIXME") |
170 | = C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume | 176 | C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume |
177 | input_nodes <- (restored ++) . T.toList <$> getTable | ||
171 | -- Step 1: Use iterative searches to flesh out the table.. | 178 | -- Step 1: Use iterative searches to flesh out the table.. |
172 | do knowns <- map (map $ nodeAddr . fst) . T.toList <$> getTable | 179 | do let knowns = map (map $ nodeAddr . fst) input_nodes |
173 | alive_knowns <- queryParallel (pingQ <$> concat knowns) | 180 | -- Below, we reverse the nodes since the table serialization puts the |
174 | nss <- searchAll alive_knowns | 181 | -- nearest nodes last and we want to choose a similar node id to bootstrap |
175 | -- We only use the supplied bootstrap nodes when we don't know of any | 182 | -- faster. |
176 | -- others to try. | 183 | (alive_knowns,_) <- unzip <$> queryParallel (pingQ <$> reverse (concat knowns)) |
177 | when (null nss) $ do | 184 | b <- isBootstrapped |
178 | -- TODO filter duplicated in startNodes list | 185 | -- If our cached nodes are alive and our IP address did not change, it's possible |
179 | -- TODO retransmissions for startNodes | 186 | -- we are already bootsrapped, so no need to do any searches. |
180 | aliveNodes <- queryParallel (pingQ <$> startNodes) | 187 | when (not b) $ do |
181 | _ <- searchAll aliveNodes | 188 | nss <- searchAll $ take 2 alive_knowns |
182 | return () | 189 | -- We only use the supplied bootstrap nodes when we don't know of any |
190 | -- others to try. | ||
191 | when (null nss) $ do | ||
192 | -- TODO filter duplicated in startNodes list | ||
193 | -- TODO retransmissions for startNodes | ||
194 | (aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes) | ||
195 | _ <- searchAll $ take 2 aliveNodes | ||
196 | return () | ||
183 | -- Step 2: Repeatedly refresh incomplete buckets until the table is full. | 197 | -- Step 2: Repeatedly refresh incomplete buckets until the table is full. |
184 | maxbuckets <- asks $ optBucketCount . options | 198 | maxbuckets <- asks $ optBucketCount . options |
185 | flip fix 0 $ \loop icnt -> do | 199 | flip fix 0 $ \loop icnt -> do |
@@ -195,6 +209,7 @@ bootstrap startNodes = do | |||
195 | p:ps -> p:unfull ps | 209 | p:ps -> p:unfull ps |
196 | [] -> [] | 210 | [] -> [] |
197 | forM_ us $ \(is_last,(index,_)) -> do | 211 | forM_ us $ \(is_last,(index,_)) -> do |
212 | nid <- myNodeIdAccordingTo (error "FIXME") | ||
198 | sample <- liftIO $ genBucketSample nid (bucketRange index is_last) | 213 | sample <- liftIO $ genBucketSample nid (bucketRange index is_last) |
199 | $(logDebugS) "bootstrapping" | 214 | $(logDebugS) "bootstrapping" |
200 | $ "BOOTSTRAP sample" | 215 | $ "BOOTSTRAP sample" |
@@ -213,23 +228,13 @@ bootstrap startNodes = do | |||
213 | -- | 228 | -- |
214 | -- This operation do not block. | 229 | -- This operation do not block. |
215 | -- | 230 | -- |
216 | isBootstrapped :: DHT ip Bool | 231 | isBootstrapped :: Eq ip => DHT ip Bool |
217 | isBootstrapped = T.full <$> getTable | 232 | isBootstrapped = T.full <$> getTable |
218 | 233 | ||
219 | {----------------------------------------------------------------------- | 234 | {----------------------------------------------------------------------- |
220 | -- Initialization | 235 | -- Initialization |
221 | -----------------------------------------------------------------------} | 236 | -----------------------------------------------------------------------} |
222 | 237 | ||
223 | -- | Load previous session. (corrupted - exception/ignore ?) | ||
224 | -- | ||
225 | -- This is blocking operation, use | ||
226 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | ||
227 | restore :: Address ip => BS.ByteString -> DHT ip () | ||
228 | restore bs = do | ||
229 | case decode bs of | ||
230 | Right tbl -> restoreTable tbl | ||
231 | Left e -> $(logWarnS) "restore" (Text.pack e) | ||
232 | |||
233 | -- | Serialize current DHT session to byte string. | 238 | -- | Serialize current DHT session to byte string. |
234 | -- | 239 | -- |
235 | -- This is blocking operation, use | 240 | -- This is blocking operation, use |
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 99d8cdaf..2ddd51ca 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -41,7 +41,6 @@ module Network.BitTorrent.DHT.Query | |||
41 | , publish | 41 | , publish |
42 | 42 | ||
43 | -- ** Routing table | 43 | -- ** Routing table |
44 | , restoreTable | ||
45 | , insertNode | 44 | , insertNode |
46 | , refreshNodes | 45 | , refreshNodes |
47 | 46 | ||
@@ -55,6 +54,7 @@ import Control.Concurrent.Lifted hiding (yield) | |||
55 | import Control.Exception.Lifted hiding (Handler) | 54 | import Control.Exception.Lifted hiding (Handler) |
56 | import Control.Monad.Reader | 55 | import Control.Monad.Reader |
57 | import Control.Monad.Logger | 56 | import Control.Monad.Logger |
57 | import Data.Maybe | ||
58 | import Data.Conduit | 58 | import Data.Conduit |
59 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) | 59 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) |
60 | import Data.Either | 60 | import Data.Either |
@@ -68,6 +68,7 @@ import Data.Time | |||
68 | import Data.Time.Clock.POSIX | 68 | import Data.Time.Clock.POSIX |
69 | 69 | ||
70 | import Network.KRPC hiding (Options, def) | 70 | import Network.KRPC hiding (Options, def) |
71 | import Network.KRPC.Message (ReflectedIP(..)) | ||
71 | import Data.Torrent | 72 | import Data.Torrent |
72 | import Network.BitTorrent.Address | 73 | import Network.BitTorrent.Address |
73 | import Network.BitTorrent.DHT.Message | 74 | import Network.BitTorrent.DHT.Message |
@@ -89,8 +90,10 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId read_only q) -> do | |||
89 | -- Do not route read-only nodes. (bep 43) | 90 | -- Do not route read-only nodes. (bep 43) |
90 | if read_only | 91 | if read_only |
91 | then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) | 92 | then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) |
92 | else insertNode ni >> return () -- TODO need to block. why? | 93 | else insertNode ni Nothing >> return () -- TODO need to block. why? |
93 | Response <$> asks thisNodeId <*> action naddr q | 94 | Response |
95 | <$> myNodeIdAccordingTo naddr | ||
96 | <*> action naddr q | ||
94 | 97 | ||
95 | -- | Default 'Ping' handler. | 98 | -- | Default 'Ping' handler. |
96 | pingH :: Address ip => NodeHandler ip | 99 | pingH :: Address ip => NodeHandler ip |
@@ -134,10 +137,10 @@ type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) | |||
134 | 137 | ||
135 | -- | The most basic query. May be used to check if the given node is | 138 | -- | The most basic query. May be used to check if the given node is |
136 | -- alive or get its 'NodeId'. | 139 | -- alive or get its 'NodeId'. |
137 | pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) | 140 | pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip, Maybe ReflectedIP) |
138 | pingQ addr = do | 141 | pingQ addr = do |
139 | (nid, Ping) <- queryNode addr Ping | 142 | (nid, Ping, mip) <- queryNode' addr Ping |
140 | return (NodeInfo nid addr) | 143 | return (NodeInfo nid addr, mip) |
141 | 144 | ||
142 | -- TODO [robustness] match range of returned node ids with the | 145 | -- TODO [robustness] match range of returned node ids with the |
143 | -- expected range and either filter bad nodes or discard response at | 146 | -- expected range and either filter bad nodes or discard response at |
@@ -197,9 +200,6 @@ republish = fork $ do | |||
197 | i <- asks (optReannounce . options) | 200 | i <- asks (optReannounce . options) |
198 | error "DHT.republish: not implemented" | 201 | error "DHT.republish: not implemented" |
199 | 202 | ||
200 | routing :: Address ip => Routing ip a -> DHT ip (Maybe a) | ||
201 | routing = runRouting probeNode refreshNodes getTimestamp | ||
202 | |||
203 | getTimestamp :: DHT ip Timestamp | 203 | getTimestamp :: DHT ip Timestamp |
204 | getTimestamp = do | 204 | getTimestamp = do |
205 | utcTime <- liftIO $ getCurrentTime | 205 | utcTime <- liftIO $ getCurrentTime |
@@ -207,12 +207,12 @@ getTimestamp = do | |||
207 | return $ utcTimeToPOSIXSeconds utcTime | 207 | return $ utcTimeToPOSIXSeconds utcTime |
208 | 208 | ||
209 | 209 | ||
210 | probeNode :: Address ip => NodeAddr ip -> DHT ip Bool | 210 | probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) |
211 | probeNode addr = do | 211 | probeNode addr = do |
212 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) | 212 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) |
213 | result <- try $ Ping <@> addr | 213 | result <- try $ pingQ addr |
214 | let _ = result :: Either SomeException Ping | 214 | let _ = fmap (const ()) result :: Either SomeException () |
215 | return $ either (const False) (const True) result | 215 | return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result |
216 | 216 | ||
217 | 217 | ||
218 | -- FIXME do not use getClosest sinse we should /refresh/ them | 218 | -- FIXME do not use getClosest sinse we should /refresh/ them |
@@ -231,58 +231,81 @@ refreshNodes nid = do | |||
231 | queryParallel $ flip L.map (L.concat nss) $ \n -> do | 231 | queryParallel $ flip L.map (L.concat nss) $ \n -> do |
232 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) | 232 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) |
233 | pingQ (nodeAddr n) | 233 | pingQ (nodeAddr n) |
234 | insertNode n | 234 | -- pingQ takes care of inserting the node. |
235 | return () | 235 | return () |
236 | return () -- $ L.concat nss | 236 | return () -- $ L.concat nss |
237 | 237 | ||
238 | -- | This operation do not block but acquire exclusive access to | 238 | -- | This operation do not block but acquire exclusive access to |
239 | -- routing table. | 239 | -- routing table. |
240 | insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId | 240 | insertNode :: Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId |
241 | insertNode info = fork $ do | 241 | insertNode info witnessed_ip = fork $ do |
242 | var <- asks routingTable | 242 | var <- asks routingInfo |
243 | tm <- getTimestamp | 243 | tm <- getTimestamp |
244 | let showTable = do | 244 | let showTable = do |
245 | t <- liftIO $ atomically $ readTVar var | 245 | t <- getTable |
246 | let logMsg = "Routing table: " <> pPrint t | 246 | let logMsg = "Routing table: " <> pPrint t |
247 | $(logDebugS) "insertNode" (T.pack (render logMsg)) | 247 | $(logDebugS) "insertNode" (T.pack (render logMsg)) |
248 | t <- liftIO $ atomically $ readTVar var | 248 | t <- liftIO $ atomically $ readTVar var |
249 | let arrival = TryInsert info | 249 | let arrival = TryInsert info |
250 | arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) | 250 | arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) |
251 | $(logDebugS) "insertNode" $ T.pack (show arrival4) | 251 | $(logDebugS) "insertNode" $ T.pack (show arrival4) |
252 | ps <- liftIO $ atomically $ do | 252 | maxbuckets <- asks (optBucketCount . options) |
253 | t <- readTVar var | 253 | fallbackid <- asks tentativeNodeId |
254 | (ps,t') <- R.insert tm arrival t | 254 | let atomicInsert arrival witnessed_ip = do |
255 | writeTVar var t' | 255 | minfo <- readTVar var |
256 | return ps | 256 | let change ip = fromMaybe fallbackid |
257 | $ listToMaybe | ||
258 | $ rank id (nodeId $ foreignNode arrival) | ||
259 | $ bep42s ip fallbackid | ||
260 | case minfo of | ||
261 | Just info -> do | ||
262 | (ps,t') <- R.insert tm arrival $ myBuckets info | ||
263 | -- TODO: Check witnessed_ip against myAddress. | ||
264 | -- If 3 nodes witness a different address, change the table. | ||
265 | -- Require these witnesses satisfy bep-42 and that their | ||
266 | -- first 3 bits are unique. | ||
267 | writeTVar var $ Just $ info { myBuckets = t' } | ||
268 | return ps | ||
269 | -- Ignore non-witnessing nodes until somebody tells | ||
270 | -- us our ip address. | ||
271 | Nothing -> fromMaybe (return []) $ do | ||
272 | ReflectedIP ip0 <- witnessed_ip | ||
273 | ip <- fromSockAddr ip0 | ||
274 | let nil = nullTable (change ip) maxbuckets | ||
275 | return $ do | ||
276 | (ps,t') <- R.insert tm arrival nil | ||
277 | writeTVar var $ Just $ R.Info t' (change ip) ip | ||
278 | return ps | ||
279 | ps <- liftIO $ atomically $ atomicInsert arrival witnessed_ip | ||
257 | showTable | 280 | showTable |
258 | fork $ forM_ ps $ \(CheckPing ns)-> do | 281 | fork $ forM_ ps $ \(CheckPing ns)-> do |
259 | forM_ ns $ \n -> do | 282 | forM_ ns $ \n -> do |
260 | alive <- PingResult n <$> probeNode (nodeAddr n) | 283 | (b,mip) <- probeNode (nodeAddr n) |
261 | let PingResult _ b = alive | 284 | let alive = PingResult n b |
262 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) | 285 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) |
263 | tm <- getTimestamp | 286 | tm <- getTimestamp |
264 | liftIO $ atomically $ do | 287 | liftIO $ atomically $ atomicInsert alive mip |
265 | t <- readTVar var | ||
266 | (_,t') <- R.insert tm alive t | ||
267 | writeTVar var t' | ||
268 | showTable | 288 | showTable |
269 | return () | 289 | return () |
270 | 290 | ||
271 | -- | Throws exception if node is not responding. | 291 | -- | Throws exception if node is not responding. |
272 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 292 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) |
273 | => NodeAddr ip -> a -> DHT ip (NodeId, b) | 293 | => NodeAddr ip -> a -> DHT ip (NodeId, b) |
274 | queryNode addr q = do | 294 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q |
275 | nid <- asks thisNodeId | 295 | |
296 | queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) | ||
297 | => NodeAddr ip -> a -> DHT ip (NodeId, b, Maybe ReflectedIP) | ||
298 | queryNode' addr q = do | ||
299 | nid <- myNodeIdAccordingTo addr | ||
276 | let read_only = False -- TODO: check for NAT issues. (BEP 43) | 300 | let read_only = False -- TODO: check for NAT issues. (BEP 43) |
277 | Response remoteId r <- query (toSockAddr addr) (Query nid read_only q) | 301 | (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) |
278 | insertNode (NodeInfo remoteId addr) | 302 | $(logInfoS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) |
279 | return (remoteId, r) | 303 | <> " by " <> T.pack (show (toSockAddr addr)) |
304 | insertNode (NodeInfo remoteId addr) witnessed_ip | ||
305 | return (remoteId, r, witnessed_ip) | ||
280 | 306 | ||
281 | -- | Infix version of 'queryNode' function. | 307 | -- | Infix version of 'queryNode' function. |
282 | (<@>) :: Address ip => KRPC (Query a) (Response b) | 308 | (<@>) :: Address ip => KRPC (Query a) (Response b) |
283 | => a -> NodeAddr ip -> DHT ip b | 309 | => a -> NodeAddr ip -> DHT ip b |
284 | q <@> addr = snd <$> queryNode addr q | 310 | q <@> addr = snd <$> queryNode addr q |
285 | {-# INLINE (<@>) #-} | 311 | {-# INLINE (<@>) #-} |
286 | |||
287 | restoreTable :: Address ip => Table ip -> DHT ip () | ||
288 | restoreTable tbl = mapM_ (insertNode . fst) $ L.concat $ R.toList tbl | ||
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index a4da8445..84e4d4ce 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs | |||
@@ -22,6 +22,7 @@ | |||
22 | module Network.BitTorrent.DHT.Routing | 22 | module Network.BitTorrent.DHT.Routing |
23 | ( -- * Table | 23 | ( -- * Table |
24 | Table | 24 | Table |
25 | , Info(..) | ||
25 | 26 | ||
26 | -- * Attributes | 27 | -- * Attributes |
27 | , BucketCount | 28 | , BucketCount |
@@ -343,6 +344,15 @@ type BucketCount = Int | |||
343 | defaultBucketCount :: BucketCount | 344 | defaultBucketCount :: BucketCount |
344 | defaultBucketCount = 20 | 345 | defaultBucketCount = 20 |
345 | 346 | ||
347 | data Info ip = Info | ||
348 | { myBuckets :: Table ip | ||
349 | , myNodeId :: NodeId | ||
350 | , myAddress :: ip | ||
351 | } | ||
352 | deriving (Eq, Show, Generic) | ||
353 | |||
354 | instance (Eq ip, Serialize ip) => Serialize (Info ip) | ||
355 | |||
346 | -- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ | 356 | -- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ |
347 | -- 160. The routing table is subdivided into 'Bucket's that each cover | 357 | -- 160. The routing table is subdivided into 'Bucket's that each cover |
348 | -- a portion of the space. An empty table has one bucket with an ID | 358 | -- a portion of the space. An empty table has one bucket with an ID |
@@ -497,7 +507,7 @@ instance TableKey InfoHash where | |||
497 | -- 'find_node' and 'get_peers' queries. | 507 | -- 'find_node' and 'get_peers' queries. |
498 | kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] | 508 | kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] |
499 | kclosest k (toNodeId -> nid) | 509 | kclosest k (toNodeId -> nid) |
500 | = L.take k . rank nid | 510 | = L.take k . rank nodeId nid |
501 | . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty | 511 | . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty |
502 | . fmap bktNodes | 512 | . fmap bktNodes |
503 | . lookupBucket nid | 513 | . lookupBucket nid |
@@ -536,8 +546,10 @@ modifyBucket nodeId f = go (0 :: BitIx) | |||
536 | <|> go i (splitTip nid n i bucket) | 546 | <|> go i (splitTip nid n i bucket) |
537 | 547 | ||
538 | -- | Triggering event for atomic table update | 548 | -- | Triggering event for atomic table update |
539 | data Event ip = TryInsert (NodeInfo ip) | 549 | data Event ip = TryInsert { foreignNode :: NodeInfo ip } |
540 | | PingResult (NodeInfo ip) Bool | 550 | | PingResult { foreignNode :: NodeInfo ip |
551 | , ponged :: Bool | ||
552 | } | ||
541 | deriving (Eq,Ord,Show) | 553 | deriving (Eq,Ord,Show) |
542 | 554 | ||
543 | eventId (TryInsert NodeInfo{..}) = nodeId | 555 | eventId (TryInsert NodeInfo{..}) = nodeId |
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 5a8d64ef..44a5f0e9 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -29,8 +29,10 @@ module Network.BitTorrent.DHT.Session | |||
29 | -- * Session | 29 | -- * Session |
30 | , Node | 30 | , Node |
31 | , options | 31 | , options |
32 | , thisNodeId | 32 | , tentativeNodeId |
33 | , routingTable | 33 | , myNodeIdAccordingTo |
34 | , routingInfo | ||
35 | , routableAddress | ||
34 | 36 | ||
35 | -- ** Initialization | 37 | -- ** Initialization |
36 | , LogFun | 38 | , LogFun |
@@ -239,14 +241,14 @@ data Node ip = Node | |||
239 | 241 | ||
240 | -- | Pseudo-unique self-assigned session identifier. This value is | 242 | -- | Pseudo-unique self-assigned session identifier. This value is |
241 | -- constant during DHT session and (optionally) between sessions. | 243 | -- constant during DHT session and (optionally) between sessions. |
242 | , thisNodeId :: !NodeId | 244 | , tentativeNodeId :: !NodeId |
243 | 245 | ||
244 | , resources :: !InternalState | 246 | , resources :: !InternalState |
245 | , manager :: !(Manager (DHT ip)) -- ^ RPC manager; | 247 | , manager :: !(Manager (DHT ip )) -- ^ RPC manager; |
246 | , routingTable :: !(TVar (Table ip)) -- ^ search table; | 248 | , routingInfo :: !(TVar (Maybe (R.Info ip))) -- ^ search table; |
247 | , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; | 249 | , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; |
248 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; | 250 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; |
249 | , sessionTokens :: !(TVar SessionTokens) -- ^ query session IDs. | 251 | , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. |
250 | , loggerFun :: !LogFun | 252 | , loggerFun :: !LogFun |
251 | } | 253 | } |
252 | 254 | ||
@@ -323,7 +325,7 @@ newNode hs opts naddr logger mbid = do | |||
323 | liftIO $ do | 325 | liftIO $ do |
324 | myId <- maybe genNodeId return mbid | 326 | myId <- maybe genNodeId return mbid |
325 | node <- Node opts myId s m | 327 | node <- Node opts myId s m |
326 | <$> atomically (newTVar (nullTable myId (optBucketCount opts))) | 328 | <$> atomically (newTVar Nothing) |
327 | <*> newTVarIO def | 329 | <*> newTVarIO def |
328 | <*> newTVarIO S.empty | 330 | <*> newTVarIO S.empty |
329 | <*> (newTVarIO =<< nullSessionTokens) | 331 | <*> (newTVarIO =<< nullSessionTokens) |
@@ -372,16 +374,33 @@ checkToken addr questionableToken = do | |||
372 | toks <- asks sessionTokens >>= liftIO . readTVarIO | 374 | toks <- asks sessionTokens >>= liftIO . readTVarIO |
373 | return $ T.member addr questionableToken (tokenMap toks) | 375 | return $ T.member addr questionableToken (tokenMap toks) |
374 | 376 | ||
377 | |||
375 | {----------------------------------------------------------------------- | 378 | {----------------------------------------------------------------------- |
376 | -- Routing table | 379 | -- Routing table |
377 | -----------------------------------------------------------------------} | 380 | -----------------------------------------------------------------------} |
378 | 381 | ||
382 | -- | This nodes externally routable address reported by remote peers. | ||
383 | routableAddress :: DHT ip (Maybe ip) | ||
384 | routableAddress = do | ||
385 | info <- asks routingInfo >>= liftIO . atomically . readTVar | ||
386 | return $ myAddress <$> info | ||
387 | |||
388 | -- | The current NodeId that the given remote node should know us by. | ||
389 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip NodeId | ||
390 | myNodeIdAccordingTo _ = do | ||
391 | info <- asks routingInfo >>= liftIO . atomically . readTVar | ||
392 | fallback <- asks tentativeNodeId | ||
393 | return $ maybe fallback myNodeId info | ||
394 | |||
379 | -- | Get current routing table. Normally you don't need to use this | 395 | -- | Get current routing table. Normally you don't need to use this |
380 | -- function, but it can be usefull for debugging and profiling purposes. | 396 | -- function, but it can be usefull for debugging and profiling purposes. |
381 | getTable :: DHT ip (Table ip) | 397 | getTable :: Eq ip => DHT ip (Table ip) |
382 | getTable = do | 398 | getTable = do |
383 | var <- asks routingTable | 399 | Node { tentativeNodeId = myId |
384 | liftIO (atomically $ readTVar var) | 400 | , routingInfo = var |
401 | , options = opts } <- ask | ||
402 | let nil = nullTable myId (optBucketCount opts) | ||
403 | liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) | ||
385 | 404 | ||
386 | -- | Find a set of closest nodes from routing table of this node. (in | 405 | -- | Find a set of closest nodes from routing table of this node. (in |
387 | -- no particular order) | 406 | -- no particular order) |