diff options
Diffstat (limited to 'examples/dhtd.hs')
-rw-r--r-- | examples/dhtd.hs | 171 |
1 files changed, 157 insertions, 14 deletions
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 6477fac4..33fd30c9 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -20,9 +20,13 @@ import Control.DeepSeq | |||
20 | import Control.Exception | 20 | import Control.Exception |
21 | import Control.Monad | 21 | import Control.Monad |
22 | import Data.Char | 22 | import Data.Char |
23 | import Data.Hashable | ||
23 | import Data.List | 24 | import Data.List |
24 | import qualified Data.Map as Map | 25 | import qualified Data.Map as Map |
26 | import Data.Maybe | ||
27 | import qualified Data.Set as Set | ||
25 | import Data.Time.Clock | 28 | import Data.Time.Clock |
29 | import GHC.Conc (threadStatus,ThreadStatus(..)) | ||
26 | import GHC.Stats | 30 | import GHC.Stats |
27 | import Network.Socket | 31 | import Network.Socket |
28 | import System.Environment | 32 | import System.Environment |
@@ -53,6 +57,7 @@ import Control.Concurrent.Async.Pool | |||
53 | import System.IO.Error | 57 | import System.IO.Error |
54 | import qualified Data.Serialize as S | 58 | import qualified Data.Serialize as S |
55 | import Network.BitTorrent.DHT.ContactInfo as Peers | 59 | import Network.BitTorrent.DHT.ContactInfo as Peers |
60 | import qualified Data.MinMaxPSQ as MM | ||
56 | 61 | ||
57 | showReport :: [(String,String)] -> String | 62 | showReport :: [(String,String)] -> String |
58 | showReport kvs = do | 63 | showReport kvs = do |
@@ -72,23 +77,36 @@ hPutClient h s = hPutStr h ('.' : marshalForClient s) | |||
72 | hPutClientChunk :: Handle -> String -> IO () | 77 | hPutClientChunk :: Handle -> String -> IO () |
73 | hPutClientChunk h s = hPutStr h (' ' : marshalForClient s) | 78 | hPutClientChunk h s = hPutStr h (' ' : marshalForClient s) |
74 | 79 | ||
75 | data DHTQuery nid ni = forall addr r tok. DHTQuery | 80 | data DHTQuery nid ni = forall addr r tok. Ord addr => DHTQuery |
76 | { qsearch :: Search nid addr tok ni r | 81 | { qsearch :: Search nid addr tok ni r |
77 | , qhandler :: ni -> nid -> IO ([ni], [r], tok) | 82 | , qhandler :: ni -> nid -> IO ([ni], [r], tok) |
78 | , qshowR :: r -> String | 83 | , qshowR :: r -> String |
79 | , qshowTok :: tok -> Maybe String | 84 | , qshowTok :: tok -> Maybe String |
80 | } | 85 | } |
81 | 86 | ||
87 | data DHTSearch nid ni = forall addr tok r. DHTSearch | ||
88 | { searchThread :: ThreadId | ||
89 | , searchState :: SearchState nid addr tok ni r | ||
90 | , searchShowTok :: tok -> Maybe String | ||
91 | , searchResults :: TVar (Set.Set String) | ||
92 | } | ||
93 | |||
82 | data DHT = forall nid ni. ( Show ni | 94 | data DHT = forall nid ni. ( Show ni |
83 | , Read ni | 95 | , Read ni |
84 | , ToJSON ni | 96 | , ToJSON ni |
85 | , FromJSON ni | 97 | , FromJSON ni |
98 | , Ord ni | ||
99 | , Hashable ni | ||
100 | , Show nid | ||
101 | , Ord nid | ||
102 | , Hashable nid | ||
86 | ) => | 103 | ) => |
87 | DHT | 104 | DHT |
88 | { dhtBuckets :: TVar (BucketList ni) | 105 | { dhtBuckets :: TVar (BucketList ni) |
89 | , dhtPing :: ni -> IO Bool | 106 | , dhtPing :: ni -> IO Bool |
90 | , dhtQuery :: Map.Map String (DHTQuery nid ni) | 107 | , dhtQuery :: Map.Map String (DHTQuery nid ni) |
91 | , dhtParseId :: String -> Either String nid | 108 | , dhtParseId :: String -> Either String nid |
109 | , dhtSearches :: TVar (Map.Map (String,nid) (DHTSearch nid ni)) | ||
92 | } | 110 | } |
93 | 111 | ||
94 | nodesFileName :: String -> String | 112 | nodesFileName :: String -> String |
@@ -135,26 +153,102 @@ reportTable bkts = map (show *** show . fst) | |||
135 | $ R.toList | 153 | $ R.toList |
136 | $ bkts | 154 | $ bkts |
137 | 155 | ||
138 | reportResult :: Show ni => | 156 | reportResult :: |
139 | String | 157 | String |
140 | -> (r -> String) | 158 | -> (r -> String) |
141 | -> (tok -> Maybe String) | 159 | -> (tok -> Maybe String) |
160 | -> (ni -> String) | ||
142 | -> Handle | 161 | -> Handle |
143 | -> Either String ([ni],[r],tok) | 162 | -> Either String ([ni],[r],tok) |
144 | -> IO () | 163 | -> IO () |
145 | reportResult meth showR showTok h (Left e) = hPutClient h e | 164 | reportResult meth showR showTok showN h (Left e) = hPutClient h e |
146 | reportResult meth showR showTok h (Right (ns,rs,tok)) = do | 165 | reportResult meth showR showTok showN h (Right (ns,rs,tok)) = do |
147 | hPutClient h $ showReport report | 166 | hPutClient h $ showReport report |
148 | where | 167 | where |
149 | report = intercalate [("","")] [ tok_r , node_r , result_r ] | 168 | report = intercalate [("","")] [ tok_r , node_r , result_r ] |
150 | 169 | ||
151 | tok_r = maybe [] (pure . ("token:",)) $ showTok tok | 170 | tok_r = maybe [] (pure . ("token:",)) $ showTok tok |
152 | 171 | ||
153 | node_r = map ( ("n",) . show ) ns | 172 | node_r = map ( ("n",) . showN ) ns |
154 | 173 | ||
155 | result_r | (meth=="node") = [] | 174 | result_r | (meth=="node") = [] |
156 | | otherwise = map ( (take 1 meth,) . showR ) rs | 175 | | otherwise = map ( (take 1 meth,) . showR ) rs |
157 | 176 | ||
177 | -- example: | ||
178 | -- * 10 peer 141d6c6ee2810f46d28bbe8373d4f454a4122535 | ||
179 | -- - 1 peer 141d6c6ee2810f46d28bbe8373d4f454a4122535 | ||
180 | -- 22 node 141d6c6ee2810f46d28bbe8373d4f454a4122535 | ||
181 | -- | ||
182 | -- key: '*' in progress | ||
183 | -- '-' stopped | ||
184 | -- ' ' finished | ||
185 | showSearches :: ( Show nid | ||
186 | , Ord nid | ||
187 | , Hashable nid | ||
188 | , Ord ni | ||
189 | , Hashable ni | ||
190 | ) => Map.Map (String,nid) (DHTSearch nid ni) -> IO String | ||
191 | showSearches searches = do | ||
192 | tups <- forM (Map.toList searches) $ \((meth,nid),DHTSearch{..}) -> do | ||
193 | (is'fin, cnt) <- atomically $ | ||
194 | (,) <$> searchIsFinished searchState | ||
195 | <*> (Set.size <$> readTVar searchResults) | ||
196 | tstat <- threadStatus searchThread | ||
197 | let stat = case tstat of | ||
198 | _ | is'fin -> ' ' | ||
199 | ThreadFinished -> '-' | ||
200 | ThreadDied -> '-' | ||
201 | _ -> '*' | ||
202 | return (stat,show cnt,meth,show nid) | ||
203 | let cnt'width = maximum $ map (\(_,cnt,_,_)->length cnt) tups | ||
204 | mth'width = maximum $ map (\(_,_,mth,_)->length mth) tups | ||
205 | return $ do -- List monad. | ||
206 | (stat,cnt,meth,nid) <- tups | ||
207 | printf " %c %-*s %-*s %s\n" stat cnt'width cnt mth'width meth nid | ||
208 | |||
209 | forkSearch :: | ||
210 | ( Ord nid | ||
211 | , Hashable nid | ||
212 | , Ord ni | ||
213 | , Hashable ni | ||
214 | , Show nid | ||
215 | ) => | ||
216 | String | ||
217 | -> nid | ||
218 | -> DHTQuery nid ni | ||
219 | -> TVar (Map.Map (String,nid) (DHTSearch nid ni)) | ||
220 | -> TVar (BucketList ni) | ||
221 | -> ThreadId | ||
222 | -> TVar (Maybe (IO ())) | ||
223 | -> STM () | ||
224 | forkSearch method nid DHTQuery{qsearch,qshowTok,qshowR} dhtSearches dhtBuckets tid kvar = do | ||
225 | ns <- R.kclosest (searchSpace qsearch) searchK nid <$> readTVar dhtBuckets | ||
226 | st <- newSearch qsearch nid ns | ||
227 | results <- newTVar Set.empty | ||
228 | let storeResult r = modifyTVar' results (Set.insert (qshowR r)) | ||
229 | >> return True | ||
230 | new = DHTSearch | ||
231 | { searchThread = tid | ||
232 | , searchState = st | ||
233 | , searchShowTok = qshowTok | ||
234 | , searchResults = results | ||
235 | } | ||
236 | modifyTVar' dhtSearches $ Map.insert (method,nid) new | ||
237 | writeTVar kvar $ Just $ searchLoop qsearch nid storeResult st | ||
238 | |||
239 | reportSearchResults meth h DHTSearch{searchShowTok,searchState,searchResults} = do | ||
240 | (ns,rs) <- atomically $ do | ||
241 | mm <- readTVar $ searchInformant searchState | ||
242 | rset <- readTVar searchResults | ||
243 | let ns = map (\(MM.Binding ni tok _) -> (ni,tok)) | ||
244 | $ MM.toList mm | ||
245 | rs = Set.toList rset | ||
246 | return (ns,rs) | ||
247 | let n'width = succ $ maximum $ map (length . show . fst) ns | ||
248 | showN (n,tok) = take n'width (show n ++ repeat ' ') ++ (fromMaybe "" $ searchShowTok tok) | ||
249 | ns' = map showN ns | ||
250 | reportResult meth id (const Nothing) id h (Right (ns',rs,())) | ||
251 | |||
158 | data Session = Session | 252 | data Session = Session |
159 | { netname :: String | 253 | { netname :: String |
160 | , dhts :: Map.Map String DHT | 254 | , dhts :: Map.Map String DHT |
@@ -189,6 +283,7 @@ clientSession s@Session{..} sock cnum h = do | |||
189 | tm <- getCurrentTime | 283 | tm <- getCurrentTime |
190 | let r = map (\PerThread{..} -> (show lbl,show (diffUTCTime tm startTime))) ts | 284 | let r = map (\PerThread{..} -> (show lbl,show (diffUTCTime tm startTime))) ts |
191 | hPutClient h $ showReport r | 285 | hPutClient h $ showReport r |
286 | #endif | ||
192 | ("mem", s) -> cmd0 $ do | 287 | ("mem", s) -> cmd0 $ do |
193 | case s of | 288 | case s of |
194 | "gc" -> do hPutClient h "Performing garbage collection..." | 289 | "gc" -> do hPutClient h "Performing garbage collection..." |
@@ -221,7 +316,6 @@ clientSession s@Session{..} sock cnum h = do | |||
221 | else hPutClient h "Run with +RTS -T to obtain live memory-usage information." | 316 | else hPutClient h "Run with +RTS -T to obtain live memory-usage information." |
222 | _ -> hPutClient h "error." | 317 | _ -> hPutClient h "error." |
223 | 318 | ||
224 | #endif | ||
225 | ("ls", _) | Just DHT{dhtBuckets} <- Map.lookup netname dhts | 319 | ("ls", _) | Just DHT{dhtBuckets} <- Map.lookup netname dhts |
226 | -> cmd0 $ do | 320 | -> cmd0 $ do |
227 | bkts <- atomically $ readTVar dhtBuckets | 321 | bkts <- atomically $ readTVar dhtBuckets |
@@ -252,7 +346,7 @@ clientSession s@Session{..} sock cnum h = do | |||
252 | (goTarget qry) | 346 | (goTarget qry) |
253 | $ dhtParseId nidstr | 347 | $ dhtParseId nidstr |
254 | goTarget DHTQuery{..} nid = | 348 | goTarget DHTQuery{..} nid = |
255 | go nid >>= reportResult method qshowR qshowTok h | 349 | go nid >>= reportResult method qshowR qshowTok show h |
256 | where | 350 | where |
257 | go | null destination = fmap Right . qhandler self | 351 | go | null destination = fmap Right . qhandler self |
258 | | otherwise = case readEither destination of | 352 | | otherwise = case readEither destination of |
@@ -263,6 +357,49 @@ clientSession s@Session{..} sock cnum h = do | |||
263 | goQuery | 357 | goQuery |
264 | $ Map.lookup method dhtQuery | 358 | $ Map.lookup method dhtQuery |
265 | 359 | ||
360 | ("s", s) | Just dht@DHT{..} <- Map.lookup netname dhts | ||
361 | -> cmd0 $ do | ||
362 | let (method,xs) = break isSpace s | ||
363 | (nidstr,ys) = break isSpace $ dropWhile isSpace xs | ||
364 | presentSearches = hPutClient h | ||
365 | =<< showSearches | ||
366 | =<< atomically (readTVar dhtSearches) | ||
367 | goTarget qry nid = do | ||
368 | kvar <- atomically $ newTVar Nothing | ||
369 | -- Forking a thread, but it may ubruptly quit if the following | ||
370 | -- STM action decides not to add a new search. This is so that | ||
371 | -- I can store the ThreadId into new DHTSearch structure. | ||
372 | tid <- fork $ join $ atomically (readTVar kvar >>= maybe retry return) | ||
373 | join $ atomically $ do | ||
374 | schs <- readTVar dhtSearches | ||
375 | case Map.lookup (method,nid) schs of | ||
376 | Nothing -> do forkSearch method nid qry dhtSearches dhtBuckets tid kvar | ||
377 | return $ presentSearches | ||
378 | Just sch -> do writeTVar kvar (Just $ return ()) | ||
379 | return $ reportSearchResults method h sch | ||
380 | goQuery qry = either (hPutClient h . ("Bad search target: "++)) | ||
381 | (goTarget qry) | ||
382 | $ dhtParseId nidstr | ||
383 | if null method then presentSearches | ||
384 | else maybe (hPutClient h ("Unsupported method: "++method)) | ||
385 | goQuery | ||
386 | $ Map.lookup method dhtQuery | ||
387 | |||
388 | ("x", s) | Just DHT{..} <- Map.lookup netname dhts | ||
389 | -> cmd0 $ do | ||
390 | let (method,xs) = break isSpace s | ||
391 | (nidstr,ys) = break isSpace $ dropWhile isSpace xs | ||
392 | go nid = join $ atomically $ do | ||
393 | schs <- readTVar dhtSearches | ||
394 | case Map.lookup (method,nid) schs of | ||
395 | Nothing -> return $ hPutClient h "No match." | ||
396 | Just DHTSearch{searchThread} -> do | ||
397 | modifyTVar' dhtSearches (Map.delete (method,nid)) | ||
398 | return $ do | ||
399 | killThread searchThread | ||
400 | hPutClient h "Removed search." | ||
401 | either (hPutClient h . ("Bad search target: "++)) go $ dhtParseId nidstr | ||
402 | |||
266 | ("save", _) | Just dht <- Map.lookup netname dhts | 403 | ("save", _) | Just dht <- Map.lookup netname dhts |
267 | -> cmd0 $ do | 404 | -> cmd0 $ do |
268 | saveNodes netname dht | 405 | saveNodes netname dht |
@@ -298,8 +435,11 @@ clientSession s@Session{..} sock cnum h = do | |||
298 | readExternals :: [TVar (BucketList Mainline.NodeInfo)] -> IO [SockAddr] | 435 | readExternals :: [TVar (BucketList Mainline.NodeInfo)] -> IO [SockAddr] |
299 | readExternals vars = do | 436 | readExternals vars = do |
300 | as <- atomically $ mapM (fmap (Mainline.nodeAddr . selfNode) . readTVar) vars | 437 | as <- atomically $ mapM (fmap (Mainline.nodeAddr . selfNode) . readTVar) vars |
438 | let unspecified (SockAddrInet _ 0) = True | ||
439 | unspecified (SockAddrInet6 _ _ (0,0,0,0) _) = True | ||
440 | unspecified _ = False | ||
301 | -- TODO: Filter to only global addresses? | 441 | -- TODO: Filter to only global addresses? |
302 | return as | 442 | return $ filter (not . unspecified) as |
303 | 443 | ||
304 | defaultPort = "6881" | 444 | defaultPort = "6881" |
305 | 445 | ||
@@ -324,6 +464,8 @@ main = do | |||
324 | tox <- return $ error "TODO: Tox.newClient" | 464 | tox <- return $ error "TODO: Tox.newClient" |
325 | quitTox <- return $ return () -- TODO: forkListener tox | 465 | quitTox <- return $ return () -- TODO: forkListener tox |
326 | 466 | ||
467 | mainlineSearches <- atomically $ newTVar Map.empty | ||
468 | |||
327 | let mainlineDHT bkts = DHT | 469 | let mainlineDHT bkts = DHT |
328 | { dhtBuckets = bkts btR | 470 | { dhtBuckets = bkts btR |
329 | , dhtPing = Mainline.ping bt | 471 | , dhtPing = Mainline.ping bt |
@@ -342,7 +484,8 @@ main = do | |||
342 | show | 484 | show |
343 | (Just . show)) | 485 | (Just . show)) |
344 | ] | 486 | ] |
345 | , dhtParseId = readEither :: String -> Either String Mainline.NodeId | 487 | , dhtParseId = readEither :: String -> Either String Mainline.NodeId |
488 | , dhtSearches = mainlineSearches | ||
346 | } | 489 | } |
347 | dhts = Map.fromList | 490 | dhts = Map.fromList |
348 | [ ("bt4", mainlineDHT Mainline.routing4) | 491 | [ ("bt4", mainlineDHT Mainline.routing4) |