summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bittorrent.cabal4
-rw-r--r--examples/dhtd.hs14
-rw-r--r--src/Data/MinMaxPSQ.hs64
-rw-r--r--src/Data/Wrapper/PSQ.hs6
-rw-r--r--src/Network/BitTorrent/Address.hs12
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs36
-rw-r--r--src/Network/BitTorrent/DHT/Routing.hs31
-rw-r--r--src/Network/BitTorrent/DHT/Search.hs92
8 files changed, 236 insertions, 23 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal
index 2d76e606..f6559752 100644
--- a/bittorrent.cabal
+++ b/bittorrent.cabal
@@ -91,6 +91,8 @@ library
91 Network.SocketLike 91 Network.SocketLike
92 other-modules: Paths_bittorrent 92 other-modules: Paths_bittorrent
93 Data.Wrapper.PSQ 93 Data.Wrapper.PSQ
94 Data.MinMaxPSQ
95 Network.BitTorrent.DHT.Search
94 if !flag(dht-only) 96 if !flag(dht-only)
95 exposed-modules: Network.BitTorrent 97 exposed-modules: Network.BitTorrent
96 Network.BitTorrent.Client 98 Network.BitTorrent.Client
@@ -143,6 +145,7 @@ library
143 -- Concurrency 145 -- Concurrency
144 , SafeSemaphore 146 , SafeSemaphore
145 , lifted-async 147 , lifted-async
148 , async-pool
146-- , BoundedChan >= 1.0.1.0 149-- , BoundedChan >= 1.0.1.0
147 , split-channel >= 0.2 150 , split-channel >= 0.2
148 , stm >= 2.4 151 , stm >= 2.4
@@ -373,6 +376,7 @@ executable dhtd
373 , monad-logger 376 , monad-logger
374 , bittorrent 377 , bittorrent
375 , unix 378 , unix
379 , containers
376 if flag(thread-debug) 380 if flag(thread-debug)
377 build-depends: time 381 build-depends: time
378 cpp-options: -DTHREAD_DEBUG 382 cpp-options: -DTHREAD_DEBUG
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index 05a3de26..1b02a37a 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -16,6 +16,7 @@ import Data.Default
16import Data.List as L 16import Data.List as L
17import Data.Maybe 17import Data.Maybe
18import Data.String 18import Data.String
19import qualified Data.Set as Set
19import qualified Data.ByteString as B (ByteString,writeFile,readFile) 20import qualified Data.ByteString as B (ByteString,writeFile,readFile)
20 ; import Data.ByteString (ByteString) 21 ; import Data.ByteString (ByteString)
21import qualified Data.ByteString.Char8 as B8 22import qualified Data.ByteString.Char8 as B8
@@ -95,8 +96,9 @@ bootstrapNodes = mapMaybe fromAddr
95-- ExtendedCaps (Map.singleton 96-- ExtendedCaps (Map.singleton
96 97
97noDebugPrints :: LogSource -> LogLevel -> Bool 98noDebugPrints :: LogSource -> LogLevel -> Bool
98noDebugPrints _ = \case LevelDebug -> False 99noDebugPrints _ = \case LevelDebug -> False
99 _ -> True 100 LevelOther _ -> False
101 _ -> True
100 102
101noLogging :: LogSource -> LogLevel -> Bool 103noLogging :: LogSource -> LogLevel -> Bool
102noLogging _ _ = False 104noLogging _ _ = False
@@ -275,6 +277,14 @@ clientSession st signalQuit sock n h = do
275 hPutClient h $ showReport rs 277 hPutClient h $ showReport rs
276 Left er -> return $ hPutClient h er 278 Left er -> return $ hPutClient h er
277 279
280 ("search-peers", s) -> cmd $ do
281 case readEither s of
282 Right ih -> do
283 ps <- Set.toList <$> isearch ioGetPeers ih
284 return $ do
285 hPutClient h $ unlines $ map (show . pPrint) ps
286 Left er -> return $ hPutClient h er
287
278 _ -> cmd0 $ hPutClient h "error." 288 _ -> cmd0 $ hPutClient h "error."
279 289
280defaultPort = error "TODO defaultPort" 290defaultPort = error "TODO defaultPort"
diff --git a/src/Data/MinMaxPSQ.hs b/src/Data/MinMaxPSQ.hs
new file mode 100644
index 00000000..96937604
--- /dev/null
+++ b/src/Data/MinMaxPSQ.hs
@@ -0,0 +1,64 @@
1{-# LANGUAGE BangPatterns #-}
2module Data.MinMaxPSQ where
3
4import Data.Ord
5import qualified Data.Wrapper.PSQ as PSQ
6 ;import Data.Wrapper.PSQ as PSQ hiding (insert, null, size)
7import Prelude hiding (null, take)
8
9data MinMaxPSQ k p = MinMaxPSQ !(PSQ k p) !(PSQ k (Down p))
10
11empty :: MinMaxPSQ k p
12empty = MinMaxPSQ PSQ.empty PSQ.empty
13
14null :: MinMaxPSQ k p -> Bool
15null (MinMaxPSQ nq xq) = PSQ.null nq
16
17size :: MinMaxPSQ k p -> Int
18size (MinMaxPSQ nq xq) = PSQ.size nq
19
20toList :: (Ord k, Ord p) => MinMaxPSQ k p -> [Binding k p]
21toList (MinMaxPSQ nq xq) = PSQ.toList nq
22
23fromList :: (Ord k, Ord p) => [Binding k p] -> MinMaxPSQ k p
24fromList kps = MinMaxPSQ (PSQ.fromList kps)
25 (PSQ.fromList $ map (\(k :-> p) -> (k :-> Down p)) kps)
26
27findMin :: (Ord k, Ord p) => MinMaxPSQ k p -> Maybe (Binding k p)
28findMin (MinMaxPSQ nq xq) = PSQ.findMin nq
29
30findMax :: (Ord k, Ord p) => MinMaxPSQ k p -> Maybe (Binding k p)
31findMax (MinMaxPSQ nq xq) = fmap (\(k :-> Down p) -> k :-> p) $ PSQ.findMin xq
32
33insert :: (Ord k, Ord p) => k -> p -> MinMaxPSQ k p -> MinMaxPSQ k p
34insert k p (MinMaxPSQ nq xq) = MinMaxPSQ (PSQ.insert k p nq)
35 (PSQ.insert k (Down p) xq)
36
37delete :: (Ord k, Ord p) => k -> MinMaxPSQ k p -> MinMaxPSQ k p
38delete k (MinMaxPSQ nq xq) = MinMaxPSQ (PSQ.delete k nq) (PSQ.delete k xq)
39
40deleteMin :: (Ord k, Ord p) => MinMaxPSQ k p -> MinMaxPSQ k p
41deleteMin (MinMaxPSQ nq xq) = case PSQ.minView nq of
42 Just (k :-> _, nq') -> MinMaxPSQ nq' (PSQ.delete k xq)
43 Nothing -> MinMaxPSQ nq xq
44
45deleteMax :: (Ord k, Ord p) => MinMaxPSQ k p -> MinMaxPSQ k p
46deleteMax (MinMaxPSQ nq xq) = case PSQ.minView xq of
47 Just (k :-> _, xq') -> MinMaxPSQ (PSQ.delete k nq) xq'
48 Nothing -> MinMaxPSQ nq xq
49
50minView :: (Ord k, Ord p) => MinMaxPSQ k p -> Maybe (Binding k p, MinMaxPSQ k p)
51minView (MinMaxPSQ nq xq) = fmap (\(k :-> p, nq') -> (k :-> p, MinMaxPSQ nq' (PSQ.delete k xq)))
52 $ PSQ.minView nq
53
54maxView :: (Ord k, Ord p) => MinMaxPSQ k p -> Maybe (Binding k p, MinMaxPSQ k p)
55maxView (MinMaxPSQ nq xq) = fmap (\(k :-> Down p, xq') -> (k :-> p, MinMaxPSQ (PSQ.delete k nq) xq'))
56 $ PSQ.minView xq
57
58insertTake :: (Ord k, Ord p) => Int -> k -> p -> MinMaxPSQ k p -> MinMaxPSQ k p
59insertTake n k p q = take n $ insert k p q
60
61take :: (Ord k, Ord p) => Int -> MinMaxPSQ k p -> MinMaxPSQ k p
62take !n !q | (size q <= n) = q
63 | null q = q
64 | otherwise = take n $ deleteMax q
diff --git a/src/Data/Wrapper/PSQ.hs b/src/Data/Wrapper/PSQ.hs
index e8fa2d98..2c08011b 100644
--- a/src/Data/Wrapper/PSQ.hs
+++ b/src/Data/Wrapper/PSQ.hs
@@ -15,10 +15,14 @@ type Binding k p = (k,p,())
15pattern (:->) :: k -> p -> Binding k p 15pattern (:->) :: k -> p -> Binding k p
16pattern k :-> p <- (k,p,()) where k :-> p = (k,p,()) 16pattern k :-> p <- (k,p,()) where k :-> p = (k,p,())
17 17
18key :: Binding k v -> k 18key :: Binding k p -> k
19key (k,p,v) = k 19key (k,p,v) = k
20{-# INLINE key #-} 20{-# INLINE key #-}
21 21
22prio :: Binding k p -> p
23prio (k,p,v) = p
24{-# INLINE prio #-}
25
22insert :: (Ord k, Ord p) => k -> p -> PSQ k p -> PSQ k p 26insert :: (Ord k, Ord p) => k -> p -> PSQ k p -> PSQ k p
23insert k p q = OrdPSQ.insert k p () q 27insert k p q = OrdPSQ.insert k p () q
24{-# INLINE insert #-} 28{-# INLINE insert #-}
diff --git a/src/Network/BitTorrent/Address.hs b/src/Network/BitTorrent/Address.hs
index 381ff50b..a8e12b35 100644
--- a/src/Network/BitTorrent/Address.hs
+++ b/src/Network/BitTorrent/Address.hs
@@ -696,10 +696,10 @@ newtype NodeDistance = NodeDistance BS.ByteString
696 deriving (Eq, Ord) 696 deriving (Eq, Ord)
697 697
698instance Pretty NodeDistance where 698instance Pretty NodeDistance where
699 pPrint (NodeDistance bs) = foldMap bitseq $ BS.unpack bs 699 pPrint (NodeDistance bs) = text $ BC.unpack (Base16.encode bs)
700 where 700
701 listBits w = L.map (testBit w) (L.reverse [0..finiteBitSize w - 1]) 701instance Show NodeDistance where
702 bitseq = foldMap (int . fromEnum) . listBits 702 show (NodeDistance bs) = BC.unpack (Base16.encode bs)
703 703
704-- | distance(A,B) = |A xor B| Smaller values are closer. 704-- | distance(A,B) = |A xor B| Smaller values are closer.
705distance :: NodeId -> NodeId -> NodeDistance 705distance :: NodeId -> NodeId -> NodeDistance
@@ -751,7 +751,7 @@ genBucketSample' gen (NodeId self) (q,m,b)
751data NodeAddr a = NodeAddr 751data NodeAddr a = NodeAddr
752 { nodeHost :: !a 752 { nodeHost :: !a
753 , nodePort :: {-# UNPACK #-} !PortNumber 753 , nodePort :: {-# UNPACK #-} !PortNumber
754 } deriving (Eq, Typeable, Functor, Foldable, Traversable) 754 } deriving (Eq, Ord, Typeable, Functor, Foldable, Traversable)
755 755
756instance Show a => Show (NodeAddr a) where 756instance Show a => Show (NodeAddr a) where
757 showsPrec i NodeAddr {..} 757 showsPrec i NodeAddr {..}
@@ -1285,7 +1285,7 @@ getBindAddress listenPortString enabled6 = do
1285 let (x6s,x4s) = partition (\s -> addrFamily s == AF_INET6) xs 1285 let (x6s,x4s) = partition (\s -> addrFamily s == AF_INET6) xs
1286 listenAddr = 1286 listenAddr =
1287 case if enabled6 then x6s++x4s else x4s of 1287 case if enabled6 then x6s++x4s else x4s of
1288 AddrInfo { addrAddress = listenAddr } : _ -> listenAddr 1288 AddrInfo { addrAddress = addr } : _ -> addr
1289 _ -> if enabled6 1289 _ -> if enabled6
1290 then SockAddrInet6 (parsePort listenPortString) 0 iN6ADDR_ANY 0 1290 then SockAddrInet6 (parsePort listenPortString) 0 iN6ADDR_ANY 0
1291 else SockAddrInet (parsePort listenPortString) iNADDR_ANY 1291 else SockAddrInet (parsePort listenPortString) iNADDR_ANY
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 8215c95d..39ef9604 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -13,6 +13,7 @@
13{-# LANGUAGE FlexibleContexts #-} 13{-# LANGUAGE FlexibleContexts #-}
14{-# LANGUAGE ScopedTypeVariables #-} 14{-# LANGUAGE ScopedTypeVariables #-}
15{-# LANGUAGE TemplateHaskell #-} 15{-# LANGUAGE TemplateHaskell #-}
16{-# LANGUAGE TupleSections #-}
16module Network.BitTorrent.DHT.Query 17module Network.BitTorrent.DHT.Query
17 ( -- * Handler 18 ( -- * Handler
18 -- | To bind specific set of handlers you need to pass 19 -- | To bind specific set of handlers you need to pass
@@ -40,6 +41,9 @@ module Network.BitTorrent.DHT.Query
40 , Search 41 , Search
41 , search 42 , search
42 , publish 43 , publish
44 , ioFindNode
45 , ioGetPeers
46 , isearch
43 47
44 -- ** Routing table 48 -- ** Routing table
45 , insertNode 49 , insertNode
@@ -67,6 +71,8 @@ import Data.Either
67import Data.List as L 71import Data.List as L
68import Data.Monoid 72import Data.Monoid
69import Data.Text as T 73import Data.Text as T
74import qualified Data.Set as Set
75 ;import Data.Set (Set)
70import Network 76import Network
71import Text.PrettyPrint as PP hiding ((<>), ($$)) 77import Text.PrettyPrint as PP hiding ((<>), ($$))
72import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) 78import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
@@ -82,6 +88,7 @@ import Network.BitTorrent.DHT.Message
82import Network.BitTorrent.DHT.Routing as R 88import Network.BitTorrent.DHT.Routing as R
83import Network.BitTorrent.DHT.Session 89import Network.BitTorrent.DHT.Session
84import Control.Concurrent.STM 90import Control.Concurrent.STM
91import qualified Network.BitTorrent.DHT.Search as Search
85 92
86{----------------------------------------------------------------------- 93{-----------------------------------------------------------------------
87-- Handlers 94-- Handlers
@@ -182,6 +189,35 @@ announceQ ih p NodeInfo {..} = do
182-- Iterative queries 189-- Iterative queries
183-----------------------------------------------------------------------} 190-----------------------------------------------------------------------}
184 191
192
193ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [PeerAddr ip]))
194ioGetPeers ih = do
195 session <- ask
196 return $ \ni -> runDHT session $ do
197 r <- try $ getPeersQ ih ni
198 case r of
199 Right e -> return $ either (,[]) ([],) e
200 Left e -> let _ = e :: QueryFailure in return ([],[])
201
202ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [NodeInfo ip]))
203ioFindNode ih = do
204 session <- ask
205 return $ \ni -> runDHT session $ do
206 NodeFound ns <- FindNode (toNodeId ih) <@> nodeAddr ni
207 return $ L.partition (\n -> nodeId n /= toNodeId ih) ns
208
209isearch :: (Ord r, Ord ip) =>
210 (InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [r])))
211 -> InfoHash
212 -> DHT ip (Set r)
213isearch f ih = do
214 qry <- f ih
215 ns <- kclosest 8 ih <$> getTable
216 liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns
217 Search.search s
218 atomically $ readTVar (Search.searchResults s)
219
220
185type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] 221type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
186 222
187-- TODO: use reorder and filter (Traversal option) leftovers 223-- TODO: use reorder and filter (Traversal option) leftovers
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs
index 5c9788dc..cf4a4de3 100644
--- a/src/Network/BitTorrent/DHT/Routing.hs
+++ b/src/Network/BitTorrent/DHT/Routing.hs
@@ -442,16 +442,16 @@ size = L.sum . shape
442depth :: Table ip -> BucketCount 442depth :: Table ip -> BucketCount
443depth = L.length . shape 443depth = L.length . shape
444 444
445lookupBucket :: NodeId -> Table ip -> Maybe (Bucket ip) 445lookupBucket :: NodeId -> Table ip -> [Bucket ip]
446lookupBucket nid = go 0 446lookupBucket nid = go 0 []
447 where 447 where
448 go i (Zero table bucket) 448 go i bs (Zero table bucket)
449 | testIdBit nid i = pure bucket 449 | testIdBit nid i = bucket : toBucketList table ++ bs
450 | otherwise = go (succ i) table 450 | otherwise = go (succ i) (bucket:bs) table
451 go i (One bucket table) 451 go i bs (One bucket table)
452 | testIdBit nid i = go (succ i) table 452 | testIdBit nid i = go (succ i) (bucket:bs) table
453 | otherwise = pure bucket 453 | otherwise = bucket : toBucketList table ++ bs
454 go _ (Tip _ _ bucket) = pure bucket 454 go _ bs (Tip _ _ bucket) = bucket : bs
455 455
456compatibleNodeId :: Table ip -> IO NodeId 456compatibleNodeId :: Table ip -> IO NodeId
457compatibleNodeId tbl = genBucketSample prefix br 457compatibleNodeId tbl = genBucketSample prefix br
@@ -504,11 +504,14 @@ instance TableKey InfoHash where
504-- | Get a list of /K/ closest nodes using XOR metric. Used in 504-- | Get a list of /K/ closest nodes using XOR metric. Used in
505-- 'find_node' and 'get_peers' queries. 505-- 'find_node' and 'get_peers' queries.
506kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] 506kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip]
507kclosest k (toNodeId -> nid) 507kclosest k (toNodeId -> nid) tbl = take k $ rank nodeId nid (L.concat bucket)
508 = L.take k . rank nodeId nid 508 ++ rank nodeId nid (L.concat everyone)
509 . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty 509 where
510 . fmap bktNodes 510 (bucket,everyone) =
511 . lookupBucket nid 511 L.splitAt 1
512 . L.map (L.map PSQ.key . PSQ.toList . bktNodes)
513 . lookupBucket nid
514 $ tbl
512 515
513{----------------------------------------------------------------------- 516{-----------------------------------------------------------------------
514-- Routing 517-- Routing
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs
new file mode 100644
index 00000000..1fe73c30
--- /dev/null
+++ b/src/Network/BitTorrent/DHT/Search.hs
@@ -0,0 +1,92 @@
1{-# LANGUAGE PatternSynonyms #-}
2{-# LANGUAGE RecordWildCards #-}
3module Network.BitTorrent.DHT.Search where
4
5import Control.Concurrent
6import Control.Concurrent.Async.Pool
7import Control.Concurrent.STM
8import Control.Exception
9import Control.Monad
10import Data.Bool
11import Data.Function
12import Data.List
13import qualified Data.Map.Strict as Map
14 ;import Data.Map.Strict (Map)
15import Data.Maybe
16import qualified Data.Set as Set
17 ;import Data.Set (Set)
18import System.IO
19
20import qualified Data.MinMaxPSQ as MM
21 ;import Data.MinMaxPSQ (MinMaxPSQ)
22import qualified Data.Wrapper.PSQ as PSQ
23 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ)
24import Network.BitTorrent.Address
25
26data IterativeSearch ip r = IterativeSearch
27 { searchTarget :: NodeId
28 , searchQuery :: NodeInfo ip -> IO ([NodeInfo ip], [r])
29 , searchPendingCount :: TVar Int
30 , searchQueued :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance)
31 , searchInformant :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance)
32 , searchVisited :: TVar (Set (NodeAddr ip))
33 , searchResults :: TVar (Set r)
34 }
35
36newSearch :: Eq ip => (NodeInfo ip -> IO ([NodeInfo ip], [r]))
37 -> NodeId -> [NodeInfo ip] -> IO (IterativeSearch ip r)
38newSearch qry target ns = atomically $ do
39 c <- newTVar 0
40 q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns
41 i <- newTVar MM.empty
42 v <- newTVar Set.empty
43 r <- newTVar Set.empty
44 return $ IterativeSearch target qry c q i v r
45
46searchAlpha :: Int
47searchAlpha = 3
48
49searchK :: Int
50searchK = 8
51
52sendQuery :: (Ord a, Ord t) =>
53 IterativeSearch t a
54 -> Binding (NodeInfo t) NodeDistance
55 -> IO ()
56sendQuery IterativeSearch{..} (ni :-> d) = do
57 (ns,rs) <- handle (\(SomeException e) -> return ([],[]))
58 (searchQuery ni)
59 atomically $ do
60 modifyTVar searchPendingCount pred
61 vs <- readTVar searchVisited
62 -- We only queue a node if it is not yet visited
63 let insertFoundNode n q
64 | nodeAddr n `Set.member` vs = q
65 | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q
66 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns
67 modifyTVar searchInformant $ MM.insertTake searchK ni d
68 modifyTVar searchResults $ \s -> foldr Set.insert s rs
69
70search ::
71 (Ord r, Ord ip) =>
72 IterativeSearch ip r -> IO ()
73search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do
74 fix $ \again -> do
75 join $ atomically $ do
76 found <- MM.minView <$> readTVar searchQueued
77 cnt <- readTVar $ searchPendingCount
78 case found of
79 Nothing -> retry
80 Just (ni :-> d, q) -> do
81 informants <- readTVar searchInformant
82 if MM.size informants < searchK
83 && (cnt > 0 || not (MM.null q))
84 || PSQ.prio (fromJust $ MM.findMax informants) > d
85 then do
86 writeTVar searchQueued q
87 modifyTVar searchVisited $ Set.insert (nodeAddr ni)
88 modifyTVar searchPendingCount succ
89 return $ withAsync g (sendQuery s (ni :-> d)) (const again)
90 else do
91 check (cnt == 0)
92 return $ return ()