diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-01-12 13:25:29 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-01-12 13:25:29 +0400 |
commit | e3d3fb375ca01aa844e86b8a4c5ca507919518d3 (patch) | |
tree | 75b923d8298577d6328ff6985399ee3625132ca8 | |
parent | b8f976b3df0af5d27f926022d7c7624609fc1072 (diff) |
Fetch infodict concurrently
-rw-r--r-- | bittorrent.cabal | 2 | ||||
-rw-r--r-- | examples/MkTorrent.hs | 50 |
2 files changed, 38 insertions, 14 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal index 912af6a7..b661cf72 100644 --- a/bittorrent.cabal +++ b/bittorrent.cabal | |||
@@ -105,6 +105,7 @@ library | |||
105 | 105 | ||
106 | -- Concurrency | 106 | -- Concurrency |
107 | , SafeSemaphore | 107 | , SafeSemaphore |
108 | , lifted-async | ||
108 | -- , BoundedChan >= 1.0.1.0 | 109 | -- , BoundedChan >= 1.0.1.0 |
109 | , stm >= 2.4 | 110 | , stm >= 2.4 |
110 | 111 | ||
@@ -259,6 +260,7 @@ executable mktorrent | |||
259 | , mtl | 260 | , mtl |
260 | , conduit | 261 | , conduit |
261 | , lens | 262 | , lens |
263 | , async | ||
262 | , parallel-io | 264 | , parallel-io |
263 | 265 | ||
264 | , network | 266 | , network |
diff --git a/examples/MkTorrent.hs b/examples/MkTorrent.hs index 124bb996..a77b908d 100644 --- a/examples/MkTorrent.hs +++ b/examples/MkTorrent.hs | |||
@@ -6,6 +6,7 @@ module Main (main) where | |||
6 | 6 | ||
7 | import Prelude as P | 7 | import Prelude as P |
8 | import Control.Concurrent | 8 | import Control.Concurrent |
9 | import Control.Concurrent.Async | ||
9 | import Control.Concurrent.ParallelIO | 10 | import Control.Concurrent.ParallelIO |
10 | import Control.Exception | 11 | import Control.Exception |
11 | import Control.Lens hiding (argument, (<.>)) | 12 | import Control.Lens hiding (argument, (<.>)) |
@@ -20,6 +21,7 @@ import Data.Text as T | |||
20 | import qualified Data.Text.IO as T | 21 | import qualified Data.Text.IO as T |
21 | import Data.Text.Read as T | 22 | import Data.Text.Read as T |
22 | import Data.Version | 23 | import Data.Version |
24 | import Network | ||
23 | import Network.URI | 25 | import Network.URI |
24 | import Options.Applicative | 26 | import Options.Applicative |
25 | import System.Exit | 27 | import System.Exit |
@@ -205,8 +207,10 @@ checkContent s pinfo = do | |||
205 | 207 | ||
206 | checkTorrent :: CheckOpts -> IO () | 208 | checkTorrent :: CheckOpts -> IO () |
207 | checkTorrent CheckOpts {..} = do | 209 | checkTorrent CheckOpts {..} = do |
210 | infoM "check" "openning torrent file..." | ||
208 | InfoDict {..} <- tInfoDict <$> fromFile checkTorrentPath | 211 | InfoDict {..} <- tInfoDict <$> fromFile checkTorrentPath |
209 | let layout = flatLayout checkContentPath idLayoutInfo | 212 | let layout = flatLayout checkContentPath idLayoutInfo |
213 | infoM "check" "mapping content files..." | ||
210 | withStorage ReadOnly (piPieceLength idPieceInfo) layout $ \ s -> do | 214 | withStorage ReadOnly (piPieceLength idPieceInfo) layout $ \ s -> do |
211 | infoM "check" "files mapped" | 215 | infoM "check" "files mapped" |
212 | checkContent s idPieceInfo | 216 | checkContent s idPieceInfo |
@@ -322,17 +326,20 @@ putTorrent opts @ ShowOpts {..} = do | |||
322 | 326 | ||
323 | data GetOpts = GetOpts | 327 | data GetOpts = GetOpts |
324 | { topic :: InfoHash | 328 | { topic :: InfoHash |
325 | , thisNode :: NodeAddr IPv4 | 329 | , servPort :: PortNumber |
326 | , bootNode :: NodeAddr IPv4 | 330 | , bootNode :: NodeAddr IPv4 |
327 | , buckets :: Int | 331 | , buckets :: Int |
328 | } deriving Show | 332 | } deriving Show |
329 | 333 | ||
334 | instance Read PortNumber where | ||
335 | readsPrec i s = [ (toEnum a, t) | (a, t) <- readsPrec i s] | ||
336 | |||
330 | paramsParser :: Parser GetOpts | 337 | paramsParser :: Parser GetOpts |
331 | paramsParser = GetOpts | 338 | paramsParser = GetOpts |
332 | <$> option (long "infohash" <> short 'i' | 339 | <$> argument readMaybe |
333 | <> metavar "SHA1" <> help "infohash of torrent file") | 340 | (metavar "SHA1" <> help "infohash of torrent file") |
334 | <*> option (long "port" <> short 'p' | 341 | <*> option (long "port" <> short 'p' |
335 | <> value def <> showDefault | 342 | <> value 7000 <> showDefault |
336 | <> metavar "NUM" <> help "port number to bind" | 343 | <> metavar "NUM" <> help "port number to bind" |
337 | ) | 344 | ) |
338 | <*> option (long "boot" <> short 'b' | 345 | <*> option (long "boot" <> short 'b' |
@@ -350,8 +357,8 @@ getInfo = info (helper <*> paramsParser) | |||
350 | <> header "get torrent file by infohash" | 357 | <> header "get torrent file by infohash" |
351 | ) | 358 | ) |
352 | 359 | ||
353 | exchangeTorrent :: PeerAddr IP -> InfoHash -> IO InfoDict | 360 | exchangeTorrent :: InfoHash -> PeerAddr IP -> IO InfoDict |
354 | exchangeTorrent addr ih = do | 361 | exchangeTorrent ih addr = do |
355 | pid <- genPeerId | 362 | pid <- genPeerId |
356 | var <- newEmptyMVar | 363 | var <- newEmptyMVar |
357 | let hs = Handshake def (toCaps [ExtExtended]) ih pid | 364 | let hs = Handshake def (toCaps [ExtExtended]) ih pid |
@@ -360,17 +367,32 @@ exchangeTorrent addr ih = do | |||
360 | liftIO $ putMVar var infodict | 367 | liftIO $ putMVar var infodict |
361 | takeMVar var | 368 | takeMVar var |
362 | 369 | ||
370 | exchangeConc :: InfoHash -> [PeerAddr IP] -> IO (Maybe InfoDict) | ||
371 | exchangeConc ih peers = do | ||
372 | workers <- forM peers $ async . exchangeTorrent ih | ||
373 | (_, result) <- waitAnyCatchCancel workers | ||
374 | return $ either (const Nothing) Just result | ||
375 | |||
376 | sinkInfoDict :: InfoHash -> Sink [PeerAddr IPv4] (DHT ip) InfoDict | ||
377 | sinkInfoDict ih = do | ||
378 | m <- await | ||
379 | case m of | ||
380 | Nothing -> liftIO $ throwIO $ userError "impossible: end of peer stream" | ||
381 | Just peers -> do | ||
382 | minfodict <- liftIO $ exchangeConc ih (fmap IPv4 <$> peers) | ||
383 | maybe (sinkInfoDict ih) return minfodict | ||
384 | |||
385 | -- TODO add tNodes, tCreated, etc? | ||
363 | getTorrent :: GetOpts -> IO () | 386 | getTorrent :: GetOpts -> IO () |
364 | getTorrent GetOpts {..} = do | 387 | getTorrent GetOpts {..} = do |
365 | dht (def { optBucketCount = buckets }) thisNode $ do | 388 | infoM "get" "starting..." |
389 | dht (def { optBucketCount = buckets }) (NodeAddr "0.0.0.0" servPort) $ do | ||
366 | bootstrap [bootNode] | 390 | bootstrap [bootNode] |
367 | DHT.lookup topic $$ C.mapM_ $ \ peers -> do | 391 | liftIO $ infoM "get" "searching for peers..." |
368 | liftIO $ forM_ peers $ \ peer -> do | 392 | infodict <- DHT.lookup topic $$ sinkInfoDict topic |
369 | infodict <- exchangeTorrent (IPv4 <$> peer) topic | 393 | liftIO $ infoM "get" "saving torrent file..." |
370 | -- TODO add tNodes, tCreated, etc? | 394 | liftIO $ toFile (show topic <.> torrentExt) $ nullTorrent infodict |
371 | let torrent = nullTorrent infodict | 395 | infoM "get" "done" |
372 | toFile (show topic <.> torrentExt) torrent | ||
373 | exitSuccess | ||
374 | 396 | ||
375 | {----------------------------------------------------------------------- | 397 | {----------------------------------------------------------------------- |
376 | -- Command | 398 | -- Command |