summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-02-01 03:21:52 -0500
committerjoe <joe@jerkface.net>2017-02-01 03:21:52 -0500
commitc51e64666b672637843a04c2f279d7d0c9eed01c (patch)
treed6f50018659ac3c5c3d72ee9bde3824514bd9f6a /src/Network
parent0d1de683de78a70ce9c054b444bb6f19c39d112c (diff)
New improved iterative search algorithm.
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/Address.hs12
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs36
-rw-r--r--src/Network/BitTorrent/DHT/Routing.hs31
-rw-r--r--src/Network/BitTorrent/DHT/Search.hs92
4 files changed, 151 insertions, 20 deletions
diff --git a/src/Network/BitTorrent/Address.hs b/src/Network/BitTorrent/Address.hs
index 381ff50b..a8e12b35 100644
--- a/src/Network/BitTorrent/Address.hs
+++ b/src/Network/BitTorrent/Address.hs
@@ -696,10 +696,10 @@ newtype NodeDistance = NodeDistance BS.ByteString
696 deriving (Eq, Ord) 696 deriving (Eq, Ord)
697 697
698instance Pretty NodeDistance where 698instance Pretty NodeDistance where
699 pPrint (NodeDistance bs) = foldMap bitseq $ BS.unpack bs 699 pPrint (NodeDistance bs) = text $ BC.unpack (Base16.encode bs)
700 where 700
701 listBits w = L.map (testBit w) (L.reverse [0..finiteBitSize w - 1]) 701instance Show NodeDistance where
702 bitseq = foldMap (int . fromEnum) . listBits 702 show (NodeDistance bs) = BC.unpack (Base16.encode bs)
703 703
704-- | distance(A,B) = |A xor B| Smaller values are closer. 704-- | distance(A,B) = |A xor B| Smaller values are closer.
705distance :: NodeId -> NodeId -> NodeDistance 705distance :: NodeId -> NodeId -> NodeDistance
@@ -751,7 +751,7 @@ genBucketSample' gen (NodeId self) (q,m,b)
751data NodeAddr a = NodeAddr 751data NodeAddr a = NodeAddr
752 { nodeHost :: !a 752 { nodeHost :: !a
753 , nodePort :: {-# UNPACK #-} !PortNumber 753 , nodePort :: {-# UNPACK #-} !PortNumber
754 } deriving (Eq, Typeable, Functor, Foldable, Traversable) 754 } deriving (Eq, Ord, Typeable, Functor, Foldable, Traversable)
755 755
756instance Show a => Show (NodeAddr a) where 756instance Show a => Show (NodeAddr a) where
757 showsPrec i NodeAddr {..} 757 showsPrec i NodeAddr {..}
@@ -1285,7 +1285,7 @@ getBindAddress listenPortString enabled6 = do
1285 let (x6s,x4s) = partition (\s -> addrFamily s == AF_INET6) xs 1285 let (x6s,x4s) = partition (\s -> addrFamily s == AF_INET6) xs
1286 listenAddr = 1286 listenAddr =
1287 case if enabled6 then x6s++x4s else x4s of 1287 case if enabled6 then x6s++x4s else x4s of
1288 AddrInfo { addrAddress = listenAddr } : _ -> listenAddr 1288 AddrInfo { addrAddress = addr } : _ -> addr
1289 _ -> if enabled6 1289 _ -> if enabled6
1290 then SockAddrInet6 (parsePort listenPortString) 0 iN6ADDR_ANY 0 1290 then SockAddrInet6 (parsePort listenPortString) 0 iN6ADDR_ANY 0
1291 else SockAddrInet (parsePort listenPortString) iNADDR_ANY 1291 else SockAddrInet (parsePort listenPortString) iNADDR_ANY
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 8215c95d..39ef9604 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -13,6 +13,7 @@
13{-# LANGUAGE FlexibleContexts #-} 13{-# LANGUAGE FlexibleContexts #-}
14{-# LANGUAGE ScopedTypeVariables #-} 14{-# LANGUAGE ScopedTypeVariables #-}
15{-# LANGUAGE TemplateHaskell #-} 15{-# LANGUAGE TemplateHaskell #-}
16{-# LANGUAGE TupleSections #-}
16module Network.BitTorrent.DHT.Query 17module Network.BitTorrent.DHT.Query
17 ( -- * Handler 18 ( -- * Handler
18 -- | To bind specific set of handlers you need to pass 19 -- | To bind specific set of handlers you need to pass
@@ -40,6 +41,9 @@ module Network.BitTorrent.DHT.Query
40 , Search 41 , Search
41 , search 42 , search
42 , publish 43 , publish
44 , ioFindNode
45 , ioGetPeers
46 , isearch
43 47
44 -- ** Routing table 48 -- ** Routing table
45 , insertNode 49 , insertNode
@@ -67,6 +71,8 @@ import Data.Either
67import Data.List as L 71import Data.List as L
68import Data.Monoid 72import Data.Monoid
69import Data.Text as T 73import Data.Text as T
74import qualified Data.Set as Set
75 ;import Data.Set (Set)
70import Network 76import Network
71import Text.PrettyPrint as PP hiding ((<>), ($$)) 77import Text.PrettyPrint as PP hiding ((<>), ($$))
72import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) 78import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
@@ -82,6 +88,7 @@ import Network.BitTorrent.DHT.Message
82import Network.BitTorrent.DHT.Routing as R 88import Network.BitTorrent.DHT.Routing as R
83import Network.BitTorrent.DHT.Session 89import Network.BitTorrent.DHT.Session
84import Control.Concurrent.STM 90import Control.Concurrent.STM
91import qualified Network.BitTorrent.DHT.Search as Search
85 92
86{----------------------------------------------------------------------- 93{-----------------------------------------------------------------------
87-- Handlers 94-- Handlers
@@ -182,6 +189,35 @@ announceQ ih p NodeInfo {..} = do
182-- Iterative queries 189-- Iterative queries
183-----------------------------------------------------------------------} 190-----------------------------------------------------------------------}
184 191
192
193ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [PeerAddr ip]))
194ioGetPeers ih = do
195 session <- ask
196 return $ \ni -> runDHT session $ do
197 r <- try $ getPeersQ ih ni
198 case r of
199 Right e -> return $ either (,[]) ([],) e
200 Left e -> let _ = e :: QueryFailure in return ([],[])
201
202ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [NodeInfo ip]))
203ioFindNode ih = do
204 session <- ask
205 return $ \ni -> runDHT session $ do
206 NodeFound ns <- FindNode (toNodeId ih) <@> nodeAddr ni
207 return $ L.partition (\n -> nodeId n /= toNodeId ih) ns
208
209isearch :: (Ord r, Ord ip) =>
210 (InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [r])))
211 -> InfoHash
212 -> DHT ip (Set r)
213isearch f ih = do
214 qry <- f ih
215 ns <- kclosest 8 ih <$> getTable
216 liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns
217 Search.search s
218 atomically $ readTVar (Search.searchResults s)
219
220
185type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] 221type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
186 222
187-- TODO: use reorder and filter (Traversal option) leftovers 223-- TODO: use reorder and filter (Traversal option) leftovers
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs
index 5c9788dc..cf4a4de3 100644
--- a/src/Network/BitTorrent/DHT/Routing.hs
+++ b/src/Network/BitTorrent/DHT/Routing.hs
@@ -442,16 +442,16 @@ size = L.sum . shape
442depth :: Table ip -> BucketCount 442depth :: Table ip -> BucketCount
443depth = L.length . shape 443depth = L.length . shape
444 444
445lookupBucket :: NodeId -> Table ip -> Maybe (Bucket ip) 445lookupBucket :: NodeId -> Table ip -> [Bucket ip]
446lookupBucket nid = go 0 446lookupBucket nid = go 0 []
447 where 447 where
448 go i (Zero table bucket) 448 go i bs (Zero table bucket)
449 | testIdBit nid i = pure bucket 449 | testIdBit nid i = bucket : toBucketList table ++ bs
450 | otherwise = go (succ i) table 450 | otherwise = go (succ i) (bucket:bs) table
451 go i (One bucket table) 451 go i bs (One bucket table)
452 | testIdBit nid i = go (succ i) table 452 | testIdBit nid i = go (succ i) (bucket:bs) table
453 | otherwise = pure bucket 453 | otherwise = bucket : toBucketList table ++ bs
454 go _ (Tip _ _ bucket) = pure bucket 454 go _ bs (Tip _ _ bucket) = bucket : bs
455 455
456compatibleNodeId :: Table ip -> IO NodeId 456compatibleNodeId :: Table ip -> IO NodeId
457compatibleNodeId tbl = genBucketSample prefix br 457compatibleNodeId tbl = genBucketSample prefix br
@@ -504,11 +504,14 @@ instance TableKey InfoHash where
504-- | Get a list of /K/ closest nodes using XOR metric. Used in 504-- | Get a list of /K/ closest nodes using XOR metric. Used in
505-- 'find_node' and 'get_peers' queries. 505-- 'find_node' and 'get_peers' queries.
506kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] 506kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip]
507kclosest k (toNodeId -> nid) 507kclosest k (toNodeId -> nid) tbl = take k $ rank nodeId nid (L.concat bucket)
508 = L.take k . rank nodeId nid 508 ++ rank nodeId nid (L.concat everyone)
509 . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty 509 where
510 . fmap bktNodes 510 (bucket,everyone) =
511 . lookupBucket nid 511 L.splitAt 1
512 . L.map (L.map PSQ.key . PSQ.toList . bktNodes)
513 . lookupBucket nid
514 $ tbl
512 515
513{----------------------------------------------------------------------- 516{-----------------------------------------------------------------------
514-- Routing 517-- Routing
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs
new file mode 100644
index 00000000..1fe73c30
--- /dev/null
+++ b/src/Network/BitTorrent/DHT/Search.hs
@@ -0,0 +1,92 @@
1{-# LANGUAGE PatternSynonyms #-}
2{-# LANGUAGE RecordWildCards #-}
3module Network.BitTorrent.DHT.Search where
4
5import Control.Concurrent
6import Control.Concurrent.Async.Pool
7import Control.Concurrent.STM
8import Control.Exception
9import Control.Monad
10import Data.Bool
11import Data.Function
12import Data.List
13import qualified Data.Map.Strict as Map
14 ;import Data.Map.Strict (Map)
15import Data.Maybe
16import qualified Data.Set as Set
17 ;import Data.Set (Set)
18import System.IO
19
20import qualified Data.MinMaxPSQ as MM
21 ;import Data.MinMaxPSQ (MinMaxPSQ)
22import qualified Data.Wrapper.PSQ as PSQ
23 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ)
24import Network.BitTorrent.Address
25
26data IterativeSearch ip r = IterativeSearch
27 { searchTarget :: NodeId
28 , searchQuery :: NodeInfo ip -> IO ([NodeInfo ip], [r])
29 , searchPendingCount :: TVar Int
30 , searchQueued :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance)
31 , searchInformant :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance)
32 , searchVisited :: TVar (Set (NodeAddr ip))
33 , searchResults :: TVar (Set r)
34 }
35
36newSearch :: Eq ip => (NodeInfo ip -> IO ([NodeInfo ip], [r]))
37 -> NodeId -> [NodeInfo ip] -> IO (IterativeSearch ip r)
38newSearch qry target ns = atomically $ do
39 c <- newTVar 0
40 q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns
41 i <- newTVar MM.empty
42 v <- newTVar Set.empty
43 r <- newTVar Set.empty
44 return $ IterativeSearch target qry c q i v r
45
46searchAlpha :: Int
47searchAlpha = 3
48
49searchK :: Int
50searchK = 8
51
52sendQuery :: (Ord a, Ord t) =>
53 IterativeSearch t a
54 -> Binding (NodeInfo t) NodeDistance
55 -> IO ()
56sendQuery IterativeSearch{..} (ni :-> d) = do
57 (ns,rs) <- handle (\(SomeException e) -> return ([],[]))
58 (searchQuery ni)
59 atomically $ do
60 modifyTVar searchPendingCount pred
61 vs <- readTVar searchVisited
62 -- We only queue a node if it is not yet visited
63 let insertFoundNode n q
64 | nodeAddr n `Set.member` vs = q
65 | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q
66 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns
67 modifyTVar searchInformant $ MM.insertTake searchK ni d
68 modifyTVar searchResults $ \s -> foldr Set.insert s rs
69
70search ::
71 (Ord r, Ord ip) =>
72 IterativeSearch ip r -> IO ()
73search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do
74 fix $ \again -> do
75 join $ atomically $ do
76 found <- MM.minView <$> readTVar searchQueued
77 cnt <- readTVar $ searchPendingCount
78 case found of
79 Nothing -> retry
80 Just (ni :-> d, q) -> do
81 informants <- readTVar searchInformant
82 if MM.size informants < searchK
83 && (cnt > 0 || not (MM.null q))
84 || PSQ.prio (fromJust $ MM.findMax informants) > d
85 then do
86 writeTVar searchQueued q
87 modifyTVar searchVisited $ Set.insert (nodeAddr ni)
88 modifyTVar searchPendingCount succ
89 return $ withAsync g (sendQuery s (ni :-> d)) (const again)
90 else do
91 check (cnt == 0)
92 return $ return ()