summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Query.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/DHT/Query.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs34
1 files changed, 13 insertions, 21 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index d1fa36e5..e067ab52 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -49,7 +49,6 @@ module Network.BitTorrent.DHT.Query
49 , (<@>) 49 , (<@>)
50 ) where 50 ) where
51 51
52import Control.Applicative
53import Control.Concurrent.Lifted hiding (yield) 52import Control.Concurrent.Lifted hiding (yield)
54import Control.Exception.Lifted hiding (Handler) 53import Control.Exception.Lifted hiding (Handler)
55import Control.Monad.Reader 54import Control.Monad.Reader
@@ -168,7 +167,7 @@ announceQ ih p NodeInfo {..} = do
168 Left ns 167 Left ns
169 | False -> undefined -- TODO check if we can announce 168 | False -> undefined -- TODO check if we can announce
170 | otherwise -> return (Left ns) 169 | otherwise -> return (Left ns)
171 Right ps -> do -- TODO *probably* add to peer cache 170 Right _ -> do -- TODO *probably* add to peer cache
172 Announced <- Announce False ih p grantedToken <@> nodeAddr 171 Announced <- Announce False ih p grantedToken <@> nodeAddr
173 return (Right [nodeAddr]) 172 return (Right [nodeAddr])
174 173
@@ -179,7 +178,7 @@ announceQ ih p NodeInfo {..} = do
179type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] 178type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
180 179
181-- TODO: use reorder and filter (Traversal option) leftovers 180-- TODO: use reorder and filter (Traversal option) leftovers
182search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o 181search :: k -> Iteration ip o -> Search ip o
183search _ action = do 182search _ action = do
184 awaitForever $ \ batch -> unless (L.null batch) $ do 183 awaitForever $ \ batch -> unless (L.null batch) $ do
185 $(logWarnS) "search" "start query" 184 $(logWarnS) "search" "start query"
@@ -196,11 +195,6 @@ publish ih p = do
196 _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r 195 _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
197 return () 196 return ()
198 197
199republish :: DHT ip ThreadId
200republish = fork $ do
201 i <- asks (optReannounce . options)
202 error "DHT.republish: not implemented"
203
204getTimestamp :: DHT ip Timestamp 198getTimestamp :: DHT ip Timestamp
205getTimestamp = do 199getTimestamp = do
206 utcTime <- liftIO $ getCurrentTime 200 utcTime <- liftIO $ getCurrentTime
@@ -229,7 +223,7 @@ refreshNodes nid = do
229 -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume 223 -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume
230 nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume 224 nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume
231 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." 225 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes."
232 queryParallel $ flip L.map (L.concat nss) $ \n -> do 226 _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do
233 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) 227 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n))
234 pingQ (nodeAddr n) 228 pingQ (nodeAddr n)
235 -- pingQ takes care of inserting the node. 229 -- pingQ takes care of inserting the node.
@@ -239,15 +233,14 @@ refreshNodes nid = do
239-- | This operation do not block but acquire exclusive access to 233-- | This operation do not block but acquire exclusive access to
240-- routing table. 234-- routing table.
241insertNode :: Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId 235insertNode :: Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId
242insertNode info witnessed_ip = fork $ do 236insertNode info witnessed_ip0 = fork $ do
243 var <- asks routingInfo 237 var <- asks routingInfo
244 tm <- getTimestamp 238 tm <- getTimestamp
245 let showTable = do 239 let showTable = do
246 t <- getTable 240 t <- getTable
247 let logMsg = "Routing table: " <> pPrint t 241 let logMsg = "Routing table: " <> pPrint t
248 $(logDebugS) "insertNode" (T.pack (render logMsg)) 242 $(logDebugS) "insertNode" (T.pack (render logMsg))
249 t <- liftIO $ atomically $ readTVar var 243 let arrival0 = TryInsert info
250 let arrival = TryInsert info
251 arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) 244 arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4)
252 $(logDebugS) "insertNode" $ T.pack (show arrival4) 245 $(logDebugS) "insertNode" $ T.pack (show arrival4)
253 maxbuckets <- asks (optBucketCount . options) 246 maxbuckets <- asks (optBucketCount . options)
@@ -259,13 +252,13 @@ insertNode info witnessed_ip = fork $ do
259 $ rank id (nodeId $ foreignNode arrival) 252 $ rank id (nodeId $ foreignNode arrival)
260 $ bep42s ip fallbackid 253 $ bep42s ip fallbackid
261 case minfo of 254 case minfo of
262 Just info -> do 255 Just inf -> do
263 (ps,t') <- R.insert tm arrival $ myBuckets info 256 (ps,t') <- R.insert tm arrival $ myBuckets inf
264 writeTVar var $ Just $ info { myBuckets = t' } 257 writeTVar var $ Just $ inf { myBuckets = t' }
265 return $ do 258 return $ do
266 case witnessed_ip of 259 case witnessed_ip of
267 Just (ReflectedIP ip0) 260 Just (ReflectedIP ip0)
268 | fromSockAddr ip0 /= Just (myAddress info) 261 | fromSockAddr ip0 /= Just (myAddress inf)
269 -> $(logInfo) ( T.pack $ L.unwords 262 -> $(logInfo) ( T.pack $ L.unwords
270 $ [ "Possible NAT?" 263 $ [ "Possible NAT?"
271 , show (toSockAddr $ nodeAddr $ foreignNode arrival) 264 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
@@ -298,15 +291,14 @@ insertNode info witnessed_ip = fork $ do
298 <> ")" 291 <> ")"
299 ] ) 292 ] )
300 return ps 293 return ps
301 ps <- join $ liftIO $ atomically $ atomicInsert arrival witnessed_ip 294 ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0
302 showTable 295 showTable
303 fork $ forM_ ps $ \(CheckPing ns)-> do 296 _ <- fork $ forM_ ps $ \(CheckPing ns)-> do
304 forM_ ns $ \n -> do 297 forM_ ns $ \n -> do
305 (b,mip) <- probeNode (nodeAddr n) 298 (b,mip) <- probeNode (nodeAddr n)
306 let alive = PingResult n b 299 let alive = PingResult n b
307 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) 300 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b))
308 tm <- getTimestamp 301 _ <- join $ liftIO $ atomically $ atomicInsert alive mip
309 join $ liftIO $ atomically $ atomicInsert alive mip
310 showTable 302 showTable
311 return () 303 return ()
312 304
@@ -323,7 +315,7 @@ queryNode' addr q = do
323 (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) 315 (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q)
324 -- $(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) 316 -- $(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip)
325 -- <> " by " <> T.pack (show (toSockAddr addr)) 317 -- <> " by " <> T.pack (show (toSockAddr addr))
326 insertNode (NodeInfo remoteId addr) witnessed_ip 318 _ <- insertNode (NodeInfo remoteId addr) witnessed_ip
327 return (remoteId, r, witnessed_ip) 319 return (remoteId, r, witnessed_ip)
328 320
329-- | Infix version of 'queryNode' function. 321-- | Infix version of 'queryNode' function.