diff options
-rw-r--r-- | bittorrent.cabal | 4 | ||||
-rw-r--r-- | examples/dhtd.hs | 14 | ||||
-rw-r--r-- | src/Data/MinMaxPSQ.hs | 64 | ||||
-rw-r--r-- | src/Data/Wrapper/PSQ.hs | 6 | ||||
-rw-r--r-- | src/Network/BitTorrent/Address.hs | 12 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 36 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Routing.hs | 31 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Search.hs | 92 |
8 files changed, 236 insertions, 23 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal index 2d76e606..f6559752 100644 --- a/bittorrent.cabal +++ b/bittorrent.cabal | |||
@@ -91,6 +91,8 @@ library | |||
91 | Network.SocketLike | 91 | Network.SocketLike |
92 | other-modules: Paths_bittorrent | 92 | other-modules: Paths_bittorrent |
93 | Data.Wrapper.PSQ | 93 | Data.Wrapper.PSQ |
94 | Data.MinMaxPSQ | ||
95 | Network.BitTorrent.DHT.Search | ||
94 | if !flag(dht-only) | 96 | if !flag(dht-only) |
95 | exposed-modules: Network.BitTorrent | 97 | exposed-modules: Network.BitTorrent |
96 | Network.BitTorrent.Client | 98 | Network.BitTorrent.Client |
@@ -143,6 +145,7 @@ library | |||
143 | -- Concurrency | 145 | -- Concurrency |
144 | , SafeSemaphore | 146 | , SafeSemaphore |
145 | , lifted-async | 147 | , lifted-async |
148 | , async-pool | ||
146 | -- , BoundedChan >= 1.0.1.0 | 149 | -- , BoundedChan >= 1.0.1.0 |
147 | , split-channel >= 0.2 | 150 | , split-channel >= 0.2 |
148 | , stm >= 2.4 | 151 | , stm >= 2.4 |
@@ -373,6 +376,7 @@ executable dhtd | |||
373 | , monad-logger | 376 | , monad-logger |
374 | , bittorrent | 377 | , bittorrent |
375 | , unix | 378 | , unix |
379 | , containers | ||
376 | if flag(thread-debug) | 380 | if flag(thread-debug) |
377 | build-depends: time | 381 | build-depends: time |
378 | cpp-options: -DTHREAD_DEBUG | 382 | cpp-options: -DTHREAD_DEBUG |
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 05a3de26..1b02a37a 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -16,6 +16,7 @@ import Data.Default | |||
16 | import Data.List as L | 16 | import Data.List as L |
17 | import Data.Maybe | 17 | import Data.Maybe |
18 | import Data.String | 18 | import Data.String |
19 | import qualified Data.Set as Set | ||
19 | import qualified Data.ByteString as B (ByteString,writeFile,readFile) | 20 | import qualified Data.ByteString as B (ByteString,writeFile,readFile) |
20 | ; import Data.ByteString (ByteString) | 21 | ; import Data.ByteString (ByteString) |
21 | import qualified Data.ByteString.Char8 as B8 | 22 | import qualified Data.ByteString.Char8 as B8 |
@@ -95,8 +96,9 @@ bootstrapNodes = mapMaybe fromAddr | |||
95 | -- ExtendedCaps (Map.singleton | 96 | -- ExtendedCaps (Map.singleton |
96 | 97 | ||
97 | noDebugPrints :: LogSource -> LogLevel -> Bool | 98 | noDebugPrints :: LogSource -> LogLevel -> Bool |
98 | noDebugPrints _ = \case LevelDebug -> False | 99 | noDebugPrints _ = \case LevelDebug -> False |
99 | _ -> True | 100 | LevelOther _ -> False |
101 | _ -> True | ||
100 | 102 | ||
101 | noLogging :: LogSource -> LogLevel -> Bool | 103 | noLogging :: LogSource -> LogLevel -> Bool |
102 | noLogging _ _ = False | 104 | noLogging _ _ = False |
@@ -275,6 +277,14 @@ clientSession st signalQuit sock n h = do | |||
275 | hPutClient h $ showReport rs | 277 | hPutClient h $ showReport rs |
276 | Left er -> return $ hPutClient h er | 278 | Left er -> return $ hPutClient h er |
277 | 279 | ||
280 | ("search-peers", s) -> cmd $ do | ||
281 | case readEither s of | ||
282 | Right ih -> do | ||
283 | ps <- Set.toList <$> isearch ioGetPeers ih | ||
284 | return $ do | ||
285 | hPutClient h $ unlines $ map (show . pPrint) ps | ||
286 | Left er -> return $ hPutClient h er | ||
287 | |||
278 | _ -> cmd0 $ hPutClient h "error." | 288 | _ -> cmd0 $ hPutClient h "error." |
279 | 289 | ||
280 | defaultPort = error "TODO defaultPort" | 290 | defaultPort = error "TODO defaultPort" |
diff --git a/src/Data/MinMaxPSQ.hs b/src/Data/MinMaxPSQ.hs new file mode 100644 index 00000000..96937604 --- /dev/null +++ b/src/Data/MinMaxPSQ.hs | |||
@@ -0,0 +1,64 @@ | |||
1 | {-# LANGUAGE BangPatterns #-} | ||
2 | module Data.MinMaxPSQ where | ||
3 | |||
4 | import Data.Ord | ||
5 | import qualified Data.Wrapper.PSQ as PSQ | ||
6 | ;import Data.Wrapper.PSQ as PSQ hiding (insert, null, size) | ||
7 | import Prelude hiding (null, take) | ||
8 | |||
9 | data MinMaxPSQ k p = MinMaxPSQ !(PSQ k p) !(PSQ k (Down p)) | ||
10 | |||
11 | empty :: MinMaxPSQ k p | ||
12 | empty = MinMaxPSQ PSQ.empty PSQ.empty | ||
13 | |||
14 | null :: MinMaxPSQ k p -> Bool | ||
15 | null (MinMaxPSQ nq xq) = PSQ.null nq | ||
16 | |||
17 | size :: MinMaxPSQ k p -> Int | ||
18 | size (MinMaxPSQ nq xq) = PSQ.size nq | ||
19 | |||
20 | toList :: (Ord k, Ord p) => MinMaxPSQ k p -> [Binding k p] | ||
21 | toList (MinMaxPSQ nq xq) = PSQ.toList nq | ||
22 | |||
23 | fromList :: (Ord k, Ord p) => [Binding k p] -> MinMaxPSQ k p | ||
24 | fromList kps = MinMaxPSQ (PSQ.fromList kps) | ||
25 | (PSQ.fromList $ map (\(k :-> p) -> (k :-> Down p)) kps) | ||
26 | |||
27 | findMin :: (Ord k, Ord p) => MinMaxPSQ k p -> Maybe (Binding k p) | ||
28 | findMin (MinMaxPSQ nq xq) = PSQ.findMin nq | ||
29 | |||
30 | findMax :: (Ord k, Ord p) => MinMaxPSQ k p -> Maybe (Binding k p) | ||
31 | findMax (MinMaxPSQ nq xq) = fmap (\(k :-> Down p) -> k :-> p) $ PSQ.findMin xq | ||
32 | |||
33 | insert :: (Ord k, Ord p) => k -> p -> MinMaxPSQ k p -> MinMaxPSQ k p | ||
34 | insert k p (MinMaxPSQ nq xq) = MinMaxPSQ (PSQ.insert k p nq) | ||
35 | (PSQ.insert k (Down p) xq) | ||
36 | |||
37 | delete :: (Ord k, Ord p) => k -> MinMaxPSQ k p -> MinMaxPSQ k p | ||
38 | delete k (MinMaxPSQ nq xq) = MinMaxPSQ (PSQ.delete k nq) (PSQ.delete k xq) | ||
39 | |||
40 | deleteMin :: (Ord k, Ord p) => MinMaxPSQ k p -> MinMaxPSQ k p | ||
41 | deleteMin (MinMaxPSQ nq xq) = case PSQ.minView nq of | ||
42 | Just (k :-> _, nq') -> MinMaxPSQ nq' (PSQ.delete k xq) | ||
43 | Nothing -> MinMaxPSQ nq xq | ||
44 | |||
45 | deleteMax :: (Ord k, Ord p) => MinMaxPSQ k p -> MinMaxPSQ k p | ||
46 | deleteMax (MinMaxPSQ nq xq) = case PSQ.minView xq of | ||
47 | Just (k :-> _, xq') -> MinMaxPSQ (PSQ.delete k nq) xq' | ||
48 | Nothing -> MinMaxPSQ nq xq | ||
49 | |||
50 | minView :: (Ord k, Ord p) => MinMaxPSQ k p -> Maybe (Binding k p, MinMaxPSQ k p) | ||
51 | minView (MinMaxPSQ nq xq) = fmap (\(k :-> p, nq') -> (k :-> p, MinMaxPSQ nq' (PSQ.delete k xq))) | ||
52 | $ PSQ.minView nq | ||
53 | |||
54 | maxView :: (Ord k, Ord p) => MinMaxPSQ k p -> Maybe (Binding k p, MinMaxPSQ k p) | ||
55 | maxView (MinMaxPSQ nq xq) = fmap (\(k :-> Down p, xq') -> (k :-> p, MinMaxPSQ (PSQ.delete k nq) xq')) | ||
56 | $ PSQ.minView xq | ||
57 | |||
58 | insertTake :: (Ord k, Ord p) => Int -> k -> p -> MinMaxPSQ k p -> MinMaxPSQ k p | ||
59 | insertTake n k p q = take n $ insert k p q | ||
60 | |||
61 | take :: (Ord k, Ord p) => Int -> MinMaxPSQ k p -> MinMaxPSQ k p | ||
62 | take !n !q | (size q <= n) = q | ||
63 | | null q = q | ||
64 | | otherwise = take n $ deleteMax q | ||
diff --git a/src/Data/Wrapper/PSQ.hs b/src/Data/Wrapper/PSQ.hs index e8fa2d98..2c08011b 100644 --- a/src/Data/Wrapper/PSQ.hs +++ b/src/Data/Wrapper/PSQ.hs | |||
@@ -15,10 +15,14 @@ type Binding k p = (k,p,()) | |||
15 | pattern (:->) :: k -> p -> Binding k p | 15 | pattern (:->) :: k -> p -> Binding k p |
16 | pattern k :-> p <- (k,p,()) where k :-> p = (k,p,()) | 16 | pattern k :-> p <- (k,p,()) where k :-> p = (k,p,()) |
17 | 17 | ||
18 | key :: Binding k v -> k | 18 | key :: Binding k p -> k |
19 | key (k,p,v) = k | 19 | key (k,p,v) = k |
20 | {-# INLINE key #-} | 20 | {-# INLINE key #-} |
21 | 21 | ||
22 | prio :: Binding k p -> p | ||
23 | prio (k,p,v) = p | ||
24 | {-# INLINE prio #-} | ||
25 | |||
22 | insert :: (Ord k, Ord p) => k -> p -> PSQ k p -> PSQ k p | 26 | insert :: (Ord k, Ord p) => k -> p -> PSQ k p -> PSQ k p |
23 | insert k p q = OrdPSQ.insert k p () q | 27 | insert k p q = OrdPSQ.insert k p () q |
24 | {-# INLINE insert #-} | 28 | {-# INLINE insert #-} |
diff --git a/src/Network/BitTorrent/Address.hs b/src/Network/BitTorrent/Address.hs index 381ff50b..a8e12b35 100644 --- a/src/Network/BitTorrent/Address.hs +++ b/src/Network/BitTorrent/Address.hs | |||
@@ -696,10 +696,10 @@ newtype NodeDistance = NodeDistance BS.ByteString | |||
696 | deriving (Eq, Ord) | 696 | deriving (Eq, Ord) |
697 | 697 | ||
698 | instance Pretty NodeDistance where | 698 | instance Pretty NodeDistance where |
699 | pPrint (NodeDistance bs) = foldMap bitseq $ BS.unpack bs | 699 | pPrint (NodeDistance bs) = text $ BC.unpack (Base16.encode bs) |
700 | where | 700 | |
701 | listBits w = L.map (testBit w) (L.reverse [0..finiteBitSize w - 1]) | 701 | instance Show NodeDistance where |
702 | bitseq = foldMap (int . fromEnum) . listBits | 702 | show (NodeDistance bs) = BC.unpack (Base16.encode bs) |
703 | 703 | ||
704 | -- | distance(A,B) = |A xor B| Smaller values are closer. | 704 | -- | distance(A,B) = |A xor B| Smaller values are closer. |
705 | distance :: NodeId -> NodeId -> NodeDistance | 705 | distance :: NodeId -> NodeId -> NodeDistance |
@@ -751,7 +751,7 @@ genBucketSample' gen (NodeId self) (q,m,b) | |||
751 | data NodeAddr a = NodeAddr | 751 | data NodeAddr a = NodeAddr |
752 | { nodeHost :: !a | 752 | { nodeHost :: !a |
753 | , nodePort :: {-# UNPACK #-} !PortNumber | 753 | , nodePort :: {-# UNPACK #-} !PortNumber |
754 | } deriving (Eq, Typeable, Functor, Foldable, Traversable) | 754 | } deriving (Eq, Ord, Typeable, Functor, Foldable, Traversable) |
755 | 755 | ||
756 | instance Show a => Show (NodeAddr a) where | 756 | instance Show a => Show (NodeAddr a) where |
757 | showsPrec i NodeAddr {..} | 757 | showsPrec i NodeAddr {..} |
@@ -1285,7 +1285,7 @@ getBindAddress listenPortString enabled6 = do | |||
1285 | let (x6s,x4s) = partition (\s -> addrFamily s == AF_INET6) xs | 1285 | let (x6s,x4s) = partition (\s -> addrFamily s == AF_INET6) xs |
1286 | listenAddr = | 1286 | listenAddr = |
1287 | case if enabled6 then x6s++x4s else x4s of | 1287 | case if enabled6 then x6s++x4s else x4s of |
1288 | AddrInfo { addrAddress = listenAddr } : _ -> listenAddr | 1288 | AddrInfo { addrAddress = addr } : _ -> addr |
1289 | _ -> if enabled6 | 1289 | _ -> if enabled6 |
1290 | then SockAddrInet6 (parsePort listenPortString) 0 iN6ADDR_ANY 0 | 1290 | then SockAddrInet6 (parsePort listenPortString) 0 iN6ADDR_ANY 0 |
1291 | else SockAddrInet (parsePort listenPortString) iNADDR_ANY | 1291 | else SockAddrInet (parsePort listenPortString) iNADDR_ANY |
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 8215c95d..39ef9604 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -13,6 +13,7 @@ | |||
13 | {-# LANGUAGE FlexibleContexts #-} | 13 | {-# LANGUAGE FlexibleContexts #-} |
14 | {-# LANGUAGE ScopedTypeVariables #-} | 14 | {-# LANGUAGE ScopedTypeVariables #-} |
15 | {-# LANGUAGE TemplateHaskell #-} | 15 | {-# LANGUAGE TemplateHaskell #-} |
16 | {-# LANGUAGE TupleSections #-} | ||
16 | module Network.BitTorrent.DHT.Query | 17 | module Network.BitTorrent.DHT.Query |
17 | ( -- * Handler | 18 | ( -- * Handler |
18 | -- | To bind specific set of handlers you need to pass | 19 | -- | To bind specific set of handlers you need to pass |
@@ -40,6 +41,9 @@ module Network.BitTorrent.DHT.Query | |||
40 | , Search | 41 | , Search |
41 | , search | 42 | , search |
42 | , publish | 43 | , publish |
44 | , ioFindNode | ||
45 | , ioGetPeers | ||
46 | , isearch | ||
43 | 47 | ||
44 | -- ** Routing table | 48 | -- ** Routing table |
45 | , insertNode | 49 | , insertNode |
@@ -67,6 +71,8 @@ import Data.Either | |||
67 | import Data.List as L | 71 | import Data.List as L |
68 | import Data.Monoid | 72 | import Data.Monoid |
69 | import Data.Text as T | 73 | import Data.Text as T |
74 | import qualified Data.Set as Set | ||
75 | ;import Data.Set (Set) | ||
70 | import Network | 76 | import Network |
71 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | 77 | import Text.PrettyPrint as PP hiding ((<>), ($$)) |
72 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | 78 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) |
@@ -82,6 +88,7 @@ import Network.BitTorrent.DHT.Message | |||
82 | import Network.BitTorrent.DHT.Routing as R | 88 | import Network.BitTorrent.DHT.Routing as R |
83 | import Network.BitTorrent.DHT.Session | 89 | import Network.BitTorrent.DHT.Session |
84 | import Control.Concurrent.STM | 90 | import Control.Concurrent.STM |
91 | import qualified Network.BitTorrent.DHT.Search as Search | ||
85 | 92 | ||
86 | {----------------------------------------------------------------------- | 93 | {----------------------------------------------------------------------- |
87 | -- Handlers | 94 | -- Handlers |
@@ -182,6 +189,35 @@ announceQ ih p NodeInfo {..} = do | |||
182 | -- Iterative queries | 189 | -- Iterative queries |
183 | -----------------------------------------------------------------------} | 190 | -----------------------------------------------------------------------} |
184 | 191 | ||
192 | |||
193 | ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [PeerAddr ip])) | ||
194 | ioGetPeers ih = do | ||
195 | session <- ask | ||
196 | return $ \ni -> runDHT session $ do | ||
197 | r <- try $ getPeersQ ih ni | ||
198 | case r of | ||
199 | Right e -> return $ either (,[]) ([],) e | ||
200 | Left e -> let _ = e :: QueryFailure in return ([],[]) | ||
201 | |||
202 | ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [NodeInfo ip])) | ||
203 | ioFindNode ih = do | ||
204 | session <- ask | ||
205 | return $ \ni -> runDHT session $ do | ||
206 | NodeFound ns <- FindNode (toNodeId ih) <@> nodeAddr ni | ||
207 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns | ||
208 | |||
209 | isearch :: (Ord r, Ord ip) => | ||
210 | (InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [r]))) | ||
211 | -> InfoHash | ||
212 | -> DHT ip (Set r) | ||
213 | isearch f ih = do | ||
214 | qry <- f ih | ||
215 | ns <- kclosest 8 ih <$> getTable | ||
216 | liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns | ||
217 | Search.search s | ||
218 | atomically $ readTVar (Search.searchResults s) | ||
219 | |||
220 | |||
185 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] | 221 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] |
186 | 222 | ||
187 | -- TODO: use reorder and filter (Traversal option) leftovers | 223 | -- TODO: use reorder and filter (Traversal option) leftovers |
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 5c9788dc..cf4a4de3 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs | |||
@@ -442,16 +442,16 @@ size = L.sum . shape | |||
442 | depth :: Table ip -> BucketCount | 442 | depth :: Table ip -> BucketCount |
443 | depth = L.length . shape | 443 | depth = L.length . shape |
444 | 444 | ||
445 | lookupBucket :: NodeId -> Table ip -> Maybe (Bucket ip) | 445 | lookupBucket :: NodeId -> Table ip -> [Bucket ip] |
446 | lookupBucket nid = go 0 | 446 | lookupBucket nid = go 0 [] |
447 | where | 447 | where |
448 | go i (Zero table bucket) | 448 | go i bs (Zero table bucket) |
449 | | testIdBit nid i = pure bucket | 449 | | testIdBit nid i = bucket : toBucketList table ++ bs |
450 | | otherwise = go (succ i) table | 450 | | otherwise = go (succ i) (bucket:bs) table |
451 | go i (One bucket table) | 451 | go i bs (One bucket table) |
452 | | testIdBit nid i = go (succ i) table | 452 | | testIdBit nid i = go (succ i) (bucket:bs) table |
453 | | otherwise = pure bucket | 453 | | otherwise = bucket : toBucketList table ++ bs |
454 | go _ (Tip _ _ bucket) = pure bucket | 454 | go _ bs (Tip _ _ bucket) = bucket : bs |
455 | 455 | ||
456 | compatibleNodeId :: Table ip -> IO NodeId | 456 | compatibleNodeId :: Table ip -> IO NodeId |
457 | compatibleNodeId tbl = genBucketSample prefix br | 457 | compatibleNodeId tbl = genBucketSample prefix br |
@@ -504,11 +504,14 @@ instance TableKey InfoHash where | |||
504 | -- | Get a list of /K/ closest nodes using XOR metric. Used in | 504 | -- | Get a list of /K/ closest nodes using XOR metric. Used in |
505 | -- 'find_node' and 'get_peers' queries. | 505 | -- 'find_node' and 'get_peers' queries. |
506 | kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] | 506 | kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] |
507 | kclosest k (toNodeId -> nid) | 507 | kclosest k (toNodeId -> nid) tbl = take k $ rank nodeId nid (L.concat bucket) |
508 | = L.take k . rank nodeId nid | 508 | ++ rank nodeId nid (L.concat everyone) |
509 | . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty | 509 | where |
510 | . fmap bktNodes | 510 | (bucket,everyone) = |
511 | . lookupBucket nid | 511 | L.splitAt 1 |
512 | . L.map (L.map PSQ.key . PSQ.toList . bktNodes) | ||
513 | . lookupBucket nid | ||
514 | $ tbl | ||
512 | 515 | ||
513 | {----------------------------------------------------------------------- | 516 | {----------------------------------------------------------------------- |
514 | -- Routing | 517 | -- Routing |
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs new file mode 100644 index 00000000..1fe73c30 --- /dev/null +++ b/src/Network/BitTorrent/DHT/Search.hs | |||
@@ -0,0 +1,92 @@ | |||
1 | {-# LANGUAGE PatternSynonyms #-} | ||
2 | {-# LANGUAGE RecordWildCards #-} | ||
3 | module Network.BitTorrent.DHT.Search where | ||
4 | |||
5 | import Control.Concurrent | ||
6 | import Control.Concurrent.Async.Pool | ||
7 | import Control.Concurrent.STM | ||
8 | import Control.Exception | ||
9 | import Control.Monad | ||
10 | import Data.Bool | ||
11 | import Data.Function | ||
12 | import Data.List | ||
13 | import qualified Data.Map.Strict as Map | ||
14 | ;import Data.Map.Strict (Map) | ||
15 | import Data.Maybe | ||
16 | import qualified Data.Set as Set | ||
17 | ;import Data.Set (Set) | ||
18 | import System.IO | ||
19 | |||
20 | import qualified Data.MinMaxPSQ as MM | ||
21 | ;import Data.MinMaxPSQ (MinMaxPSQ) | ||
22 | import qualified Data.Wrapper.PSQ as PSQ | ||
23 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) | ||
24 | import Network.BitTorrent.Address | ||
25 | |||
26 | data IterativeSearch ip r = IterativeSearch | ||
27 | { searchTarget :: NodeId | ||
28 | , searchQuery :: NodeInfo ip -> IO ([NodeInfo ip], [r]) | ||
29 | , searchPendingCount :: TVar Int | ||
30 | , searchQueued :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance) | ||
31 | , searchInformant :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance) | ||
32 | , searchVisited :: TVar (Set (NodeAddr ip)) | ||
33 | , searchResults :: TVar (Set r) | ||
34 | } | ||
35 | |||
36 | newSearch :: Eq ip => (NodeInfo ip -> IO ([NodeInfo ip], [r])) | ||
37 | -> NodeId -> [NodeInfo ip] -> IO (IterativeSearch ip r) | ||
38 | newSearch qry target ns = atomically $ do | ||
39 | c <- newTVar 0 | ||
40 | q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns | ||
41 | i <- newTVar MM.empty | ||
42 | v <- newTVar Set.empty | ||
43 | r <- newTVar Set.empty | ||
44 | return $ IterativeSearch target qry c q i v r | ||
45 | |||
46 | searchAlpha :: Int | ||
47 | searchAlpha = 3 | ||
48 | |||
49 | searchK :: Int | ||
50 | searchK = 8 | ||
51 | |||
52 | sendQuery :: (Ord a, Ord t) => | ||
53 | IterativeSearch t a | ||
54 | -> Binding (NodeInfo t) NodeDistance | ||
55 | -> IO () | ||
56 | sendQuery IterativeSearch{..} (ni :-> d) = do | ||
57 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) | ||
58 | (searchQuery ni) | ||
59 | atomically $ do | ||
60 | modifyTVar searchPendingCount pred | ||
61 | vs <- readTVar searchVisited | ||
62 | -- We only queue a node if it is not yet visited | ||
63 | let insertFoundNode n q | ||
64 | | nodeAddr n `Set.member` vs = q | ||
65 | | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q | ||
66 | modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns | ||
67 | modifyTVar searchInformant $ MM.insertTake searchK ni d | ||
68 | modifyTVar searchResults $ \s -> foldr Set.insert s rs | ||
69 | |||
70 | search :: | ||
71 | (Ord r, Ord ip) => | ||
72 | IterativeSearch ip r -> IO () | ||
73 | search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do | ||
74 | fix $ \again -> do | ||
75 | join $ atomically $ do | ||
76 | found <- MM.minView <$> readTVar searchQueued | ||
77 | cnt <- readTVar $ searchPendingCount | ||
78 | case found of | ||
79 | Nothing -> retry | ||
80 | Just (ni :-> d, q) -> do | ||
81 | informants <- readTVar searchInformant | ||
82 | if MM.size informants < searchK | ||
83 | && (cnt > 0 || not (MM.null q)) | ||
84 | || PSQ.prio (fromJust $ MM.findMax informants) > d | ||
85 | then do | ||
86 | writeTVar searchQueued q | ||
87 | modifyTVar searchVisited $ Set.insert (nodeAddr ni) | ||
88 | modifyTVar searchPendingCount succ | ||
89 | return $ withAsync g (sendQuery s (ni :-> d)) (const again) | ||
90 | else do | ||
91 | check (cnt == 0) | ||
92 | return $ return () | ||