diff options
Diffstat (limited to 'src/Network/BitTorrent/DHT')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 391 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Search.hs | 52 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 114 |
3 files changed, 398 insertions, 159 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 254b347c..e5d9bd5f 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -41,11 +41,13 @@ module Network.BitTorrent.DHT.Query | |||
41 | -- concatenate its responses, optionally yielding result and | 41 | -- concatenate its responses, optionally yielding result and |
42 | -- continue to the next iteration. | 42 | -- continue to the next iteration. |
43 | , Search | 43 | , Search |
44 | , search | 44 | -- , search |
45 | , publish | 45 | , publish |
46 | , ioFindNode | 46 | , ioFindNode |
47 | , ioFindNodes | ||
47 | , ioGetPeers | 48 | , ioGetPeers |
48 | , isearch | 49 | , isearch |
50 | , bgsearch | ||
49 | 51 | ||
50 | -- ** Routing table | 52 | -- ** Routing table |
51 | , insertNode | 53 | , insertNode |
@@ -57,6 +59,8 @@ module Network.BitTorrent.DHT.Query | |||
57 | , (<@>) | 59 | , (<@>) |
58 | ) where | 60 | ) where |
59 | 61 | ||
62 | import Data.Bits | ||
63 | import Data.Default | ||
60 | #ifdef THREAD_DEBUG | 64 | #ifdef THREAD_DEBUG |
61 | import Control.Concurrent.Lifted.Instrument hiding (yield) | 65 | import Control.Concurrent.Lifted.Instrument hiding (yield) |
62 | #else | 66 | #else |
@@ -102,30 +106,43 @@ import Network.DatagramServer.Tox | |||
102 | #endif | 106 | #endif |
103 | import Network.Address hiding (NodeId) | 107 | import Network.Address hiding (NodeId) |
104 | import Network.DatagramServer.Types as RPC hiding (Query,Response) | 108 | import Network.DatagramServer.Types as RPC hiding (Query,Response) |
109 | import Network.DHT.Types | ||
105 | import Control.Monad.Trans.Control | 110 | import Control.Monad.Trans.Control |
111 | import Data.Typeable | ||
112 | import Data.Serialize | ||
113 | import System.IO.Unsafe (unsafeInterleaveIO) | ||
114 | import Data.String | ||
106 | 115 | ||
107 | {----------------------------------------------------------------------- | 116 | {----------------------------------------------------------------------- |
108 | -- Handlers | 117 | -- Handlers |
109 | -----------------------------------------------------------------------} | 118 | -----------------------------------------------------------------------} |
110 | 119 | ||
120 | {- | ||
111 | nodeHandler :: ( Address ip | 121 | nodeHandler :: ( Address ip |
112 | , KRPC (Query a) (Response b) | 122 | , KRPC (Query KMessageOf a) (Response KMessageOf b) |
113 | ) | 123 | ) |
114 | => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler | 124 | => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler |
125 | -} | ||
126 | nodeHandler :: | ||
127 | (Address addr, WireFormat raw msg, Pretty (NodeInfo dht addr u), | ||
128 | Default u, | ||
129 | IsString t, Functor msg, | ||
130 | SerializableTo raw (Response dht r), | ||
131 | SerializableTo raw (Query dht q)) => | ||
132 | (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ()) | ||
133 | -> (NodeAddr addr -> IO (NodeId dht)) | ||
134 | -> (Char -> t -> Text -> IO ()) | ||
135 | -> QueryMethod msg | ||
136 | -> (NodeAddr addr -> q -> IO r) | ||
137 | -> Handler IO msg raw | ||
115 | nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do | 138 | nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do |
116 | #ifdef VERSION_bencoding | ||
117 | let remoteId = queringNodeId qry | 139 | let remoteId = queringNodeId qry |
118 | read_only = queryIsReadOnly qry | 140 | read_only = queryIsReadOnly qry |
119 | q = queryParams qry | 141 | q = queryParams qry |
120 | #else | ||
121 | let remoteId = msgClient qry | ||
122 | read_only = False | ||
123 | q = msgPayload qry | ||
124 | #endif | ||
125 | case fromSockAddr sockAddr of | 142 | case fromSockAddr sockAddr of |
126 | Nothing -> throwIO BadAddress | 143 | Nothing -> throwIO BadAddress |
127 | Just naddr -> do | 144 | Just naddr -> do |
128 | let ni = NodeInfo remoteId naddr () | 145 | let ni = NodeInfo remoteId naddr def |
129 | -- Do not route read-only nodes. (bep 43) | 146 | -- Do not route read-only nodes. (bep 43) |
130 | if read_only | 147 | if read_only |
131 | then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) | 148 | then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) |
@@ -135,13 +152,13 @@ nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ | |||
135 | <*> action naddr q | 152 | <*> action naddr q |
136 | 153 | ||
137 | -- | Default 'Ping' handler. | 154 | -- | Default 'Ping' handler. |
138 | pingH :: NodeAddr ip -> Ping -> IO Ping | 155 | pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht) |
139 | pingH _ Ping = return Ping | 156 | pingH dht _ _ = return (DHT.pongMessage dht) |
140 | -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } | 157 | -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } |
141 | 158 | ||
142 | -- | Default 'FindNode' handler. | 159 | -- | Default 'FindNode' handler. |
143 | findNodeH :: (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) -> NodeAddr ip -> FindNode ip -> IO (NodeFound ip) | 160 | findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip) |
144 | findNodeH getclosest _ (FindNode nid) = NodeFound <$> getclosest nid | 161 | findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg) |
145 | 162 | ||
146 | -- | Default 'GetPeers' handler. | 163 | -- | Default 'GetPeers' handler. |
147 | getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) | 164 | getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) |
@@ -162,51 +179,100 @@ announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do | |||
162 | insertPeer peers topic announcedName peerAddr | 179 | insertPeer peers topic announcedName peerAddr |
163 | return Announced | 180 | return Announced |
164 | 181 | ||
182 | -- | Includes all Kademlia-related handlers. | ||
183 | kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip | ||
184 | , Ord (TransactionID dht) | ||
185 | , Ord (NodeId dht) | ||
186 | , Show u | ||
187 | , SerializableTo raw (Response dht (Ping dht)) | ||
188 | , SerializableTo raw (Query dht (Ping dht)) | ||
189 | , Show (QueryMethod dht) | ||
190 | , Show (NodeId dht) | ||
191 | , FiniteBits (NodeId dht) | ||
192 | , Default u | ||
193 | , Serialize (TransactionID dht) | ||
194 | , WireFormat raw dht | ||
195 | , Kademlia dht | ||
196 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
197 | , Functor dht | ||
198 | , Pretty (NodeInfo dht ip u) | ||
199 | , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
200 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
201 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
202 | ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] | ||
203 | -- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] | ||
204 | kademliaHandlers logger = do | ||
205 | groknode <- insertNode1 | ||
206 | mynid <- myNodeIdAccordingTo1 | ||
207 | let handler :: ( KRPC (Query dht a) (Response dht b) | ||
208 | , SerializableTo raw (Response dht b) | ||
209 | , SerializableTo raw (Query dht a) | ||
210 | ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw | ||
211 | handler = nodeHandler groknode mynid (logt logger) | ||
212 | dht = Proxy :: Proxy dht | ||
213 | getclosest <- getClosest1 | ||
214 | return [ handler (namePing dht) $ pingH dht | ||
215 | , handler (nameFindNodes dht) $ findNodeH getclosest | ||
216 | ] | ||
217 | |||
218 | |||
165 | -- | Includes all default query handlers. | 219 | -- | Includes all default query handlers. |
166 | defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT ip [NodeHandler] | 220 | defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] |
167 | defaultHandlers logger = do | 221 | defaultHandlers logger = do |
168 | groknode <- insertNode1 | 222 | groknode <- insertNode1 |
169 | toks <- asks sessionTokens | ||
170 | getclosest <- getClosest1 | ||
171 | mynid <- myNodeIdAccordingTo1 | 223 | mynid <- myNodeIdAccordingTo1 |
224 | let handler :: KRPC (Query KMessageOf a) (Response KMessageOf b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler | ||
225 | handler = nodeHandler groknode mynid (logt logger) | ||
226 | toks <- asks sessionTokens | ||
172 | peers <- asks contactInfo | 227 | peers <- asks contactInfo |
173 | getpeers <- getPeerList1 | 228 | getpeers <- getPeerList1 |
174 | let handler :: KRPC (Query a) (Response b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler | 229 | hs <- kademliaHandlers logger |
175 | handler = nodeHandler groknode mynid (logt logger) | 230 | return $ hs ++ [ handler "get_peers" $ getPeersH getpeers toks |
176 | return [ handler "ping" $ pingH | 231 | , handler "announce_peer" $ announceH peers toks ] |
177 | , handler "find-nodes" $ findNodeH getclosest | ||
178 | , handler "get_peers" $ getPeersH getpeers toks | ||
179 | , handler "announce_peer" $ announceH peers toks ] | ||
180 | 232 | ||
181 | {----------------------------------------------------------------------- | 233 | {----------------------------------------------------------------------- |
182 | -- Basic queries | 234 | -- Basic queries |
183 | -----------------------------------------------------------------------} | 235 | -----------------------------------------------------------------------} |
184 | 236 | ||
185 | type Iteration ip o = NodeInfo KMessageOf ip () -> DHT ip (Either [NodeInfo KMessageOf ip ()] [o ip]) | 237 | type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip]) |
186 | 238 | ||
187 | -- | The most basic query. May be used to check if the given node is | 239 | -- | The most basic query. May be used to check if the given node is |
188 | -- alive or get its 'NodeId'. | 240 | -- alive or get its 'NodeId'. |
189 | pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo KMessageOf ip (), Maybe ReflectedIP) | 241 | pingQ :: forall raw dht u ip. |
242 | ( DHT.Kademlia dht | ||
243 | , Address ip | ||
244 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
245 | , Default u | ||
246 | , Show u | ||
247 | , Ord (TransactionID dht) | ||
248 | , Serialize (TransactionID dht) | ||
249 | , WireFormat raw dht | ||
250 | , SerializableTo raw (Response dht (Ping dht)) | ||
251 | , SerializableTo raw (Query dht (Ping dht)) | ||
252 | , Ord (NodeId dht) | ||
253 | , FiniteBits (NodeId dht) | ||
254 | , Show (NodeId dht) | ||
255 | , Show (QueryMethod dht) | ||
256 | ) => NodeAddr ip -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) | ||
190 | pingQ addr = do | 257 | pingQ addr = do |
191 | #ifdef VERSION_bencoding | 258 | let ping = DHT.pingMessage (Proxy :: Proxy dht) |
192 | (nid, Ping, mip) <- queryNode' addr Ping | 259 | (nid, pong, mip) <- queryNode' addr ping |
193 | #else | 260 | let _ = pong `asTypeOf` ping |
194 | (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} | 261 | -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} |
195 | #endif | 262 | return (NodeInfo nid addr def, mip) |
196 | return (NodeInfo nid addr (), mip) | ||
197 | 263 | ||
198 | -- TODO [robustness] match range of returned node ids with the | 264 | -- TODO [robustness] match range of returned node ids with the |
199 | -- expected range and either filter bad nodes or discard response at | 265 | -- expected range and either filter bad nodes or discard response at |
200 | -- all throwing an exception | 266 | -- all throwing an exception |
201 | -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo | 267 | -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo |
202 | findNodeQ key NodeInfo {..} = do | 268 | findNodeQ proxy key NodeInfo {..} = do |
203 | NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr | 269 | closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> nodeAddr |
204 | $(logInfoS) "findNodeQ" $ "NodeFound\n" | 270 | $(logInfoS) "findNodeQ" $ "NodeFound\n" |
205 | <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) | 271 | <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) |
206 | return $ Right closest | 272 | return $ Right closest |
207 | 273 | ||
208 | #ifdef VERSION_bencoding | 274 | #ifdef VERSION_bencoding |
209 | getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr | 275 | getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr |
210 | getPeersQ topic NodeInfo {..} = do | 276 | getPeersQ topic NodeInfo {..} = do |
211 | GotPeers {..} <- GetPeers topic <@> nodeAddr | 277 | GotPeers {..} <- GetPeers topic <@> nodeAddr |
212 | let dist = distance (toNodeId topic) nodeId | 278 | let dist = distance (toNodeId topic) nodeId |
@@ -215,7 +281,7 @@ getPeersQ topic NodeInfo {..} = do | |||
215 | <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } | 281 | <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } |
216 | return peers | 282 | return peers |
217 | 283 | ||
218 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr | 284 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr |
219 | announceQ ih p NodeInfo {..} = do | 285 | announceQ ih p NodeInfo {..} = do |
220 | GotPeers {..} <- GetPeers ih <@> nodeAddr | 286 | GotPeers {..} <- GetPeers ih <@> nodeAddr |
221 | case peers of | 287 | case peers of |
@@ -232,7 +298,7 @@ announceQ ih p NodeInfo {..} = do | |||
232 | -----------------------------------------------------------------------} | 298 | -----------------------------------------------------------------------} |
233 | 299 | ||
234 | 300 | ||
235 | ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) | 301 | ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) |
236 | ioGetPeers ih = do | 302 | ioGetPeers ih = do |
237 | session <- ask | 303 | session <- ask |
238 | return $ \ni -> runDHT session $ do | 304 | return $ \ni -> runDHT session $ do |
@@ -241,17 +307,71 @@ ioGetPeers ih = do | |||
241 | Right e -> return $ either (,[]) ([],) e | 307 | Right e -> return $ either (,[]) ([],) e |
242 | Left e -> let _ = e :: QueryFailure in return ([],[]) | 308 | Left e -> let _ = e :: QueryFailure in return ([],[]) |
243 | 309 | ||
244 | ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [NodeInfo KMessageOf ip ()])) | 310 | ioFindNode :: ( DHT.Kademlia dht |
311 | , WireFormat raw dht | ||
312 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
313 | , Address ip | ||
314 | , Default u | ||
315 | , Show u | ||
316 | , Show (QueryMethod dht) | ||
317 | , TableKey dht infohash | ||
318 | , Eq (NodeId dht) | ||
319 | , Ord (NodeId dht) | ||
320 | , FiniteBits (NodeId dht) | ||
321 | , Show (NodeId dht) | ||
322 | , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
323 | , Ord (TransactionID dht) | ||
324 | , Serialize (TransactionID dht) | ||
325 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
326 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
327 | , SerializableTo raw (Response dht (Ping dht)) | ||
328 | , SerializableTo raw (Query dht (Ping dht)) | ||
329 | ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) | ||
245 | ioFindNode ih = do | 330 | ioFindNode ih = do |
246 | session <- ask | 331 | session <- ask |
247 | return $ \ni -> runDHT session $ do | 332 | return $ \ni -> runDHT session $ do |
248 | NodeFound ns <- FindNode (toNodeId ih) <@> nodeAddr ni | 333 | ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni |
249 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns | 334 | let ns' = L.map (fmap (const def)) ns |
250 | 335 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns' | |
251 | isearch :: (Ord r, Ord ip) => | 336 | |
252 | (InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r]))) | 337 | |
253 | -> InfoHash | 338 | -- | Like ioFindNode, but considers all found nodes to be 'Right' results. |
254 | -> DHT ip (ThreadId, Search.IterativeSearch ip r) | 339 | ioFindNodes :: ( DHT.Kademlia dht |
340 | , WireFormat raw dht | ||
341 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
342 | , Address ip | ||
343 | , Default u | ||
344 | , Show u | ||
345 | , Show (QueryMethod dht) | ||
346 | , TableKey dht infohash | ||
347 | , Eq (NodeId dht) | ||
348 | , Ord (NodeId dht) | ||
349 | , FiniteBits (NodeId dht) | ||
350 | , Show (NodeId dht) | ||
351 | , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
352 | , Ord (TransactionID dht) | ||
353 | , Serialize (TransactionID dht) | ||
354 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
355 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
356 | , SerializableTo raw (Response dht (Ping dht)) | ||
357 | , SerializableTo raw (Query dht (Ping dht)) | ||
358 | ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) | ||
359 | ioFindNodes ih = do | ||
360 | session <- ask | ||
361 | return $ \ni -> runDHT session $ do | ||
362 | ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni | ||
363 | let ns' = L.map (fmap (const def)) ns | ||
364 | return ([], ns') | ||
365 | |||
366 | isearch :: ( Ord r | ||
367 | , Ord ip | ||
368 | , Ord (NodeId dht) | ||
369 | , FiniteBits (NodeId dht) | ||
370 | , TableKey dht ih | ||
371 | , Show ih) => | ||
372 | (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]))) | ||
373 | -> ih | ||
374 | -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r) | ||
255 | isearch f ih = do | 375 | isearch f ih = do |
256 | qry <- f ih | 376 | qry <- f ih |
257 | ns <- kclosest 8 ih <$> getTable | 377 | ns <- kclosest 8 ih <$> getTable |
@@ -263,8 +383,25 @@ isearch f ih = do | |||
263 | -- atomically \$ readTVar (Search.searchResults s) | 383 | -- atomically \$ readTVar (Search.searchResults s) |
264 | return (a, s) | 384 | return (a, s) |
265 | 385 | ||
266 | 386 | -- | Background search: fill a lazy list using a background thread. | |
267 | type Search ip o = Conduit [NodeInfo KMessageOf ip ()] (DHT ip) [o KMessageOf ip ()] | 387 | bgsearch f ih = do |
388 | (tid, s) <- isearch f ih | ||
389 | let again shown = do | ||
390 | (chk,fin) <- atomically $ do | ||
391 | r <- (Set.\\ shown) <$> readTVar (Search.searchResults s) | ||
392 | if not $ Set.null r | ||
393 | then (,) r <$> Search.searchIsFinished s | ||
394 | else Search.searchIsFinished s >>= check >> return (Set.empty,True) | ||
395 | let ps = Set.toList chk | ||
396 | if fin then return ps | ||
397 | else do | ||
398 | xs <- unsafeInterleaveIO $ again (shown `Set.union` chk) | ||
399 | return $ ps ++ xs | ||
400 | liftIO $ again Set.empty | ||
401 | |||
402 | type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u] | ||
403 | |||
404 | #if 0 | ||
268 | 405 | ||
269 | -- TODO: use reorder and filter (Traversal option) leftovers | 406 | -- TODO: use reorder and filter (Traversal option) leftovers |
270 | -- search :: k -> IterationI ip o -> Search ip o | 407 | -- search :: k -> IterationI ip o -> Search ip o |
@@ -275,17 +412,36 @@ search _ action = do | |||
275 | let (nodes, results) = partitionEithers responses | 412 | let (nodes, results) = partitionEithers responses |
276 | $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) | 413 | $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) |
277 | leftover $ L.concat nodes | 414 | leftover $ L.concat nodes |
278 | mapM_ yield results | 415 | let r = mapM_ yield results |
279 | 416 | _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ()) | |
280 | publish :: Address ip => InfoHash -> PortNumber -> DHT ip () | 417 | r |
281 | publish ih p = do | ||
282 | nodes <- getClosest ih | ||
283 | r <- asks (optReplication . options) | ||
284 | _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
285 | return () | ||
286 | 418 | ||
419 | #endif | ||
287 | 420 | ||
288 | probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) | 421 | publish = error "todo" |
422 | -- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip () | ||
423 | -- publish ih p = do | ||
424 | -- nodes <- getClosest ih | ||
425 | -- r <- asks (optReplication . options) | ||
426 | -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
427 | -- return () | ||
428 | |||
429 | |||
430 | probeNode :: ( Default u | ||
431 | , Show u | ||
432 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
433 | , DHT.Kademlia dht | ||
434 | , Address ip | ||
435 | , Ord (TransactionID dht) | ||
436 | , Serialize (TransactionID dht) | ||
437 | , WireFormat raw dht | ||
438 | , SerializableTo raw (Response dht (Ping dht)) | ||
439 | , SerializableTo raw (Query dht (Ping dht)) | ||
440 | , Ord (NodeId dht) | ||
441 | , FiniteBits (NodeId dht) | ||
442 | , Show (NodeId dht) | ||
443 | , Show (QueryMethod dht) | ||
444 | ) => NodeAddr ip -> DHT raw dht u ip (Bool , Maybe ReflectedIP) | ||
289 | probeNode addr = do | 445 | probeNode addr = do |
290 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) | 446 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) |
291 | result <- try $ pingQ addr | 447 | result <- try $ pingQ addr |
@@ -293,8 +449,16 @@ probeNode addr = do | |||
293 | return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result | 449 | return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result |
294 | 450 | ||
295 | 451 | ||
452 | refreshNodes :: forall raw dht u ip. | ||
453 | ( Address ip | ||
454 | , Ord (NodeId dht) | ||
455 | , Default u | ||
456 | , FiniteBits (NodeId dht) | ||
457 | , Pretty (NodeId dht) | ||
458 | , DHT.Kademlia dht ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()] | ||
459 | refreshNodes _ = return () -- TODO | ||
460 | #if 0 | ||
296 | -- FIXME do not use getClosest sinse we should /refresh/ them | 461 | -- FIXME do not use getClosest sinse we should /refresh/ them |
297 | refreshNodes :: Address ip => NodeId KMessageOf -> DHT ip () -- [NodeInfo KMessageOf ip ()] | ||
298 | refreshNodes nid = do | 462 | refreshNodes nid = do |
299 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) | 463 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) |
300 | nodes <- getClosest nid | 464 | nodes <- getClosest nid |
@@ -304,7 +468,7 @@ refreshNodes nid = do | |||
304 | -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () | 468 | -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () |
305 | -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () | 469 | -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () |
306 | -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume | 470 | -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume |
307 | nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume | 471 | nss <- sourceList [nodes] $= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume |
308 | $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." | 472 | $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." |
309 | _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do | 473 | _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do |
310 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) | 474 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) |
@@ -312,8 +476,9 @@ refreshNodes nid = do | |||
312 | -- pingQ takes care of inserting the node. | 476 | -- pingQ takes care of inserting the node. |
313 | return () | 477 | return () |
314 | return () -- \$ L.concat nss | 478 | return () -- \$ L.concat nss |
479 | #endif | ||
315 | 480 | ||
316 | logc :: Char -> String -> DHT ip () | 481 | logc :: Char -> String -> DHT raw dht u ip () |
317 | logc 'D' = $(logDebugS) "insertNode" . T.pack | 482 | logc 'D' = $(logDebugS) "insertNode" . T.pack |
318 | logc 'W' = $(logWarnS) "insertNode" . T.pack | 483 | logc 'W' = $(logWarnS) "insertNode" . T.pack |
319 | logc 'I' = $(logInfoS) "insertNode" . T.pack | 484 | logc 'I' = $(logInfoS) "insertNode" . T.pack |
@@ -321,12 +486,46 @@ logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) | |||
321 | 486 | ||
322 | -- | This operation do not block but acquire exclusive access to | 487 | -- | This operation do not block but acquire exclusive access to |
323 | -- routing table. | 488 | -- routing table. |
324 | insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () | 489 | insertNode :: forall raw dht u ip. |
490 | ( Address ip | ||
491 | , Ord (NodeId dht) | ||
492 | , FiniteBits (NodeId dht) | ||
493 | , Show (NodeId dht) | ||
494 | , Default u | ||
495 | , Show u | ||
496 | , DHT.Kademlia dht | ||
497 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
498 | , Ord (TransactionID dht) | ||
499 | , WireFormat raw dht | ||
500 | , Serialize (TransactionID dht) | ||
501 | , SerializableTo raw (Response dht (Ping dht)) | ||
502 | , SerializableTo raw (Query dht (Ping dht)) | ||
503 | , Ord (NodeId dht) | ||
504 | , Show (NodeId dht) | ||
505 | , Show (QueryMethod dht) | ||
506 | ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip () | ||
325 | insertNode info witnessed_ip0 = do | 507 | insertNode info witnessed_ip0 = do |
326 | f <- insertNode1 | 508 | f <- insertNode1 |
327 | liftIO $ f info witnessed_ip0 | 509 | liftIO $ f info witnessed_ip0 |
328 | 510 | ||
329 | insertNode1 :: forall ip. Address ip => DHT ip (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) | 511 | insertNode1 :: forall raw dht u ip. |
512 | ( Address ip | ||
513 | , Default u | ||
514 | , Show u | ||
515 | , Ord (NodeId dht) | ||
516 | , FiniteBits (NodeId dht) | ||
517 | , Show (NodeId dht) | ||
518 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
519 | , DHT.Kademlia dht | ||
520 | , Ord (TransactionID dht) | ||
521 | , WireFormat raw dht | ||
522 | , Serialize (TransactionID dht) | ||
523 | , SerializableTo raw (Response dht (Ping dht)) | ||
524 | , SerializableTo raw (Query dht (Ping dht)) | ||
525 | , Ord (NodeId dht) | ||
526 | , Show (NodeId dht) | ||
527 | , Show (QueryMethod dht) | ||
528 | ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) | ||
330 | insertNode1 = do | 529 | insertNode1 = do |
331 | bc <- optBucketCount <$> asks options | 530 | bc <- optBucketCount <$> asks options |
332 | nid <- asks tentativeNodeId | 531 | nid <- asks tentativeNodeId |
@@ -335,15 +534,17 @@ insertNode1 = do | |||
335 | dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. | 534 | dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. |
336 | probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) | 535 | probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) |
337 | let probe n = probe0 n >>= runDHT dht_node_state . restoreM | 536 | let probe n = probe0 n >>= runDHT dht_node_state . restoreM |
537 | {- | ||
338 | changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive | 538 | changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive |
339 | ip <- fromSockAddr ip0 :: Maybe ip | 539 | ip <- fromSockAddr ip0 :: Maybe ip |
340 | listToMaybe | 540 | listToMaybe |
341 | $ rank id (nodeId $ foreignNode arrival) | 541 | $ rank id (nodeId $ foreignNode arrival) |
342 | $ bep42s ip (DHT.fallbackID params) -- warning: recursive | 542 | $ bep42s ip (DHT.fallbackID params) -- warning: recursive |
543 | -} | ||
343 | params = DHT.TableParameters | 544 | params = DHT.TableParameters |
344 | { maxBuckets = bc :: Int | 545 | { maxBuckets = bc :: Int |
345 | , fallbackID = nid :: NodeId KMessageOf | 546 | , fallbackID = nid :: NodeId dht |
346 | , adjustID = changeip :: SockAddr -> Event KMessageOf ip () -> NodeId KMessageOf | 547 | , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht |
347 | , logMessage = logm :: Char -> String -> IO () | 548 | , logMessage = logm :: Char -> String -> IO () |
348 | , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP) | 549 | , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP) |
349 | } | 550 | } |
@@ -356,25 +557,75 @@ insertNode1 = do | |||
356 | return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 | 557 | return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 |
357 | 558 | ||
358 | -- | Throws exception if node is not responding. | 559 | -- | Throws exception if node is not responding. |
359 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 560 | queryNode :: forall raw dht u a b ip. |
360 | => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b) | 561 | ( Address ip |
562 | , KRPC (Query dht a) (Response dht b) | ||
563 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
564 | , Default u | ||
565 | , Show u | ||
566 | , DHT.Kademlia dht | ||
567 | , Ord (TransactionID dht) | ||
568 | , Serialize (TransactionID dht) | ||
569 | , WireFormat raw dht | ||
570 | , SerializableTo raw (Response dht b) | ||
571 | , SerializableTo raw (Query dht a) | ||
572 | , Ord (NodeId dht) | ||
573 | , FiniteBits (NodeId dht) | ||
574 | , Show (NodeId dht) | ||
575 | , Show (QueryMethod dht) | ||
576 | , SerializableTo raw (Response dht (Ping dht)) | ||
577 | , SerializableTo raw (Query dht (Ping dht)) | ||
578 | ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b) | ||
361 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q | 579 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q |
362 | 580 | ||
363 | queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 581 | queryNode' :: forall raw dht u a b ip. |
364 | => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b, Maybe ReflectedIP) | 582 | ( Address ip |
583 | , Default u | ||
584 | , Show u | ||
585 | , DHT.Kademlia dht | ||
586 | , KRPC (Query dht a) (Response dht b) | ||
587 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
588 | , Ord (TransactionID dht) | ||
589 | , Serialize (TransactionID dht) | ||
590 | , WireFormat raw dht | ||
591 | , SerializableTo raw (Response dht b) | ||
592 | , SerializableTo raw (Query dht a) | ||
593 | , Ord (NodeId dht) | ||
594 | , FiniteBits (NodeId dht) | ||
595 | , Show (NodeId dht) | ||
596 | , Show (QueryMethod dht) | ||
597 | , SerializableTo raw (Response dht (Ping dht)) | ||
598 | , SerializableTo raw (Query dht (Ping dht)) | ||
599 | ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) | ||
365 | queryNode' addr q = do | 600 | queryNode' addr q = do |
366 | nid <- myNodeIdAccordingTo addr | 601 | nid <- myNodeIdAccordingTo addr |
367 | let read_only = False -- TODO: check for NAT issues. (BEP 43) | 602 | let read_only = False -- TODO: check for NAT issues. (BEP 43) |
368 | let KRPC.Method name = KRPC.method :: KRPC.Method (Query a) (Response b) | 603 | let KRPC.Method name = KRPC.method :: KRPC.Method (Query dht a) (Response dht b) |
369 | mgr <- asks manager | 604 | mgr <- asks manager |
370 | (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr name (toSockAddr addr) (Query nid read_only q) | 605 | (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr (error "TODO: name") (toSockAddr addr) (Query nid read_only q) |
371 | -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) | 606 | -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) |
372 | -- <> " by " <> T.pack (show (toSockAddr addr)) | 607 | -- <> " by " <> T.pack (show (toSockAddr addr)) |
373 | _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip | 608 | _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip |
374 | return (remoteId, r, witnessed_ip) | 609 | return (remoteId, r, witnessed_ip) |
375 | 610 | ||
376 | -- | Infix version of 'queryNode' function. | 611 | -- | Infix version of 'queryNode' function. |
377 | (<@>) :: Address ip => KRPC (Query a) (Response b) | 612 | (<@>) :: ( Address ip |
378 | => a -> NodeAddr ip -> DHT ip b | 613 | , KRPC (Query dht a) (Response dht b) |
614 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
615 | , Default u | ||
616 | , Show u | ||
617 | , Show (QueryMethod dht) | ||
618 | , Ord (NodeId dht) | ||
619 | , FiniteBits (NodeId dht) | ||
620 | , Show (NodeId dht) | ||
621 | , Ord (TransactionID dht) | ||
622 | , Serialize (TransactionID dht) | ||
623 | , SerializableTo raw (Response dht b) | ||
624 | , SerializableTo raw (Query dht a) | ||
625 | , SerializableTo raw (Response dht (Ping dht)) | ||
626 | , SerializableTo raw (Query dht (Ping dht)) | ||
627 | , WireFormat raw dht | ||
628 | , Kademlia dht | ||
629 | ) => a -> NodeAddr ip -> DHT raw dht u ip b | ||
379 | q <@> addr = snd <$> queryNode addr q | 630 | q <@> addr = snd <$> queryNode addr q |
380 | {-# INLINE (<@>) #-} | 631 | {-# INLINE (<@>) #-} |
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index f5cd7834..356f6fd9 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs | |||
@@ -2,6 +2,7 @@ | |||
2 | {-# LANGUAGE PatternSynonyms #-} | 2 | {-# LANGUAGE PatternSynonyms #-} |
3 | {-# LANGUAGE RecordWildCards #-} | 3 | {-# LANGUAGE RecordWildCards #-} |
4 | {-# LANGUAGE ScopedTypeVariables #-} | 4 | {-# LANGUAGE ScopedTypeVariables #-} |
5 | {-# LANGUAGE FlexibleContexts #-} | ||
5 | module Network.BitTorrent.DHT.Search where | 6 | module Network.BitTorrent.DHT.Search where |
6 | 7 | ||
7 | import Control.Concurrent | 8 | import Control.Concurrent |
@@ -25,27 +26,23 @@ import qualified Data.Wrapper.PSQ as PSQ | |||
25 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) | 26 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) |
26 | import Network.Address hiding (NodeId) | 27 | import Network.Address hiding (NodeId) |
27 | import Network.DatagramServer.Types | 28 | import Network.DatagramServer.Types |
28 | #ifdef VERSION_bencoding | 29 | import Data.Bits |
29 | import Network.DatagramServer.Mainline (KMessageOf) | ||
30 | type Ann = () | ||
31 | #else | ||
32 | import Network.DatagramServer.Tox as Tox | ||
33 | type KMessageOf = Tox.Message | ||
34 | type Ann = Bool | ||
35 | #endif | ||
36 | 30 | ||
37 | data IterativeSearch ip r = IterativeSearch | 31 | data IterativeSearch dht u ip r = IterativeSearch |
38 | { searchTarget :: NodeId KMessageOf | 32 | { searchTarget :: NodeId dht |
39 | , searchQuery :: NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r]) | 33 | , searchQuery :: NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]) |
40 | , searchPendingCount :: TVar Int | 34 | , searchPendingCount :: TVar Int |
41 | , searchQueued :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) | 35 | , searchQueued :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))) |
42 | , searchInformant :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) | 36 | , searchInformant :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))) |
43 | , searchVisited :: TVar (Set (NodeAddr ip)) | 37 | , searchVisited :: TVar (Set (NodeAddr ip)) |
44 | , searchResults :: TVar (Set r) | 38 | , searchResults :: TVar (Set r) |
45 | } | 39 | } |
46 | 40 | ||
47 | newSearch :: Eq ip => (NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r])) | 41 | newSearch :: ( Eq ip |
48 | -> NodeId KMessageOf -> [NodeInfo KMessageOf ip Ann] -> IO (IterativeSearch ip r) | 42 | , Ord (NodeId dht) |
43 | , FiniteBits (NodeId dht) | ||
44 | ) => (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])) | ||
45 | -> NodeId dht -> [NodeInfo dht ip u] -> IO (IterativeSearch dht u ip r) | ||
49 | newSearch qry target ns = atomically $ do | 46 | newSearch qry target ns = atomically $ do |
50 | c <- newTVar 0 | 47 | c <- newTVar 0 |
51 | q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns | 48 | q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns |
@@ -60,9 +57,14 @@ searchAlpha = 3 | |||
60 | searchK :: Int | 57 | searchK :: Int |
61 | searchK = 8 | 58 | searchK = 8 |
62 | 59 | ||
63 | sendQuery :: forall a ip. (Ord a, Ord ip) => | 60 | sendQuery :: forall a ip dht u. |
64 | IterativeSearch ip a | 61 | ( Ord a |
65 | -> Binding (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf)) | 62 | , Ord ip |
63 | , Ord (NodeId dht) | ||
64 | , FiniteBits (NodeId dht) | ||
65 | ) => | ||
66 | IterativeSearch dht u ip a | ||
67 | -> Binding (NodeInfo dht ip u) (NodeDistance (NodeId dht)) | ||
66 | -> IO () | 68 | -> IO () |
67 | sendQuery IterativeSearch{..} (ni :-> d) = do | 69 | sendQuery IterativeSearch{..} (ni :-> d) = do |
68 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) | 70 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) |
@@ -71,9 +73,9 @@ sendQuery IterativeSearch{..} (ni :-> d) = do | |||
71 | modifyTVar searchPendingCount pred | 73 | modifyTVar searchPendingCount pred |
72 | vs <- readTVar searchVisited | 74 | vs <- readTVar searchVisited |
73 | -- We only queue a node if it is not yet visited | 75 | -- We only queue a node if it is not yet visited |
74 | let insertFoundNode :: NodeInfo KMessageOf ip u | 76 | let insertFoundNode :: NodeInfo dht ip u |
75 | -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) | 77 | -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)) |
76 | -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) | 78 | -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)) |
77 | insertFoundNode n q | 79 | insertFoundNode n q |
78 | | nodeAddr n `Set.member` vs = q | 80 | | nodeAddr n `Set.member` vs = q |
79 | | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q | 81 | | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q |
@@ -82,7 +84,9 @@ sendQuery IterativeSearch{..} (ni :-> d) = do | |||
82 | modifyTVar searchResults $ \s -> foldr Set.insert s rs | 84 | modifyTVar searchResults $ \s -> foldr Set.insert s rs |
83 | 85 | ||
84 | 86 | ||
85 | searchIsFinished :: Ord ip => IterativeSearch ip r -> STM Bool | 87 | searchIsFinished :: ( Ord ip |
88 | , Ord (NodeId dht) | ||
89 | ) => IterativeSearch dht u ip r -> STM Bool | ||
86 | searchIsFinished IterativeSearch{..} = do | 90 | searchIsFinished IterativeSearch{..} = do |
87 | q <- readTVar searchQueued | 91 | q <- readTVar searchQueued |
88 | cnt <- readTVar searchPendingCount | 92 | cnt <- readTVar searchPendingCount |
@@ -94,8 +98,8 @@ searchIsFinished IterativeSearch{..} = do | |||
94 | <= PSQ.prio (fromJust $ MM.findMin q)))) | 98 | <= PSQ.prio (fromJust $ MM.findMin q)))) |
95 | 99 | ||
96 | search :: | 100 | search :: |
97 | (Ord r, Ord ip) => | 101 | (Ord r, Ord ip, Ord (NodeId dht), FiniteBits (NodeId dht)) => |
98 | IterativeSearch ip r -> IO () | 102 | IterativeSearch dht u ip r -> IO () |
99 | search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do | 103 | search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do |
100 | fix $ \again -> do | 104 | fix $ \again -> do |
101 | join $ atomically $ do | 105 | join $ atomically $ do |
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index d4794038..f96ba707 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -96,6 +96,7 @@ import Control.Monad.Trans.Control | |||
96 | import Control.Monad.Trans.Resource | 96 | import Control.Monad.Trans.Resource |
97 | import Data.Typeable | 97 | import Data.Typeable |
98 | import Data.String | 98 | import Data.String |
99 | import Data.Bits | ||
99 | import Data.ByteString | 100 | import Data.ByteString |
100 | import Data.Conduit.Lazy | 101 | import Data.Conduit.Lazy |
101 | import Data.Default | 102 | import Data.Default |
@@ -265,26 +266,17 @@ type AnnounceSet = Set (InfoHash, PortNumber) | |||
265 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | 266 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () |
266 | 267 | ||
267 | -- | DHT session keep track state of /this/ node. | 268 | -- | DHT session keep track state of /this/ node. |
268 | data Node ip = Node | 269 | data Node raw dht u ip = Node |
269 | { -- | Session configuration; | 270 | { -- | Session configuration; |
270 | options :: !Options | 271 | options :: !Options |
271 | 272 | ||
272 | -- | Pseudo-unique self-assigned session identifier. This value is | 273 | -- | Pseudo-unique self-assigned session identifier. This value is |
273 | -- constant during DHT session and (optionally) between sessions. | 274 | -- constant during DHT session and (optionally) between sessions. |
274 | #ifdef VERSION_bencoding | 275 | , tentativeNodeId :: !(NodeId dht) |
275 | , tentativeNodeId :: !(NodeId KMessageOf) | ||
276 | #else | ||
277 | , tentativeNodeId :: !(NodeId Tox.Message) | ||
278 | #endif | ||
279 | 276 | ||
280 | , resources :: !InternalState | 277 | , resources :: !InternalState |
281 | #ifdef VERSION_bencoding | 278 | , manager :: !(Manager raw dht) -- ^ RPC manager; |
282 | , manager :: !(Manager IO BValue KMessageOf) -- ^ RPC manager; | 279 | , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; |
283 | , routingInfo :: !(TVar (Maybe (R.Info KMessageOf ip ()))) -- ^ search table; | ||
284 | #else | ||
285 | , manager :: !(Manager (DHT ip) ByteString Tox.Message) -- ^ RPC manager; | ||
286 | , routingInfo :: !(TVar (Maybe (R.Info Tox.Message ip Bool))) -- ^ search table; | ||
287 | #endif | ||
288 | , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; | 280 | , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; |
289 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; | 281 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; |
290 | , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. | 282 | , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. |
@@ -293,23 +285,23 @@ data Node ip = Node | |||
293 | 285 | ||
294 | -- | DHT keep track current session and proper resource allocation for | 286 | -- | DHT keep track current session and proper resource allocation for |
295 | -- safe multithreading. | 287 | -- safe multithreading. |
296 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } | 288 | newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a } |
297 | deriving ( Functor, Applicative, Monad, MonadIO | 289 | deriving ( Functor, Applicative, Monad, MonadIO |
298 | , MonadBase IO, MonadReader (Node ip), MonadThrow | 290 | , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow |
299 | ) | 291 | ) |
300 | 292 | ||
301 | #if MIN_VERSION_monad_control(1,0,0) | 293 | #if MIN_VERSION_monad_control(1,0,0) |
302 | newtype DHTStM ip a = StM { | 294 | newtype DHTStM raw dht u ip a = StM { |
303 | unSt :: StM (ReaderT (Node ip) IO) a | 295 | unSt :: StM (ReaderT (Node raw dht u ip) IO) a |
304 | } | 296 | } |
305 | #endif | 297 | #endif |
306 | 298 | ||
307 | instance MonadBaseControl IO (DHT ip) where | 299 | instance MonadBaseControl IO (DHT raw dht u ip) where |
308 | #if MIN_VERSION_monad_control(1,0,0) | 300 | #if MIN_VERSION_monad_control(1,0,0) |
309 | type StM (DHT ip) a = DHTStM ip a | 301 | type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a |
310 | #else | 302 | #else |
311 | newtype StM (DHT ip) a = StM { | 303 | newtype StM (DHT raw dht u ip) a = StM { |
312 | unSt :: StM (ReaderT (Node ip) IO) a | 304 | unSt :: StM (ReaderT (Node raw dht u ip) IO) a |
313 | } | 305 | } |
314 | #endif | 306 | #endif |
315 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> | 307 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> |
@@ -321,22 +313,22 @@ instance MonadBaseControl IO (DHT ip) where | |||
321 | 313 | ||
322 | -- | Check is it is possible to run 'queryNode' or handle pending | 314 | -- | Check is it is possible to run 'queryNode' or handle pending |
323 | -- query from remote node. | 315 | -- query from remote node. |
324 | instance MonadActive (DHT ip) where | 316 | instance MonadActive (DHT raw dht u ip) where |
325 | monadActive = getManager >>= liftIO . isActive | 317 | monadActive = getManager >>= liftIO . isActive |
326 | {-# INLINE monadActive #-} | 318 | {-# INLINE monadActive #-} |
327 | 319 | ||
328 | -- | All allocated resources will be closed at 'closeNode'. | 320 | -- | All allocated resources will be closed at 'closeNode'. |
329 | instance MonadResource (DHT ip) where | 321 | instance MonadResource (DHT raw dht u ip) where |
330 | liftResourceT m = do | 322 | liftResourceT m = do |
331 | s <- asks resources | 323 | s <- asks resources |
332 | liftIO $ runInternalState m s | 324 | liftIO $ runInternalState m s |
333 | 325 | ||
334 | -- instance MonadKRPC (DHT ip) (DHT ip) BValue KMessageOf where | 326 | -- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where |
335 | 327 | ||
336 | getManager :: DHT ip (Manager IO BValue KMessageOf) | 328 | getManager :: DHT raw dht u ip (Manager raw dht) |
337 | getManager = asks manager | 329 | getManager = asks manager |
338 | 330 | ||
339 | instance MonadLogger (DHT ip) where | 331 | instance MonadLogger (DHT raw dht u ip) where |
340 | monadLoggerLog loc src lvl msg = do | 332 | monadLoggerLog loc src lvl msg = do |
341 | logger <- asks loggerFun | 333 | logger <- asks loggerFun |
342 | liftIO $ logger loc src lvl (toLogStr msg) | 334 | liftIO $ logger loc src lvl (toLogStr msg) |
@@ -344,7 +336,7 @@ instance MonadLogger (DHT ip) where | |||
344 | #ifdef VERSION_bencoding | 336 | #ifdef VERSION_bencoding |
345 | type NodeHandler = Handler IO KMessageOf BValue | 337 | type NodeHandler = Handler IO KMessageOf BValue |
346 | #else | 338 | #else |
347 | type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString | 339 | type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString |
348 | #endif | 340 | #endif |
349 | 341 | ||
350 | logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () | 342 | logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () |
@@ -376,17 +368,16 @@ locFromCS cs = case getCallStack cs of | |||
376 | -- | Run DHT session. You /must/ properly close session using | 368 | -- | Run DHT session. You /must/ properly close session using |
377 | -- 'closeNode' function, otherwise socket or other scarce resources may | 369 | -- 'closeNode' function, otherwise socket or other scarce resources may |
378 | -- leak. | 370 | -- leak. |
379 | newNode :: Address ip | 371 | newNode :: ( Address ip |
372 | , FiniteBits (NodeId dht) | ||
373 | , Serialize (NodeId dht) | ||
374 | ) | ||
380 | => -- [NodeHandler] -- ^ handlers to run on accepted queries; | 375 | => -- [NodeHandler] -- ^ handlers to run on accepted queries; |
381 | Options -- ^ various dht options; | 376 | Options -- ^ various dht options; |
382 | -> NodeAddr ip -- ^ node address to bind; | 377 | -> NodeAddr ip -- ^ node address to bind; |
383 | -> LogFun -- ^ invoked on log messages; | 378 | -> LogFun -- ^ invoked on log messages; |
384 | #ifdef VERSION_bencoding | 379 | -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated. |
385 | -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. | 380 | -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address. |
386 | #else | ||
387 | -> Maybe (NodeId Tox.Message) -- ^ use this NodeId, if not given a new one is generated. | ||
388 | #endif | ||
389 | -> IO (Node ip) -- ^ a new DHT node running at given address. | ||
390 | newNode opts naddr logger mbid = do | 381 | newNode opts naddr logger mbid = do |
391 | s <- createInternalState | 382 | s <- createInternalState |
392 | runInternalState initNode s | 383 | runInternalState initNode s |
@@ -409,11 +400,11 @@ newNode opts naddr logger mbid = do | |||
409 | 400 | ||
410 | -- | Some resources like listener thread may live for | 401 | -- | Some resources like listener thread may live for |
411 | -- some short period of time right after this DHT session closed. | 402 | -- some short period of time right after this DHT session closed. |
412 | closeNode :: Node ip -> IO () | 403 | closeNode :: Node raw dht u ip -> IO () |
413 | closeNode Node {..} = closeInternalState resources | 404 | closeNode Node {..} = closeInternalState resources |
414 | 405 | ||
415 | -- | Run DHT operation on the given session. | 406 | -- | Run DHT operation on the given session. |
416 | runDHT :: Node ip -> DHT ip a -> IO a | 407 | runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a |
417 | runDHT node action = runReaderT (unDHT action) node | 408 | runDHT node action = runReaderT (unDHT action) node |
418 | {-# INLINE runDHT #-} | 409 | {-# INLINE runDHT #-} |
419 | 410 | ||
@@ -453,24 +444,20 @@ checkToken sessionTokens addr questionableToken = do | |||
453 | -----------------------------------------------------------------------} | 444 | -----------------------------------------------------------------------} |
454 | 445 | ||
455 | -- | This nodes externally routable address reported by remote peers. | 446 | -- | This nodes externally routable address reported by remote peers. |
456 | routableAddress :: DHT ip (Maybe SockAddr) | 447 | routableAddress :: DHT raw dht u ip (Maybe SockAddr) |
457 | routableAddress = do | 448 | routableAddress = do |
458 | info <- asks routingInfo >>= liftIO . atomically . readTVar | 449 | info <- asks routingInfo >>= liftIO . atomically . readTVar |
459 | return $ myAddress <$> info | 450 | return $ myAddress <$> info |
460 | 451 | ||
461 | -- | The current NodeId that the given remote node should know us by. | 452 | -- | The current NodeId that the given remote node should know us by. |
462 | #ifdef VERSION_bencoding | 453 | myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht) |
463 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId KMessageOf) | ||
464 | #else | ||
465 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId Tox.Message) | ||
466 | #endif | ||
467 | myNodeIdAccordingTo _ = do | 454 | myNodeIdAccordingTo _ = do |
468 | info <- asks routingInfo >>= liftIO . atomically . readTVar | 455 | info <- asks routingInfo >>= liftIO . atomically . readTVar |
469 | maybe (asks tentativeNodeId) | 456 | maybe (asks tentativeNodeId) |
470 | (return . myNodeId) | 457 | (return . myNodeId) |
471 | info | 458 | info |
472 | 459 | ||
473 | myNodeIdAccordingTo1 :: DHT ip ( NodeAddr ip -> IO (NodeId KMessageOf) ) | 460 | myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) ) |
474 | myNodeIdAccordingTo1 = do | 461 | myNodeIdAccordingTo1 = do |
475 | var <- asks routingInfo | 462 | var <- asks routingInfo |
476 | tid <- asks tentativeNodeId | 463 | tid <- asks tentativeNodeId |
@@ -480,11 +467,7 @@ myNodeIdAccordingTo1 = do | |||
480 | 467 | ||
481 | -- | Get current routing table. Normally you don't need to use this | 468 | -- | Get current routing table. Normally you don't need to use this |
482 | -- function, but it can be usefull for debugging and profiling purposes. | 469 | -- function, but it can be usefull for debugging and profiling purposes. |
483 | #ifdef VERSION_bencoding | 470 | getTable :: Eq ip => DHT raw dht u ip (Table dht ip u) |
484 | getTable :: Eq ip => DHT ip (Table KMessageOf ip ()) | ||
485 | #else | ||
486 | getTable :: Eq ip => DHT ip (Table Tox.Message ip Bool) | ||
487 | #endif | ||
488 | getTable = do | 471 | getTable = do |
489 | Node { tentativeNodeId = myId | 472 | Node { tentativeNodeId = myId |
490 | , routingInfo = var | 473 | , routingInfo = var |
@@ -492,18 +475,18 @@ getTable = do | |||
492 | let nil = nullTable myId (optBucketCount opts) | 475 | let nil = nullTable myId (optBucketCount opts) |
493 | liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) | 476 | liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) |
494 | 477 | ||
495 | getSwarms :: Ord ip => DHT ip [ (InfoHash, Int, Maybe ByteString) ] | 478 | getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ] |
496 | getSwarms = do | 479 | getSwarms = do |
497 | store <- asks contactInfo >>= liftIO . atomically . readTVar | 480 | store <- asks contactInfo >>= liftIO . atomically . readTVar |
498 | return $ P.knownSwarms store | 481 | return $ P.knownSwarms store |
499 | 482 | ||
500 | savePeerStore :: (Ord ip, Address ip) => DHT ip ByteString | 483 | savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString |
501 | savePeerStore = do | 484 | savePeerStore = do |
502 | var <- asks contactInfo | 485 | var <- asks contactInfo |
503 | peers <- liftIO $ atomically $ readTVar var | 486 | peers <- liftIO $ atomically $ readTVar var |
504 | return $ S.encode peers | 487 | return $ S.encode peers |
505 | 488 | ||
506 | mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT ip () | 489 | mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip () |
507 | mergeSavedPeers bs = do | 490 | mergeSavedPeers bs = do |
508 | var <- asks contactInfo | 491 | var <- asks contactInfo |
509 | case S.decode bs of | 492 | case S.decode bs of |
@@ -511,7 +494,7 @@ mergeSavedPeers bs = do | |||
511 | Left _ -> return () | 494 | Left _ -> return () |
512 | 495 | ||
513 | 496 | ||
514 | allPeers :: Ord ip => InfoHash -> DHT ip [ PeerAddr ip ] | 497 | allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ] |
515 | allPeers ih = do | 498 | allPeers ih = do |
516 | store <- asks contactInfo >>= liftIO . atomically . readTVar | 499 | store <- asks contactInfo >>= liftIO . atomically . readTVar |
517 | return $ P.lookup ih store | 500 | return $ P.lookup ih store |
@@ -521,18 +504,20 @@ allPeers ih = do | |||
521 | -- | 504 | -- |
522 | -- This operation used for 'find_nodes' query. | 505 | -- This operation used for 'find_nodes' query. |
523 | -- | 506 | -- |
524 | #ifdef VERSION_bencoding | 507 | getClosest :: ( Eq ip |
525 | getClosest :: Eq ip => TableKey KMessageOf k => k -> DHT ip [NodeInfo KMessageOf ip ()] | 508 | , Ord (NodeId dht) |
526 | #else | 509 | , FiniteBits (NodeId dht) |
527 | getClosest :: Eq ip => TableKey Tox.Message k => k -> DHT ip [NodeInfo Tox.Message ip Bool] | 510 | , TableKey dht k ) => |
528 | #endif | 511 | k -> DHT raw dht u ip [NodeInfo dht ip u] |
529 | getClosest node = do | 512 | getClosest node = do |
530 | k <- asks (optK . options) | 513 | k <- asks (optK . options) |
531 | kclosest k node <$> getTable | 514 | kclosest k node <$> getTable |
532 | 515 | ||
533 | getClosest1 :: ( Eq ip | 516 | getClosest1 :: ( Eq ip |
534 | , TableKey KMessageOf k | 517 | , Ord (NodeId dht) |
535 | ) => DHT ip (k -> IO [NodeInfo KMessageOf ip ()]) | 518 | , FiniteBits (NodeId dht) |
519 | , TableKey dht k | ||
520 | ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u]) | ||
536 | getClosest1 = do | 521 | getClosest1 = do |
537 | k <- asks (optK . options) | 522 | k <- asks (optK . options) |
538 | nobkts <- asks (optBucketCount . options) | 523 | nobkts <- asks (optBucketCount . options) |
@@ -574,13 +559,12 @@ getTimestamp = do | |||
574 | -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) | 559 | -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) |
575 | return $ utcTimeToPOSIXSeconds utcTime | 560 | return $ utcTimeToPOSIXSeconds utcTime |
576 | 561 | ||
577 | |||
578 | #ifdef VERSION_bencoding | 562 | #ifdef VERSION_bencoding |
579 | -- | Prepare result for 'get_peers' query. | 563 | -- | Prepare result for 'get_peers' query. |
580 | -- | 564 | -- |
581 | -- This operation use 'getClosest' as failback so it may block. | 565 | -- This operation use 'getClosest' as failback so it may block. |
582 | -- | 566 | -- |
583 | getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip) | 567 | getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) |
584 | getPeerList ih = do | 568 | getPeerList ih = do |
585 | var <- asks contactInfo | 569 | var <- asks contactInfo |
586 | ps <- liftIO $ lookupPeers var ih | 570 | ps <- liftIO $ lookupPeers var ih |
@@ -588,7 +572,7 @@ getPeerList ih = do | |||
588 | then Left <$> getClosest ih | 572 | then Left <$> getClosest ih |
589 | else return (Right ps) | 573 | else return (Right ps) |
590 | 574 | ||
591 | getPeerList1 :: Ord ip => DHT ip (InfoHash -> IO (PeerList ip)) | 575 | getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) |
592 | getPeerList1 = do | 576 | getPeerList1 = do |
593 | var <- asks contactInfo | 577 | var <- asks contactInfo |
594 | getclosest <- getClosest1 | 578 | getclosest <- getClosest1 |
@@ -599,12 +583,12 @@ getPeerList1 = do | |||
599 | else return (Right ps) | 583 | else return (Right ps) |
600 | 584 | ||
601 | 585 | ||
602 | insertTopic :: InfoHash -> PortNumber -> DHT ip () | 586 | insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () |
603 | insertTopic ih p = do | 587 | insertTopic ih p = do |
604 | var <- asks announceInfo | 588 | var <- asks announceInfo |
605 | liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) | 589 | liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) |
606 | 590 | ||
607 | deleteTopic :: InfoHash -> PortNumber -> DHT ip () | 591 | deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () |
608 | deleteTopic ih p = do | 592 | deleteTopic ih p = do |
609 | var <- asks announceInfo | 593 | var <- asks announceInfo |
610 | liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) | 594 | liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) |
@@ -616,7 +600,7 @@ deleteTopic ih p = do | |||
616 | -----------------------------------------------------------------------} | 600 | -----------------------------------------------------------------------} |
617 | 601 | ||
618 | -- | Failed queries are ignored. | 602 | -- | Failed queries are ignored. |
619 | queryParallel :: [DHT ip a] -> DHT ip [a] | 603 | queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a] |
620 | queryParallel queries = do | 604 | queryParallel queries = do |
621 | -- TODO: use alpha | 605 | -- TODO: use alpha |
622 | -- alpha <- asks (optAlpha . options) | 606 | -- alpha <- asks (optAlpha . options) |