summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-07-27 00:09:36 -0400
committerjoe <joe@jerkface.net>2017-07-27 00:09:36 -0400
commit0e20eb6683761362ee282e3188fccdab46b02ee4 (patch)
tree05043c1b75ba331ffd7d645b544badcecad6657c
parentaee5037c333abc77174d4867b75b1ef068fbaf1b (diff)
peer search.
-rw-r--r--Kademlia.hs6
-rw-r--r--Mainline.hs65
-rw-r--r--src/Network/BitTorrent/DHT/Search.hs36
3 files changed, 57 insertions, 50 deletions
diff --git a/Kademlia.hs b/Kademlia.hs
index 57afb7fc..531b533b 100644
--- a/Kademlia.hs
+++ b/Kademlia.hs
@@ -266,9 +266,9 @@ forkPollForRefresh interval psq refresh = do
266 seconds -> threadDelay ( seconds * 1000000 ) 266 seconds -> threadDelay ( seconds * 1000000 )
267 again 267 again
268 268
269refreshBucket :: forall nid ni addr. 269refreshBucket :: forall nid tok ni addr.
270 ( Show nid, FiniteBits nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) => 270 ( Show nid, FiniteBits nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) =>
271 Search nid addr ni ni -> TVar (BucketList ni) -> nid -> Int -> IO Int 271 Search nid addr tok ni ni -> TVar (BucketList ni) -> nid -> Int -> IO Int
272refreshBucket sch var nid n = do 272refreshBucket sch var nid n = do
273 tbl <- atomically (readTVar var) 273 tbl <- atomically (readTVar var)
274 let count = bktCount tbl 274 let count = bktCount tbl
@@ -320,7 +320,7 @@ bootstrap ::
320 , Ord nid 320 , Ord nid
321 , Traversable t1 321 , Traversable t1
322 , Traversable t 322 , Traversable t
323 ) => Search nid addr ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () 323 ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
324bootstrap sch var ping ns ns0 = do 324bootstrap sch var ping ns ns0 = do
325 gotPing <- atomically $ newTVar False 325 gotPing <- atomically $ newTVar False
326 326
diff --git a/Mainline.hs b/Mainline.hs
index 30e18a09..7c54a096 100644
--- a/Mainline.hs
+++ b/Mainline.hs
@@ -945,30 +945,38 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do
945 945
946isReadonlyClient client = False -- TODO 946isReadonlyClient client = False -- TODO
947 947
948ping :: MainlineClient -> NodeInfo -> IO Bool 948mainlineSend meth unwrap msg client nid addr = do
949ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr 949 reply <- sendQuery client serializer (msg nid) addr
950 -- sendQuery will return (Just (Left _)) on a parse error. We're going to
951 -- blow it away with the join-either sequence.
952 -- TODO: Do something with parse errors.
953 return $ join $ either (const Nothing) Just <$> reply
950 where 954 where
951 serializer = MethodSerializer 955 serializer = MethodSerializer
952 { methodTimeout = 5 956 { methodTimeout = 5
953 , method = Method "ping" 957 , method = meth
954 , wrapQuery = encodeQueryPayload (Method "ping") (isReadonlyClient client) 958 , wrapQuery = encodeQueryPayload meth (isReadonlyClient client)
955 , unwrapResponse = const True 959 , unwrapResponse = (>>= either (Left . Error GenericError . Char8.pack)
960 (Right . unwrap)
961 . BE.fromBEncode)
962 . rspPayload
956 } 963 }
957 964
958-- searchQuery :: ni -> IO ([ni], [r]) 965ping :: MainlineClient -> NodeInfo -> IO Bool
959getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO ([NodeInfo],[NodeInfo]) 966ping client addr =
960getNodes client nid addr = 967 fromMaybe False
961 fromMaybe ([],[]) <$> sendQuery client serializer (FindNode nid (Just Want_Both)) addr 968 <$> mainlineSend (Method "ping") (\Ping -> True) (const Ping) client () addr
969
970-- searchQuery :: ni -> IO (Maybe [ni], [r], tok))
971getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],()))
972getNodes = mainlineSend (Method "find_node") unwrap $ flip FindNode (Just Want_Both)
962 where 973 where
963 serializer = MethodSerializer 974 unwrap (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6,())
964 { methodTimeout = 5 975
965 , method = Method "find_node" 976getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[PeerAddr],Token))
966 , wrapQuery = encodeQueryPayload (Method "find_node") (isReadonlyClient client) 977getPeers = mainlineSend (Method "get_peers") unwrap $ flip GetPeers (Just Want_Both) . coerce
967 , unwrapResponse = \case 978 where
968 R { rspPayload = Right bval } | Right (NodeFound ns4 ns6) <- BE.fromBEncode bval 979 unwrap (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, tok)
969 -> (ns4++ns6, ns4++ns6)
970 _ -> ([],[])
971 }
972 980
973data TriadSlot = SlotA | SlotB | SlotC 981data TriadSlot = SlotA | SlotB | SlotC
974 deriving (Eq,Ord,Enum,Show,Read) 982 deriving (Eq,Ord,Enum,Show,Read)
@@ -1048,23 +1056,16 @@ delVote triad voter = do
1048 writeTVar (triadSlot slot triad) Nothing 1056 writeTVar (triadSlot slot triad) Nothing
1049 triadCountVotes prior triad 1057 triadCountVotes prior triad
1050 1058
1051nodeSearch client = Search 1059mainlineSearch qry = Search
1052 { searchSpace = mainlineSpace 1060 { searchSpace = mainlineSpace
1053 , searchNodeAddress = nodeIP &&& nodePort 1061 , searchNodeAddress = nodeIP &&& nodePort
1054 , searchQuery = \nid ni -> do 1062 , searchQuery = qry
1055 hPutStrLn stderr $ "findNodes "++show nid++" --> "++show ni
1056 handle (\(SomeException e) -> do
1057 hPutStrLn stderr $ "got "++show e
1058 -- threadDelay 1000000
1059 return ([],[]))
1060 $ do
1061 (xs,y) <- getNodes client nid ni
1062 forM_ xs $ \x -> do
1063 hPutStrLn stderr $ "got "++show x
1064 -- threadDelay 1000000
1065 return (xs,y)
1066 } 1063 }
1067 1064
1065nodeSearch client = mainlineSearch (getNodes client)
1066
1067peerSearch client = mainlineSearch (getPeers client)
1068
1068-- | List of bootstrap nodes maintained by different bittorrent 1069-- | List of bootstrap nodes maintained by different bittorrent
1069-- software authors. 1070-- software authors.
1070bootstrapNodes :: WantIP -> IO [NodeInfo] 1071bootstrapNodes :: WantIP -> IO [NodeInfo]
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs
index 8c441cb0..b263e339 100644
--- a/src/Network/BitTorrent/DHT/Search.hs
+++ b/src/Network/BitTorrent/DHT/Search.hs
@@ -21,7 +21,7 @@ import qualified Data.Set as Set
21import System.IO 21import System.IO
22 22
23import qualified Data.MinMaxPSQ as MM 23import qualified Data.MinMaxPSQ as MM
24 ;import Data.MinMaxPSQ (MinMaxPSQ) 24 ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ')
25import qualified Data.Wrapper.PSQ as PSQ 25import qualified Data.Wrapper.PSQ as PSQ
26 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) 26 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey)
27import Network.Address hiding (NodeId) 27import Network.Address hiding (NodeId)
@@ -34,13 +34,13 @@ import Control.Concurrent.Lifted
34import GHC.Conc (labelThread) 34import GHC.Conc (labelThread)
35#endif 35#endif
36 36
37data Search nid addr ni r = Search 37data Search nid addr tok ni r = Search
38 { searchSpace :: KademliaSpace nid ni 38 { searchSpace :: KademliaSpace nid ni
39 , searchNodeAddress :: ni -> addr 39 , searchNodeAddress :: ni -> addr
40 , searchQuery :: nid -> ni -> IO ([ni], [r]) 40 , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], tok))
41 } 41 }
42 42
43data SearchState nid addr ni r = SearchState 43data SearchState nid addr tok ni r = SearchState
44 {- 44 {-
45 { searchParams :: Search nid addr ni r 45 { searchParams :: Search nid addr ni r
46 46
@@ -60,7 +60,7 @@ data SearchState nid addr ni r = SearchState
60 -- | Nodes scheduled to be queried. 60 -- | Nodes scheduled to be queried.
61 , searchQueued :: TVar (MinMaxPSQ ni nid) 61 , searchQueued :: TVar (MinMaxPSQ ni nid)
62 -- | The nearest K nodes that issued a reply. 62 -- | The nearest K nodes that issued a reply.
63 , searchInformant :: TVar (MinMaxPSQ ni nid) 63 , searchInformant :: TVar (MinMaxPSQ' ni nid tok)
64 -- | This tracks already-queried addresses so we avoid bothering them 64 -- | This tracks already-queried addresses so we avoid bothering them
65 -- again. XXX: We could probably keep only the pending queries in this 65 -- again. XXX: We could probably keep only the pending queries in this
66 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha 66 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
@@ -79,10 +79,10 @@ newSearch :: ( Ord addr
79 -> (r -> STM Bool) -- receives search results. 79 -> (r -> STM Bool) -- receives search results.
80 -> nid -- target of search 80 -> nid -- target of search
81 -} 81 -}
82 Search nid addr ni r 82 Search nid addr tok ni r
83 -> nid 83 -> nid
84 -> [ni] -- Initial nodes to query. 84 -> [ni] -- Initial nodes to query.
85 -> IO (SearchState nid addr ni r) 85 -> IO (SearchState nid addr tok ni r)
86newSearch (Search space nAddr qry) target ns = atomically $ do 86newSearch (Search space nAddr qry) target ns = atomically $ do
87 c <- newTVar 0 87 c <- newTVar 0
88 q <- newTVar $ MM.fromList 88 q <- newTVar $ MM.fromList
@@ -99,25 +99,31 @@ searchAlpha = 8
99searchK :: Int 99searchK :: Int
100searchK = 8 100searchK = 8
101 101
102sendQuery :: forall addr nid ni r. 102sendQuery :: forall addr nid tok ni r.
103 ( Ord addr 103 ( Ord addr
104 , Ord r 104 , Ord r
105 , PSQKey nid 105 , PSQKey nid
106 , PSQKey ni 106 , PSQKey ni
107 , Show nid 107 , Show nid
108 ) => 108 ) =>
109 Search nid addr ni r 109 Search nid addr tok ni r
110 -> nid 110 -> nid
111 -> (r -> STM Bool) 111 -> (r -> STM Bool)
112 -> SearchState nid addr ni r 112 -> SearchState nid addr tok ni r
113 -> Binding ni nid 113 -> Binding ni nid
114 -> IO () 114 -> IO ()
115sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do 115sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do
116 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) 116 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
117 (ns,rs) <- handle (\(SomeException e) -> return ([],[])) 117 -- TODO: Should we really be catching ThreadKilled ?
118 reply <- handle (\(SomeException e) -> return Nothing)
118 (searchQuery searchTarget ni) 119 (searchQuery searchTarget ni)
120 -- (ns,rs)
121 let tok = error "TODO: token"
119 atomically $ do 122 atomically $ do
120 modifyTVar searchPendingCount pred 123 modifyTVar searchPendingCount pred
124 maybe (return ()) go reply
125 where
126 go (ns,rs,tok) = do
121 vs <- readTVar searchVisited 127 vs <- readTVar searchVisited
122 -- We only queue a node if it is not yet visited 128 -- We only queue a node if it is not yet visited
123 let insertFoundNode :: ni 129 let insertFoundNode :: ni
@@ -130,7 +136,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
130 $ kademliaLocation searchSpace n ) 136 $ kademliaLocation searchSpace n )
131 q 137 q
132 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns 138 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns
133 modifyTVar searchInformant $ MM.insertTake searchK ni d 139 modifyTVar searchInformant $ MM.insertTake' searchK ni tok d
134 flip fix rs $ \loop -> \case 140 flip fix rs $ \loop -> \case
135 r:rs' -> do 141 r:rs' -> do
136 wanting <- searchResult r 142 wanting <- searchResult r
@@ -142,7 +148,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
142searchIsFinished :: ( Ord addr 148searchIsFinished :: ( Ord addr
143 , PSQKey nid 149 , PSQKey nid
144 , PSQKey ni 150 , PSQKey ni
145 ) => SearchState nid addr ni r -> STM Bool 151 ) => SearchState nid addr tok ni r -> STM Bool
146searchIsFinished SearchState{ ..} = do 152searchIsFinished SearchState{ ..} = do
147 q <- readTVar searchQueued 153 q <- readTVar searchQueued
148 cnt <- readTVar searchPendingCount 154 cnt <- readTVar searchPendingCount
@@ -153,7 +159,7 @@ searchIsFinished SearchState{ ..} = do
153 && ( PSQ.prio (fromJust $ MM.findMax informants) 159 && ( PSQ.prio (fromJust $ MM.findMax informants)
154 <= PSQ.prio (fromJust $ MM.findMin q)))) 160 <= PSQ.prio (fromJust $ MM.findMin q))))
155 161
156searchCancel :: SearchState nid addr ni r -> STM () 162searchCancel :: SearchState nid addr tok ni r -> STM ()
157searchCancel SearchState{..} = do 163searchCancel SearchState{..} = do
158 writeTVar searchPendingCount 0 164 writeTVar searchPendingCount 0
159 writeTVar searchQueued MM.empty 165 writeTVar searchQueued MM.empty
@@ -164,7 +170,7 @@ search ::
164 , PSQKey nid 170 , PSQKey nid
165 , PSQKey ni 171 , PSQKey ni
166 , Show nid 172 , Show nid
167 ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr ni r) 173 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r)
168search sch@Search{..} buckets target result = do 174search sch@Search{..} buckets target result = do
169 let ns = R.kclosest searchSpace searchK target buckets 175 let ns = R.kclosest searchSpace searchK target buckets
170 st <- newSearch sch target ns 176 st <- newSearch sch target ns