diff options
Diffstat (limited to 'src/Network/BitTorrent/DHT')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 36 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Routing.hs | 31 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Search.hs | 92 |
3 files changed, 145 insertions, 14 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 8215c95d..39ef9604 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -13,6 +13,7 @@ | |||
13 | {-# LANGUAGE FlexibleContexts #-} | 13 | {-# LANGUAGE FlexibleContexts #-} |
14 | {-# LANGUAGE ScopedTypeVariables #-} | 14 | {-# LANGUAGE ScopedTypeVariables #-} |
15 | {-# LANGUAGE TemplateHaskell #-} | 15 | {-# LANGUAGE TemplateHaskell #-} |
16 | {-# LANGUAGE TupleSections #-} | ||
16 | module Network.BitTorrent.DHT.Query | 17 | module Network.BitTorrent.DHT.Query |
17 | ( -- * Handler | 18 | ( -- * Handler |
18 | -- | To bind specific set of handlers you need to pass | 19 | -- | To bind specific set of handlers you need to pass |
@@ -40,6 +41,9 @@ module Network.BitTorrent.DHT.Query | |||
40 | , Search | 41 | , Search |
41 | , search | 42 | , search |
42 | , publish | 43 | , publish |
44 | , ioFindNode | ||
45 | , ioGetPeers | ||
46 | , isearch | ||
43 | 47 | ||
44 | -- ** Routing table | 48 | -- ** Routing table |
45 | , insertNode | 49 | , insertNode |
@@ -67,6 +71,8 @@ import Data.Either | |||
67 | import Data.List as L | 71 | import Data.List as L |
68 | import Data.Monoid | 72 | import Data.Monoid |
69 | import Data.Text as T | 73 | import Data.Text as T |
74 | import qualified Data.Set as Set | ||
75 | ;import Data.Set (Set) | ||
70 | import Network | 76 | import Network |
71 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | 77 | import Text.PrettyPrint as PP hiding ((<>), ($$)) |
72 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | 78 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) |
@@ -82,6 +88,7 @@ import Network.BitTorrent.DHT.Message | |||
82 | import Network.BitTorrent.DHT.Routing as R | 88 | import Network.BitTorrent.DHT.Routing as R |
83 | import Network.BitTorrent.DHT.Session | 89 | import Network.BitTorrent.DHT.Session |
84 | import Control.Concurrent.STM | 90 | import Control.Concurrent.STM |
91 | import qualified Network.BitTorrent.DHT.Search as Search | ||
85 | 92 | ||
86 | {----------------------------------------------------------------------- | 93 | {----------------------------------------------------------------------- |
87 | -- Handlers | 94 | -- Handlers |
@@ -182,6 +189,35 @@ announceQ ih p NodeInfo {..} = do | |||
182 | -- Iterative queries | 189 | -- Iterative queries |
183 | -----------------------------------------------------------------------} | 190 | -----------------------------------------------------------------------} |
184 | 191 | ||
192 | |||
193 | ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [PeerAddr ip])) | ||
194 | ioGetPeers ih = do | ||
195 | session <- ask | ||
196 | return $ \ni -> runDHT session $ do | ||
197 | r <- try $ getPeersQ ih ni | ||
198 | case r of | ||
199 | Right e -> return $ either (,[]) ([],) e | ||
200 | Left e -> let _ = e :: QueryFailure in return ([],[]) | ||
201 | |||
202 | ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [NodeInfo ip])) | ||
203 | ioFindNode ih = do | ||
204 | session <- ask | ||
205 | return $ \ni -> runDHT session $ do | ||
206 | NodeFound ns <- FindNode (toNodeId ih) <@> nodeAddr ni | ||
207 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns | ||
208 | |||
209 | isearch :: (Ord r, Ord ip) => | ||
210 | (InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [r]))) | ||
211 | -> InfoHash | ||
212 | -> DHT ip (Set r) | ||
213 | isearch f ih = do | ||
214 | qry <- f ih | ||
215 | ns <- kclosest 8 ih <$> getTable | ||
216 | liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns | ||
217 | Search.search s | ||
218 | atomically $ readTVar (Search.searchResults s) | ||
219 | |||
220 | |||
185 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] | 221 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] |
186 | 222 | ||
187 | -- TODO: use reorder and filter (Traversal option) leftovers | 223 | -- TODO: use reorder and filter (Traversal option) leftovers |
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 5c9788dc..cf4a4de3 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs | |||
@@ -442,16 +442,16 @@ size = L.sum . shape | |||
442 | depth :: Table ip -> BucketCount | 442 | depth :: Table ip -> BucketCount |
443 | depth = L.length . shape | 443 | depth = L.length . shape |
444 | 444 | ||
445 | lookupBucket :: NodeId -> Table ip -> Maybe (Bucket ip) | 445 | lookupBucket :: NodeId -> Table ip -> [Bucket ip] |
446 | lookupBucket nid = go 0 | 446 | lookupBucket nid = go 0 [] |
447 | where | 447 | where |
448 | go i (Zero table bucket) | 448 | go i bs (Zero table bucket) |
449 | | testIdBit nid i = pure bucket | 449 | | testIdBit nid i = bucket : toBucketList table ++ bs |
450 | | otherwise = go (succ i) table | 450 | | otherwise = go (succ i) (bucket:bs) table |
451 | go i (One bucket table) | 451 | go i bs (One bucket table) |
452 | | testIdBit nid i = go (succ i) table | 452 | | testIdBit nid i = go (succ i) (bucket:bs) table |
453 | | otherwise = pure bucket | 453 | | otherwise = bucket : toBucketList table ++ bs |
454 | go _ (Tip _ _ bucket) = pure bucket | 454 | go _ bs (Tip _ _ bucket) = bucket : bs |
455 | 455 | ||
456 | compatibleNodeId :: Table ip -> IO NodeId | 456 | compatibleNodeId :: Table ip -> IO NodeId |
457 | compatibleNodeId tbl = genBucketSample prefix br | 457 | compatibleNodeId tbl = genBucketSample prefix br |
@@ -504,11 +504,14 @@ instance TableKey InfoHash where | |||
504 | -- | Get a list of /K/ closest nodes using XOR metric. Used in | 504 | -- | Get a list of /K/ closest nodes using XOR metric. Used in |
505 | -- 'find_node' and 'get_peers' queries. | 505 | -- 'find_node' and 'get_peers' queries. |
506 | kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] | 506 | kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] |
507 | kclosest k (toNodeId -> nid) | 507 | kclosest k (toNodeId -> nid) tbl = take k $ rank nodeId nid (L.concat bucket) |
508 | = L.take k . rank nodeId nid | 508 | ++ rank nodeId nid (L.concat everyone) |
509 | . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty | 509 | where |
510 | . fmap bktNodes | 510 | (bucket,everyone) = |
511 | . lookupBucket nid | 511 | L.splitAt 1 |
512 | . L.map (L.map PSQ.key . PSQ.toList . bktNodes) | ||
513 | . lookupBucket nid | ||
514 | $ tbl | ||
512 | 515 | ||
513 | {----------------------------------------------------------------------- | 516 | {----------------------------------------------------------------------- |
514 | -- Routing | 517 | -- Routing |
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs new file mode 100644 index 00000000..1fe73c30 --- /dev/null +++ b/src/Network/BitTorrent/DHT/Search.hs | |||
@@ -0,0 +1,92 @@ | |||
1 | {-# LANGUAGE PatternSynonyms #-} | ||
2 | {-# LANGUAGE RecordWildCards #-} | ||
3 | module Network.BitTorrent.DHT.Search where | ||
4 | |||
5 | import Control.Concurrent | ||
6 | import Control.Concurrent.Async.Pool | ||
7 | import Control.Concurrent.STM | ||
8 | import Control.Exception | ||
9 | import Control.Monad | ||
10 | import Data.Bool | ||
11 | import Data.Function | ||
12 | import Data.List | ||
13 | import qualified Data.Map.Strict as Map | ||
14 | ;import Data.Map.Strict (Map) | ||
15 | import Data.Maybe | ||
16 | import qualified Data.Set as Set | ||
17 | ;import Data.Set (Set) | ||
18 | import System.IO | ||
19 | |||
20 | import qualified Data.MinMaxPSQ as MM | ||
21 | ;import Data.MinMaxPSQ (MinMaxPSQ) | ||
22 | import qualified Data.Wrapper.PSQ as PSQ | ||
23 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) | ||
24 | import Network.BitTorrent.Address | ||
25 | |||
26 | data IterativeSearch ip r = IterativeSearch | ||
27 | { searchTarget :: NodeId | ||
28 | , searchQuery :: NodeInfo ip -> IO ([NodeInfo ip], [r]) | ||
29 | , searchPendingCount :: TVar Int | ||
30 | , searchQueued :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance) | ||
31 | , searchInformant :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance) | ||
32 | , searchVisited :: TVar (Set (NodeAddr ip)) | ||
33 | , searchResults :: TVar (Set r) | ||
34 | } | ||
35 | |||
36 | newSearch :: Eq ip => (NodeInfo ip -> IO ([NodeInfo ip], [r])) | ||
37 | -> NodeId -> [NodeInfo ip] -> IO (IterativeSearch ip r) | ||
38 | newSearch qry target ns = atomically $ do | ||
39 | c <- newTVar 0 | ||
40 | q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns | ||
41 | i <- newTVar MM.empty | ||
42 | v <- newTVar Set.empty | ||
43 | r <- newTVar Set.empty | ||
44 | return $ IterativeSearch target qry c q i v r | ||
45 | |||
46 | searchAlpha :: Int | ||
47 | searchAlpha = 3 | ||
48 | |||
49 | searchK :: Int | ||
50 | searchK = 8 | ||
51 | |||
52 | sendQuery :: (Ord a, Ord t) => | ||
53 | IterativeSearch t a | ||
54 | -> Binding (NodeInfo t) NodeDistance | ||
55 | -> IO () | ||
56 | sendQuery IterativeSearch{..} (ni :-> d) = do | ||
57 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) | ||
58 | (searchQuery ni) | ||
59 | atomically $ do | ||
60 | modifyTVar searchPendingCount pred | ||
61 | vs <- readTVar searchVisited | ||
62 | -- We only queue a node if it is not yet visited | ||
63 | let insertFoundNode n q | ||
64 | | nodeAddr n `Set.member` vs = q | ||
65 | | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q | ||
66 | modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns | ||
67 | modifyTVar searchInformant $ MM.insertTake searchK ni d | ||
68 | modifyTVar searchResults $ \s -> foldr Set.insert s rs | ||
69 | |||
70 | search :: | ||
71 | (Ord r, Ord ip) => | ||
72 | IterativeSearch ip r -> IO () | ||
73 | search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do | ||
74 | fix $ \again -> do | ||
75 | join $ atomically $ do | ||
76 | found <- MM.minView <$> readTVar searchQueued | ||
77 | cnt <- readTVar $ searchPendingCount | ||
78 | case found of | ||
79 | Nothing -> retry | ||
80 | Just (ni :-> d, q) -> do | ||
81 | informants <- readTVar searchInformant | ||
82 | if MM.size informants < searchK | ||
83 | && (cnt > 0 || not (MM.null q)) | ||
84 | || PSQ.prio (fromJust $ MM.findMax informants) > d | ||
85 | then do | ||
86 | writeTVar searchQueued q | ||
87 | modifyTVar searchVisited $ Set.insert (nodeAddr ni) | ||
88 | modifyTVar searchPendingCount succ | ||
89 | return $ withAsync g (sendQuery s (ni :-> d)) (const again) | ||
90 | else do | ||
91 | check (cnt == 0) | ||
92 | return $ return () | ||