summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Query.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/DHT/Query.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs97
1 files changed, 60 insertions, 37 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 99d8cdaf..2ddd51ca 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -41,7 +41,6 @@ module Network.BitTorrent.DHT.Query
41 , publish 41 , publish
42 42
43 -- ** Routing table 43 -- ** Routing table
44 , restoreTable
45 , insertNode 44 , insertNode
46 , refreshNodes 45 , refreshNodes
47 46
@@ -55,6 +54,7 @@ import Control.Concurrent.Lifted hiding (yield)
55import Control.Exception.Lifted hiding (Handler) 54import Control.Exception.Lifted hiding (Handler)
56import Control.Monad.Reader 55import Control.Monad.Reader
57import Control.Monad.Logger 56import Control.Monad.Logger
57import Data.Maybe
58import Data.Conduit 58import Data.Conduit
59import Data.Conduit.List as C hiding (mapMaybe, mapM_) 59import Data.Conduit.List as C hiding (mapMaybe, mapM_)
60import Data.Either 60import Data.Either
@@ -68,6 +68,7 @@ import Data.Time
68import Data.Time.Clock.POSIX 68import Data.Time.Clock.POSIX
69 69
70import Network.KRPC hiding (Options, def) 70import Network.KRPC hiding (Options, def)
71import Network.KRPC.Message (ReflectedIP(..))
71import Data.Torrent 72import Data.Torrent
72import Network.BitTorrent.Address 73import Network.BitTorrent.Address
73import Network.BitTorrent.DHT.Message 74import Network.BitTorrent.DHT.Message
@@ -89,8 +90,10 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId read_only q) -> do
89 -- Do not route read-only nodes. (bep 43) 90 -- Do not route read-only nodes. (bep 43)
90 if read_only 91 if read_only
91 then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) 92 then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni)
92 else insertNode ni >> return () -- TODO need to block. why? 93 else insertNode ni Nothing >> return () -- TODO need to block. why?
93 Response <$> asks thisNodeId <*> action naddr q 94 Response
95 <$> myNodeIdAccordingTo naddr
96 <*> action naddr q
94 97
95-- | Default 'Ping' handler. 98-- | Default 'Ping' handler.
96pingH :: Address ip => NodeHandler ip 99pingH :: Address ip => NodeHandler ip
@@ -134,10 +137,10 @@ type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip])
134 137
135-- | The most basic query. May be used to check if the given node is 138-- | The most basic query. May be used to check if the given node is
136-- alive or get its 'NodeId'. 139-- alive or get its 'NodeId'.
137pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) 140pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip, Maybe ReflectedIP)
138pingQ addr = do 141pingQ addr = do
139 (nid, Ping) <- queryNode addr Ping 142 (nid, Ping, mip) <- queryNode' addr Ping
140 return (NodeInfo nid addr) 143 return (NodeInfo nid addr, mip)
141 144
142-- TODO [robustness] match range of returned node ids with the 145-- TODO [robustness] match range of returned node ids with the
143-- expected range and either filter bad nodes or discard response at 146-- expected range and either filter bad nodes or discard response at
@@ -197,9 +200,6 @@ republish = fork $ do
197 i <- asks (optReannounce . options) 200 i <- asks (optReannounce . options)
198 error "DHT.republish: not implemented" 201 error "DHT.republish: not implemented"
199 202
200routing :: Address ip => Routing ip a -> DHT ip (Maybe a)
201routing = runRouting probeNode refreshNodes getTimestamp
202
203getTimestamp :: DHT ip Timestamp 203getTimestamp :: DHT ip Timestamp
204getTimestamp = do 204getTimestamp = do
205 utcTime <- liftIO $ getCurrentTime 205 utcTime <- liftIO $ getCurrentTime
@@ -207,12 +207,12 @@ getTimestamp = do
207 return $ utcTimeToPOSIXSeconds utcTime 207 return $ utcTimeToPOSIXSeconds utcTime
208 208
209 209
210probeNode :: Address ip => NodeAddr ip -> DHT ip Bool 210probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP)
211probeNode addr = do 211probeNode addr = do
212 $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) 212 $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr)))
213 result <- try $ Ping <@> addr 213 result <- try $ pingQ addr
214 let _ = result :: Either SomeException Ping 214 let _ = fmap (const ()) result :: Either SomeException ()
215 return $ either (const False) (const True) result 215 return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result
216 216
217 217
218-- FIXME do not use getClosest sinse we should /refresh/ them 218-- FIXME do not use getClosest sinse we should /refresh/ them
@@ -231,58 +231,81 @@ refreshNodes nid = do
231 queryParallel $ flip L.map (L.concat nss) $ \n -> do 231 queryParallel $ flip L.map (L.concat nss) $ \n -> do
232 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) 232 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n))
233 pingQ (nodeAddr n) 233 pingQ (nodeAddr n)
234 insertNode n 234 -- pingQ takes care of inserting the node.
235 return () 235 return ()
236 return () -- $ L.concat nss 236 return () -- $ L.concat nss
237 237
238-- | This operation do not block but acquire exclusive access to 238-- | This operation do not block but acquire exclusive access to
239-- routing table. 239-- routing table.
240insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId 240insertNode :: Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId
241insertNode info = fork $ do 241insertNode info witnessed_ip = fork $ do
242 var <- asks routingTable 242 var <- asks routingInfo
243 tm <- getTimestamp 243 tm <- getTimestamp
244 let showTable = do 244 let showTable = do
245 t <- liftIO $ atomically $ readTVar var 245 t <- getTable
246 let logMsg = "Routing table: " <> pPrint t 246 let logMsg = "Routing table: " <> pPrint t
247 $(logDebugS) "insertNode" (T.pack (render logMsg)) 247 $(logDebugS) "insertNode" (T.pack (render logMsg))
248 t <- liftIO $ atomically $ readTVar var 248 t <- liftIO $ atomically $ readTVar var
249 let arrival = TryInsert info 249 let arrival = TryInsert info
250 arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) 250 arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4)
251 $(logDebugS) "insertNode" $ T.pack (show arrival4) 251 $(logDebugS) "insertNode" $ T.pack (show arrival4)
252 ps <- liftIO $ atomically $ do 252 maxbuckets <- asks (optBucketCount . options)
253 t <- readTVar var 253 fallbackid <- asks tentativeNodeId
254 (ps,t') <- R.insert tm arrival t 254 let atomicInsert arrival witnessed_ip = do
255 writeTVar var t' 255 minfo <- readTVar var
256 return ps 256 let change ip = fromMaybe fallbackid
257 $ listToMaybe
258 $ rank id (nodeId $ foreignNode arrival)
259 $ bep42s ip fallbackid
260 case minfo of
261 Just info -> do
262 (ps,t') <- R.insert tm arrival $ myBuckets info
263 -- TODO: Check witnessed_ip against myAddress.
264 -- If 3 nodes witness a different address, change the table.
265 -- Require these witnesses satisfy bep-42 and that their
266 -- first 3 bits are unique.
267 writeTVar var $ Just $ info { myBuckets = t' }
268 return ps
269 -- Ignore non-witnessing nodes until somebody tells
270 -- us our ip address.
271 Nothing -> fromMaybe (return []) $ do
272 ReflectedIP ip0 <- witnessed_ip
273 ip <- fromSockAddr ip0
274 let nil = nullTable (change ip) maxbuckets
275 return $ do
276 (ps,t') <- R.insert tm arrival nil
277 writeTVar var $ Just $ R.Info t' (change ip) ip
278 return ps
279 ps <- liftIO $ atomically $ atomicInsert arrival witnessed_ip
257 showTable 280 showTable
258 fork $ forM_ ps $ \(CheckPing ns)-> do 281 fork $ forM_ ps $ \(CheckPing ns)-> do
259 forM_ ns $ \n -> do 282 forM_ ns $ \n -> do
260 alive <- PingResult n <$> probeNode (nodeAddr n) 283 (b,mip) <- probeNode (nodeAddr n)
261 let PingResult _ b = alive 284 let alive = PingResult n b
262 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) 285 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b))
263 tm <- getTimestamp 286 tm <- getTimestamp
264 liftIO $ atomically $ do 287 liftIO $ atomically $ atomicInsert alive mip
265 t <- readTVar var
266 (_,t') <- R.insert tm alive t
267 writeTVar var t'
268 showTable 288 showTable
269 return () 289 return ()
270 290
271-- | Throws exception if node is not responding. 291-- | Throws exception if node is not responding.
272queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) 292queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b)
273 => NodeAddr ip -> a -> DHT ip (NodeId, b) 293 => NodeAddr ip -> a -> DHT ip (NodeId, b)
274queryNode addr q = do 294queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q
275 nid <- asks thisNodeId 295
296queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b)
297 => NodeAddr ip -> a -> DHT ip (NodeId, b, Maybe ReflectedIP)
298queryNode' addr q = do
299 nid <- myNodeIdAccordingTo addr
276 let read_only = False -- TODO: check for NAT issues. (BEP 43) 300 let read_only = False -- TODO: check for NAT issues. (BEP 43)
277 Response remoteId r <- query (toSockAddr addr) (Query nid read_only q) 301 (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q)
278 insertNode (NodeInfo remoteId addr) 302 $(logInfoS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip)
279 return (remoteId, r) 303 <> " by " <> T.pack (show (toSockAddr addr))
304 insertNode (NodeInfo remoteId addr) witnessed_ip
305 return (remoteId, r, witnessed_ip)
280 306
281-- | Infix version of 'queryNode' function. 307-- | Infix version of 'queryNode' function.
282(<@>) :: Address ip => KRPC (Query a) (Response b) 308(<@>) :: Address ip => KRPC (Query a) (Response b)
283 => a -> NodeAddr ip -> DHT ip b 309 => a -> NodeAddr ip -> DHT ip b
284q <@> addr = snd <$> queryNode addr q 310q <@> addr = snd <$> queryNode addr q
285{-# INLINE (<@>) #-} 311{-# INLINE (<@>) #-}
286
287restoreTable :: Address ip => Table ip -> DHT ip ()
288restoreTable tbl = mapM_ (insertNode . fst) $ L.concat $ R.toList tbl