From 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sat, 28 Sep 2019 13:43:29 -0400 Subject: Factor out some new libraries word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search --- dht/bittorrent/src/Network/BitTorrent.hs | 61 + dht/bittorrent/src/Network/BitTorrent/Client.hs | 195 +++ .../src/Network/BitTorrent/Client/Handle.hs | 188 +++ .../src/Network/BitTorrent/Client/Types.hs | 163 +++ dht/bittorrent/src/Network/BitTorrent/Exchange.hs | 35 + .../src/Network/BitTorrent/Exchange/Bitfield.hs | 405 +++++++ .../src/Network/BitTorrent/Exchange/Block.hs | 369 ++++++ .../src/Network/BitTorrent/Exchange/Connection.hs | 1012 ++++++++++++++++ .../src/Network/BitTorrent/Exchange/Download.hs | 296 +++++ .../src/Network/BitTorrent/Exchange/Manager.hs | 62 + .../src/Network/BitTorrent/Exchange/Message.hs | 1237 ++++++++++++++++++++ .../src/Network/BitTorrent/Exchange/Session.hs | 586 ++++++++++ .../src/Network/BitTorrent/Internal/Cache.hs | 169 +++ .../src/Network/BitTorrent/Internal/Progress.hs | 154 +++ .../src/Network/BitTorrent/Internal/Types.hs | 10 + dht/bittorrent/src/Network/BitTorrent/Readme.md | 10 + dht/bittorrent/src/Network/BitTorrent/Tracker.hs | 51 + .../src/Network/BitTorrent/Tracker/List.hs | 197 ++++ .../src/Network/BitTorrent/Tracker/Message.hs | 925 +++++++++++++++ .../src/Network/BitTorrent/Tracker/RPC.hs | 175 +++ .../src/Network/BitTorrent/Tracker/RPC/HTTP.hs | 191 +++ .../src/Network/BitTorrent/Tracker/RPC/UDP.hs | 454 +++++++ .../src/Network/BitTorrent/Tracker/Session.hs | 306 +++++ 23 files changed, 7251 insertions(+) create mode 100644 dht/bittorrent/src/Network/BitTorrent.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Client.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Client/Handle.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Client/Types.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Block.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Message.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Session.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Internal/Cache.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Internal/Progress.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Internal/Types.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Readme.md create mode 100644 dht/bittorrent/src/Network/BitTorrent/Tracker.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Tracker/List.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Tracker/Message.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Tracker/RPC.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Tracker/RPC/HTTP.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Tracker/Session.hs (limited to 'dht/bittorrent/src/Network') diff --git a/dht/bittorrent/src/Network/BitTorrent.hs b/dht/bittorrent/src/Network/BitTorrent.hs new file mode 100644 index 00000000..91a58887 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent.hs @@ -0,0 +1,61 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +{-# LANGUAGE RecordWildCards #-} +module Network.BitTorrent + ( -- * Client + Options (..) + + -- ** Session + , Client + , clientPeerId + , clientListenerPort + , allowedExtensions + + -- ** Initialization + , LogFun + , newClient + , closeClient + , withClient + + -- ** Monadic + , MonadBitTorrent (..) + , BitTorrent + , runBitTorrent + , getClient + , simpleClient + + -- * Torrent + -- ** Source + , InfoHash + , Magnet + , InfoDict + , Torrent + + -- ** Handle + , Handle + , handleTopic + , handleTrackers + , handleExchange + + , TorrentSource(openHandle) + , closeHandle + , getHandle + , getIndex + + -- ** Control + , start + , pause + , stop + + -- * Events + , EventSource (..) + ) where + +import Data.Torrent +import Network.BitTorrent.Client +import Network.BitTorrent.Internal.Types \ No newline at end of file diff --git a/dht/bittorrent/src/Network/BitTorrent/Client.hs b/dht/bittorrent/src/Network/BitTorrent/Client.hs new file mode 100644 index 00000000..c84290dd --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Client.hs @@ -0,0 +1,195 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE TypeSynonymInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE TemplateHaskell #-} +module Network.BitTorrent.Client + ( -- * Options + Options (..) + + -- * Client session + , Client + + -- ** Session data + , clientPeerId + , clientListenerPort + , allowedExtensions + + -- ** Session initialization + , LogFun + , newClient + , closeClient + , withClient + , simpleClient + + -- * BitTorrent monad + , MonadBitTorrent (..) + , BitTorrent + , runBitTorrent + , getClient + + -- * Handle + , Handle + , handleTopic + , handleTrackers + , handleExchange + + -- ** Construction + , TorrentSource (..) + , closeHandle + + -- ** Query + , getHandle + , getIndex + + -- ** Management + , start + , pause + , stop + ) where + +import Control.Applicative +import Control.Exception +import Control.Concurrent +import Control.Concurrent.Chan.Split as CS +import Control.Monad.Logger +import Control.Monad.Trans +import Control.Monad.Trans.Resource + +import Data.Default +import Data.HashMap.Strict as HM +import Data.Text +import Network + +import Data.Torrent +import Network.Address +import Network.BitTorrent.Client.Types +import Network.BitTorrent.Client.Handle +import Network.BitTorrent.DHT as DHT hiding (Options) +import Network.BitTorrent.Tracker as Tracker hiding (Options) +import Network.BitTorrent.Exchange as Exchange hiding (Options) +import qualified Network.BitTorrent.Exchange as Exchange (Options(..)) + + +data Options = Options + { optFingerprint :: Fingerprint + , optName :: Text + , optPort :: PortNumber + , optExtensions :: [Extension] + , optNodeAddr :: NodeAddr IPv4 + , optBootNode :: Maybe (NodeAddr IPv4) + } + +instance Default Options where + def = Options + { optFingerprint = def + , optName = "hs-bittorrent" + , optPort = 6882 + , optExtensions = [] + , optNodeAddr = "0.0.0.0:6882" + , optBootNode = Nothing + } + +exchangeOptions :: PeerId -> Options -> Exchange.Options +exchangeOptions pid Options {..} = Exchange.Options + { optPeerAddr = PeerAddr (Just pid) (peerHost def) optPort + , optBacklog = optBacklog def + } + +connHandler :: MVar (HashMap InfoHash Handle) -> Exchange.Handler +connHandler tmap ih = do + m <- readMVar tmap + case HM.lookup ih m of + Nothing -> error "torrent not found" + Just (Handle {..}) -> return handleExchange + +initClient :: Options -> LogFun -> ResIO Client +initClient opts @ Options {..} logFun = do + pid <- liftIO genPeerId + tmap <- liftIO $ newMVar HM.empty + + let peerInfo = PeerInfo pid Nothing optPort + let mkTracker = Tracker.newManager def peerInfo + (_, tmgr) <- allocate mkTracker Tracker.closeManager + + let mkEx = Exchange.newManager (exchangeOptions pid opts) (connHandler tmap) + (_, emgr) <- allocate mkEx Exchange.closeManager + + let mkNode = DHT.newNode defaultHandlers def optNodeAddr logFun Nothing + (_, node) <- allocate mkNode DHT.closeNode + + resourceMap <- getInternalState + eventStream <- liftIO newSendPort + + return Client + { clientPeerId = pid + , clientListenerPort = optPort + , allowedExtensions = toCaps optExtensions + , clientResources = resourceMap + , trackerManager = tmgr + , exchangeManager = emgr + , clientNode = node + , clientTorrents = tmap + , clientLogger = logFun + , clientEvents = eventStream + } + +newClient :: Options -> LogFun -> IO Client +newClient opts logFun = do + s <- createInternalState + runInternalState (initClient opts logFun) s + `onException` closeInternalState s + +closeClient :: Client -> IO () +closeClient Client {..} = closeInternalState clientResources + +withClient :: Options -> LogFun -> (Client -> IO a) -> IO a +withClient opts lf action = bracket (newClient opts lf) closeClient action + +-- do not perform IO in 'initClient', do it in the 'boot' +--boot :: BitTorrent () +--boot = do +-- Options {..} <- asks options +-- liftDHT $ bootstrap (maybeToList optBootNode) + +-- | Run bittorrent client with default options and log to @stderr@. +-- +-- For testing purposes only. +-- +simpleClient :: BitTorrent () -> IO () +simpleClient m = do + runStderrLoggingT $ LoggingT $ \ logger -> do + withClient def logger (`runBitTorrent` m) + +{----------------------------------------------------------------------- +-- Torrent identifiers +-----------------------------------------------------------------------} + +class TorrentSource s where + openHandle :: FilePath -> s -> BitTorrent Handle + +instance TorrentSource InfoHash where + openHandle path ih = openMagnet path (nullMagnet ih) + {-# INLINE openHandle #-} + +instance TorrentSource Magnet where + openHandle = openMagnet + {-# INLINE openHandle #-} + +instance TorrentSource InfoDict where + openHandle path dict = openTorrent path (nullTorrent dict) + {-# INLINE openHandle #-} + +instance TorrentSource Torrent where + openHandle = openTorrent + {-# INLINE openHandle #-} + +instance TorrentSource FilePath where + openHandle contentDir torrentPath = do + t <- liftIO $ fromFile torrentPath + openTorrent contentDir t + {-# INLINE openHandle #-} + +getIndex :: BitTorrent [Handle] +getIndex = do + Client {..} <- getClient + elems <$> liftIO (readMVar clientTorrents) diff --git a/dht/bittorrent/src/Network/BitTorrent/Client/Handle.hs b/dht/bittorrent/src/Network/BitTorrent/Client/Handle.hs new file mode 100644 index 00000000..66baac48 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Client/Handle.hs @@ -0,0 +1,188 @@ +module Network.BitTorrent.Client.Handle + ( -- * Handle + Handle + + -- * Initialization + , openTorrent + , openMagnet + , closeHandle + + -- * Control + , start + , pause + , stop + + -- * Query + , getHandle + , getStatus + ) where + +import Control.Concurrent.Chan.Split +import Control.Concurrent.Lifted as L +import Control.Monad +import Control.Monad.Trans +import Data.Default +import Data.List as L +import Data.HashMap.Strict as HM + +import Data.Torrent +import Network.BitTorrent.Client.Types as Types +import Network.BitTorrent.DHT as DHT +import Network.BitTorrent.Exchange as Exchange +import Network.BitTorrent.Tracker as Tracker + +{----------------------------------------------------------------------- +-- Safe handle set manupulation +-----------------------------------------------------------------------} + +allocHandle :: InfoHash -> BitTorrent Handle -> BitTorrent Handle +allocHandle ih m = do + Client {..} <- getClient + + (h, added) <- modifyMVar clientTorrents $ \ handles -> do + case HM.lookup ih handles of + Just h -> return (handles, (h, False)) + Nothing -> do + h <- m + return (HM.insert ih h handles, (h, True)) + + when added $ do + liftIO $ send clientEvents (TorrentAdded ih) + + return h + +freeHandle :: InfoHash -> BitTorrent () -> BitTorrent () +freeHandle ih finalizer = do + Client {..} <- getClient + + modifyMVar_ clientTorrents $ \ handles -> do + case HM.lookup ih handles of + Nothing -> return handles + Just _ -> do + finalizer + return (HM.delete ih handles) + +lookupHandle :: InfoHash -> BitTorrent (Maybe Handle) +lookupHandle ih = do + Client {..} <- getClient + handles <- readMVar clientTorrents + return (HM.lookup ih handles) + +{----------------------------------------------------------------------- +-- Initialization +-----------------------------------------------------------------------} + +newExchangeSession :: FilePath -> Either InfoHash InfoDict -> BitTorrent Exchange.Session +newExchangeSession rootPath source = do + c @ Client {..} <- getClient + liftIO $ Exchange.newSession clientLogger (externalAddr c) rootPath source + +-- | Open a torrent in 'stop'ed state. Use 'nullTorrent' to open +-- handle from 'InfoDict'. This operation do not block. +openTorrent :: FilePath -> Torrent -> BitTorrent Handle +openTorrent rootPath t @ Torrent {..} = do + let ih = idInfoHash tInfoDict + allocHandle ih $ do + statusVar <- newMVar Types.Stopped + tses <- liftIO $ Tracker.newSession ih (trackerList t) + eses <- newExchangeSession rootPath (Right tInfoDict) + eventStream <- liftIO newSendPort + return $ Handle + { handleTopic = ih + , handlePrivate = idPrivate tInfoDict + , handleStatus = statusVar + , handleTrackers = tses + , handleExchange = eses + , handleEvents = eventStream + } + +-- | Use 'nullMagnet' to open handle from 'InfoHash'. +openMagnet :: FilePath -> Magnet -> BitTorrent Handle +openMagnet rootPath Magnet {..} = do + allocHandle exactTopic $ do + statusVar <- newMVar Types.Stopped + tses <- liftIO $ Tracker.newSession exactTopic def + eses <- newExchangeSession rootPath (Left exactTopic) + eventStream <- liftIO newSendPort + return $ Handle + { handleTopic = exactTopic + , handlePrivate = False + , handleStatus = statusVar + , handleTrackers = tses + , handleExchange = eses + , handleEvents = eventStream + } + +-- | Stop torrent and destroy all sessions. You don't need to close +-- handles at application exit, all handles will be automatically +-- closed at 'Network.BitTorrent.Client.closeClient'. This operation +-- may block. +closeHandle :: Handle -> BitTorrent () +closeHandle h @ Handle {..} = do + freeHandle handleTopic $ do + Client {..} <- getClient + stop h + liftIO $ Exchange.closeSession handleExchange + liftIO $ Tracker.closeSession trackerManager handleTrackers + +{----------------------------------------------------------------------- +-- Control +-----------------------------------------------------------------------} + +modifyStatus :: HandleStatus -> Handle -> (HandleStatus -> BitTorrent ()) -> BitTorrent () +modifyStatus targetStatus Handle {..} targetAction = do + modifyMVar_ handleStatus $ \ actualStatus -> do + unless (actualStatus == targetStatus) $ do + targetAction actualStatus + return targetStatus + liftIO $ send handleEvents (StatusChanged targetStatus) + +-- | Start downloading, uploading and announcing this torrent. +-- +-- This operation is blocking, use +-- 'Control.Concurrent.Async.Lifted.async' if needed. +start :: Handle -> BitTorrent () +start h @ Handle {..} = do + modifyStatus Types.Running h $ \ status -> do + case status of + Types.Running -> return () + Types.Stopped -> do + Client {..} <- getClient + liftIO $ Tracker.notify trackerManager handleTrackers Tracker.Started + unless handlePrivate $ do + liftDHT $ DHT.insert handleTopic (error "start") + liftIO $ do + peers <- askPeers trackerManager handleTrackers + print $ "got: " ++ show (L.length peers) ++ " peers" + forM_ peers $ \ peer -> do + Exchange.connect peer handleExchange + +-- | Stop downloading this torrent. +pause :: Handle -> BitTorrent () +pause _ = return () + +-- | Stop downloading, uploading and announcing this torrent. +stop :: Handle -> BitTorrent () +stop h @ Handle {..} = do + modifyStatus Types.Stopped h $ \ status -> do + case status of + Types.Stopped -> return () + Types.Running -> do + Client {..} <- getClient + unless handlePrivate $ do + liftDHT $ DHT.delete handleTopic (error "stop") + liftIO $ Tracker.notify trackerManager handleTrackers Tracker.Stopped + +{----------------------------------------------------------------------- +-- Query +-----------------------------------------------------------------------} + +getHandle :: InfoHash -> BitTorrent Handle +getHandle ih = do + mhandle <- lookupHandle ih + case mhandle of + Nothing -> error "should we throw some exception?" + Just h -> return h + +getStatus :: Handle -> IO HandleStatus +getStatus Handle {..} = readMVar handleStatus diff --git a/dht/bittorrent/src/Network/BitTorrent/Client/Types.hs b/dht/bittorrent/src/Network/BitTorrent/Client/Types.hs new file mode 100644 index 00000000..e2ad858f --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Client/Types.hs @@ -0,0 +1,163 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +module Network.BitTorrent.Client.Types + ( -- * Core types + HandleStatus (..) + , Handle (..) + , Client (..) + , externalAddr + + -- * Monad BitTorrent + , BitTorrent (..) + , runBitTorrent + , getClient + + , MonadBitTorrent (..) + + -- * Events + , Types.Event (..) + ) where + +import Control.Applicative +import Control.Concurrent +import Control.Concurrent.Chan.Split as CS +import Control.Monad.Base +import Control.Monad.Logger +import Control.Monad.Reader +import Control.Monad.Trans.Control +import Control.Monad.Trans.Resource +import Data.Function +import Data.HashMap.Strict as HM +import Data.Ord +import Network +import System.Log.FastLogger + +import Data.Torrent +import Network.Address +import Network.BitTorrent.Internal.Types as Types +import Network.BitTorrent.DHT as DHT +import Network.BitTorrent.Exchange as Exchange +import Network.BitTorrent.Tracker as Tracker hiding (Event) + +data HandleStatus + = Running + | Stopped + deriving (Show, Eq) + +data Handle = Handle + { handleTopic :: !InfoHash + , handlePrivate :: !Bool + + , handleStatus :: !(MVar HandleStatus) + , handleTrackers :: !Tracker.Session + , handleExchange :: !Exchange.Session + , handleEvents :: !(SendPort (Event Handle)) + } + +instance EventSource Handle where + data Event Handle = StatusChanged HandleStatus + listen Handle {..} = CS.listen undefined + +data Client = Client + { clientPeerId :: !PeerId + , clientListenerPort :: !PortNumber + , allowedExtensions :: !Caps + , clientResources :: !InternalState + , trackerManager :: !Tracker.Manager + , exchangeManager :: !Exchange.Manager + , clientNode :: !(Node IPv4) + , clientTorrents :: !(MVar (HashMap InfoHash Handle)) + , clientLogger :: !LogFun + , clientEvents :: !(SendPort (Event Client)) + } + +instance Eq Client where + (==) = (==) `on` clientPeerId + +instance Ord Client where + compare = comparing clientPeerId + +instance EventSource Client where + data Event Client = TorrentAdded InfoHash + listen Client {..} = CS.listen clientEvents + +-- | External IP address of a host running a bittorrent client +-- software may be used to acknowledge remote peer the host connected +-- to. See 'Network.BitTorrent.Exchange.Message.ExtendedHandshake'. +externalAddr :: Client -> PeerAddr (Maybe IP) +externalAddr Client {..} = PeerAddr + { peerId = Just clientPeerId + , peerHost = Nothing -- TODO return external IP address, if known + , peerPort = clientListenerPort + } + +{----------------------------------------------------------------------- +-- BitTorrent monad +-----------------------------------------------------------------------} + +newtype BitTorrent a = BitTorrent + { unBitTorrent :: ReaderT Client IO a + } deriving ( Functor, Applicative, Monad + , MonadIO, MonadThrow, MonadBase IO + ) + +class MonadBitTorrent m where + liftBT :: BitTorrent a -> m a + +#if MIN_VERSION_monad_control(1,0,0) +newtype BTStM a = BTStM { unBTSt :: StM (ReaderT Client IO) a } + +instance MonadBaseControl IO BitTorrent where + type StM BitTorrent a = BTStM a + liftBaseWith cc = BitTorrent $ liftBaseWith $ \ cc' -> + cc $ \ (BitTorrent m) -> BTStM <$> cc' m + {-# INLINE liftBaseWith #-} + + restoreM = BitTorrent . restoreM . unBTSt + {-# INLINE restoreM #-} +#else +instance MonadBaseControl IO BitTorrent where + newtype StM BitTorrent a = StM { unSt :: StM (ReaderT Client IO) a } + liftBaseWith cc = BitTorrent $ liftBaseWith $ \ cc' -> + cc $ \ (BitTorrent m) -> StM <$> cc' m + {-# INLINE liftBaseWith #-} + + restoreM = BitTorrent . restoreM . unSt + {-# INLINE restoreM #-} +#endif + +-- | NOP. +instance MonadBitTorrent BitTorrent where + liftBT = id + +instance MonadTrans t => MonadBitTorrent (t BitTorrent) where + liftBT = lift + +-- | Registered but not closed manually resources will be +-- automatically closed at 'Network.BitTorrent.Client.closeClient' +instance MonadResource BitTorrent where + liftResourceT m = BitTorrent $ do + s <- asks clientResources + liftIO $ runInternalState m s + +-- | Run DHT operation, only if the client node is running. +instance MonadDHT BitTorrent where + liftDHT action = BitTorrent $ do + node <- asks clientNode + liftIO $ runDHT node action + +instance MonadLogger BitTorrent where + monadLoggerLog loc src lvl msg = BitTorrent $ do + logger <- asks clientLogger + liftIO $ logger loc src lvl (toLogStr msg) + +runBitTorrent :: Client -> BitTorrent a -> IO a +runBitTorrent client action = runReaderT (unBitTorrent action) client +{-# INLINE runBitTorrent #-} + +getClient :: BitTorrent Client +getClient = BitTorrent ask +{-# INLINE getClient #-} diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange.hs new file mode 100644 index 00000000..143bf090 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange.hs @@ -0,0 +1,35 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +module Network.BitTorrent.Exchange + ( -- * Manager + Options (..) + , Manager + , Handler + , newManager + , closeManager + + -- * Session + , Caps + , Extension + , toCaps + , Session + , newSession + , closeSession + + -- * Query + , waitMetadata + , takeMetadata + + -- * Connections + , connect + , connectSink + ) where + +import Network.BitTorrent.Exchange.Manager +import Network.BitTorrent.Exchange.Message +import Network.BitTorrent.Exchange.Session diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs new file mode 100644 index 00000000..1be9f970 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs @@ -0,0 +1,405 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- This modules provides all necessary machinery to work with +-- bitfields. Bitfields are used to keep track indices of complete +-- pieces either this peer have or remote peer have. +-- +-- There are also commonly used piece selection algorithms +-- which used to find out which one next piece to download. +-- Selectors considered to be used in the following order: +-- +-- * 'randomFirst' - at the start of download. +-- +-- * 'rarestFirst' - performed to avoid situation when +-- rarest piece is unaccessible. +-- +-- * 'endGame' - performed after a peer has requested all +-- the subpieces of the content. +-- +-- Note that BitTorrent protocol recommend (TODO link?) the +-- 'strictFirst' priority policy for /subpiece/ or /blocks/ +-- selection. +-- +{-# LANGUAGE CPP #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE RecordWildCards #-} +module Network.BitTorrent.Exchange.Bitfield + ( -- * Bitfield + PieceIx + , PieceCount + , Bitfield + + -- * Construction + , haveAll + , haveNone + , have + , singleton + , interval + , adjustSize + + -- * Query + -- ** Cardinality + , Network.BitTorrent.Exchange.Bitfield.null + , Network.BitTorrent.Exchange.Bitfield.full + , haveCount + , totalCount + , completeness + + -- ** Membership + , member + , notMember + , findMin + , findMax + , isSubsetOf + + -- ** Availability + , complement + , Frequency + , frequencies + , rarest + + -- * Combine + , insert + , union + , intersection + , difference + + -- * Conversion + , toList + , fromList + + -- * Serialization + , fromBitmap + , toBitmap + + -- * Piece selection + , Selector + , selector + , strategyClass + + , strictFirst + , strictLast + , rarestFirst + , randomFirst + , endGame + ) where + +import Control.Monad +import Control.Monad.ST +import Data.ByteString (ByteString) +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as Lazy +import Data.Vector.Unboxed (Vector) +import qualified Data.Vector.Unboxed as V +import qualified Data.Vector.Unboxed.Mutable as VM +import Data.IntervalSet (IntSet) +import qualified Data.IntervalSet as S +import qualified Data.IntervalSet.ByteString as S +import Data.List (foldl') +import Data.Monoid +import Data.Ratio + +import Data.Torrent + +-- TODO cache some operations + +-- | Bitfields are represented just as integer sets but with a restriction: +-- each integer in the set should be within the given interval. The greatest +-- lower bound of the interval must be zero, so intervals may be specified by +-- providing a maximum set size. For example, a bitfield of size 10 might +-- contain only indices in interval [0..9]. +-- +-- By convention, we use the following aliases for Int: +-- +-- [ PieceIx ] an Int member of the Bitfield. +-- +-- [ PieceCount ] maximum set size for a Bitfield. +data Bitfield = Bitfield { + bfSize :: !PieceCount + , bfSet :: !IntSet + } deriving (Show, Read, Eq) + +-- Invariants: all elements of bfSet lie in [0..bfSize - 1]; + +instance Monoid Bitfield where + {-# SPECIALIZE instance Monoid Bitfield #-} + mempty = haveNone 0 + mappend = union + mconcat = unions + +{----------------------------------------------------------------------- + Construction +-----------------------------------------------------------------------} + +-- | The empty bitfield of the given size. +haveNone :: PieceCount -> Bitfield +haveNone s = Bitfield s S.empty + +-- | The full bitfield containing all piece indices for the given size. +haveAll :: PieceCount -> Bitfield +haveAll s = Bitfield s (S.interval 0 (s - 1)) + +-- | Insert the index in the set ignoring out of range indices. +have :: PieceIx -> Bitfield -> Bitfield +have ix Bitfield {..} + | 0 <= ix && ix < bfSize = Bitfield bfSize (S.insert ix bfSet) + | otherwise = Bitfield bfSize bfSet + +singleton :: PieceIx -> PieceCount -> Bitfield +singleton ix pc = have ix (haveNone pc) + +-- | Assign new size to bitfield. FIXME Normally, size should be only +-- decreased, otherwise exception raised. +adjustSize :: PieceCount -> Bitfield -> Bitfield +adjustSize s Bitfield {..} = Bitfield s bfSet + +-- | NOTE: for internal use only +interval :: PieceCount -> PieceIx -> PieceIx -> Bitfield +interval pc a b = Bitfield pc (S.interval a b) + +{----------------------------------------------------------------------- + Query +-----------------------------------------------------------------------} + +-- | Test if bitifield have no one index: peer do not have anything. +null :: Bitfield -> Bool +null Bitfield {..} = S.null bfSet + +-- | Test if bitfield have all pieces. +full :: Bitfield -> Bool +full Bitfield {..} = S.size bfSet == bfSize + +-- | Count of peer have pieces. +haveCount :: Bitfield -> PieceCount +haveCount = S.size . bfSet + +-- | Total count of pieces and its indices. +totalCount :: Bitfield -> PieceCount +totalCount = bfSize + +-- | Ratio of /have/ piece count to the /total/ piece count. +-- +-- > forall bf. 0 <= completeness bf <= 1 +-- +completeness :: Bitfield -> Ratio PieceCount +completeness b = haveCount b % totalCount b + +inRange :: PieceIx -> Bitfield -> Bool +inRange ix Bitfield {..} = 0 <= ix && ix < bfSize + +member :: PieceIx -> Bitfield -> Bool +member ix bf @ Bitfield {..} + | ix `inRange` bf = ix `S.member` bfSet + | otherwise = False + +notMember :: PieceIx -> Bitfield -> Bool +notMember ix bf @ Bitfield {..} + | ix `inRange` bf = ix `S.notMember` bfSet + | otherwise = True + +-- | Find first available piece index. +findMin :: Bitfield -> PieceIx +findMin = S.findMin . bfSet +{-# INLINE findMin #-} + +-- | Find last available piece index. +findMax :: Bitfield -> PieceIx +findMax = S.findMax . bfSet +{-# INLINE findMax #-} + +-- | Check if all pieces from first bitfield present if the second bitfield +isSubsetOf :: Bitfield -> Bitfield -> Bool +isSubsetOf a b = bfSet a `S.isSubsetOf` bfSet b +{-# INLINE isSubsetOf #-} + +-- | Resulting bitfield includes only missing pieces. +complement :: Bitfield -> Bitfield +complement Bitfield {..} = Bitfield + { bfSet = uni `S.difference` bfSet + , bfSize = bfSize + } + where + Bitfield _ uni = haveAll bfSize +{-# INLINE complement #-} + +{----------------------------------------------------------------------- +-- Availability +-----------------------------------------------------------------------} + +-- | Frequencies are needed in piece selection startegies which use +-- availability quantity to find out the optimal next piece index to +-- download. +type Frequency = Int + +-- TODO rename to availability +-- | How many times each piece index occur in the given bitfield set. +frequencies :: [Bitfield] -> Vector Frequency +frequencies [] = V.fromList [] +frequencies xs = runST $ do + v <- VM.new size + VM.set v 0 + forM_ xs $ \ Bitfield {..} -> do + forM_ (S.toList bfSet) $ \ x -> do + fr <- VM.read v x + VM.write v x (succ fr) + V.unsafeFreeze v + where + size = maximum (map bfSize xs) + +-- TODO it seems like this operation is veeery slow + +-- | Find least available piece index. If no piece available return +-- 'Nothing'. +rarest :: [Bitfield] -> Maybe PieceIx +rarest xs + | V.null freqMap = Nothing + | otherwise + = Just $ fst $ V.ifoldr' minIx (0, freqMap V.! 0) freqMap + where + freqMap = frequencies xs + {-# NOINLINE freqMap #-} + + minIx :: PieceIx -> Frequency + -> (PieceIx, Frequency) + -> (PieceIx, Frequency) + minIx ix fr acc@(_, fra) + | fr < fra && fr > 0 = (ix, fr) + | otherwise = acc + + +{----------------------------------------------------------------------- + Combine +-----------------------------------------------------------------------} + +insert :: PieceIx -> Bitfield -> Bitfield +insert pix bf @ Bitfield {..} + | 0 <= pix && pix < bfSize = Bitfield + { bfSet = S.insert pix bfSet + , bfSize = bfSize + } + | otherwise = bf + +-- | Find indices at least one peer have. +union :: Bitfield -> Bitfield -> Bitfield +union a b = {-# SCC union #-} Bitfield { + bfSize = bfSize a `max` bfSize b + , bfSet = bfSet a `S.union` bfSet b + } + +-- | Find indices both peers have. +intersection :: Bitfield -> Bitfield -> Bitfield +intersection a b = {-# SCC intersection #-} Bitfield { + bfSize = bfSize a `min` bfSize b + , bfSet = bfSet a `S.intersection` bfSet b + } + +-- | Find indices which have first peer but do not have the second peer. +difference :: Bitfield -> Bitfield -> Bitfield +difference a b = {-# SCC difference #-} Bitfield { + bfSize = bfSize a -- FIXME is it reasonable? + , bfSet = bfSet a `S.difference` bfSet b + } + +-- | Find indices the any of the peers have. +unions :: [Bitfield] -> Bitfield +unions = {-# SCC unions #-} foldl' union (haveNone 0) + +{----------------------------------------------------------------------- + Serialization +-----------------------------------------------------------------------} + +-- | List all /have/ indexes. +toList :: Bitfield -> [PieceIx] +toList Bitfield {..} = S.toList bfSet + +-- | Make bitfield from list of /have/ indexes. +fromList :: PieceCount -> [PieceIx] -> Bitfield +fromList s ixs = Bitfield { + bfSize = s + , bfSet = S.splitGT (-1) $ S.splitLT s $ S.fromList ixs + } + +-- | Unpack 'Bitfield' from tightly packed bit array. Note resulting +-- size might be more than real bitfield size, use 'adjustSize'. +fromBitmap :: ByteString -> Bitfield +fromBitmap bs = {-# SCC fromBitmap #-} Bitfield { + bfSize = B.length bs * 8 + , bfSet = S.fromByteString bs + } +{-# INLINE fromBitmap #-} + +-- | Pack a 'Bitfield' to tightly packed bit array. +toBitmap :: Bitfield -> Lazy.ByteString +toBitmap Bitfield {..} = {-# SCC toBitmap #-} Lazy.fromChunks [intsetBM, alignment] + where + byteSize = bfSize `div` 8 + if bfSize `mod` 8 == 0 then 0 else 1 + alignment = B.replicate (byteSize - B.length intsetBM) 0 + intsetBM = S.toByteString bfSet + +{----------------------------------------------------------------------- +-- Piece selection +-----------------------------------------------------------------------} + +type Selector = Bitfield -- ^ Indices of client /have/ pieces. + -> Bitfield -- ^ Indices of peer /have/ pieces. + -> [Bitfield] -- ^ Indices of other peers /have/ pieces. + -> Maybe PieceIx -- ^ Zero-based index of piece to request + -- to, if any. + +selector :: Selector -- ^ Selector to use at the start. + -> Ratio PieceCount + -> Selector -- ^ Selector to use after the client have + -- the C pieces. + -> Selector -- ^ Selector that changes behaviour based + -- on completeness. +selector start pt ready h a xs = + case strategyClass pt h of + SCBeginning -> start h a xs + SCReady -> ready h a xs + SCEnd -> endGame h a xs + +data StartegyClass + = SCBeginning + | SCReady + | SCEnd + deriving (Show, Eq, Ord, Enum, Bounded) + + +strategyClass :: Ratio PieceCount -> Bitfield -> StartegyClass +strategyClass threshold = classify . completeness + where + classify c + | c < threshold = SCBeginning + | c + 1 % numerator c < 1 = SCReady + -- FIXME numerator have is not total count + | otherwise = SCEnd + + +-- | Select the first available piece. +strictFirst :: Selector +strictFirst h a _ = Just $ findMin (difference a h) + +-- | Select the last available piece. +strictLast :: Selector +strictLast h a _ = Just $ findMax (difference a h) + +-- | +rarestFirst :: Selector +rarestFirst h a xs = rarest (map (intersection want) xs) + where + want = difference h a + +-- | In average random first is faster than rarest first strategy but +-- only if all pieces are available. +randomFirst :: Selector +randomFirst = do +-- randomIO + error "TODO: randomFirst" + +endGame :: Selector +endGame = strictLast diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Block.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Block.hs new file mode 100644 index 00000000..bc9a3d24 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Block.hs @@ -0,0 +1,369 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Blocks are used to transfer pieces. +-- +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +module Network.BitTorrent.Exchange.Block + ( -- * Block attributes + BlockOffset + , BlockCount + , BlockSize + , defaultTransferSize + + -- * Block index + , BlockIx(..) + , blockIxRange + + -- * Block data + , Block(..) + , blockIx + , blockSize + , blockRange + , isPiece + , leadingBlock + + -- * Block bucket + , Bucket + + -- ** Query + , Network.BitTorrent.Exchange.Block.null + , Network.BitTorrent.Exchange.Block.full + , Network.BitTorrent.Exchange.Block.size + , Network.BitTorrent.Exchange.Block.spans + + -- ** Construction + , Network.BitTorrent.Exchange.Block.empty + , Network.BitTorrent.Exchange.Block.insert + , Network.BitTorrent.Exchange.Block.insertLazy + , Network.BitTorrent.Exchange.Block.merge + , Network.BitTorrent.Exchange.Block.fromList + + -- ** Rendering + , Network.BitTorrent.Exchange.Block.toPiece + + -- ** Debug + , Network.BitTorrent.Exchange.Block.valid + ) where + +import Prelude hiding (span) +import Control.Applicative +import Data.ByteString as BS hiding (span) +import Data.ByteString.Lazy as BL hiding (span) +import Data.ByteString.Lazy.Builder as BS +import Data.Default +import Data.Monoid +import Data.List as L hiding (span) +import Data.Serialize as S +import Data.Typeable +import Numeric +import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) + +import Data.Torrent + +{----------------------------------------------------------------------- +-- Block attributes +-----------------------------------------------------------------------} + +-- | Offset of a block in a piece in bytes. Should be multiple of +-- the choosen block size. +type BlockOffset = Int + +-- | Size of a block in bytes. Should be power of 2. +-- +-- Normally block size is equal to 'defaultTransferSize'. +-- +type BlockSize = Int + +-- | Number of block in a piece of a torrent. Used to distinguish +-- block count from piece count. +type BlockCount = Int + +-- | Widely used semi-official block size. Some clients can ignore if +-- block size of BlockIx in Request message is not equal to this +-- value. +-- +defaultTransferSize :: BlockSize +defaultTransferSize = 16 * 1024 + +{----------------------------------------------------------------------- + Block Index +-----------------------------------------------------------------------} + +-- | BlockIx correspond. +data BlockIx = BlockIx { + -- | Zero-based piece index. + ixPiece :: {-# UNPACK #-} !PieceIx + + -- | Zero-based byte offset within the piece. + , ixOffset :: {-# UNPACK #-} !BlockOffset + + -- | Block size starting from offset. + , ixLength :: {-# UNPACK #-} !BlockSize + } deriving (Show, Eq, Typeable) + +-- | First block in torrent. Useful for debugging. +instance Default BlockIx where + def = BlockIx 0 0 defaultTransferSize + +getInt :: S.Get Int +getInt = fromIntegral <$> S.getWord32be +{-# INLINE getInt #-} + +putInt :: S.Putter Int +putInt = S.putWord32be . fromIntegral +{-# INLINE putInt #-} + +instance Serialize BlockIx where + {-# SPECIALIZE instance Serialize BlockIx #-} + get = BlockIx <$> getInt + <*> getInt + <*> getInt + {-# INLINE get #-} + + put BlockIx {..} = do + putInt ixPiece + putInt ixOffset + putInt ixLength + {-# INLINE put #-} + +instance Pretty BlockIx where + pPrint BlockIx {..} = + ("piece = " <> int ixPiece <> ",") <+> + ("offset = " <> int ixOffset <> ",") <+> + ("length = " <> int ixLength) + +-- | Get location of payload bytes in the torrent content. +blockIxRange :: (Num a, Integral a) => PieceSize -> BlockIx -> (a, a) +blockIxRange piSize BlockIx {..} = (offset, offset + len) + where + offset = fromIntegral piSize * fromIntegral ixPiece + + fromIntegral ixOffset + len = fromIntegral ixLength +{-# INLINE blockIxRange #-} + +{----------------------------------------------------------------------- + Block +-----------------------------------------------------------------------} + +data Block payload = Block { + -- | Zero-based piece index. + blkPiece :: {-# UNPACK #-} !PieceIx + + -- | Zero-based byte offset within the piece. + , blkOffset :: {-# UNPACK #-} !BlockOffset + + -- | Payload bytes. + , blkData :: !payload + } deriving (Show, Eq, Functor, Typeable) + +-- | Payload is ommitted. +instance Pretty (Block BL.ByteString) where + pPrint = pPrint . blockIx + {-# INLINE pPrint #-} + +-- | Get size of block /payload/ in bytes. +blockSize :: Block BL.ByteString -> BlockSize +blockSize = fromIntegral . BL.length . blkData +{-# INLINE blockSize #-} + +-- | Get block index of a block. +blockIx :: Block BL.ByteString -> BlockIx +blockIx = BlockIx <$> blkPiece <*> blkOffset <*> blockSize + +-- | Get location of payload bytes in the torrent content. +blockRange :: (Num a, Integral a) + => PieceSize -> Block BL.ByteString -> (a, a) +blockRange piSize = blockIxRange piSize . blockIx +{-# INLINE blockRange #-} + +-- | Test if a block can be safely turned into a piece. +isPiece :: PieceSize -> Block BL.ByteString -> Bool +isPiece pieceLen blk @ (Block i offset _) = + offset == 0 && blockSize blk == pieceLen && i >= 0 +{-# INLINE isPiece #-} + +-- | First block in the piece. +leadingBlock :: PieceIx -> BlockSize -> BlockIx +leadingBlock pix blockSize = BlockIx + { ixPiece = pix + , ixOffset = 0 + , ixLength = blockSize + } +{-# INLINE leadingBlock #-} + +{----------------------------------------------------------------------- +-- Bucket +-----------------------------------------------------------------------} + +type Pos = Int +type ChunkSize = Int + +-- | A sparse set of blocks used to represent an /in progress/ piece. +data Bucket + = Nil + | Span {-# UNPACK #-} !ChunkSize !Bucket + | Fill {-# UNPACK #-} !ChunkSize !Builder !Bucket + +instance Show Bucket where + showsPrec i Nil = showString "" + showsPrec i (Span s xs) = showString "Span " <> showInt s + <> showString " " <> showsPrec i xs + showsPrec i (Fill s _ xs) = showString "Fill " <> showInt s + <> showString " " <> showsPrec i xs + +-- | INVARIANT: 'Nil' should appear only after 'Span' of 'Fill'. +nilInvFailed :: a +nilInvFailed = error "Nil: bucket invariant failed" + +valid :: Bucket -> Bool +valid = check Nothing + where + check Nothing Nil = False -- see 'nilInvFailed' + check (Just _) _ = True + check prevIsSpan (Span sz xs) = + prevIsSpan /= Just True && -- Span n (NotSpan .. ) invariant + sz > 0 && -- Span is always non-empty + check (Just True) xs + check prevIsSpan (Fill sz b xs) = + prevIsSpan /= Just True && -- Fill n (NotFill .. ) invariant + sz > 0 && -- Fill is always non-empty + check (Just False) xs + +instance Pretty Bucket where + pPrint Nil = nilInvFailed + pPrint bkt = go bkt + where + go Nil = PP.empty + go (Span sz xs) = "Span" <+> PP.int sz <+> go xs + go (Fill sz b xs) = "Fill" <+> PP.int sz <+> go xs + +-- | Smart constructor: use it when some block is /deleted/ from +-- bucket. +span :: ChunkSize -> Bucket -> Bucket +span sz (Span sz' xs) = Span (sz + sz') xs +span sz xxs = Span sz xxs +{-# INLINE span #-} + +-- | Smart constructor: use it when some block is /inserted/ to +-- bucket. +fill :: ChunkSize -> Builder -> Bucket -> Bucket +fill sz b (Fill sz' b' xs) = Fill (sz + sz') (b <> b') xs +fill sz b xxs = Fill sz b xxs +{-# INLINE fill #-} + +{----------------------------------------------------------------------- +-- Bucket queries +-----------------------------------------------------------------------} + +-- | /O(1)/. Test if this bucket is empty. +null :: Bucket -> Bool +null Nil = nilInvFailed +null (Span _ Nil) = True +null _ = False +{-# INLINE null #-} + +-- | /O(1)/. Test if this bucket is complete. +full :: Bucket -> Bool +full Nil = nilInvFailed +full (Fill _ _ Nil) = True +full _ = False +{-# INLINE full #-} + +-- | /O(n)/. Total size of the incompleted piece. +size :: Bucket -> PieceSize +size Nil = nilInvFailed +size bkt = go bkt + where + go Nil = 0 + go (Span sz xs) = sz + go xs + go (Fill sz _ xs) = sz + go xs + +-- | /O(n)/. List incomplete blocks to download. If some block have +-- size more than the specified 'BlockSize' then block is split into +-- smaller blocks to satisfy given 'BlockSize'. Small (for +-- e.g. trailing) blocks is not ignored, but returned in-order. +spans :: BlockSize -> Bucket -> [(BlockOffset, BlockSize)] +spans expectedSize = go 0 + where + go _ Nil = [] + go off (Span sz xs) = listChunks off sz ++ go (off + sz) xs + go off (Fill sz _ xs) = go (off + sz) xs + + listChunks off restSize + | restSize <= 0 = [] + | otherwise = (off, blkSize) + : listChunks (off + blkSize) (restSize - blkSize) + where + blkSize = min expectedSize restSize + +{----------------------------------------------------------------------- +-- Bucket contstruction +-----------------------------------------------------------------------} + +-- | /O(1)/. A new empty bucket capable to alloof specified size. +empty :: PieceSize -> Bucket +empty sz + | sz < 0 = error "empty: Bucket size must be a non-negative value" + | otherwise = Span sz Nil +{-# INLINE empty #-} + +insertSpan :: Pos -> BS.ByteString -> ChunkSize -> Bucket -> Bucket +insertSpan !pos !bs !span_sz !xs = + let pref_len = pos + fill_len = span_sz - pos `min` BS.length bs + suff_len = (span_sz - pos) - fill_len + in mkSpan pref_len $ + fill fill_len (byteString (BS.take fill_len bs)) $ + mkSpan suff_len $ + xs + where + mkSpan 0 xs = xs + mkSpan sz xs = Span sz xs + +-- | /O(n)/. Insert a strict bytestring at specified position. +-- +-- Best case: if blocks are inserted in sequential order, then this +-- operation should take /O(1)/. +-- +insert :: Pos -> BS.ByteString -> Bucket -> Bucket +insert _ _ Nil = nilInvFailed +insert dstPos bs bucket = go 0 bucket + where + intersects curPos sz = dstPos >= curPos && dstPos <= curPos + sz + + go _ Nil = Nil + go curPos (Span sz xs) + | intersects curPos sz = insertSpan (dstPos - curPos) bs sz xs + | otherwise = span sz (go (curPos + sz) xs) + go curPos bkt @ (Fill sz br xs) + | intersects curPos sz = bkt + | otherwise = fill sz br (go (curPos + sz) xs) + +fromList :: PieceSize -> [(Pos, BS.ByteString)] -> Bucket +fromList s = L.foldr (uncurry Network.BitTorrent.Exchange.Block.insert) + (Network.BitTorrent.Exchange.Block.empty s) + +-- TODO zero-copy +insertLazy :: Pos -> BL.ByteString -> Bucket -> Bucket +insertLazy pos bl = Network.BitTorrent.Exchange.Block.insert pos (BL.toStrict bl) + +-- | /O(n)/. +merge :: Bucket -> Bucket -> Bucket +merge = error "Bucket.merge: not implemented" + +-- | /O(1)/. +toPiece :: Bucket -> Maybe BL.ByteString +toPiece Nil = nilInvFailed +toPiece (Fill _ b Nil) = Just (toLazyByteString b) +toPiece _ = Nothing diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs new file mode 100644 index 00000000..6804d0a2 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs @@ -0,0 +1,1012 @@ +-- | +-- Module : Network.BitTorrent.Exchange.Wire +-- Copyright : (c) Sam Truzjan 2013 +-- (c) Daniel Gröber 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Each peer wire connection is identified by triple @(topic, +-- remote_addr, this_addr)@. This means that connections are the +-- same if and only if their 'ConnectionId' are the same. Of course, +-- you /must/ avoid duplicated connections. +-- +-- This module control /integrity/ of data send and received. +-- +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +module Network.BitTorrent.Exchange.Connection + ( -- * Wire + Connected + , Wire + , ChannelSide (..) + + -- * Connection + , Connection + , connInitiatedBy + + -- ** Identity + , connRemoteAddr + , connTopic + , connRemotePeerId + , connThisPeerId + + -- ** Capabilities + , connProtocol + , connCaps + , connExtCaps + , connRemoteEhs + + -- ** State + , connStatus + , connBitfield + + -- ** Env + , connOptions + , connSession + , connStats + + -- ** Status + , PeerStatus (..) + , ConnectionStatus (..) + , updateStatus + , statusUpdates + , clientStatus + , remoteStatus + , canUpload + , canDownload + , defaultUnchokeSlots + , defaultRechokeInterval + + + -- * Setup + , ConnectionPrefs (..) + , SessionLink (..) + , ConnectionConfig (..) + + -- ** Initiate + , connectWire + + -- ** Accept + , PendingConnection + , newPendingConnection + , pendingPeer + , pendingCaps + , pendingTopic + , closePending + , acceptWire + + -- ** Post setup actions + , resizeBitfield + + -- * Messaging + , recvMessage + , sendMessage + , filterQueue + , getMaxQueueLength + + -- * Exceptions + , ProtocolError (..) + , WireFailure (..) + , peerPenalty + , isWireFailure + , disconnectPeer + + -- * Stats + , ByteStats (..) + , FlowStats (..) + , ConnectionStats (..) + + -- * Flood detection + , FloodDetector (..) + + -- * Options + , Options (..) + ) where + +import Control.Applicative +import Control.Concurrent hiding (yield) +import Control.Exception +import Control.Monad.Reader +import Control.Monad.State +import Control.Monad.Trans.Resource +import Control.Lens +import Data.ByteString as BS +import Data.ByteString.Lazy as BSL +import Data.Conduit as C +import Data.Conduit.Cereal +import Data.Conduit.List +import Data.Conduit.Network +import Data.Default +import Data.IORef +import Data.List as L +import Data.Maybe as M +import Data.Monoid +import Data.Serialize as S +import Data.Typeable +import Network +import Network.Socket hiding (Connected) +import Network.Socket.ByteString as BS +import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) +import Text.Show.Functions () +import System.Log.FastLogger (ToLogStr(..)) +import System.Timeout + +import Data.Torrent +import Network.Address +import Network.BitTorrent.Exchange.Bitfield as BF +import Network.BitTorrent.Exchange.Message as Msg + +-- TODO handle port message? +-- TODO handle limits? +-- TODO filter not requested PIECE messages +-- TODO metadata piece request flood protection +-- TODO piece request flood protection +-- TODO protect against flood attacks +{----------------------------------------------------------------------- +-- Exceptions +-----------------------------------------------------------------------} + +-- | Used to specify initiator of 'ProtocolError'. +data ChannelSide + = ThisPeer + | RemotePeer + deriving (Show, Eq, Enum, Bounded) + +instance Default ChannelSide where + def = ThisPeer + +instance Pretty ChannelSide where + pPrint = PP.text . show + +-- | A protocol errors occur when a peer violates protocol +-- specification. +data ProtocolError + -- | Protocol string should be 'BitTorrent Protocol' but remote + -- peer have sent a different string. + = InvalidProtocol ProtocolName + + -- | Sent and received protocol strings do not match. Can occur + -- in 'connectWire' only. + | UnexpectedProtocol ProtocolName + + -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not + -- match with 'hsInfoHash' /this/ peer have sent. Can occur in + -- 'connectWire' or 'acceptWire' only. + | UnexpectedTopic InfoHash + + -- | Some trackers or DHT can return 'PeerId' of a peer. If a + -- remote peer handshaked with different 'hsPeerId' then this + -- exception is raised. Can occur in 'connectWire' only. + | UnexpectedPeerId PeerId + + -- | Accepted peer have sent unknown torrent infohash in + -- 'hsInfoHash' field. This situation usually happen when /this/ + -- peer have deleted the requested torrent. The error can occur in + -- 'acceptWire' function only. + | UnknownTopic InfoHash + + -- | A remote peer have 'ExtExtended' enabled but did not send an + -- 'ExtendedHandshake' back. + | HandshakeRefused + + -- | 'Network.BitTorrent.Exchange.Message.Bitfield' message MUST + -- be send either once or zero times, but either this peer or + -- remote peer send a bitfield message the second time. + | BitfieldAlreadySent ChannelSide + + -- | Capabilities violation. For example this exception can occur + -- when a peer have sent 'Port' message but 'ExtDHT' is not + -- allowed in 'connCaps'. + | DisallowedMessage + { -- | Who sent invalid message. + violentSender :: ChannelSide + + -- | If the 'violentSender' reconnect with this extension + -- enabled then he can try to send this message. + , extensionRequired :: Extension + } + deriving Show + +instance Pretty ProtocolError where + pPrint = PP.text . show + +errorPenalty :: ProtocolError -> Int +errorPenalty (InvalidProtocol _) = 1 +errorPenalty (UnexpectedProtocol _) = 1 +errorPenalty (UnexpectedTopic _) = 1 +errorPenalty (UnexpectedPeerId _) = 1 +errorPenalty (UnknownTopic _) = 0 +errorPenalty (HandshakeRefused ) = 1 +errorPenalty (BitfieldAlreadySent _) = 1 +errorPenalty (DisallowedMessage _ _) = 1 + +-- | Exceptions used to interrupt the current P2P session. +data WireFailure + = ConnectionRefused IOError + + -- | Force termination of wire connection. + -- + -- Normally you should throw only this exception from event loop + -- using 'disconnectPeer', other exceptions are thrown + -- automatically by functions from this module. + -- + | DisconnectPeer + + -- | A peer not responding and did not send a 'KeepAlive' message + -- for a specified period of time. + | PeerDisconnected + + -- | A remote peer have sent some unknown message we unable to + -- parse. + | DecodingError GetException + + -- | See 'ProtocolError' for more details. + | ProtocolError ProtocolError + + -- | A possible malicious peer have sent too many control messages + -- without making any progress. + | FloodDetected ConnectionStats + deriving (Show, Typeable) + +instance Exception WireFailure + +instance Pretty WireFailure where + pPrint = PP.text . show + +-- TODO +-- data Penalty = Ban | Penalty Int + +peerPenalty :: WireFailure -> Int +peerPenalty DisconnectPeer = 0 +peerPenalty PeerDisconnected = 0 +peerPenalty (DecodingError _) = 1 +peerPenalty (ProtocolError e) = errorPenalty e +peerPenalty (FloodDetected _) = 1 + +-- | Do nothing with exception, used with 'handle' or 'try'. +isWireFailure :: Monad m => WireFailure -> m () +isWireFailure _ = return () + +protocolError :: MonadThrow m => ProtocolError -> m a +protocolError = monadThrow . ProtocolError + +{----------------------------------------------------------------------- +-- Stats +-----------------------------------------------------------------------} + +-- | Message stats in one direction. +data FlowStats = FlowStats + { -- | Number of the messages sent or received. + messageCount :: {-# UNPACK #-} !Int + -- | Sum of byte sequences of all messages. + , messageBytes :: {-# UNPACK #-} !ByteStats + } deriving Show + +instance Pretty FlowStats where + pPrint FlowStats {..} = + PP.int messageCount <+> "messages" $+$ + pPrint messageBytes + +-- | Zeroed stats. +instance Default FlowStats where + def = FlowStats 0 def + +-- | Monoid under addition. +instance Monoid FlowStats where + mempty = def + mappend a b = FlowStats + { messageBytes = messageBytes a <> messageBytes b + , messageCount = messageCount a + messageCount b + } + +-- | Find average length of byte sequences per message. +avgByteStats :: FlowStats -> ByteStats +avgByteStats (FlowStats n ByteStats {..}) = ByteStats + { overhead = overhead `quot` n + , control = control `quot` n + , payload = payload `quot` n + } + +-- | Message stats in both directions. This data can be retrieved +-- using 'getStats' function. +-- +-- Note that this stats is completely different from +-- 'Data.Torrent.Progress.Progress': payload bytes not necessary +-- equal to downloaded\/uploaded bytes since a peer can send a +-- broken block. +-- +data ConnectionStats = ConnectionStats + { -- | Received messages stats. + incomingFlow :: !FlowStats + -- | Sent messages stats. + , outcomingFlow :: !FlowStats + } deriving Show + +instance Pretty ConnectionStats where + pPrint ConnectionStats {..} = vcat + [ "Recv:" <+> pPrint incomingFlow + , "Sent:" <+> pPrint outcomingFlow + , "Both:" <+> pPrint (incomingFlow <> outcomingFlow) + ] + +-- | Zeroed stats. +instance Default ConnectionStats where + def = ConnectionStats def def + +-- | Monoid under addition. +instance Monoid ConnectionStats where + mempty = def + mappend a b = ConnectionStats + { incomingFlow = incomingFlow a <> incomingFlow b + , outcomingFlow = outcomingFlow a <> outcomingFlow b + } + +-- | Aggregate one more message stats in the /specified/ direction. +addStats :: ChannelSide -> ByteStats -> ConnectionStats -> ConnectionStats +addStats ThisPeer x s = s { outcomingFlow = (FlowStats 1 x) <> (outcomingFlow s) } +addStats RemotePeer x s = s { incomingFlow = (FlowStats 1 x) <> (incomingFlow s) } + +-- | Sum of overhead and control bytes in both directions. +wastedBytes :: ConnectionStats -> Int +wastedBytes ConnectionStats {..} = overhead + control + where + FlowStats _ ByteStats {..} = incomingFlow <> outcomingFlow + +-- | Sum of payload bytes in both directions. +payloadBytes :: ConnectionStats -> Int +payloadBytes ConnectionStats {..} = + payload (messageBytes (incomingFlow <> outcomingFlow)) + +-- | Sum of any bytes in both directions. +transmittedBytes :: ConnectionStats -> Int +transmittedBytes ConnectionStats {..} = + byteLength (messageBytes (incomingFlow <> outcomingFlow)) + +{----------------------------------------------------------------------- +-- Flood protection +-----------------------------------------------------------------------} + +defaultFloodFactor :: Int +defaultFloodFactor = 1 + +-- | This is a very permissive value, connection setup usually takes +-- around 10-100KB, including both directions. +defaultFloodThreshold :: Int +defaultFloodThreshold = 2 * 1024 * 1024 + +-- | A flood detection function. +type Detector stats = Int -- ^ Factor; + -> Int -- ^ Threshold; + -> stats -- ^ Stats to analyse; + -> Bool -- ^ Is this a flooded connection? + +defaultDetector :: Detector ConnectionStats +defaultDetector factor threshold s = + transmittedBytes s > threshold && + factor * wastedBytes s > payloadBytes s + +-- | Flood detection is used to protect /this/ peer against a /remote/ +-- malicious peer sending meaningless control messages. +data FloodDetector = FloodDetector + { -- | Max ratio of payload bytes to control bytes. + floodFactor :: {-# UNPACK #-} !Int + + -- | Max count of bytes connection /setup/ can take including + -- 'Handshake', 'ExtendedHandshake', 'Bitfield', 'Have' and 'Port' + -- messages. This value is used to avoid false positives at the + -- connection initialization. + , floodThreshold :: {-# UNPACK #-} !Int + + -- | Flood predicate on the /current/ 'ConnectionStats'. + , floodPredicate :: Detector ConnectionStats + } deriving Show + +instance Eq FloodDetector where + a == b = floodFactor a == floodFactor b + && floodThreshold a == floodThreshold b + +-- | Flood detector with very permissive options. +instance Default FloodDetector where + def = FloodDetector + { floodFactor = defaultFloodFactor + , floodThreshold = defaultFloodThreshold + , floodPredicate = defaultDetector + } + +-- | This peer might drop connection if the detector gives positive answer. +runDetector :: FloodDetector -> ConnectionStats -> Bool +runDetector FloodDetector {..} = floodPredicate floodFactor floodThreshold + +{----------------------------------------------------------------------- +-- Options +-----------------------------------------------------------------------} + +-- | Various connection settings and limits. +data Options = Options + { -- | How often /this/ peer should send 'KeepAlive' messages. + keepaliveInterval :: {-# UNPACK #-} !Int + + -- | /This/ peer will drop connection if a /remote/ peer did not + -- send any message for this period of time. + , keepaliveTimeout :: {-# UNPACK #-} !Int + + , requestQueueLength :: {-# UNPACK #-} !Int + + -- | Used to protect against flood attacks. + , floodDetector :: FloodDetector + + -- | Used to protect against flood attacks in /metadata + -- exchange/. Normally, a requesting peer should request each + -- 'InfoDict' piece only one time, but a malicious peer can + -- saturate wire with 'MetadataRequest' messages thus flooding + -- responding peer. + -- + -- This value set upper bound for number of 'MetadataRequests' + -- for each piece. + -- + , metadataFactor :: {-# UNPACK #-} !Int + + -- | Used to protect against out-of-memory attacks: malicious peer + -- can claim that 'totalSize' is, say, 100TB and send some random + -- data instead of infodict pieces. Since requesting peer unable + -- to check not completed infodict via the infohash, the + -- accumulated pieces will allocate the all available memory. + -- + -- This limit set upper bound for 'InfoDict' size. See + -- 'ExtendedMetadata' for more info. + -- + , maxInfoDictSize :: {-# UNPACK #-} !Int + } deriving (Show, Eq) + +-- | Permissive default parameters, most likely you don't need to +-- change them. +instance Default Options where + def = Options + { keepaliveInterval = defaultKeepAliveInterval + , keepaliveTimeout = defaultKeepAliveTimeout + , requestQueueLength = defaultRequestQueueLength + , floodDetector = def + , metadataFactor = defaultMetadataFactor + , maxInfoDictSize = defaultMaxInfoDictSize + } + +{----------------------------------------------------------------------- +-- Peer status +-----------------------------------------------------------------------} + +-- | Connections contain two bits of state on either end: choked or +-- not, and interested or not. +data PeerStatus = PeerStatus + { -- | Choking is a notification that no data will be sent until + -- unchoking happens. + _choking :: !Bool + + -- | + , _interested :: !Bool + } deriving (Show, Eq, Ord) + +$(makeLenses ''PeerStatus) + +instance Pretty PeerStatus where + pPrint PeerStatus {..} = + pPrint (Choking _choking) <+> "and" <+> pPrint (Interested _interested) + +-- | Connections start out choked and not interested. +instance Default PeerStatus where + def = PeerStatus True False + +instance Monoid PeerStatus where + mempty = def + mappend a b = PeerStatus + { _choking = _choking a && _choking b + , _interested = _interested a || _interested b + } + +-- | Can be used to update remote peer status using incoming 'Status' +-- message. +updateStatus :: StatusUpdate -> PeerStatus -> PeerStatus +updateStatus (Choking b) = choking .~ b +updateStatus (Interested b) = interested .~ b + +-- | Can be used to generate outcoming messages. +statusUpdates :: PeerStatus -> PeerStatus -> [StatusUpdate] +statusUpdates a b = M.catMaybes $ + [ if _choking a == _choking b then Nothing + else Just $ Choking $ _choking b + , if _interested a == _interested b then Nothing + else Just $ Interested $ _interested b + ] + +{----------------------------------------------------------------------- +-- Connection status +-----------------------------------------------------------------------} + +-- | Status of the both endpoints. +data ConnectionStatus = ConnectionStatus + { _clientStatus :: !PeerStatus + , _remoteStatus :: !PeerStatus + } deriving (Show, Eq) + +$(makeLenses ''ConnectionStatus) + +instance Pretty ConnectionStatus where + pPrint ConnectionStatus {..} = + "this " PP.<+> pPrint _clientStatus PP.$$ + "remote" PP.<+> pPrint _remoteStatus + +-- | Connections start out choked and not interested. +instance Default ConnectionStatus where + def = ConnectionStatus def def + +-- | Can the client transfer to the remote peer? +canUpload :: ConnectionStatus -> Bool +canUpload ConnectionStatus {..} + = _interested _remoteStatus && not (_choking _clientStatus) + +-- | Can the client transfer from the remote peer? +canDownload :: ConnectionStatus -> Bool +canDownload ConnectionStatus {..} + = _interested _clientStatus && not (_choking _remoteStatus) + +-- | Indicates how many peers are allowed to download from the client +-- by default. +defaultUnchokeSlots :: Int +defaultUnchokeSlots = 4 + +-- | +defaultRechokeInterval :: Int +defaultRechokeInterval = 10 * 1000 * 1000 + +{----------------------------------------------------------------------- +-- Connection +-----------------------------------------------------------------------} + +data ConnectionState = ConnectionState { + -- | If @not (allowed ExtExtended connCaps)@ then this set is always + -- empty. Otherwise it has the BEP10 extension protocol mandated mapping of + -- 'MessageId' to the message type for the remote peer. + -- + -- Note that this value can change in current session if either + -- this or remote peer will initiate rehandshaking. + -- + _connExtCaps :: !ExtendedCaps + + -- | Current extended handshake information from the remote peer + , _connRemoteEhs :: !ExtendedHandshake + + -- | Various stats about messages sent and received. Stats can be + -- used to protect /this/ peer against flood attacks. + -- + -- Note that this value will change with the next sent or received + -- message. + , _connStats :: !ConnectionStats + + , _connStatus :: !ConnectionStatus + + -- | Bitfield of remote endpoint. + , _connBitfield :: !Bitfield + } + +makeLenses ''ConnectionState + +instance Default ConnectionState where + def = ConnectionState + { _connExtCaps = def + , _connRemoteEhs = def + , _connStats = def + , _connStatus = def + , _connBitfield = BF.haveNone 0 + } + +-- | Connection keep various info about both peers. +data Connection s = Connection + { connInitiatedBy :: !ChannelSide + + , connRemoteAddr :: !(PeerAddr IP) + + -- | /Both/ peers handshaked with this protocol string. The only + -- value is \"Bittorrent Protocol\" but this can be changed in + -- future. + , connProtocol :: !ProtocolName + + -- | Set of enabled core extensions, i.e. the pre BEP10 extension + -- mechanism. This value is used to check if a message is allowed + -- to be sent or received. + , connCaps :: !Caps + + -- | /Both/ peers handshaked with this infohash. A connection can + -- handle only one topic, use 'reconnect' to change the current + -- topic. + , connTopic :: !InfoHash + + -- | Typically extracted from handshake. + , connRemotePeerId :: !PeerId + + -- | Typically extracted from handshake. + , connThisPeerId :: !PeerId + + -- | + , connOptions :: !Options + + -- | Mutable connection state, see 'ConnectionState' + , connState :: !(IORef ConnectionState) + +-- -- | Max request queue length. +-- , connMaxQueueLen :: !Int + + -- | Environment data. + , connSession :: !s + + , connChan :: !(Chan Message) + } + +instance Pretty (Connection s) where + pPrint Connection {..} = "Connection" + +instance ToLogStr (Connection s) where + toLogStr Connection {..} = mconcat + [ toLogStr (show connRemoteAddr) + , toLogStr (show connProtocol) + , toLogStr (show connCaps) + , toLogStr (show connTopic) + , toLogStr (show connRemotePeerId) + , toLogStr (show connThisPeerId) + , toLogStr (show connOptions) + ] + +-- TODO check extended messages too +isAllowed :: Connection s -> Message -> Bool +isAllowed Connection {..} msg + | Just ext <- requires msg = ext `allowed` connCaps + | otherwise = True + +{----------------------------------------------------------------------- +-- Hanshaking +-----------------------------------------------------------------------} + +sendHandshake :: Socket -> Handshake -> IO () +sendHandshake sock hs = sendAll sock (S.encode hs) + +recvHandshake :: Socket -> IO Handshake +recvHandshake sock = do + header <- BS.recv sock 1 + unless (BS.length header == 1) $ + throw $ userError "Unable to receive handshake header." + + let protocolLen = BS.head header + let restLen = handshakeSize protocolLen - 1 + + body <- BS.recv sock restLen + let resp = BS.cons protocolLen body + either (throwIO . userError) return $ S.decode resp + +-- | Handshaking with a peer specified by the second argument. +-- +-- It's important to send handshake first because /accepting/ peer +-- do not know handshake topic and will wait until /connecting/ peer +-- will send handshake. +-- +initiateHandshake :: Socket -> Handshake -> IO Handshake +initiateHandshake sock hs = do + sendHandshake sock hs + recvHandshake sock + +data HandshakePair = HandshakePair + { handshakeSent :: !Handshake + , handshakeRecv :: !Handshake + } deriving (Show, Eq) + +validatePair :: HandshakePair -> PeerAddr IP -> IO () +validatePair (HandshakePair hs hs') addr = Prelude.mapM_ checkProp + [ (def == hsProtocol hs', InvalidProtocol $ hsProtocol hs') + , (hsProtocol hs == hsProtocol hs', UnexpectedProtocol $ hsProtocol hs') + , (hsInfoHash hs == hsInfoHash hs', UnexpectedTopic $ hsInfoHash hs') + , (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr) + , UnexpectedPeerId $ hsPeerId hs') + ] + where + checkProp (t, e) = unless t $ throwIO $ ProtocolError e + +-- | Connection state /right/ after handshaking. +establishedStats :: HandshakePair -> ConnectionStats +establishedStats HandshakePair {..} = ConnectionStats + { outcomingFlow = FlowStats 1 $ handshakeStats handshakeSent + , incomingFlow = FlowStats 1 $ handshakeStats handshakeRecv + } + +{----------------------------------------------------------------------- +-- Wire +-----------------------------------------------------------------------} + +-- | do not expose this so we can change it without breaking api +newtype Connected s a = Connected { runConnected :: (ReaderT (Connection s) IO a) } + deriving (Functor, Applicative, Monad + , MonadIO, MonadReader (Connection s), MonadThrow + ) + +instance MonadState ConnectionState (Connected s) where + get = Connected (asks connState) >>= liftIO . readIORef + put x = Connected (asks connState) >>= liftIO . flip writeIORef x + +-- | A duplex channel connected to a remote peer which keep tracks +-- connection parameters. +type Wire s a = ConduitM Message Message (Connected s) a + +{----------------------------------------------------------------------- +-- Wrapper +-----------------------------------------------------------------------} + +putStats :: ChannelSide -> Message -> Connected s () +putStats side msg = connStats %= addStats side (stats msg) + +validate :: ChannelSide -> Message -> Connected s () +validate side msg = do + caps <- asks connCaps + case requires msg of + Nothing -> return () + Just ext + | ext `allowed` caps -> return () + | otherwise -> protocolError $ DisallowedMessage side ext + +trackFlow :: ChannelSide -> Wire s () +trackFlow side = iterM $ do + validate side + putStats side + +{----------------------------------------------------------------------- +-- Setup +-----------------------------------------------------------------------} + +-- System.Timeout.timeout multiplier +seconds :: Int +seconds = 1000000 + +sinkChan :: MonadIO m => Chan Message -> Sink Message m () +sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan) + +sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message +sourceChan interval chan = do + mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan + yield $ fromMaybe Msg.KeepAlive mmsg + +-- | Normally you should use 'connectWire' or 'acceptWire'. +runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO () +runWire action sock chan conn = flip runReaderT conn $ runConnected $ + sourceSocket sock $= + conduitGet S.get $= + trackFlow RemotePeer $= + action $= + trackFlow ThisPeer C.$$ + sinkChan chan + +-- | This function will block until a peer send new message. You can +-- also use 'await'. +recvMessage :: Wire s Message +recvMessage = await >>= maybe (monadThrow PeerDisconnected) return + +-- | You can also use 'yield'. +sendMessage :: PeerMessage msg => msg -> Wire s () +sendMessage msg = do + ecaps <- use connExtCaps + yield $ envelop ecaps msg + +getMaxQueueLength :: Connected s Int +getMaxQueueLength = do + advertisedLen <- ehsQueueLength <$> use connRemoteEhs + defaultLen <- asks (requestQueueLength . connOptions) + return $ fromMaybe defaultLen advertisedLen + +-- | Filter pending messages from send buffer. +filterQueue :: (Message -> Bool) -> Wire s () +filterQueue p = lift $ do + chan <- asks connChan + liftIO $ getChanContents chan >>= writeList2Chan chan . L.filter p + +-- | Forcefully terminate wire session and close socket. +disconnectPeer :: Wire s a +disconnectPeer = monadThrow DisconnectPeer + +extendedHandshake :: ExtendedCaps -> Wire s () +extendedHandshake caps = do + -- TODO add other params to the handshake + sendMessage $ nullExtendedHandshake caps + msg <- recvMessage + case msg of + Extended (EHandshake remoteEhs@(ExtendedHandshake {..})) -> do + connExtCaps .= (ehsCaps <> caps) + connRemoteEhs .= remoteEhs + _ -> protocolError HandshakeRefused + +rehandshake :: ExtendedCaps -> Wire s () +rehandshake caps = error "rehandshake" + +reconnect :: Wire s () +reconnect = error "reconnect" + +data ConnectionId = ConnectionId + { topic :: !InfoHash + , remoteAddr :: !(PeerAddr IP) + , thisAddr :: !(PeerAddr (Maybe IP)) -- ^ foreign address of this node. + } + +-- | /Preffered/ settings of wire. To get the real use 'ask'. +data ConnectionPrefs = ConnectionPrefs + { prefOptions :: !Options + , prefProtocol :: !ProtocolName + , prefCaps :: !Caps + , prefExtCaps :: !ExtendedCaps + } deriving (Show, Eq) + +instance Default ConnectionPrefs where + def = ConnectionPrefs + { prefOptions = def + , prefProtocol = def + , prefCaps = def + , prefExtCaps = def + } + +normalize :: ConnectionPrefs -> ConnectionPrefs +normalize = error "normalize" + +-- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'. +data SessionLink s = SessionLink + { linkTopic :: !(InfoHash) + , linkPeerId :: !(PeerId) + , linkMetadataSize :: !(Maybe Int) + , linkOutputChan :: !(Maybe (Chan Message)) + , linkSession :: !(s) + } + +data ConnectionConfig s = ConnectionConfig + { cfgPrefs :: !(ConnectionPrefs) + , cfgSession :: !(SessionLink s) + , cfgWire :: !(Wire s ()) + } + +configHandshake :: ConnectionConfig s -> Handshake +configHandshake ConnectionConfig {..} = Handshake + { hsProtocol = prefProtocol cfgPrefs + , hsReserved = prefCaps cfgPrefs + , hsInfoHash = linkTopic cfgSession + , hsPeerId = linkPeerId cfgSession + } + +{----------------------------------------------------------------------- +-- Pending connections +-----------------------------------------------------------------------} + +-- | Connection in half opened state. A normal usage scenario: +-- +-- * Opened using 'newPendingConnection', usually in the listener +-- loop; +-- +-- * Closed using 'closePending' if 'pendingPeer' is banned, +-- 'pendingCaps' is prohibited or pendingTopic is unknown; +-- +-- * Accepted using 'acceptWire' otherwise. +-- +data PendingConnection = PendingConnection + { pendingSock :: Socket + , pendingPeer :: PeerAddr IP -- ^ 'peerId' is always non empty; + , pendingCaps :: Caps -- ^ advertised by the peer; + , pendingTopic :: InfoHash -- ^ possible non-existent topic. + } + +-- | Reconstruct handshake sent by the remote peer. +pendingHandshake :: PendingConnection -> Handshake +pendingHandshake PendingConnection {..} = Handshake + { hsProtocol = def + , hsReserved = pendingCaps + , hsInfoHash = pendingTopic + , hsPeerId = fromMaybe (error "pendingHandshake: impossible") + (peerId pendingPeer) + } + +-- | +-- +-- This function can throw 'WireFailure' exception. +-- +newPendingConnection :: Socket -> PeerAddr IP -> IO PendingConnection +newPendingConnection sock addr = do + Handshake {..} <- recvHandshake sock + unless (hsProtocol == def) $ do + throwIO $ ProtocolError $ InvalidProtocol hsProtocol + return PendingConnection + { pendingSock = sock + , pendingPeer = addr { peerId = Just hsPeerId } + , pendingCaps = hsReserved + , pendingTopic = hsInfoHash + } + +-- | Release all resources associated with the given connection. Note +-- that you /must not/ 'closePending' if you 'acceptWire'. +closePending :: PendingConnection -> IO () +closePending PendingConnection {..} = do + close pendingSock + +{----------------------------------------------------------------------- +-- Connection setup +-----------------------------------------------------------------------} + +chanToSock :: Int -> Chan Message -> Socket -> IO () +chanToSock ka chan sock = + sourceChan ka chan $= conduitPut S.put C.$$ sinkSocket sock + +afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair + -> ConnectionConfig s -> IO () +afterHandshaking initiator addr sock + hpair @ (HandshakePair hs hs') + (ConnectionConfig + { cfgPrefs = ConnectionPrefs {..} + , cfgSession = SessionLink {..} + , cfgWire = wire + }) = do + let caps = hsReserved hs <> hsReserved hs' + cstate <- newIORef def { _connStats = establishedStats hpair } + chan <- maybe newChan return linkOutputChan + let conn = Connection { + connInitiatedBy = initiator + , connRemoteAddr = addr + , connProtocol = hsProtocol hs + , connCaps = caps + , connTopic = hsInfoHash hs + , connRemotePeerId = hsPeerId hs' + , connThisPeerId = hsPeerId hs + , connOptions = def + , connState = cstate + , connSession = linkSession + , connChan = chan + } + + -- TODO make KA interval configurable + let kaInterval = defaultKeepAliveInterval + wire' = if ExtExtended `allowed` caps + then extendedHandshake prefExtCaps >> wire + else wire + + bracket (forkIO (chanToSock kaInterval chan sock)) + (killThread) + (\ _ -> runWire wire' sock chan conn) + +-- | Initiate 'Wire' connection and handshake with a peer. This function will +-- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on +-- both sides. +-- +-- This function can throw 'WireFailure' exception. +-- +connectWire :: PeerAddr IP -> ConnectionConfig s -> IO () +connectWire addr cfg = do + let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return + bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do + let hs = configHandshake cfg + hs' <- initiateHandshake sock hs + let hpair = HandshakePair hs hs' + validatePair hpair addr + afterHandshaking ThisPeer addr sock hpair cfg + +-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed +-- socket. For peer listener loop the 'acceptSafe' should be +-- prefered against 'accept'. The socket will be closed at exit. +-- +-- This function can throw 'WireFailure' exception. +-- +acceptWire :: PendingConnection -> ConnectionConfig s -> IO () +acceptWire pc @ PendingConnection {..} cfg = do + bracket (return pendingSock) close $ \ _ -> do + unless (linkTopic (cfgSession cfg) == pendingTopic) $ do + throwIO (ProtocolError (UnexpectedTopic pendingTopic)) + + let hs = configHandshake cfg + sendHandshake pendingSock hs + let hpair = HandshakePair hs (pendingHandshake pc) + + afterHandshaking RemotePeer pendingPeer pendingSock hpair cfg + +-- | Used when size of bitfield becomes known. +resizeBitfield :: Int -> Connected s () +resizeBitfield n = connBitfield %= adjustSize n diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs new file mode 100644 index 00000000..981db2fb --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs @@ -0,0 +1,296 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- +-- +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE TemplateHaskell #-} +module Network.BitTorrent.Exchange.Download + ( -- * Downloading + Download (..) + , Updates + , runDownloadUpdates + + -- ** Metadata + -- $metadata-download + , MetadataDownload + , metadataDownload + + -- ** Content + -- $content-download + , ContentDownload + , contentDownload + ) where + +import Control.Applicative +import Control.Concurrent +import Control.Lens +import Control.Monad.State +import Data.BEncode as BE +import Data.ByteString as BS +import Data.ByteString.Lazy as BL +import Data.Default +import Data.List as L +import Data.Maybe +import Data.Map as M +import Data.Tuple + +import Data.Torrent as Torrent +import Network.Address +import Network.BitTorrent.Exchange.Bitfield as BF +import Network.BitTorrent.Exchange.Block as Block +import Network.BitTorrent.Exchange.Message as Msg +import System.Torrent.Storage (Storage, writePiece) + + +{----------------------------------------------------------------------- +-- Class +-----------------------------------------------------------------------} + +type Updates s a = StateT s IO a + +runDownloadUpdates :: MVar s -> Updates s a -> IO a +runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m) + +class Download s chunk | s -> chunk where + scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx] + + -- | + scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx) + scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf + + -- | Get number of sent requests to this peer. + getRequestQueueLength :: PeerAddr IP -> Updates s Int + + -- | Remove all pending block requests to the remote peer. May be used + -- when: + -- + -- * a peer closes connection; + -- + -- * remote peer choked this peer; + -- + -- * timeout expired. + -- + resetPending :: PeerAddr IP -> Updates s () + + -- | MAY write to storage, if a new piece have been completed. + -- + -- You should check if a returned by peer block is actually have + -- been requested and in-flight. This is needed to avoid "I send + -- random corrupted block" attacks. + pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool) + +{----------------------------------------------------------------------- +-- Metadata download +-----------------------------------------------------------------------} +-- $metadata-download +-- TODO + +data MetadataDownload = MetadataDownload + { _pendingPieces :: [(PeerAddr IP, PieceIx)] + , _bucket :: Bucket + , _topic :: InfoHash + } + +makeLenses ''MetadataDownload + +-- | Create a new scheduler for infodict of the given size. +metadataDownload :: Int -> InfoHash -> MetadataDownload +metadataDownload ps = MetadataDownload [] (Block.empty ps) + +instance Default MetadataDownload where + def = error "instance Default MetadataDownload" + +--cancelPending :: PieceIx -> Updates () +cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd) + +instance Download MetadataDownload (Piece BS.ByteString) where + scheduleBlock addr bf = do + bkt <- use bucket + case spans metadataPieceSize bkt of + [] -> return Nothing + ((off, _ ) : _) -> do + let pix = off `div` metadataPieceSize + pendingPieces %= ((addr, pix) :) + return (Just (BlockIx pix 0 metadataPieceSize)) + + resetPending addr = pendingPieces %= L.filter ((addr ==) . fst) + + pushBlock addr Torrent.Piece {..} = do + p <- use pendingPieces + when ((addr, pieceIndex) `L.notElem` p) $ + error "not requested" + cancelPending pieceIndex + + bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData + b <- use bucket + case toPiece b of + Nothing -> return Nothing + Just chunks -> do + t <- use topic + case parseInfoDict (BL.toStrict chunks) t of + Right x -> do + pendingPieces .= [] + return undefined -- (Just x) + Left e -> do + pendingPieces .= [] + bucket .= Block.empty (Block.size b) + return undefined -- Nothing + where + -- todo use incremental parsing to avoid BS.concat call + parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict + parseInfoDict chunk topic = + case BE.decode chunk of + Right (infodict @ InfoDict {..}) + | topic == idInfoHash -> return infodict + | otherwise -> Left "broken infodict" + Left err -> Left $ "unable to parse infodict " ++ err + +{----------------------------------------------------------------------- +-- Content download +-----------------------------------------------------------------------} +-- $content-download +-- +-- A block can have one of the following status: +-- +-- 1) /not allowed/: Piece is not in download set. +-- +-- 2) /waiting/: (allowed?) Block have been allowed to download, +-- but /this/ peer did not send any 'Request' message for this +-- block. To allow some piece use +-- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' +-- and 'allowPiece'. +-- +-- 3) /inflight/: (pending?) Block have been requested but +-- /remote/ peer did not send any 'Piece' message for this block. +-- Related functions 'markInflight' +-- +-- 4) /pending/: (stalled?) Block have have been downloaded +-- Related functions 'insertBlock'. +-- +-- Piece status: +-- +-- 1) /assembled/: (downloaded?) All blocks in piece have been +-- downloaded but the piece did not verified yet. +-- +-- * Valid: go to completed; +-- +-- * Invalid: go to waiting. +-- +-- 2) /corrupted/: +-- +-- 3) /downloaded/: (verified?) A piece have been successfully +-- verified via the hash. Usually the piece should be stored to +-- the 'System.Torrent.Storage' and /this/ peer should send 'Have' +-- messages to the /remote/ peers. +-- + +data PieceEntry = PieceEntry + { pending :: [(PeerAddr IP, BlockIx)] + , stalled :: Bucket + } + +pieceEntry :: PieceSize -> PieceEntry +pieceEntry s = PieceEntry [] (Block.empty s) + +isEmpty :: PieceEntry -> Bool +isEmpty PieceEntry {..} = L.null pending && Block.null stalled + +_holes :: PieceIx -> PieceEntry -> [BlockIx] +_holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) + where + mkBlockIx (off, sz) = BlockIx pix off sz + +data ContentDownload = ContentDownload + { inprogress :: !(Map PieceIx PieceEntry) + , bitfield :: !Bitfield + , pieceSize :: !PieceSize + , contentStorage :: Storage + } + +contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload +contentDownload = ContentDownload M.empty + +--modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates () +modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s + { inprogress = alter (g pieceSize) pix inprogress } + where + g s = h . f . fromMaybe (pieceEntry s) + h e + | isEmpty e = Nothing + | otherwise = Just e + +instance Download ContentDownload (Block BL.ByteString) where + scheduleBlocks n addr maskBF = do + ContentDownload {..} <- get + let wantPieces = maskBF `BF.difference` bitfield + let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $ + M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) + inprogress + + bixs <- if L.null wantBlocks + then do + mpix <- choosePiece wantPieces + case mpix of -- TODO return 'n' blocks + Nothing -> return [] + Just pix -> return [leadingBlock pix defaultTransferSize] + else chooseBlocks wantBlocks n + + forM_ bixs $ \ bix -> do + modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e + { pending = (addr, bix) : pending } + + return bixs + where + -- TODO choose block nearest to pending or stalled sets to reduce disk + -- seeks on remote machines + --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx] + chooseBlocks xs n = return (L.take n xs) + + -- TODO use selection strategies from Exchange.Selector + --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx) + choosePiece bf + | BF.null bf = return $ Nothing + | otherwise = return $ Just $ BF.findMin bf + + getRequestQueueLength addr = do + m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress) + return $ L.sum $ L.map L.length $ M.elems m + + resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } + where + reset = fmap $ \ e -> e + { pending = L.filter (not . (==) addr . fst) (pending e) } + + pushBlock addr blk @ Block {..} = do + mpe <- gets (M.lookup blkPiece . inprogress) + case mpe of + Nothing -> return Nothing + Just (pe @ PieceEntry {..}) + | blockIx blk `L.notElem` fmap snd pending -> return Nothing + | otherwise -> do + let bkt' = Block.insertLazy blkOffset blkData stalled + case toPiece bkt' of + Nothing -> do + modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e + { pending = L.filter ((==) (blockIx blk) . snd) pending + , stalled = bkt' + } + return (Just False) + + Just pieceData -> do + -- TODO verify + storage <- gets contentStorage + liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage + modify $ \ s @ ContentDownload {..} -> s + { inprogress = M.delete blkPiece inprogress + , bitfield = BF.insert blkPiece bitfield + } + return (Just True) diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs new file mode 100644 index 00000000..30a6a607 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs @@ -0,0 +1,62 @@ +module Network.BitTorrent.Exchange.Manager + ( Options (..) + , Manager + , Handler + , newManager + , closeManager + ) where + +import Control.Concurrent +import Control.Exception hiding (Handler) +import Control.Monad +import Data.Default +import Network.Socket + +import Data.Torrent +import Network.Address +import Network.BitTorrent.Exchange.Connection hiding (Options) +import Network.BitTorrent.Exchange.Session + + +data Options = Options + { optBacklog :: Int + , optPeerAddr :: PeerAddr IP + } deriving (Show, Eq) + +instance Default Options where + def = Options + { optBacklog = maxListenQueue + , optPeerAddr = def + } + +data Manager = Manager + { listener :: !ThreadId + } + +type Handler = InfoHash -> IO Session + +handleNewConn :: Socket -> PeerAddr IP -> Handler -> IO () +handleNewConn sock addr handler = do + conn <- newPendingConnection sock addr + ses <- handler (pendingTopic conn) `onException` closePending conn + establish conn ses + +listenIncoming :: Options -> Handler -> IO () +listenIncoming Options {..} handler = do + bracket (socket AF_INET Stream defaultProtocol) close $ \ sock -> do + bind sock (toSockAddr optPeerAddr) + listen sock optBacklog + forever $ do + (conn, sockAddr) <- accept sock + case fromSockAddr sockAddr of + Nothing -> return () + Just addr -> void $ forkIO $ handleNewConn sock addr handler + +newManager :: Options -> Handler -> IO Manager +newManager opts handler = do + tid <- forkIO $ listenIncoming opts handler + return (Manager tid) + +closeManager :: Manager -> IO () +closeManager Manager {..} = do + killThread listener \ No newline at end of file diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Message.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Message.hs new file mode 100644 index 00000000..5c096523 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Message.hs @@ -0,0 +1,1237 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Normally peer to peer communication consisting of the following +-- steps: +-- +-- * In order to establish the connection between peers we should +-- send 'Handshake' message. The 'Handshake' is a required message +-- and must be the first message transmitted by the peer to the +-- another peer. Another peer should reply with a handshake as well. +-- +-- * Next peer might sent bitfield message, but might not. In the +-- former case we should update bitfield peer have. Again, if we +-- have some pieces we should send bitfield. Normally bitfield +-- message should sent after the handshake message. +-- +-- * Regular exchange messages. TODO docs +-- +-- For more high level API see "Network.BitTorrent.Exchange" module. +-- +-- For more infomation see: +-- +-- +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE CPP #-} +{-# OPTIONS -fno-warn-orphans #-} +module Network.BitTorrent.Exchange.Message + ( -- * Capabilities + Capabilities (..) + , Extension (..) + , Caps + + -- * Handshake + , ProtocolName + , Handshake(..) + , defaultHandshake + , handshakeSize + , handshakeMaxSize + , handshakeStats + + -- * Stats + , ByteCount + , ByteStats (..) + , byteLength + + -- * Messages + , Message (..) + , defaultKeepAliveTimeout + , defaultKeepAliveInterval + , PeerMessage (..) + + -- ** Core messages + , StatusUpdate (..) + , Available (..) + , Transfer (..) + , defaultRequestQueueLength + + -- ** Fast extension + , FastMessage (..) + + -- ** Extension protocol + , ExtendedMessage (..) + + -- *** Capabilities + , ExtendedExtension (..) + , ExtendedCaps (..) + + -- *** Handshake + , ExtendedHandshake (..) + , defaultQueueLength + , nullExtendedHandshake + + -- *** Metadata + , ExtendedMetadata (..) + , metadataPieceSize + , defaultMetadataFactor + , defaultMaxInfoDictSize + , isLastPiece + , isValidPiece + ) where + +import Control.Applicative +import Control.Arrow ((&&&), (***)) +import Control.Monad (when) +import Data.Attoparsec.ByteString.Char8 as BS +import Data.BEncode as BE +import Data.BEncode.BDict as BE +import Data.BEncode.Internal as BE (ppBEncode, parser) +import Data.BEncode.Types (BDict) +import Data.Bits +import Data.ByteString as BS +import Data.ByteString.Char8 as BC +import Data.ByteString.Lazy as BL +import Data.Default +import Data.List as L +import Data.Map.Strict as M +import Data.Maybe +import Data.Monoid +import Data.Ord +import Data.Serialize as S +import Data.String +import Data.Text as T +import Data.Typeable +import Data.Word +#if MIN_VERSION_iproute(1,7,4) +import Data.IP hiding (fromSockAddr) +#else +import Data.IP +#endif +import Network +import Network.Socket hiding (KeepAlive) +import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) + +import Data.Torrent hiding (Piece (..)) +import qualified Data.Torrent as P (Piece (..)) +import Network.Address +import Network.BitTorrent.Exchange.Bitfield +import Network.BitTorrent.Exchange.Block + +{----------------------------------------------------------------------- +-- Capabilities +-----------------------------------------------------------------------} + +-- | +class Capabilities caps where + type Ext caps :: * + + -- | Pack extensions to caps. + toCaps :: [Ext caps] -> caps + + -- | Unpack extensions from caps. + fromCaps :: caps -> [Ext caps] + + -- | Check if an extension is a member of the specified set. + allowed :: Ext caps -> caps -> Bool + +ppCaps :: Capabilities caps => Pretty (Ext caps) => caps -> Doc +ppCaps = hcat . punctuate ", " . L.map pPrint . fromCaps + +{----------------------------------------------------------------------- +-- Extensions +-----------------------------------------------------------------------} + +-- | Enumeration of message extension protocols. +-- +-- For more info see: +-- +data Extension + = ExtDHT -- ^ BEP 5: allow to send PORT messages. + | ExtFast -- ^ BEP 6: allow to send FAST messages. + | ExtExtended -- ^ BEP 10: allow to send the extension protocol messages. + deriving (Show, Eq, Ord, Enum, Bounded) + +-- | Full extension names, suitable for logging. +instance Pretty Extension where + pPrint ExtDHT = "Distributed Hash Table Protocol" + pPrint ExtFast = "Fast Extension" + pPrint ExtExtended = "Extension Protocol" + +-- | Extension bitmask as specified by BEP 4. +extMask :: Extension -> Word64 +extMask ExtDHT = 0x01 +extMask ExtFast = 0x04 +extMask ExtExtended = 0x100000 + +{----------------------------------------------------------------------- +-- Capabilities +-----------------------------------------------------------------------} + +-- | Capabilities is a set of 'Extension's usually sent in 'Handshake' +-- messages. +newtype Caps = Caps Word64 + deriving (Show, Eq) + +-- | Render set of extensions as comma separated list. +instance Pretty Caps where + pPrint = ppCaps + {-# INLINE pPrint #-} + +-- | The empty set. +instance Default Caps where + def = Caps 0 + {-# INLINE def #-} + +-- | Monoid under intersection. 'mempty' includes all known extensions. +instance Monoid Caps where + mempty = toCaps [minBound .. maxBound] + {-# INLINE mempty #-} + + mappend (Caps a) (Caps b) = Caps (a .&. b) + {-# INLINE mappend #-} + +-- | 'Handshake' compatible encoding. +instance Serialize Caps where + put (Caps caps) = S.putWord64be caps + {-# INLINE put #-} + + get = Caps <$> S.getWord64be + {-# INLINE get #-} + +instance Capabilities Caps where + type Ext Caps = Extension + + allowed e (Caps caps) = (extMask e .&. caps) /= 0 + {-# INLINE allowed #-} + + toCaps = Caps . L.foldr (.|.) 0 . L.map extMask + fromCaps caps = L.filter (`allowed` caps) [minBound..maxBound] + +{----------------------------------------------------------------------- + Handshake +-----------------------------------------------------------------------} + +maxProtocolNameSize :: Word8 +maxProtocolNameSize = maxBound + +-- | The protocol name is used to identify to the local peer which +-- version of BTP the remote peer uses. +newtype ProtocolName = ProtocolName BS.ByteString + deriving (Eq, Ord, Typeable) + +-- | In BTP/1.0 the name is 'BitTorrent protocol'. If this string is +-- different from the local peers own protocol name, then the +-- connection is to be dropped. +instance Default ProtocolName where + def = ProtocolName "BitTorrent protocol" + +instance Show ProtocolName where + show (ProtocolName bs) = show bs + +instance Pretty ProtocolName where + pPrint (ProtocolName bs) = PP.text $ BC.unpack bs + +instance IsString ProtocolName where + fromString str + | L.length str <= fromIntegral maxProtocolNameSize + = ProtocolName (fromString str) + | otherwise = error $ "fromString: ProtocolName too long: " ++ str + +instance Serialize ProtocolName where + put (ProtocolName bs) = do + putWord8 $ fromIntegral $ BS.length bs + putByteString bs + + get = do + len <- getWord8 + bs <- getByteString $ fromIntegral len + return (ProtocolName bs) + +-- | Handshake message is used to exchange all information necessary +-- to establish connection between peers. +-- +data Handshake = Handshake { + -- | Identifier of the protocol. This is usually equal to 'def'. + hsProtocol :: ProtocolName + + -- | Reserved bytes used to specify supported BEP's. + , hsReserved :: Caps + + -- | Info hash of the info part of the metainfo file. that is + -- transmitted in tracker requests. Info hash of the initiator + -- handshake and response handshake should match, otherwise + -- initiator should break the connection. + -- + , hsInfoHash :: InfoHash + + -- | Peer id of the initiator. This is usually the same peer id + -- that is transmitted in tracker requests. + -- + , hsPeerId :: PeerId + + } deriving (Show, Eq) + +instance Serialize Handshake where + put Handshake {..} = do + put hsProtocol + put hsReserved + put hsInfoHash + put hsPeerId + get = Handshake <$> get <*> get <*> get <*> get + +-- | Show handshake protocol string, caps and fingerprint. +instance Pretty Handshake where + pPrint Handshake {..} + = pPrint hsProtocol $$ + pPrint hsReserved $$ + pPrint (fingerprint hsPeerId) + +-- | Get handshake message size in bytes from the length of protocol +-- string. +handshakeSize :: Word8 -> Int +handshakeSize n = 1 + fromIntegral n + 8 + 20 + 20 + +-- | Maximum size of handshake message in bytes. +handshakeMaxSize :: Int +handshakeMaxSize = handshakeSize maxProtocolNameSize + +-- | Handshake with default protocol string and reserved bitmask. +defaultHandshake :: InfoHash -> PeerId -> Handshake +defaultHandshake = Handshake def def + +handshakeStats :: Handshake -> ByteStats +handshakeStats (Handshake (ProtocolName bs) _ _ _) + = ByteStats 1 (BS.length bs + 8 + 20 + 20) 0 + +{----------------------------------------------------------------------- +-- Stats +-----------------------------------------------------------------------} + +-- | Number of bytes. +type ByteCount = Int + +-- | Summary of encoded message byte layout can be used to collect +-- stats about message flow in both directions. This data can be +-- retrieved using 'stats' function. +data ByteStats = ByteStats + { -- | Number of bytes used to help encode 'control' and 'payload' + -- bytes: message size, message ID's, etc + overhead :: {-# UNPACK #-} !ByteCount + + -- | Number of bytes used to exchange peers state\/options: piece + -- and block indexes, infohash, port numbers, peer ID\/IP, etc. + , control :: {-# UNPACK #-} !ByteCount + + -- | Number of payload bytes: torrent data blocks and infodict + -- metadata. + , payload :: {-# UNPACK #-} !ByteCount + } deriving Show + +instance Pretty ByteStats where + pPrint s @ ByteStats {..} = fsep + [ PP.int overhead, "overhead" + , PP.int control, "control" + , PP.int payload, "payload" + , "bytes" + ] $+$ fsep + [ PP.int (byteLength s), "total bytes" + ] + +-- | Empty byte sequences. +instance Default ByteStats where + def = ByteStats 0 0 0 + +-- | Monoid under addition. +instance Monoid ByteStats where + mempty = def + mappend a b = ByteStats + { overhead = overhead a + overhead b + , control = control a + control b + , payload = payload a + payload b + } + +-- | Sum of the all byte sequences. +byteLength :: ByteStats -> Int +byteLength ByteStats {..} = overhead + control + payload + +{----------------------------------------------------------------------- +-- Regular messages +-----------------------------------------------------------------------} + +-- | Messages which can be sent after handshaking. Minimal complete +-- definition: 'envelop'. +class PeerMessage a where + -- | Construct a message to be /sent/. Note that if 'ExtendedCaps' + -- do not contain mapping for this message the default + -- 'ExtendedMessageId' is used. + envelop :: ExtendedCaps -- ^ The /receiver/ extended capabilities; + -> a -- ^ An regular message; + -> Message -- ^ Enveloped message to sent. + + -- | Find out the extension this message belong to. Can be used to + -- check if this message is allowed to send\/recv in current + -- session. + requires :: a -> Maybe Extension + requires _ = Nothing + + -- | Get sizes of overhead\/control\/payload byte sequences of + -- binary message representation without encoding message to binary + -- bytestring. + -- + -- This function should obey one law: + -- + -- * 'byteLength' ('stats' msg) == 'BL.length' ('encode' msg) + -- + stats :: a -> ByteStats + stats _ = ByteStats 4 0 0 + +{----------------------------------------------------------------------- +-- Status messages +-----------------------------------------------------------------------} + +-- | Notification that the sender have updated its +-- 'Network.BitTorrent.Exchange.Status.PeerStatus'. +data StatusUpdate + -- | Notification that the sender will not upload data to the + -- receiver until unchoking happen. + = Choking !Bool + + -- | Notification that the sender is interested (or not interested) + -- in any of the receiver's data pieces. + | Interested !Bool + deriving (Show, Eq, Ord, Typeable) + +instance Pretty StatusUpdate where + pPrint (Choking False) = "not choking" + pPrint (Choking True ) = "choking" + pPrint (Interested False) = "not interested" + pPrint (Interested True ) = "interested" + +instance PeerMessage StatusUpdate where + envelop _ = Status + {-# INLINE envelop #-} + + stats _ = ByteStats 4 1 0 + {-# INLINE stats #-} + +{----------------------------------------------------------------------- +-- Available messages +-----------------------------------------------------------------------} + +-- | Messages used to inform receiver which pieces of the torrent +-- sender have. +data Available = + -- | Zero-based index of a piece that has just been successfully + -- downloaded and verified via the hash. + Have ! PieceIx + + -- | The bitfield message may only be sent immediately after the + -- handshaking sequence is complete, and before any other message + -- are sent. If client have no pieces then bitfield need not to be + -- sent. + | Bitfield !Bitfield + deriving (Show, Eq) + +instance Pretty Available where + pPrint (Have ix ) = "Have" <+> int ix + pPrint (Bitfield _ ) = "Bitfield" + +instance PeerMessage Available where + envelop _ = Available + {-# INLINE envelop #-} + + stats (Have _) = ByteStats (4 + 1) 4 0 + stats (Bitfield bf) = ByteStats (4 + 1) (q + trailing) 0 + where + trailing = if r == 0 then 0 else 1 + (q, r) = quotRem (totalCount bf) 8 + +{----------------------------------------------------------------------- +-- Transfer messages +-----------------------------------------------------------------------} + +-- | Messages used to transfer 'Block's. +data Transfer + -- | Request for a particular block. If a client is requested a + -- block that another peer do not have the peer might not answer + -- at all. + = Request ! BlockIx + + -- | Response to a request for a block. + | Piece !(Block BL.ByteString) + + -- | Used to cancel block requests. It is typically used during + -- "End Game". + | Cancel !BlockIx + deriving (Show, Eq) + +instance Pretty Transfer where + pPrint (Request ix ) = "Request" <+> pPrint ix + pPrint (Piece blk) = "Piece" <+> pPrint blk + pPrint (Cancel i ) = "Cancel" <+> pPrint i + +instance PeerMessage Transfer where + envelop _ = Transfer + {-# INLINE envelop #-} + + stats (Request _ ) = ByteStats (4 + 1) (3 * 4) 0 + stats (Piece p ) = ByteStats (4 + 1) (4 + 4 + blockSize p) 0 + stats (Cancel _ ) = ByteStats (4 + 1) (3 * 4) 0 + +-- TODO increase +-- | Max number of pending 'Request's inflight. +defaultRequestQueueLength :: Int +defaultRequestQueueLength = 1 + +{----------------------------------------------------------------------- +-- Fast messages +-----------------------------------------------------------------------} + +-- | BEP6 messages. +data FastMessage = + -- | If a peer have all pieces it might send the 'HaveAll' message + -- instead of 'Bitfield' message. Used to save bandwidth. + HaveAll + + -- | If a peer have no pieces it might send 'HaveNone' message + -- intead of 'Bitfield' message. Used to save bandwidth. + | HaveNone + + -- | This is an advisory message meaning "you might like to + -- download this piece." Used to avoid excessive disk seeks and + -- amount of IO. + | SuggestPiece !PieceIx + + -- | Notifies a requesting peer that its request will not be + -- satisfied. + | RejectRequest !BlockIx + + -- | This is an advisory messsage meaning \"if you ask for this + -- piece, I'll give it to you even if you're choked.\" Used to + -- shorten starting phase. + | AllowedFast !PieceIx + deriving (Show, Eq) + +instance Pretty FastMessage where + pPrint (HaveAll ) = "Have all" + pPrint (HaveNone ) = "Have none" + pPrint (SuggestPiece pix) = "Suggest" <+> int pix + pPrint (RejectRequest bix) = "Reject" <+> pPrint bix + pPrint (AllowedFast pix) = "Allowed fast" <+> int pix + +instance PeerMessage FastMessage where + envelop _ = Fast + {-# INLINE envelop #-} + + requires _ = Just ExtFast + {-# INLINE requires #-} + + stats HaveAll = ByteStats 4 1 0 + stats HaveNone = ByteStats 4 1 0 + stats (SuggestPiece _) = ByteStats 5 4 0 + stats (RejectRequest _) = ByteStats 5 12 0 + stats (AllowedFast _) = ByteStats 5 4 0 + +{----------------------------------------------------------------------- +-- Extension protocol +-----------------------------------------------------------------------} + +{----------------------------------------------------------------------- +-- Extended capabilities +-----------------------------------------------------------------------} + +data ExtendedExtension + = ExtMetadata -- ^ BEP 9: Extension for Peers to Send Metadata Files + deriving (Show, Eq, Ord, Enum, Bounded, Typeable) + +instance IsString ExtendedExtension where + fromString = fromMaybe (error msg) . fromKey . fromString + where + msg = "fromString: could not parse ExtendedExtension" + +instance Pretty ExtendedExtension where + pPrint ExtMetadata = "Extension for Peers to Send Metadata Files" + +fromKey :: BKey -> Maybe ExtendedExtension +fromKey "ut_metadata" = Just ExtMetadata +fromKey _ = Nothing +{-# INLINE fromKey #-} + +toKey :: ExtendedExtension -> BKey +toKey ExtMetadata = "ut_metadata" +{-# INLINE toKey #-} + +type ExtendedMessageId = Word8 + +extId :: ExtendedExtension -> ExtendedMessageId +extId ExtMetadata = 1 +{-# INLINE extId #-} + +type ExtendedMap = Map ExtendedExtension ExtendedMessageId + +-- | The extension IDs must be stored for every peer, because every +-- peer may have different IDs for the same extension. +-- +newtype ExtendedCaps = ExtendedCaps { extendedCaps :: ExtendedMap } + deriving (Show, Eq) + +instance Pretty ExtendedCaps where + pPrint = ppCaps + {-# INLINE pPrint #-} + +-- | The empty set. +instance Default ExtendedCaps where + def = ExtendedCaps M.empty + +-- | Monoid under intersection: +-- +-- * The 'mempty' caps includes all known extensions; +-- +-- * the 'mappend' operation is NOT commutative: it return message +-- id from the first caps for the extensions existing in both caps. +-- +instance Monoid ExtendedCaps where + mempty = toCaps [minBound..maxBound] + mappend (ExtendedCaps a) (ExtendedCaps b) = + ExtendedCaps (M.intersection a b) + +appendBDict :: BDict -> ExtendedMap -> ExtendedMap +appendBDict (Cons key val xs) caps + | Just ext <- fromKey key + , Right eid <- fromBEncode val = M.insert ext eid (appendBDict xs caps) + | otherwise = appendBDict xs caps +appendBDict Nil caps = caps + +-- | Handshake compatible encoding. +instance BEncode ExtendedCaps where + toBEncode = BDict . BE.fromAscList . L.sortBy (comparing fst) + . L.map (toKey *** toBEncode) . M.toList . extendedCaps + + fromBEncode (BDict bd) = pure $ ExtendedCaps $ appendBDict bd M.empty + fromBEncode _ = decodingError "ExtendedCaps" + +instance Capabilities ExtendedCaps where + type Ext ExtendedCaps = ExtendedExtension + + toCaps = ExtendedCaps . M.fromList . L.map (id &&& extId) + + fromCaps = M.keys . extendedCaps + {-# INLINE fromCaps #-} + + allowed e (ExtendedCaps caps) = M.member e caps + {-# INLINE allowed #-} + +remoteMessageId :: ExtendedExtension -> ExtendedCaps -> ExtendedMessageId +remoteMessageId ext = fromMaybe (extId ext) . M.lookup ext . extendedCaps + +{----------------------------------------------------------------------- +-- Extended handshake +-----------------------------------------------------------------------} + +-- | This message should be sent immediately after the standard +-- bittorrent handshake to any peer that supports this extension +-- protocol. Extended handshakes can be sent more than once, however +-- an implementation may choose to ignore subsequent handshake +-- messages. +-- +data ExtendedHandshake = ExtendedHandshake + { -- | If this peer has an IPv4 interface, this is the compact + -- representation of that address. + ehsIPv4 :: Maybe HostAddress + + -- | If this peer has an IPv6 interface, this is the compact + -- representation of that address. + , ehsIPv6 :: Maybe HostAddress6 + + -- | Dictionary of supported extension messages which maps names + -- of extensions to an extended message ID for each extension + -- message. + , ehsCaps :: ExtendedCaps + + -- | Size of 'Data.Torrent.InfoDict' in bytes. This field should + -- be added if 'ExtMetadata' is enabled in current session /and/ + -- peer have the torrent file. + , ehsMetadataSize :: Maybe Int + + -- | Local TCP /listen/ port. Allows each side to learn about the + -- TCP port number of the other side. + , ehsPort :: Maybe PortNumber + + -- | Request queue the number of outstanding 'Request' messages + -- this client supports without dropping any. + , ehsQueueLength :: Maybe Int + + -- | Client name and version. + , ehsVersion :: Maybe Text + + -- | IP of the remote end + , ehsYourIp :: Maybe IP + } deriving (Show, Eq, Typeable) + +extHandshakeId :: ExtendedMessageId +extHandshakeId = 0 + +-- | Default 'Request' queue size. +defaultQueueLength :: Int +defaultQueueLength = 1 + +-- | All fields are empty. +instance Default ExtendedHandshake where + def = ExtendedHandshake def def def def def def def def + +instance Monoid ExtendedHandshake where + mempty = def { ehsCaps = mempty } + mappend old new = ExtendedHandshake { + ehsCaps = ehsCaps old <> ehsCaps new, + ehsIPv4 = ehsIPv4 old `mergeOld` ehsIPv4 new, + ehsIPv6 = ehsIPv6 old `mergeOld` ehsIPv6 new, + ehsMetadataSize = ehsMetadataSize old `mergeNew` ehsMetadataSize new, + ehsPort = ehsPort old `mergeOld` ehsPort new, + ehsQueueLength = ehsQueueLength old `mergeNew` ehsQueueLength new, + ehsVersion = ehsVersion old `mergeOld` ehsVersion new, + ehsYourIp = ehsYourIp old `mergeOld` ehsYourIp new + } + where + mergeOld mold mnew = mold <|> mnew + mergeNew mold mnew = mnew <|> mold + + +instance BEncode ExtendedHandshake where + toBEncode ExtendedHandshake {..} = toDict $ + "ipv4" .=? (S.encode <$> ehsIPv4) + .: "ipv6" .=? (S.encode <$> ehsIPv6) + .: "m" .=! ehsCaps + .: "metadata_size" .=? ehsMetadataSize + .: "p" .=? ehsPort + .: "reqq" .=? ehsQueueLength + .: "v" .=? ehsVersion + .: "yourip" .=? (runPut <$> either put put <$> toEither <$> ehsYourIp) + .: endDict + where + toEither (IPv4 v4) = Left v4 + toEither (IPv6 v6) = Right v6 + + fromBEncode = fromDict $ ExtendedHandshake + <$>? "ipv4" + <*>? "ipv6" + <*>! "m" + <*>? "metadata_size" + <*>? "p" + <*>? "reqq" + <*>? "v" + <*> (opt "yourip" >>= getYourIp) + +getYourIp :: Maybe BValue -> BE.Get (Maybe IP) +getYourIp f = + return $ do + BString ip <- f + either (const Nothing) Just $ + case BS.length ip of + 4 -> IPv4 <$> S.decode ip + 16 -> IPv6 <$> S.decode ip + _ -> fail "" + +instance Pretty ExtendedHandshake where + pPrint = PP.text . show + +-- | NOTE: Approximated 'stats'. +instance PeerMessage ExtendedHandshake where + envelop c = envelop c . EHandshake + {-# INLINE envelop #-} + + requires _ = Just ExtExtended + {-# INLINE requires #-} + + stats _ = ByteStats (4 + 1 + 1) 100 {- is it ok? -} 0 -- FIXME + {-# INLINE stats #-} + +-- | Set default values and the specified 'ExtendedCaps'. +nullExtendedHandshake :: ExtendedCaps -> ExtendedHandshake +nullExtendedHandshake caps = ExtendedHandshake + { ehsIPv4 = Nothing + , ehsIPv6 = Nothing + , ehsCaps = caps + , ehsMetadataSize = Nothing + , ehsPort = Nothing + , ehsQueueLength = Just defaultQueueLength + , ehsVersion = Just $ T.pack $ render $ pPrint libFingerprint + , ehsYourIp = Nothing + } + +{----------------------------------------------------------------------- +-- Metadata exchange extension +-----------------------------------------------------------------------} + +-- | A peer MUST verify that any piece it sends passes the info-hash +-- verification. i.e. until the peer has the entire metadata, it +-- cannot run SHA-1 to verify that it yields the same hash as the +-- info-hash. +-- +data ExtendedMetadata + -- | This message requests the a specified metadata piece. The + -- response to this message, from a peer supporting the extension, + -- is either a 'MetadataReject' or a 'MetadataData' message. + = MetadataRequest PieceIx + + -- | If sender requested a valid 'PieceIx' and receiver have the + -- corresponding piece then receiver should respond with this + -- message. + | MetadataData + { -- | A piece of 'Data.Torrent.InfoDict'. + piece :: P.Piece BS.ByteString + + -- | This key has the same semantics as the 'ehsMetadataSize' in + -- the 'ExtendedHandshake' — it is size of the torrent info + -- dict. + , totalSize :: Int + } + + -- | Peers that do not have the entire metadata MUST respond with + -- a reject message to any metadata request. + -- + -- Clients MAY implement flood protection by rejecting request + -- messages after a certain number of them have been + -- served. Typically the number of pieces of metadata times a + -- factor. + | MetadataReject PieceIx + + -- | Reserved. By specification we should ignore unknown metadata + -- messages. + | MetadataUnknown BValue + deriving (Show, Eq, Typeable) + +-- | Extended metadata message id used in 'msg_type_key'. +type MetadataId = Int + +msg_type_key, piece_key, total_size_key :: BKey +msg_type_key = "msg_type" +piece_key = "piece" +total_size_key = "total_size" + +-- | BEP9 compatible encoding. +instance BEncode ExtendedMetadata where + toBEncode (MetadataRequest pix) = toDict $ + msg_type_key .=! (0 :: MetadataId) + .: piece_key .=! pix + .: endDict + toBEncode (MetadataData (P.Piece pix _) totalSize) = toDict $ + msg_type_key .=! (1 :: MetadataId) + .: piece_key .=! pix + .: total_size_key .=! totalSize + .: endDict + toBEncode (MetadataReject pix) = toDict $ + msg_type_key .=! (2 :: MetadataId) + .: piece_key .=! pix + .: endDict + toBEncode (MetadataUnknown bval) = bval + + fromBEncode bval = (`fromDict` bval) $ do + mid <- field $ req msg_type_key + case mid :: MetadataId of + 0 -> MetadataRequest <$>! piece_key + 1 -> metadataData <$>! piece_key <*>! total_size_key + 2 -> MetadataReject <$>! piece_key + _ -> pure (MetadataUnknown bval) + where + metadataData pix s = MetadataData (P.Piece pix BS.empty) s + +-- | Piece data bytes are omitted. +instance Pretty ExtendedMetadata where + pPrint (MetadataRequest pix ) = "Request" <+> PP.int pix + pPrint (MetadataData p t) = "Data" <+> pPrint p <+> PP.int t + pPrint (MetadataReject pix ) = "Reject" <+> PP.int pix + pPrint (MetadataUnknown bval ) = "Unknown" <+> ppBEncode bval + +-- | NOTE: Approximated 'stats'. +instance PeerMessage ExtendedMetadata where + envelop c = envelop c . EMetadata (remoteMessageId ExtMetadata c) + {-# INLINE envelop #-} + + requires _ = Just ExtExtended + {-# INLINE requires #-} + + stats (MetadataRequest _) = ByteStats (4 + 1 + 1) {- ~ -} 25 0 + stats (MetadataData p _) = ByteStats (4 + 1 + 1) {- ~ -} 41 $ + BS.length (P.pieceData p) + stats (MetadataReject _) = ByteStats (4 + 1 + 1) {- ~ -} 25 0 + stats (MetadataUnknown _) = ByteStats (4 + 1 + 1) {- ? -} 0 0 + +-- | All 'Piece's in 'MetadataData' messages MUST have size equal to +-- this value. The last trailing piece can be shorter. +metadataPieceSize :: PieceSize +metadataPieceSize = 16 * 1024 + +isLastPiece :: P.Piece a -> Int -> Bool +isLastPiece P.Piece {..} total = succ pieceIndex == pcnt + where + pcnt = q + if r > 0 then 1 else 0 + (q, r) = quotRem total metadataPieceSize + +-- TODO we can check if the piece payload bytestring have appropriate +-- length; otherwise serialization MUST fail. +isValidPiece :: P.Piece BL.ByteString -> Int -> Bool +isValidPiece p @ P.Piece {..} total + | isLastPiece p total = pieceSize p <= metadataPieceSize + | otherwise = pieceSize p == metadataPieceSize + +setMetadataPayload :: BS.ByteString -> ExtendedMetadata -> ExtendedMetadata +setMetadataPayload bs (MetadataData (P.Piece pix _) t) = + MetadataData (P.Piece pix bs) t +setMetadataPayload _ msg = msg + +getMetadataPayload :: ExtendedMetadata -> Maybe BS.ByteString +getMetadataPayload (MetadataData (P.Piece _ bs) _) = Just bs +getMetadataPayload _ = Nothing + +-- | Metadata BDict usually contain only 'msg_type_key', 'piece_key' +-- and 'total_size_key' fields so it normally should take less than +-- 100 bytes. This limit is two order of magnitude larger to be +-- permissive to 'MetadataUnknown' messages. +-- +-- See 'maxMessageSize' for further explanation. +-- +maxMetadataBDictSize :: Int +maxMetadataBDictSize = 16 * 1024 + +maxMetadataSize :: Int +maxMetadataSize = maxMetadataBDictSize + metadataPieceSize + +-- to make MetadataData constructor fields a little bit prettier we +-- cheat here: first we read empty 'pieceData' from bdict, but then we +-- fill that field with the actual piece data — trailing bytes of +-- the message +getMetadata :: Int -> S.Get ExtendedMetadata +getMetadata len + | len > maxMetadataSize = fail $ parseError "size exceeded limit" + | otherwise = do + bs <- getByteString len + parseRes $ BS.parse BE.parser bs + where + parseError reason = "unable to parse metadata message: " ++ reason + + parseRes (BS.Fail _ _ m) = fail $ parseError $ "bdict: " ++ m + parseRes (BS.Partial _) = fail $ parseError "bdict: not enough bytes" + parseRes (BS.Done piece bvalueBS) + | BS.length piece > metadataPieceSize + = fail "infodict piece: size exceeded limit" + | otherwise = do + metadata <- either (fail . parseError) pure $ fromBEncode bvalueBS + return $ setMetadataPayload piece metadata + +putMetadata :: ExtendedMetadata -> BL.ByteString +putMetadata msg + | Just bs <- getMetadataPayload msg = BE.encode msg <> BL.fromStrict bs + | otherwise = BE.encode msg + +-- | Allows a requesting peer to send 2 'MetadataRequest's for the +-- each piece. +-- +-- See 'Network.BitTorrent.Wire.Options.metadataFactor' for +-- explanation why do we need this limit. +defaultMetadataFactor :: Int +defaultMetadataFactor = 2 + +-- | Usually torrent size do not exceed 1MB. This value limit torrent +-- /content/ size to about 8TB. +-- +-- See 'Network.BitTorrent.Wire.Options.maxInfoDictSize' for +-- explanation why do we need this limit. +defaultMaxInfoDictSize :: Int +defaultMaxInfoDictSize = 10 * 1024 * 1024 + +{----------------------------------------------------------------------- +-- Extension protocol messages +-----------------------------------------------------------------------} + +-- | For more info see +data ExtendedMessage + = EHandshake ExtendedHandshake + | EMetadata ExtendedMessageId ExtendedMetadata + | EUnknown ExtendedMessageId BS.ByteString + deriving (Show, Eq, Typeable) + +instance Pretty ExtendedMessage where + pPrint (EHandshake ehs) = pPrint ehs + pPrint (EMetadata _ msg) = "Metadata" <+> pPrint msg + pPrint (EUnknown mid _ ) = "Unknown" <+> PP.text (show mid) + +instance PeerMessage ExtendedMessage where + envelop _ = Extended + {-# INLINE envelop #-} + + requires _ = Just ExtExtended + {-# INLINE requires #-} + + stats (EHandshake hs) = stats hs + stats (EMetadata _ msg) = stats msg + stats (EUnknown _ msg) = ByteStats (4 + 1 + 1) (BS.length msg) 0 + +{----------------------------------------------------------------------- +-- The message datatype +-----------------------------------------------------------------------} + +type MessageId = Word8 + +-- | Messages used in communication between peers. +-- +-- Note: If some extensions are disabled (not present in extension +-- mask) and client receive message used by the disabled +-- extension then the client MUST close the connection. +-- +data Message + -- | Peers may close the TCP connection if they have not received + -- any messages for a given period of time, generally 2 + -- minutes. Thus, the KeepAlive message is sent to keep the + -- connection between two peers alive, if no /other/ message has + -- been sent in a given period of time. + = KeepAlive + | Status !StatusUpdate -- ^ Messages used to update peer status. + | Available !Available -- ^ Messages used to inform availability. + | Transfer !Transfer -- ^ Messages used to transfer 'Block's. + + -- | Peer receiving a handshake indicating the remote peer + -- supports the 'ExtDHT' should send a 'Port' message. Peers that + -- receive this message should attempt to ping the node on the + -- received port and IP address of the remote peer. + | Port !PortNumber + | Fast !FastMessage + | Extended !ExtendedMessage + deriving (Show, Eq) + +instance Default Message where + def = KeepAlive + {-# INLINE def #-} + +-- | Payload bytes are omitted. +instance Pretty Message where + pPrint (KeepAlive ) = "Keep alive" + pPrint (Status m) = "Status" <+> pPrint m + pPrint (Available m) = pPrint m + pPrint (Transfer m) = pPrint m + pPrint (Port p) = "Port" <+> int (fromEnum p) + pPrint (Fast m) = pPrint m + pPrint (Extended m) = pPrint m + +instance PeerMessage Message where + envelop _ = id + {-# INLINE envelop #-} + + requires KeepAlive = Nothing + requires (Status _) = Nothing + requires (Available _) = Nothing + requires (Transfer _) = Nothing + requires (Port _) = Just ExtDHT + requires (Fast _) = Just ExtFast + requires (Extended _) = Just ExtExtended + + stats KeepAlive = ByteStats 4 0 0 + stats (Status m) = stats m + stats (Available m) = stats m + stats (Transfer m) = stats m + stats (Port _) = ByteStats 5 2 0 + stats (Fast m) = stats m + stats (Extended m) = stats m + +-- | PORT message. +instance PeerMessage PortNumber where + envelop _ = Port + {-# INLINE envelop #-} + + requires _ = Just ExtDHT + {-# INLINE requires #-} + +-- | How long /this/ peer should wait before dropping connection, in +-- seconds. +defaultKeepAliveTimeout :: Int +defaultKeepAliveTimeout = 2 * 60 + +-- | How often /this/ peer should send 'KeepAlive' messages, in +-- seconds. +defaultKeepAliveInterval :: Int +defaultKeepAliveInterval = 60 + +getInt :: S.Get Int +getInt = fromIntegral <$> S.getWord32be +{-# INLINE getInt #-} + +putInt :: S.Putter Int +putInt = S.putWord32be . fromIntegral +{-# INLINE putInt #-} + +-- | This limit should protect against "out-of-memory" attacks: if a +-- malicious peer have sent a long varlength message then receiver can +-- accumulate too long bytestring in the 'Get'. +-- +-- Normal messages should never exceed this limits. +-- +-- See also 'maxBitfieldSize', 'maxBlockSize' limits. +-- +maxMessageSize :: Int +maxMessageSize = 20 + 1024 * 1024 + +-- | This also limit max torrent size to: +-- +-- max_bitfield_size * piece_ix_per_byte * max_piece_size = +-- 2 ^ 20 * 8 * 1MB = +-- 8TB +-- +maxBitfieldSize :: Int +maxBitfieldSize = 1024 * 1024 + +getBitfield :: Int -> S.Get Bitfield +getBitfield len + | len > maxBitfieldSize = fail "BITFIELD message size exceeded limit" + | otherwise = fromBitmap <$> getByteString len + +maxBlockSize :: Int +maxBlockSize = 4 * defaultTransferSize + +getBlock :: Int -> S.Get (Block BL.ByteString) +getBlock len + | len > maxBlockSize = fail "BLOCK message size exceeded limit" + | otherwise = Block <$> getInt <*> getInt + <*> getLazyByteString (fromIntegral len) +{-# INLINE getBlock #-} + +instance Serialize Message where + get = do + len <- getInt + + when (len > maxMessageSize) $ do + fail "message body size exceeded the limit" + + if len == 0 then return KeepAlive + else do + mid <- S.getWord8 + case mid of + 0x00 -> return $ Status (Choking True) + 0x01 -> return $ Status (Choking False) + 0x02 -> return $ Status (Interested True) + 0x03 -> return $ Status (Interested False) + 0x04 -> (Available . Have) <$> getInt + 0x05 -> (Available . Bitfield) <$> getBitfield (pred len) + 0x06 -> (Transfer . Request) <$> S.get + 0x07 -> (Transfer . Piece) <$> getBlock (len - 9) + 0x08 -> (Transfer . Cancel) <$> S.get + 0x09 -> Port <$> S.get + 0x0D -> (Fast . SuggestPiece) <$> getInt + 0x0E -> return $ Fast HaveAll + 0x0F -> return $ Fast HaveNone + 0x10 -> (Fast . RejectRequest) <$> S.get + 0x11 -> (Fast . AllowedFast) <$> getInt + 0x14 -> Extended <$> getExtendedMessage (pred len) + _ -> do + rm <- S.remaining >>= S.getBytes + fail $ "unknown message ID: " ++ show mid ++ "\n" + ++ "remaining available bytes: " ++ show rm + + put KeepAlive = putInt 0 + put (Status msg) = putStatus msg + put (Available msg) = putAvailable msg + put (Transfer msg) = putTransfer msg + put (Port p ) = putPort p + put (Fast msg) = putFast msg + put (Extended m ) = putExtendedMessage m + +statusUpdateId :: StatusUpdate -> MessageId +statusUpdateId (Choking choking) = fromIntegral (0 + fromEnum choking) +statusUpdateId (Interested choking) = fromIntegral (2 + fromEnum choking) + +putStatus :: Putter StatusUpdate +putStatus su = do + putInt 1 + putWord8 (statusUpdateId su) + +putAvailable :: Putter Available +putAvailable (Have i) = do + putInt 5 + putWord8 0x04 + putInt i +putAvailable (Bitfield (toBitmap -> bs)) = do + putInt $ 1 + fromIntegral (BL.length bs) + putWord8 0x05 + putLazyByteString bs + +putBlock :: Putter (Block BL.ByteString) +putBlock Block {..} = do + putInt blkPiece + putInt blkOffset + putLazyByteString blkData + +putTransfer :: Putter Transfer +putTransfer (Request blk) = putInt 13 >> S.putWord8 0x06 >> S.put blk +putTransfer (Piece blk) = do + putInt (9 + blockSize blk) + putWord8 0x07 + putBlock blk +putTransfer (Cancel blk) = putInt 13 >> S.putWord8 0x08 >> S.put blk + +putPort :: Putter PortNumber +putPort p = do + putInt 3 + putWord8 0x09 + put p + +putFast :: Putter FastMessage +putFast HaveAll = putInt 1 >> putWord8 0x0E +putFast HaveNone = putInt 1 >> putWord8 0x0F +putFast (SuggestPiece pix) = putInt 5 >> putWord8 0x0D >> putInt pix +putFast (RejectRequest i ) = putInt 13 >> putWord8 0x10 >> put i +putFast (AllowedFast i ) = putInt 5 >> putWord8 0x11 >> putInt i + +maxEHandshakeSize :: Int +maxEHandshakeSize = 16 * 1024 + +getExtendedHandshake :: Int -> S.Get ExtendedHandshake +getExtendedHandshake messageSize + | messageSize > maxEHandshakeSize + = fail "extended handshake size exceeded limit" + | otherwise = do + bs <- getByteString messageSize + either fail pure $ BE.decode bs + +maxEUnknownSize :: Int +maxEUnknownSize = 64 * 1024 + +getExtendedUnknown :: Int -> S.Get BS.ByteString +getExtendedUnknown len + | len > maxEUnknownSize = fail "unknown extended message size exceeded limit" + | otherwise = getByteString len + +getExtendedMessage :: Int -> S.Get ExtendedMessage +getExtendedMessage messageSize = do + msgId <- getWord8 + let msgBodySize = messageSize - 1 + case msgId of + 0 -> EHandshake <$> getExtendedHandshake msgBodySize + 1 -> EMetadata msgId <$> getMetadata msgBodySize + _ -> EUnknown msgId <$> getExtendedUnknown msgBodySize + +-- | By spec. +extendedMessageId :: MessageId +extendedMessageId = 20 + +putExt :: ExtendedMessageId -> BL.ByteString -> Put +putExt mid lbs = do + putWord32be $ fromIntegral (1 + 1 + BL.length lbs) + putWord8 extendedMessageId + putWord8 mid + putLazyByteString lbs + +-- NOTE: in contrast to getExtendedMessage this function put length +-- and message id too! +putExtendedMessage :: Putter ExtendedMessage +putExtendedMessage (EHandshake hs) = putExt extHandshakeId $ BE.encode hs +putExtendedMessage (EMetadata mid msg) = putExt mid $ putMetadata msg +putExtendedMessage (EUnknown mid bs) = putExt mid $ BL.fromStrict bs diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Session.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Session.hs new file mode 100644 index 00000000..38a3c3a6 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Session.hs @@ -0,0 +1,586 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TypeFamilies #-} +module Network.BitTorrent.Exchange.Session + ( -- * Session + Session + , Event (..) + , LogFun + , sessionLogger + + -- * Construction + , newSession + , closeSession + , withSession + + -- * Connection Set + , connect + , connectSink + , establish + + -- * Query + , waitMetadata + , takeMetadata + ) where + +import Control.Applicative +import Control.Concurrent +import Control.Concurrent.Chan.Split as CS +import Control.Concurrent.STM +import Control.Exception hiding (Handler) +import Control.Lens +import Control.Monad as M +import Control.Monad.Logger +import Control.Monad.Reader +import Data.ByteString as BS +import Data.ByteString.Lazy as BL +import Data.Conduit as C (Sink, awaitForever, (=$=), ($=)) +import qualified Data.Conduit as C +import Data.Conduit.List as C +import Data.Map as M +import Data.Monoid +import Data.Set as S +import Data.Text as T +import Data.Typeable +import Text.PrettyPrint hiding ((<>)) +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) +import System.Log.FastLogger (LogStr, ToLogStr (..)) + +import Data.BEncode as BE +import Data.Torrent as Torrent +import Network.BitTorrent.Internal.Types +import Network.Address +import Network.BitTorrent.Exchange.Bitfield as BF +import Network.BitTorrent.Exchange.Block as Block +import Network.BitTorrent.Exchange.Connection +import Network.BitTorrent.Exchange.Download as D +import Network.BitTorrent.Exchange.Message as Message +import System.Torrent.Storage + +#if !MIN_VERSION_iproute(1,2,12) +deriving instance Ord IP +#endif + +{----------------------------------------------------------------------- +-- Exceptions +-----------------------------------------------------------------------} + +data ExchangeError + = InvalidRequest BlockIx StorageFailure + | CorruptedPiece PieceIx + deriving (Show, Typeable) + +instance Exception ExchangeError + +packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a +packException f m = try m >>= either (throwIO . f) return + +{----------------------------------------------------------------------- +-- Session state +-----------------------------------------------------------------------} +-- TODO unmap storage on zero connections + +data Cached a = Cached + { cachedValue :: !a + , cachedData :: BL.ByteString -- keep lazy + } + +cache :: BEncode a => a -> Cached a +cache s = Cached s (BE.encode s) + +-- | Logger function. +type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () + +--data SessionStatus = Seeder | Leecher + +data SessionState + = WaitingMetadata + { metadataDownload :: MVar MetadataDownload + , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters + , contentRootPath :: FilePath + } + | HavingMetadata + { metadataCache :: Cached InfoDict + , contentDownload :: MVar ContentDownload + , contentStorage :: Storage + } + +newSessionState :: FilePath -> Either InfoHash InfoDict -> IO SessionState +newSessionState rootPath (Left ih ) = do + WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath +newSessionState rootPath (Right dict) = do + storage <- openInfoDict ReadWriteEx rootPath dict + download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) + (piPieceLength (idPieceInfo dict)) + storage + return $ HavingMetadata (cache dict) download storage + +closeSessionState :: SessionState -> IO () +closeSessionState WaitingMetadata {..} = return () +closeSessionState HavingMetadata {..} = close contentStorage + +haveMetadata :: InfoDict -> SessionState -> IO SessionState +haveMetadata dict WaitingMetadata {..} = do + storage <- openInfoDict ReadWriteEx contentRootPath dict + download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) + (piPieceLength (idPieceInfo dict)) + storage + return HavingMetadata + { metadataCache = cache dict + , contentDownload = download + , contentStorage = storage + } +haveMetadata _ s = return s + +{----------------------------------------------------------------------- +-- Session +-----------------------------------------------------------------------} + +data Session = Session + { sessionPeerId :: !(PeerId) + , sessionTopic :: !(InfoHash) + , sessionLogger :: !(LogFun) + , sessionEvents :: !(SendPort (Event Session)) + + , sessionState :: !(MVar SessionState) + +------------------------------------------------------------------------ + , connectionsPrefs :: !ConnectionPrefs + + -- | Connections either waiting for TCP/uTP 'connect' or waiting + -- for BT handshake. + , connectionsPending :: !(TVar (Set (PeerAddr IP))) + + -- | Connections successfully handshaked and data transfer can + -- take place. + , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session))) + + -- | TODO implement choking mechanism + , connectionsUnchoked :: [PeerAddr IP] + + -- | Messages written to this channel will be sent to the all + -- connections, including pending connections (but right after + -- handshake). + , connectionsBroadcast :: !(Chan Message) + } + +instance EventSource Session where + data Event Session + = ConnectingTo (PeerAddr IP) + | ConnectionEstablished (PeerAddr IP) + | ConnectionAborted + | ConnectionClosed (PeerAddr IP) + | SessionClosed + deriving Show + + listen Session {..} = CS.listen sessionEvents + +newSession :: LogFun + -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; + -> FilePath -- ^ root directory for content files; + -> Either InfoHash InfoDict -- ^ torrent info dictionary; + -> IO Session +newSession logFun addr rootPath source = do + let ih = either id idInfoHash source + pid <- maybe genPeerId return (peerId addr) + eventStream <- newSendPort + sState <- newSessionState rootPath source + sStateVar <- newMVar sState + pSetVar <- newTVarIO S.empty + eSetVar <- newTVarIO M.empty + chan <- newChan + return Session + { sessionPeerId = pid + , sessionTopic = ih + , sessionLogger = logFun + , sessionEvents = eventStream + , sessionState = sStateVar + , connectionsPrefs = def + , connectionsPending = pSetVar + , connectionsEstablished = eSetVar + , connectionsUnchoked = [] + , connectionsBroadcast = chan + } + +closeSession :: Session -> IO () +closeSession Session {..} = do + s <- readMVar sessionState + closeSessionState s +{- + hSet <- atomically $ do + pSet <- swapTVar connectionsPending S.empty + eSet <- swapTVar connectionsEstablished S.empty + return pSet + mapM_ kill hSet +-} + +withSession :: () +withSession = error "withSession" + +{----------------------------------------------------------------------- +-- Logging +-----------------------------------------------------------------------} + +instance MonadLogger (Connected Session) where + monadLoggerLog loc src lvl msg = do + conn <- ask + ses <- asks connSession + addr <- asks connRemoteAddr + let addrSrc = src <> " @ " <> T.pack (render (pPrint addr)) + liftIO $ sessionLogger ses loc addrSrc lvl (toLogStr msg) + +logMessage :: MonadLogger m => Message -> m () +logMessage msg = logDebugN $ T.pack (render (pPrint msg)) + +logEvent :: MonadLogger m => Text -> m () +logEvent = logInfoN + +{----------------------------------------------------------------------- +-- Connection set +-----------------------------------------------------------------------} +--- Connection status transition: +--- +--- pending -> established -> finished -> closed +--- | \|/ /|\ +--- \-------------------------------------| +--- +--- Purpose of slots: +--- 1) to avoid duplicates +--- 2) connect concurrently +--- + +-- | Add connection to the pending set. +pendingConnection :: PeerAddr IP -> Session -> STM Bool +pendingConnection addr Session {..} = do + pSet <- readTVar connectionsPending + eSet <- readTVar connectionsEstablished + if (addr `S.member` pSet) || (addr `M.member` eSet) + then return False + else do + modifyTVar' connectionsPending (S.insert addr) + return True + +-- | Pending connection successfully established, add it to the +-- established set. +establishedConnection :: Connected Session () +establishedConnection = do + conn <- ask + addr <- asks connRemoteAddr + Session {..} <- asks connSession + liftIO $ atomically $ do + modifyTVar connectionsPending (S.delete addr) + modifyTVar connectionsEstablished (M.insert addr conn) + +-- | Either this or remote peer decided to finish conversation +-- (conversation is alread /established/ connection), remote it from +-- the established set. +finishedConnection :: Connected Session () +finishedConnection = do + Session {..} <- asks connSession + addr <- asks connRemoteAddr + liftIO $ atomically $ do + modifyTVar connectionsEstablished $ M.delete addr + +-- | There are no state for this connection, remove it from the all +-- sets. +closedConnection :: PeerAddr IP -> Session -> STM () +closedConnection addr Session {..} = do + modifyTVar connectionsPending $ S.delete addr + modifyTVar connectionsEstablished $ M.delete addr + +getConnectionConfig :: Session -> IO (ConnectionConfig Session) +getConnectionConfig s @ Session {..} = do + chan <- dupChan connectionsBroadcast + let sessionLink = SessionLink { + linkTopic = sessionTopic + , linkPeerId = sessionPeerId + , linkMetadataSize = Nothing + , linkOutputChan = Just chan + , linkSession = s + } + return ConnectionConfig + { cfgPrefs = connectionsPrefs + , cfgSession = sessionLink + , cfgWire = mainWire + } + +type Finalizer = IO () +type Runner = (ConnectionConfig Session -> IO ()) + +runConnection :: Runner -> Finalizer -> PeerAddr IP -> Session -> IO () +runConnection runner finalize addr set @ Session {..} = do + _ <- forkIO (action `finally` cleanup) + return () + where + action = do + notExist <- atomically $ pendingConnection addr set + when notExist $ do + cfg <- getConnectionConfig set + runner cfg + + cleanup = do + finalize +-- runStatusUpdates status (SS.resetPending addr) + -- TODO Metata.resetPending addr + atomically $ closedConnection addr set + +-- | Establish connection from scratch. If this endpoint is already +-- connected, no new connections is created. This function do not block. +connect :: PeerAddr IP -> Session -> IO () +connect addr = runConnection (connectWire addr) (return ()) addr + +-- | Establish connection with already pre-connected endpoint. If this +-- endpoint is already connected, no new connections is created. This +-- function do not block. +-- +-- 'PendingConnection' will be closed automatically, you do not need +-- to call 'closePending'. +establish :: PendingConnection -> Session -> IO () +establish conn = runConnection (acceptWire conn) (closePending conn) + (pendingPeer conn) + +-- | Conduit version of 'connect'. +connectSink :: MonadIO m => Session -> Sink [PeerAddr IPv4] m () +connectSink s = C.mapM_ (liftIO . connectBatch) + where + connectBatch = M.mapM_ (\ addr -> connect (IPv4 <$> addr) s) + +-- | Why do we need this message? +type BroadcastMessage = ExtendedCaps -> Message + +broadcast :: BroadcastMessage -> Session -> IO () +broadcast = error "broadcast" + +{----------------------------------------------------------------------- +-- Helpers +-----------------------------------------------------------------------} + +waitMVar :: MVar a -> IO () +waitMVar m = withMVar m (const (return ())) + +-- This function appear in new GHC "out of box". (moreover it is atomic) +tryReadMVar :: MVar a -> IO (Maybe a) +tryReadMVar m = do + ma <- tryTakeMVar m + maybe (return ()) (putMVar m) ma + return ma + +readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString) +readBlock bix @ BlockIx {..} s = do + p <- packException (InvalidRequest bix) $ do readPiece ixPiece s + let chunk = BL.take (fromIntegral ixLength) $ + BL.drop (fromIntegral ixOffset) (pieceData p) + if BL.length chunk == fromIntegral ixLength + then return $ Block ixPiece ixOffset chunk + else throwIO $ InvalidRequest bix (InvalidSize ixLength) + +-- | +tryReadMetadataBlock :: PieceIx + -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) +tryReadMetadataBlock pix = do + Session {..} <- asks connSession + s <- liftIO (readMVar sessionState) + case s of + WaitingMetadata {..} -> error "tryReadMetadataBlock" + HavingMetadata {..} -> error "tryReadMetadataBlock" + +sendBroadcast :: PeerMessage msg => msg -> Wire Session () +sendBroadcast msg = do + Session {..} <- asks connSession + error "sendBroadcast" +-- liftIO $ msg `broadcast` sessionConnections + +waitMetadata :: Session -> IO InfoDict +waitMetadata Session {..} = do + s <- readMVar sessionState + case s of + WaitingMetadata {..} -> readMVar metadataCompleted + HavingMetadata {..} -> return (cachedValue metadataCache) + +takeMetadata :: Session -> IO (Maybe InfoDict) +takeMetadata Session {..} = do + s <- readMVar sessionState + case s of + WaitingMetadata {..} -> return Nothing + HavingMetadata {..} -> return (Just (cachedValue metadataCache)) + +{----------------------------------------------------------------------- +-- Triggers +-----------------------------------------------------------------------} + +-- | Trigger is the reaction of a handler at some event. +type Trigger = Wire Session () + +interesting :: Trigger +interesting = do + addr <- asks connRemoteAddr + sendMessage (Interested True) + sendMessage (Choking False) + tryFillRequestQueue + +fillRequestQueue :: Trigger +fillRequestQueue = do + maxN <- lift getMaxQueueLength + rbf <- use connBitfield + addr <- asks connRemoteAddr +-- blks <- withStatusUpdates $ do +-- n <- getRequestQueueLength addr +-- scheduleBlocks addr rbf (maxN - n) +-- mapM_ (sendMessage . Request) blks + return () + +tryFillRequestQueue :: Trigger +tryFillRequestQueue = do + allowed <- canDownload <$> use connStatus + when allowed $ do + fillRequestQueue + +{----------------------------------------------------------------------- +-- Incoming message handling +-----------------------------------------------------------------------} + +type Handler msg = msg -> Wire Session () + +handleStatus :: Handler StatusUpdate +handleStatus s = do + connStatus %= over remoteStatus (updateStatus s) + case s of + Interested _ -> return () + Choking True -> do + addr <- asks connRemoteAddr +-- withStatusUpdates (SS.resetPending addr) + return () + Choking False -> tryFillRequestQueue + +handleAvailable :: Handler Available +handleAvailable msg = do + connBitfield %= case msg of + Have ix -> BF.insert ix + Bitfield bf -> const bf + + --thisBf <- getThisBitfield + thisBf <- undefined + case msg of + Have ix + | ix `BF.member` thisBf -> return () + | otherwise -> interesting + Bitfield bf + | bf `BF.isSubsetOf` thisBf -> return () + | otherwise -> interesting + +handleTransfer :: Handler Transfer +handleTransfer (Request bix) = do + Session {..} <- asks connSession + s <- liftIO $ readMVar sessionState + case s of + WaitingMetadata {..} -> return () + HavingMetadata {..} -> do + bitfield <- undefined -- getThisBitfield + upload <- canUpload <$> use connStatus + when (upload && ixPiece bix `BF.member` bitfield) $ do + blk <- liftIO $ readBlock bix contentStorage + sendMessage (Message.Piece blk) + +handleTransfer (Message.Piece blk) = do + Session {..} <- asks connSession + s <- liftIO $ readMVar sessionState + case s of + WaitingMetadata {..} -> return () -- TODO (?) break connection + HavingMetadata {..} -> do + isSuccess <- undefined -- withStatusUpdates (SS.pushBlock blk storage) + case isSuccess of + Nothing -> liftIO $ throwIO $ userError "block is not requested" + Just isCompleted -> do + when isCompleted $ do + sendBroadcast (Have (blkPiece blk)) +-- maybe send not interested + tryFillRequestQueue + +handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) + where + transferResponse bix (Transfer (Message.Piece blk)) = blockIx blk == bix + transferResponse _ _ = False + +{----------------------------------------------------------------------- +-- Metadata exchange +-----------------------------------------------------------------------} +-- TODO introduce new metadata exchange specific exceptions + +waitForMetadata :: Trigger +waitForMetadata = do + Session {..} <- asks connSession + needFetch <- undefined --liftIO (isEmptyMVar infodict) + when needFetch $ do + canFetch <- allowed ExtMetadata <$> use connExtCaps + if canFetch + then tryRequestMetadataBlock + else undefined -- liftIO (waitMVar infodict) + +tryRequestMetadataBlock :: Trigger +tryRequestMetadataBlock = do + mpix <- lift $ undefined --withMetadataUpdates Metadata.scheduleBlock + case mpix of + Nothing -> error "tryRequestMetadataBlock" + Just pix -> sendMessage (MetadataRequest pix) + +handleMetadata :: Handler ExtendedMetadata +handleMetadata (MetadataRequest pix) = + lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse + where + mkResponse Nothing = MetadataReject pix + mkResponse (Just (piece, total)) = MetadataData piece total + +handleMetadata (MetadataData {..}) = do + ih <- asks connTopic + mdict <- lift $ undefined --withMetadataUpdates (Metadata.pushBlock piece ih) + case mdict of + Nothing -> tryRequestMetadataBlock -- not completed, need all blocks + Just dict -> do -- complete, wake up payload fetch + Session {..} <- asks connSession + liftIO $ modifyMVar_ sessionState (haveMetadata dict) + +handleMetadata (MetadataReject pix) = do + lift $ undefined -- withMetadataUpdates (Metadata.cancelPending pix) + +handleMetadata (MetadataUnknown _ ) = do + logInfoN "Unknown metadata message" + +{----------------------------------------------------------------------- +-- Main entry point +-----------------------------------------------------------------------} + +acceptRehandshake :: ExtendedHandshake -> Trigger +acceptRehandshake ehs = error "acceptRehandshake" + +handleExtended :: Handler ExtendedMessage +handleExtended (EHandshake ehs) = acceptRehandshake ehs +handleExtended (EMetadata _ msg) = handleMetadata msg +handleExtended (EUnknown _ _ ) = logWarnN "Unknown extension message" + +handleMessage :: Handler Message +handleMessage KeepAlive = return () +handleMessage (Status s) = handleStatus s +handleMessage (Available msg) = handleAvailable msg +handleMessage (Transfer msg) = handleTransfer msg +handleMessage (Port n) = error "handleMessage" +handleMessage (Fast _) = error "handleMessage" +handleMessage (Extended msg) = handleExtended msg + +exchange :: Wire Session () +exchange = do + waitForMetadata + bf <- undefined --getThisBitfield + sendMessage (Bitfield bf) + awaitForever handleMessage + +mainWire :: Wire Session () +mainWire = do + lift establishedConnection + Session {..} <- asks connSession +-- lift $ resizeBitfield (totalPieces storage) + logEvent "Connection established" + iterM logMessage =$= exchange =$= iterM logMessage + lift finishedConnection diff --git a/dht/bittorrent/src/Network/BitTorrent/Internal/Cache.hs b/dht/bittorrent/src/Network/BitTorrent/Internal/Cache.hs new file mode 100644 index 00000000..8c74467a --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Internal/Cache.hs @@ -0,0 +1,169 @@ +-- | +-- Copyright : (c) Sam Truzjan 2014 +-- License : BSD +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Cached data for tracker responses. +-- +module Network.BitTorrent.Internal.Cache + ( -- * Cache + Cached + , lastUpdated + , updateInterval + , minUpdateInterval + + -- * Construction + , newCached + , newCached_ + + -- * Query + , isAlive + , isStalled + , isExpired + , canUpdate + , shouldUpdate + + -- * Cached data + , tryTakeData + , unsafeTryTakeData + , takeData + ) where + +import Control.Applicative +import Data.Monoid +import Data.Default +import Data.Time +import Data.Time.Clock.POSIX +import System.IO.Unsafe + + +data Cached a = Cached + { -- | Time of resource creation. + lastUpdated :: !POSIXTime + + -- | Minimum invalidation timeout. + , minUpdateInterval :: !NominalDiffTime + + -- | Resource lifetime. + , updateInterval :: !NominalDiffTime + + -- | Resource data. + , cachedData :: a + } deriving (Show, Eq) + +-- INVARIANT: minUpdateInterval <= updateInterval + +instance Default (Cached a) where + def = mempty + +instance Functor Cached where + fmap f (Cached t i m a) = Cached t i m (f a) + +posixEpoch :: NominalDiffTime +posixEpoch = 1000000000000000000000000000000000000000000000000000000 + +instance Applicative Cached where + pure = Cached 0 posixEpoch posixEpoch + f <*> c = Cached + { lastUpdated = undefined + , minUpdateInterval = undefined + , updateInterval = undefined + , cachedData = cachedData f (cachedData c) + } + +instance Alternative Cached where + empty = mempty + (<|>) = error "cached alternative instance: not implemented" + +instance Monad Cached where + return = pure + Cached {..} >>= f = Cached + { lastUpdated = undefined + , updateInterval = undefined + , minUpdateInterval = undefined + , cachedData = undefined + } + +instance Monoid (Cached a) where + mempty = Cached + { lastUpdated = 0 + , minUpdateInterval = 0 + , updateInterval = 0 + , cachedData = error "cached mempty: impossible happen" + } + + mappend a b + | expirationTime a > expirationTime b = a + | otherwise = b + +normalize :: NominalDiffTime -> NominalDiffTime + -> (NominalDiffTime, NominalDiffTime) +normalize a b + | a < b = (a, b) + | otherwise = (b, a) +{-# INLINE normalize #-} + +newCached :: NominalDiffTime -> NominalDiffTime -> a -> IO (Cached a) +newCached minInterval interval x = do + t <- getPOSIXTime + let (mui, ui) = normalize minInterval interval + return Cached + { lastUpdated = t + , minUpdateInterval = mui + , updateInterval = ui + , cachedData = x + } + +newCached_ :: NominalDiffTime -> a -> IO (Cached a) +newCached_ interval x = newCached interval interval x +{-# INLINE newCached_ #-} + +expirationTime :: Cached a -> POSIXTime +expirationTime Cached {..} = undefined + +isAlive :: Cached a -> IO Bool +isAlive Cached {..} = do + currentTime <- getPOSIXTime + return $ lastUpdated + updateInterval > currentTime + +isExpired :: Cached a -> IO Bool +isExpired Cached {..} = undefined + +isStalled :: Cached a -> IO Bool +isStalled Cached {..} = undefined + +canUpdate :: Cached a -> IO (Maybe NominalDiffTime) +canUpdate = undefined --isStaled + +shouldUpdate :: Cached a -> IO (Maybe NominalDiffTime) +shouldUpdate = undefined -- isExpired + +tryTakeData :: Cached a -> IO (Maybe a) +tryTakeData c = do + alive <- isAlive c + return $ if alive then Just (cachedData c) else Nothing + +unsafeTryTakeData :: Cached a -> Maybe a +unsafeTryTakeData = unsafePerformIO . tryTakeData + +invalidateData :: Cached a -> IO a -> IO (Cached a) +invalidateData Cached {..} action = do + t <- getPOSIXTime + x <- action + return Cached + { lastUpdated = t + , updateInterval = updateInterval + , minUpdateInterval = minUpdateInterval + , cachedData = x + } + +takeData :: Cached a -> IO a -> IO a +takeData c action = do + mdata <- tryTakeData c + case mdata of + Just a -> return a + Nothing -> do + c' <- invalidateData c action + takeData c' action diff --git a/dht/bittorrent/src/Network/BitTorrent/Internal/Progress.hs b/dht/bittorrent/src/Network/BitTorrent/Internal/Progress.hs new file mode 100644 index 00000000..6ac889e2 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Internal/Progress.hs @@ -0,0 +1,154 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- 'Progress' used to track amount downloaded\/left\/upload bytes +-- either on per client or per torrent basis. This value is used to +-- notify the tracker and usually shown to the user. To aggregate +-- total progress you can use the Monoid instance. +-- +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE ViewPatterns #-} +{-# OPTIONS -fno-warn-orphans #-} +module Network.BitTorrent.Internal.Progress + ( -- * Progress + Progress (..) + + -- * Lens + , left + , uploaded + , downloaded + + -- * Construction + , startProgress + , downloadedProgress + , enqueuedProgress + , uploadedProgress + , dequeuedProgress + + -- * Query + , canDownload + , canUpload + ) where + +import Control.Applicative +import Control.Lens hiding ((%=)) +import Data.ByteString.Lazy.Builder as BS +import Data.ByteString.Lazy.Builder.ASCII as BS +import Data.Default +import Data.Monoid +import Data.Serialize as S +import Data.Ratio +import Data.Word +import Network.HTTP.Types.QueryLike +import Text.PrettyPrint as PP +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) + + +-- | Progress data is considered as dynamic within one client +-- session. This data also should be shared across client application +-- sessions (e.g. files), otherwise use 'startProgress' to get initial +-- 'Progress' value. +-- +data Progress = Progress + { _downloaded :: {-# UNPACK #-} !Word64 -- ^ Total amount of bytes downloaded; + , _left :: {-# UNPACK #-} !Word64 -- ^ Total amount of bytes left; + , _uploaded :: {-# UNPACK #-} !Word64 -- ^ Total amount of bytes uploaded. + } deriving (Show, Read, Eq) + +$(makeLenses ''Progress) + +-- | UDP tracker compatible encoding. +instance Serialize Progress where + put Progress {..} = do + putWord64be $ fromIntegral _downloaded + putWord64be $ fromIntegral _left + putWord64be $ fromIntegral _uploaded + + get = Progress + <$> (fromIntegral <$> getWord64be) + <*> (fromIntegral <$> getWord64be) + <*> (fromIntegral <$> getWord64be) + +instance Default Progress where + def = Progress 0 0 0 + {-# INLINE def #-} + +-- | Can be used to aggregate total progress. +instance Monoid Progress where + mempty = def + {-# INLINE mempty #-} + + mappend (Progress da la ua) (Progress db lb ub) = Progress + { _downloaded = da + db + , _left = la + lb + , _uploaded = ua + ub + } + {-# INLINE mappend #-} + +instance QueryValueLike Builder where + toQueryValue = toQueryValue . BS.toLazyByteString + +instance QueryValueLike Word64 where + toQueryValue = toQueryValue . BS.word64Dec + +-- | HTTP Tracker protocol compatible encoding. +instance QueryLike Progress where + toQuery Progress {..} = + [ ("uploaded" , toQueryValue _uploaded) + , ("left" , toQueryValue _left) + , ("downloaded", toQueryValue _downloaded) + ] + +instance Pretty Progress where + pPrint Progress {..} = + "/\\" <+> PP.text (show _uploaded) $$ + "\\/" <+> PP.text (show _downloaded) $$ + "left" <+> PP.text (show _left) + +-- | Initial progress is used when there are no session before. +-- +-- Please note that tracker might penalize client some way if the do +-- not accumulate progress. If possible and save 'Progress' between +-- client sessions to avoid that. +-- +startProgress :: Integer -> Progress +startProgress = Progress 0 0 . fromIntegral +{-# INLINE startProgress #-} + +-- | Used when the client download some data from /any/ peer. +downloadedProgress :: Int -> Progress -> Progress +downloadedProgress (fromIntegral -> amount) + = (left -~ amount) + . (downloaded +~ amount) +{-# INLINE downloadedProgress #-} + +-- | Used when the client upload some data to /any/ peer. +uploadedProgress :: Int -> Progress -> Progress +uploadedProgress (fromIntegral -> amount) = uploaded +~ amount +{-# INLINE uploadedProgress #-} + +-- | Used when leecher join client session. +enqueuedProgress :: Integer -> Progress -> Progress +enqueuedProgress amount = left +~ fromIntegral amount +{-# INLINE enqueuedProgress #-} + +-- | Used when leecher leave client session. +-- (e.g. user deletes not completed torrent) +dequeuedProgress :: Integer -> Progress -> Progress +dequeuedProgress amount = left -~ fromIntegral amount +{-# INLINE dequeuedProgress #-} + +ri2rw64 :: Ratio Int -> Ratio Word64 +ri2rw64 x = fromIntegral (numerator x) % fromIntegral (denominator x) + +-- | Check global /download/ limit by uploaded \/ downloaded ratio. +canDownload :: Ratio Int -> Progress -> Bool +canDownload limit Progress {..} = _uploaded % _downloaded > ri2rw64 limit + +-- | Check global /upload/ limit by downloaded \/ uploaded ratio. +canUpload :: Ratio Int -> Progress -> Bool +canUpload limit Progress {..} = _downloaded % _uploaded > ri2rw64 limit diff --git a/dht/bittorrent/src/Network/BitTorrent/Internal/Types.hs b/dht/bittorrent/src/Network/BitTorrent/Internal/Types.hs new file mode 100644 index 00000000..d157db3e --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Internal/Types.hs @@ -0,0 +1,10 @@ +{-# LANGUAGE TypeFamilies #-} +module Network.BitTorrent.Internal.Types + ( EventSource (..) + ) where + +import Control.Concurrent.Chan.Split + +class EventSource source where + data Event source + listen :: source -> IO (ReceivePort (Event source)) diff --git a/dht/bittorrent/src/Network/BitTorrent/Readme.md b/dht/bittorrent/src/Network/BitTorrent/Readme.md new file mode 100644 index 00000000..ebf9545e --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Readme.md @@ -0,0 +1,10 @@ +Layout +====== + +| module group | can import | main purpose | +|:-------------|:------------:|:--------------------------------------:| +| Core | | common datatypes | +| DHT | Core | centralized peer discovery | +| Tracker | Core | decentralized peer discovery | +| Exchange | Core | torrent content exchange | +| Client | any other | core of bittorrent client application | diff --git a/dht/bittorrent/src/Network/BitTorrent/Tracker.hs b/dht/bittorrent/src/Network/BitTorrent/Tracker.hs new file mode 100644 index 00000000..1191f921 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Tracker.hs @@ -0,0 +1,51 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : non-portable +-- +-- This module provides high level API for peer -> tracker +-- communication. Tracker is used to discover other peers in the +-- network using torrent info hash. +-- +{-# LANGUAGE TemplateHaskell #-} +module Network.BitTorrent.Tracker + ( -- * RPC Manager + PeerInfo (..) + , Options + , Manager + , newManager + , closeManager + , withManager + + -- * Multitracker session + , trackerList + , Session + , Event (..) + , trackers + , newSession + , closeSession + , withSession + + -- ** Events + , AnnounceEvent (..) + , notify + , askPeers + + -- ** Session state + , TrackerSession + , trackerPeers + , trackerScrape + + , tryTakeData + , unsafeTryTakeData + + , getSessionState + ) where + +import Network.BitTorrent.Internal.Cache (tryTakeData, unsafeTryTakeData) +import Network.BitTorrent.Tracker.Message +import Network.BitTorrent.Tracker.List +import Network.BitTorrent.Tracker.RPC +import Network.BitTorrent.Tracker.Session diff --git a/dht/bittorrent/src/Network/BitTorrent/Tracker/List.hs b/dht/bittorrent/src/Network/BitTorrent/Tracker/List.hs new file mode 100644 index 00000000..1507b4be --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Tracker/List.hs @@ -0,0 +1,197 @@ +-- | +-- Copyright : (c) Sam Truzjan 2014 +-- License : BSD +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Multitracker Metadata Extension support. +-- +-- For more info see: +-- +{-# LANGUAGE FlexibleInstances #-} +module Network.BitTorrent.Tracker.List + ( -- * Tracker list + TierEntry + , TrackerList + + -- * Construction + , trackers + , trackerList + , shuffleTiers + , mapWithURI + , Network.BitTorrent.Tracker.List.toList + + -- * Traversals + , traverseAll + , traverseTiers + ) where + +import Prelude hiding (mapM, foldr) +import Control.Arrow +import Control.Applicative +import Control.Exception +import Data.Default +import Data.List as L (map, elem, any, filter, null) +import Data.Maybe +import Data.Foldable +import Data.Traversable +import Network.URI +import System.Random.Shuffle + +import Data.Torrent +import Network.BitTorrent.Tracker.RPC as RPC + +{----------------------------------------------------------------------- +-- Tracker list datatype +-----------------------------------------------------------------------} + +type TierEntry a = (URI, a) +type Tier a = [TierEntry a] + +-- | Tracker list is either a single tracker or list of tiers. All +-- trackers in each tier must be checked before the client goes on to +-- the next tier. +data TrackerList a + = Announce (TierEntry a) -- ^ torrent file 'announce' field only + | TierList [Tier a] -- ^ torrent file 'announce-list' field only + deriving (Show, Eq) + +-- | Empty tracker list. Can be used for trackerless torrents. +instance Default (TrackerList a) where + def = TierList [] + +instance Functor TrackerList where + fmap f (Announce (uri, a)) = Announce (uri, f a) + fmap f (TierList a) = TierList (fmap (fmap (second f)) a) + +instance Foldable TrackerList where + foldr f z (Announce e ) = f (snd e) z + foldr f z (TierList xs) = foldr (flip (foldr (f . snd))) z xs + +_traverseEntry f (uri, a) = (,) uri <$> f a + +instance Traversable TrackerList where + traverse f (Announce e ) = Announce <$> _traverseEntry f e + traverse f (TierList xs) = + TierList <$> traverse (traverse (_traverseEntry f)) xs + +traverseWithURI :: Applicative f + => (TierEntry a -> f b) -> TrackerList a -> f (TrackerList b) +traverseWithURI f (Announce (uri, a)) = (Announce . (,) uri) <$> f (uri, a) +traverseWithURI f (TierList xxs ) = + TierList <$> traverse (traverse (traverseEntry f)) xxs + where + traverseEntry f (uri, a) = (,) uri <$> f (uri, a) + +{----------------------------------------------------------------------- +-- List extraction +-----------------------------------------------------------------------} +-- BEP12 do not expose any restrictions for the content of +-- 'announce-list' key - there are some /bad/ cases can happen with +-- poorly designed or even malicious torrent creation software. +-- +-- Bad case #1: announce-list is present, but empty. +-- +-- { tAnnounce = Just "http://a.com" +-- , tAnnounceList = Just [[]] +-- } +-- +-- Bad case #2: announce uri do not present in announce list. +-- +-- { tAnnounce = Just "http://a.com" +-- , tAnnounceList = Just [["udp://a.com"]] +-- } +-- +-- The addBackup function solves both problems by adding announce uri +-- as backup tier. +-- +addBackup :: [[URI]] -> URI -> [[URI]] +addBackup tiers bkp + | L.any (L.elem bkp) tiers = tiers + | otherwise = tiers ++ [[bkp]] + +fixList :: Maybe [[URI]] -> Maybe URI -> Maybe [[URI]] +fixList mxss mx = do + xss <- mxss + let xss' = L.filter (not . L.null) xss + return $ maybe xss' (addBackup xss') mx + +trackers :: [URI] -> TrackerList () +trackers uris = TierList $ map (\uri -> [(uri,())]) uris + +-- | Extract set of trackers from torrent file. The 'tAnnounce' key is +-- only ignored if the 'tAnnounceList' key is present. +trackerList :: Torrent -> TrackerList () +trackerList Torrent {..} = fromMaybe (TierList []) $ do + (TierList . tierList) <$> (tAnnounceList `fixList` tAnnounce) + <|> (Announce . nullEntry) <$> tAnnounce + where + nullEntry uri = (uri, ()) + tierList = L.map (L.map nullEntry) + +-- | Shuffle /order of trackers/ in each tier, preserving original +-- /order of tiers/. This can help to balance the load between the +-- trackers. +shuffleTiers :: TrackerList a -> IO (TrackerList a) +shuffleTiers (Announce a ) = return (Announce a) +shuffleTiers (TierList xs) = TierList <$> mapM shuffleM xs + +mapWithURI :: (URI -> a -> b) -> TrackerList a -> TrackerList b +mapWithURI f (Announce (uri, a)) = Announce (uri, f uri a) +mapWithURI f (TierList xs ) = TierList (L.map (L.map mapEntry) xs) + where + mapEntry (uri, a) = (uri, f uri a) + +toList :: TrackerList a -> [[TierEntry a]] +toList (Announce e) = [[e]] +toList (TierList xxs) = xxs + +{----------------------------------------------------------------------- +-- Special traversals (suppressed RPC exceptions) +-----------------------------------------------------------------------} + +catchRPC :: IO a -> IO a -> IO a +catchRPC a b = catch a (f b) + where + f :: a -> RpcException -> a + f = const + +throwRPC :: String -> IO a +throwRPC = throwIO . GenericException + +-- | Like 'traverse' but ignores 'RpcExceptions'. +traverseAll :: (TierEntry a -> IO a) -> TrackerList a -> IO (TrackerList a) +traverseAll action = traverseWithURI (action $?) + where + f $? x = catchRPC (f x) (return (snd x)) + +-- | Like 'traverse' but put working trackers to the head of tiers. +-- This can help to avoid exceessive requests to not available +-- trackers at each reannounce. If no one action succeed then original +-- list is returned. +traverseTiers :: (TierEntry a -> IO a) -> TrackerList a -> IO (TrackerList a) +traverseTiers action ts = catchRPC (goList ts) (return ts) + where + goList tl @ (Announce _ ) = traverseWithURI action tl + goList (TierList tiers) = TierList <$> goTiers (goTier []) tiers + + goTiers _ [] = throwRPC "traverseTiers: no tiers" + goTiers f (x : xs) = catchRPC shortcut failback + where + shortcut = do + x' <- f x + return (x' : xs) + + failback = do + xs' <- goTiers f xs + return (x : xs') + + goTier _ [] = throwRPC "traverseTiers: no trackers in tier" + goTier failed ((uri, a) : as) = catchRPC shortcut failback + where + shortcut = do + a' <- action (uri, a) + return ((uri, a') : as ++ failed) -- failed trackers at the end + + failback = goTier ((uri, a) : failed) as diff --git a/dht/bittorrent/src/Network/BitTorrent/Tracker/Message.hs b/dht/bittorrent/src/Network/BitTorrent/Tracker/Message.hs new file mode 100644 index 00000000..ab492275 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Tracker/Message.hs @@ -0,0 +1,925 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- (c) Daniel Gröber 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Every tracker should support announce query. This query is used +-- to discover peers within a swarm and have two-fold effect: +-- +-- * peer doing announce discover other peers using peer list from +-- the response to the announce query. +-- +-- * tracker store peer information and use it in the succeeding +-- requests made by other peers, until the peer info expires. +-- +-- By convention most trackers support another form of request — +-- scrape query — which queries the state of a given torrent (or +-- a list of torrents) that the tracker is managing. +-- +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE CPP #-} +{-# OPTIONS -fno-warn-orphans #-} +module Network.BitTorrent.Tracker.Message + ( -- * Announce + -- ** Query + AnnounceEvent (..) + , AnnounceQuery (..) + , renderAnnounceQuery + , ParamParseFailure + , parseAnnounceQuery + + -- ** Info + , PeerList (..) + , getPeerList + , AnnounceInfo(..) + , defaultNumWant + , defaultMaxNumWant + , defaultReannounceInterval + + -- * Scrape + -- ** Query + , ScrapeQuery + , renderScrapeQuery + , parseScrapeQuery + + -- ** Info + , ScrapeEntry (..) + , ScrapeInfo + + -- * HTTP specific + -- ** Routes + , PathPiece + , defaultAnnouncePath + , defaultScrapePath + + -- ** Preferences + , AnnouncePrefs (..) + , renderAnnouncePrefs + , parseAnnouncePrefs + + -- ** Request + , AnnounceRequest (..) + , parseAnnounceRequest + , renderAnnounceRequest + + -- ** Response + , announceType + , scrapeType + , parseFailureStatus + + -- ** Extra + , queryToSimpleQuery + + -- * UDP specific + -- ** Connection + , ConnectionId + , initialConnectionId + + -- ** Messages + , Request (..) + , Response (..) + , responseName + + -- ** Transaction + , genTransactionId + , TransactionId + , Transaction (..) + ) + where + +import Control.Applicative +import Control.Monad +import Data.BEncode as BE hiding (Result) +import Data.BEncode.BDict as BE +import Data.ByteString as BS +import Data.ByteString.Char8 as BC +import Data.Char as Char +import Data.Convertible +import Data.Default +import Data.Either +import Data.List as L +import Data.Maybe +import Data.Monoid +import Data.Serialize as S hiding (Result) +import Data.String +import Data.Text (Text) +import Data.Text.Encoding +import Data.Typeable +import Data.Word +#if MIN_VERSION_iproute(1,7,4) +import Data.IP hiding (fromSockAddr) +#else +import Data.IP +#endif +import Network +import Network.HTTP.Types.QueryLike +import Network.HTTP.Types.URI hiding (urlEncode) +import Network.HTTP.Types.Status +import Network.Socket hiding (Connected) +import Numeric +import System.Entropy +import Text.Read (readMaybe) + +import Data.Torrent +import Network.Address +import Network.BitTorrent.Internal.Progress + +{----------------------------------------------------------------------- +-- Events +-----------------------------------------------------------------------} + +-- | Events are used to specify which kind of announce query is performed. +data AnnounceEvent + -- | For the first request: when download first begins. + = Started + + -- | This peer stopped downloading /and/ uploading the torrent or + -- just shutting down. + | Stopped + + -- | This peer completed downloading the torrent. This only happen + -- right after last piece have been verified. No 'Completed' is + -- sent if the file was completed when 'Started'. + | Completed + deriving (Show, Read, Eq, Ord, Enum, Bounded, Typeable) + +-- | HTTP tracker protocol compatible encoding. +instance QueryValueLike AnnounceEvent where + toQueryValue e = toQueryValue (Char.toLower x : xs) + where + (x : xs) = show e -- INVARIANT: this is always nonempty list + +type EventId = Word32 + +-- | UDP tracker encoding event codes. +eventId :: AnnounceEvent -> EventId +eventId Completed = 1 +eventId Started = 2 +eventId Stopped = 3 + +-- TODO add Regular event +putEvent :: Putter (Maybe AnnounceEvent) +putEvent Nothing = putWord32be 0 +putEvent (Just e) = putWord32be (eventId e) + +getEvent :: S.Get (Maybe AnnounceEvent) +getEvent = do + eid <- getWord32be + case eid of + 0 -> return Nothing + 1 -> return $ Just Completed + 2 -> return $ Just Started + 3 -> return $ Just Stopped + _ -> fail "unknown event id" + +{----------------------------------------------------------------------- + Announce query +-----------------------------------------------------------------------} +-- TODO add &ipv6= and &ipv4= params to AnnounceQuery +-- http://www.bittorrent.org/beps/bep_0007.html#announce-parameter + +-- | A tracker request is HTTP GET request; used to include metrics +-- from clients that help the tracker keep overall statistics about +-- the torrent. The most important, requests are used by the tracker +-- to keep track lists of active peer for a particular torrent. +-- +data AnnounceQuery = AnnounceQuery + { + -- | Hash of info part of the torrent usually obtained from + -- 'Torrent' or 'Magnet'. + reqInfoHash :: !InfoHash + + -- | ID of the peer doing request. + , reqPeerId :: !PeerId + + -- | Port to listen to for connections from other + -- peers. Tracker should respond with this port when + -- some /other/ peer request the tracker with the same info hash. + -- Normally, this port is choosed from 'defaultPorts'. + , reqPort :: !PortNumber + + -- | Current progress of peer doing request. + , reqProgress :: !Progress + + -- | The peer IP. Needed only when client communicated with + -- tracker throught a proxy. + , reqIP :: Maybe HostAddress + + -- | Number of peers that the peers wants to receive from. It is + -- optional for trackers to honor this limit. See note for + -- 'defaultNumWant'. + , reqNumWant :: Maybe Int + + -- | If not specified, the request is regular periodic + -- request. Regular request should be sent + , reqEvent :: Maybe AnnounceEvent + } deriving (Show, Eq, Typeable) + +-- | UDP tracker protocol compatible encoding. +instance Serialize AnnounceQuery where + put AnnounceQuery {..} = do + put reqInfoHash + put reqPeerId + put reqProgress + putEvent reqEvent + putWord32host $ fromMaybe 0 reqIP + putWord32be $ 0 -- TODO what the fuck is "key"? + putWord32be $ fromIntegral $ fromMaybe (-1) reqNumWant + + put reqPort + + get = do + ih <- get + pid <- get + + progress <- get + + ev <- getEvent + ip <- getWord32be +-- key <- getWord32be -- TODO + want <- getWord32be + + port <- get + + return $ AnnounceQuery { + reqInfoHash = ih + , reqPeerId = pid + , reqPort = port + , reqProgress = progress + , reqIP = if ip == 0 then Nothing else Just ip + , reqNumWant = if want == -1 then Nothing + else Just (fromIntegral want) + , reqEvent = ev + } + +instance QueryValueLike PortNumber where + toQueryValue = toQueryValue . show . fromEnum + +instance QueryValueLike Word32 where + toQueryValue = toQueryValue . show + +instance QueryValueLike Int where + toQueryValue = toQueryValue . show + +-- | HTTP tracker protocol compatible encoding. +instance QueryLike AnnounceQuery where + toQuery AnnounceQuery {..} = + toQuery reqProgress ++ + [ ("info_hash", toQueryValue reqInfoHash) -- TODO use 'paramName' + , ("peer_id" , toQueryValue reqPeerId) + , ("port" , toQueryValue reqPort) + , ("ip" , toQueryValue reqIP) + , ("numwant" , toQueryValue reqNumWant) + , ("event" , toQueryValue reqEvent) + ] + +-- | Filter @param=value@ pairs with the unset value. +queryToSimpleQuery :: Query -> SimpleQuery +queryToSimpleQuery = catMaybes . L.map f + where + f (_, Nothing) = Nothing + f (a, Just b ) = Just (a, b) + +-- | Encode announce query to query string. +renderAnnounceQuery :: AnnounceQuery -> SimpleQuery +renderAnnounceQuery = queryToSimpleQuery . toQuery + +data QueryParam + -- announce query + = ParamInfoHash + | ParamPeerId + | ParamPort + | ParamUploaded + | ParamLeft + | ParamDownloaded + | ParamIP + | ParamNumWant + | ParamEvent + -- announce query ext + | ParamCompact + | ParamNoPeerId + deriving (Show, Eq, Ord, Enum) + +paramName :: QueryParam -> BS.ByteString +paramName ParamInfoHash = "info_hash" +paramName ParamPeerId = "peer_id" +paramName ParamPort = "port" +paramName ParamUploaded = "uploaded" +paramName ParamLeft = "left" +paramName ParamDownloaded = "downloaded" +paramName ParamIP = "ip" +paramName ParamNumWant = "numwant" +paramName ParamEvent = "event" +paramName ParamCompact = "compact" +paramName ParamNoPeerId = "no_peer_id" +{-# INLINE paramName #-} + +class FromParam a where + fromParam :: BS.ByteString -> Maybe a + +instance FromParam Bool where + fromParam "0" = Just False + fromParam "1" = Just True + fromParam _ = Nothing + +instance FromParam InfoHash where + fromParam = either (const Nothing) pure . safeConvert + +instance FromParam PeerId where + fromParam = either (const Nothing) pure . safeConvert + +instance FromParam Word32 where + fromParam = readMaybe . BC.unpack + +instance FromParam Word64 where + fromParam = readMaybe . BC.unpack + +instance FromParam Int where + fromParam = readMaybe . BC.unpack + +instance FromParam PortNumber where + fromParam bs = fromIntegral <$> (fromParam bs :: Maybe Word32) + +instance FromParam AnnounceEvent where + fromParam bs = do + (x, xs) <- BC.uncons bs + readMaybe $ BC.unpack $ BC.cons (Char.toUpper x) xs + +-- | 'ParamParseFailure' represent errors can occur while parsing HTTP +-- tracker requests. In case of failure, this can be used to provide +-- more informative 'statusCode' and 'statusMessage' in tracker +-- responses. +-- +data ParamParseFailure + = Missing QueryParam -- ^ param not found in query string; + | Invalid QueryParam BS.ByteString -- ^ param present but not valid. + deriving (Show, Eq) + +type ParseResult = Either ParamParseFailure + +withError :: ParamParseFailure -> Maybe a -> ParseResult a +withError e = maybe (Left e) Right + +reqParam :: FromParam a => QueryParam -> SimpleQuery -> ParseResult a +reqParam param xs = do + val <- withError (Missing param) $ L.lookup (paramName param) xs + withError (Invalid param val) (fromParam val) + +optParam :: FromParam a => QueryParam -> SimpleQuery -> ParseResult (Maybe a) +optParam param ps + | Just x <- L.lookup (paramName param) ps + = pure <$> withError (Invalid param x) (fromParam x) + | otherwise = pure Nothing + +parseProgress :: SimpleQuery -> ParseResult Progress +parseProgress params = Progress + <$> reqParam ParamDownloaded params + <*> reqParam ParamLeft params + <*> reqParam ParamUploaded params + +-- | Parse announce request from a query string. +parseAnnounceQuery :: SimpleQuery -> ParseResult AnnounceQuery +parseAnnounceQuery params = AnnounceQuery + <$> reqParam ParamInfoHash params + <*> reqParam ParamPeerId params + <*> reqParam ParamPort params + <*> parseProgress params + <*> optParam ParamIP params + <*> optParam ParamNumWant params + <*> optParam ParamEvent params + +{----------------------------------------------------------------------- +-- Announce Info +-----------------------------------------------------------------------} +-- TODO check if announceinterval/complete/incomplete is positive ints + +-- | Tracker can return peer list in either compact(BEP23) or not +-- compact form. +-- +-- For more info see: +-- +data PeerList ip + = PeerList [PeerAddr] + | CompactPeerList [PeerAddr] + deriving (Show, Eq, Typeable, Functor) + +-- | The empty non-compact peer list. +instance Default (PeerList IP) where + def = PeerList [] + {-# INLINE def #-} + +getPeerList :: PeerList IP -> [PeerAddr] +getPeerList (PeerList xs) = xs +getPeerList (CompactPeerList xs) = xs + +instance BEncode (PeerList a) where + toBEncode (PeerList xs) = toBEncode xs + toBEncode (CompactPeerList xs) = toBEncode $ runPut (mapM_ put xs) + + fromBEncode (BList l ) = PeerList <$> fromBEncode (BList l) + fromBEncode (BString s ) = CompactPeerList <$> runGet (many get) s + fromBEncode _ = decodingError "PeerList: should be a BString or BList" + +-- | The tracker response includes a peer list that helps the client +-- participate in the torrent. The most important is 'respPeer' list +-- used to join the swarm. +-- +data AnnounceInfo = + Failure !Text -- ^ Failure reason in human readable form. + | AnnounceInfo { + -- | Number of peers completed the torrent. (seeders) + respComplete :: !(Maybe Int) + + -- | Number of peers downloading the torrent. (leechers) + , respIncomplete :: !(Maybe Int) + + -- | Recommended interval to wait between requests, in seconds. + , respInterval :: !Int + + -- | Minimal amount of time between requests, in seconds. A + -- peer /should/ make timeout with at least 'respMinInterval' + -- value, otherwise tracker might not respond. If not specified + -- the same applies to 'respInterval'. + , respMinInterval :: !(Maybe Int) + + -- | Peers that must be contacted. + , respPeers :: !(PeerList IP) + + -- | Human readable warning. + , respWarning :: !(Maybe Text) + } deriving (Show, Eq, Typeable) + +-- | Empty peer list with default reannounce interval. +instance Default AnnounceInfo where + def = AnnounceInfo + { respComplete = Nothing + , respIncomplete = Nothing + , respInterval = defaultReannounceInterval + , respMinInterval = Nothing + , respPeers = def + , respWarning = Nothing + } + +-- | HTTP tracker protocol compatible encoding. +instance BEncode AnnounceInfo where + toBEncode (Failure t) = toDict $ + "failure reason" .=! t + .: endDict + + toBEncode AnnounceInfo {..} = toDict $ + "complete" .=? respComplete + .: "incomplete" .=? respIncomplete + .: "interval" .=! respInterval + .: "min interval" .=? respMinInterval + .: "peers" .=! peers + .: "peers6" .=? peers6 + .: "warning message" .=? respWarning + .: endDict + where + (peers, peers6) = prttn respPeers + + prttn :: PeerList IP -> (PeerList IPv4, Maybe (PeerList IPv6)) + prttn (PeerList xs) = (PeerList xs, Nothing) + prttn (CompactPeerList xs) = mk $ partitionEithers $ toEither <$> xs + where + mk (v4s, v6s) + | L.null v6s = (CompactPeerList v4s, Nothing) + | otherwise = (CompactPeerList v4s, Just (CompactPeerList v6s)) + + toEither :: PeerAddr -> Either PeerAddr PeerAddr + toEither PeerAddr {..} = case peerHost of + ipv4@IPv4{} -> Left $ PeerAddr peerId ipv4 peerPort + ipv6@IPv6{} -> Right $ PeerAddr peerId ipv6 peerPort + + fromBEncode (BDict d) + | Just t <- BE.lookup "failure reason" d = Failure <$> fromBEncode t + | otherwise = (`fromDict` (BDict d)) $ + AnnounceInfo + <$>? "complete" + <*>? "incomplete" + <*>! "interval" + <*>? "min interval" + <*> (uncurry merge =<< (,) <$>! "peers" <*>? "peers6") + <*>? "warning message" + where + merge :: PeerList IPv4 -> Maybe (PeerList IPv6) -> BE.Get (PeerList IP) + merge (PeerList ips) Nothing = pure (PeerList ips) + merge (PeerList _ ) (Just _) + = fail "PeerList: non-compact peer list provided, \ + \but the `peers6' field present" + + merge (CompactPeerList ipv4s) Nothing + = pure $ CompactPeerList ipv4s + + merge (CompactPeerList _ ) (Just (PeerList _)) + = fail "PeerList: the `peers6' field value \ + \should contain *compact* peer list" + + merge (CompactPeerList ipv4s) (Just (CompactPeerList ipv6s)) + = pure $ CompactPeerList $ + ipv4s <> ipv6s + + fromBEncode _ = decodingError "Announce info" + +-- | UDP tracker protocol compatible encoding. +instance Serialize AnnounceInfo where + put (Failure msg) = put $ encodeUtf8 msg + put AnnounceInfo {..} = do + putWord32be $ fromIntegral respInterval + putWord32be $ fromIntegral $ fromMaybe 0 respIncomplete + putWord32be $ fromIntegral $ fromMaybe 0 respComplete + forM_ (getPeerList respPeers) put + + get = do + interval <- getWord32be + leechers <- getWord32be + seeders <- getWord32be + peers <- many $ isolate 6 get -- isolated to specify IPv4. + + return $ AnnounceInfo { + respWarning = Nothing + , respInterval = fromIntegral interval + , respMinInterval = Nothing + , respIncomplete = Just $ fromIntegral leechers + , respComplete = Just $ fromIntegral seeders + , respPeers = PeerList peers + } + +-- | Decodes announce response from bencoded string, for debugging only. +instance IsString AnnounceInfo where + fromString str = either (error . format) id $ BE.decode (fromString str) + where + format msg = "fromString: unable to decode AnnounceInfo: " ++ msg + +-- | Above 25, new peers are highly unlikely to increase download +-- speed. Even 30 peers is /plenty/, the official client version 3 +-- in fact only actively forms new connections if it has less than +-- 30 peers and will refuse connections if it has 55. +-- +-- +-- +defaultNumWant :: Int +defaultNumWant = 50 + +-- | Reasonable upper bound of numwant parameter. +defaultMaxNumWant :: Int +defaultMaxNumWant = 200 + +-- | Widely used reannounce interval. Note: tracker clients should not +-- use this value! +defaultReannounceInterval :: Int +defaultReannounceInterval = 30 * 60 + +{----------------------------------------------------------------------- + Scrape message +-----------------------------------------------------------------------} + +-- | Scrape query used to specify a set of torrent to scrape. +-- If list is empty then tracker should return scrape info about each +-- torrent. +type ScrapeQuery = [InfoHash] + +-- TODO +-- data ScrapeQuery +-- = ScrapeAll +-- | ScrapeSingle InfoHash +-- | ScrapeMulti (HashSet InfoHash) +-- deriving (Show) +-- +-- data ScrapeInfo +-- = ScrapeAll (HashMap InfoHash ScrapeEntry) +-- | ScrapeSingle InfoHash ScrapeEntry +-- | ScrapeMulti (HashMap InfoHash ScrapeEntry) +-- + +scrapeParam :: BS.ByteString +scrapeParam = "info_hash" + +isScrapeParam :: BS.ByteString -> Bool +isScrapeParam = (==) scrapeParam + +-- | Parse scrape query to query string. +parseScrapeQuery :: SimpleQuery -> ScrapeQuery +parseScrapeQuery + = catMaybes . L.map (fromParam . snd) . L.filter (isScrapeParam . fst) + +-- | Render scrape query to query string. +renderScrapeQuery :: ScrapeQuery -> SimpleQuery +renderScrapeQuery = queryToSimpleQuery . L.map mkPair + where + mkPair ih = (scrapeParam, toQueryValue ih) + +-- | Overall information about particular torrent. +data ScrapeEntry = ScrapeEntry { + -- | Number of seeders - peers with the entire file. + siComplete :: {-# UNPACK #-} !Int + + -- | Total number of times the tracker has registered a completion. + , siDownloaded :: {-# UNPACK #-} !Int + + -- | Number of leechers. + , siIncomplete :: {-# UNPACK #-} !Int + + -- | Name of the torrent file, as specified by the "name" + -- file in the info section of the .torrent file. + , siName :: !(Maybe Text) + } deriving (Show, Eq, Typeable) + +-- | HTTP tracker protocol compatible encoding. +instance BEncode ScrapeEntry where + toBEncode ScrapeEntry {..} = toDict $ + "complete" .=! siComplete + .: "downloaded" .=! siDownloaded + .: "incomplete" .=! siIncomplete + .: "name" .=? siName + .: endDict + + fromBEncode = fromDict $ ScrapeEntry + <$>! "complete" + <*>! "downloaded" + <*>! "incomplete" + <*>? "name" + +-- | UDP tracker protocol compatible encoding. +instance Serialize ScrapeEntry where + put ScrapeEntry {..} = do + putWord32be $ fromIntegral siComplete + putWord32be $ fromIntegral siDownloaded + putWord32be $ fromIntegral siIncomplete + + get = ScrapeEntry + <$> (fromIntegral <$> getWord32be) + <*> (fromIntegral <$> getWord32be) + <*> (fromIntegral <$> getWord32be) + <*> pure Nothing + +-- | Scrape info about a set of torrents. +type ScrapeInfo = [(InfoHash, ScrapeEntry)] + +{----------------------------------------------------------------------- +-- HTTP specific +-----------------------------------------------------------------------} + +-- | Some HTTP trackers allow to choose prefered representation of the +-- 'AnnounceInfo'. It's optional for trackers to honor any of this +-- options. +data AnnouncePrefs = AnnouncePrefs + { -- | If specified, "compact" parameter is used to advise the + -- tracker to send peer id list as: + -- + -- * bencoded list (extCompact = Just False); + -- * or more compact binary string (extCompact = Just True). + -- + -- The later is prefered since compact peer list will reduce the + -- size of tracker responses. Hovewer, if tracker do not support + -- this extension then it can return peer list in either form. + -- + -- For more info see: + -- + extCompact :: !(Maybe Bool) + + -- | If specified, "no_peer_id" parameter is used advise tracker + -- to either send or not to send peer id in tracker response. + -- Tracker may not support this extension as well. + -- + -- For more info see: + -- + -- + , extNoPeerId :: !(Maybe Bool) + } deriving (Show, Eq, Typeable) + +instance Default AnnouncePrefs where + def = AnnouncePrefs Nothing Nothing + +instance QueryLike AnnouncePrefs where + toQuery AnnouncePrefs {..} = + [ ("compact", toQueryFlag <$> extCompact) -- TODO use 'paramName' + , ("no_peer_id", toQueryFlag <$> extNoPeerId) + ] + where + toQueryFlag False = "0" + toQueryFlag True = "1" + +-- | Parse announce query extended part from query string. +parseAnnouncePrefs :: SimpleQuery -> AnnouncePrefs +parseAnnouncePrefs params = either (const def) id $ + AnnouncePrefs + <$> optParam ParamCompact params + <*> optParam ParamNoPeerId params + +-- | Render announce preferences to query string. +renderAnnouncePrefs :: AnnouncePrefs -> SimpleQuery +renderAnnouncePrefs = queryToSimpleQuery . toQuery + +-- | HTTP tracker request with preferences. +data AnnounceRequest = AnnounceRequest + { announceQuery :: AnnounceQuery -- ^ Request query params. + , announcePrefs :: AnnouncePrefs -- ^ Optional advises to the tracker. + } deriving (Show, Eq, Typeable) + +instance QueryLike AnnounceRequest where + toQuery AnnounceRequest{..} = + toQuery announcePrefs <> + toQuery announceQuery + +-- | Parse announce request from query string. +parseAnnounceRequest :: SimpleQuery -> ParseResult AnnounceRequest +parseAnnounceRequest params = AnnounceRequest + <$> parseAnnounceQuery params + <*> pure (parseAnnouncePrefs params) + +-- | Render announce request to query string. +renderAnnounceRequest :: AnnounceRequest -> SimpleQuery +renderAnnounceRequest = queryToSimpleQuery . toQuery + +type PathPiece = BS.ByteString + +defaultAnnouncePath :: PathPiece +defaultAnnouncePath = "announce" + +defaultScrapePath :: PathPiece +defaultScrapePath = "scrape" + +missingOffset :: Int +missingOffset = 101 + +invalidOffset :: Int +invalidOffset = 150 + +parseFailureCode :: ParamParseFailure -> Int +parseFailureCode (Missing param ) = missingOffset + fromEnum param +parseFailureCode (Invalid param _) = invalidOffset + fromEnum param + +parseFailureMessage :: ParamParseFailure -> BS.ByteString +parseFailureMessage e = BS.concat $ case e of + Missing p -> ["Missing parameter: ", paramName p] + Invalid p v -> ["Invalid parameter: ", paramName p, " = ", v] + +-- | HTTP response /content type/ for announce info. +announceType :: ByteString +announceType = "text/plain" + +-- | HTTP response /content type/ for scrape info. +scrapeType :: ByteString +scrapeType = "text/plain" + +-- | Get HTTP response status from a announce params parse failure. +-- +-- For more info see: +-- +-- +parseFailureStatus :: ParamParseFailure -> Status +parseFailureStatus = mkStatus <$> parseFailureCode <*> parseFailureMessage + +{----------------------------------------------------------------------- +-- UDP specific message types +-----------------------------------------------------------------------} + +genToken :: IO Word64 +genToken = do + bs <- getEntropy 8 + either err return $ runGet getWord64be bs + where + err = error "genToken: impossible happen" + +-- | Connection Id is used for entire tracker session. +newtype ConnectionId = ConnectionId Word64 + deriving (Eq, Serialize) + +instance Show ConnectionId where + showsPrec _ (ConnectionId cid) = showString "0x" <> showHex cid + +initialConnectionId :: ConnectionId +initialConnectionId = ConnectionId 0x41727101980 + +-- | Transaction Id is used within a UDP RPC. +newtype TransactionId = TransactionId Word32 + deriving (Eq, Ord, Enum, Bounded, Serialize) + +instance Show TransactionId where + showsPrec _ (TransactionId tid) = showString "0x" <> showHex tid + +genTransactionId :: IO TransactionId +genTransactionId = (TransactionId . fromIntegral) <$> genToken + +data Request + = Connect + | Announce AnnounceQuery + | Scrape ScrapeQuery + deriving Show + +data Response + = Connected ConnectionId + | Announced AnnounceInfo + | Scraped [ScrapeEntry] + | Failed Text + deriving Show + +responseName :: Response -> String +responseName (Connected _) = "connected" +responseName (Announced _) = "announced" +responseName (Scraped _) = "scraped" +responseName (Failed _) = "failed" + +data family Transaction a +data instance Transaction Request = TransactionQ + { connIdQ :: {-# UNPACK #-} !ConnectionId + , transIdQ :: {-# UNPACK #-} !TransactionId + , request :: !Request + } deriving Show +data instance Transaction Response = TransactionR + { transIdR :: {-# UNPACK #-} !TransactionId + , response :: !Response + } deriving Show + +-- TODO newtype +newtype MessageId = MessageId Word32 + deriving (Show, Eq, Num, Serialize) + +connectId, announceId, scrapeId, errorId :: MessageId +connectId = 0 +announceId = 1 +scrapeId = 2 +errorId = 3 + +instance Serialize (Transaction Request) where + put TransactionQ {..} = do + case request of + Connect -> do + put initialConnectionId + put connectId + put transIdQ + + Announce ann -> do + put connIdQ + put announceId + put transIdQ + put ann + + Scrape hashes -> do + put connIdQ + put scrapeId + put transIdQ + forM_ hashes put + + get = do + cid <- get + mid <- get + TransactionQ cid <$> S.get <*> getBody mid + where + getBody :: MessageId -> S.Get Request + getBody msgId + | msgId == connectId = pure Connect + | msgId == announceId = Announce <$> get + | msgId == scrapeId = Scrape <$> many get + | otherwise = fail errMsg + where + errMsg = "unknown request: " ++ show msgId + +instance Serialize (Transaction Response) where + put TransactionR {..} = do + case response of + Connected conn -> do + put connectId + put transIdR + put conn + + Announced info -> do + put announceId + put transIdR + put info + + Scraped infos -> do + put scrapeId + put transIdR + forM_ infos put + + Failed info -> do + put errorId + put transIdR + put (encodeUtf8 info) + + + get = do + mid <- get + TransactionR <$> get <*> getBody mid + where + getBody :: MessageId -> S.Get Response + getBody msgId + | msgId == connectId = Connected <$> get + | msgId == announceId = Announced <$> get + | msgId == scrapeId = Scraped <$> many get + | msgId == errorId = (Failed . decodeUtf8) <$> get + | otherwise = fail msg + where + msg = "unknown response: " ++ show msgId diff --git a/dht/bittorrent/src/Network/BitTorrent/Tracker/RPC.hs b/dht/bittorrent/src/Network/BitTorrent/Tracker/RPC.hs new file mode 100644 index 00000000..45fef05e --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Tracker/RPC.hs @@ -0,0 +1,175 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- This module provides unified RPC interface to BitTorrent +-- trackers. The tracker is an UDP/HTTP/HTTPS service used to +-- discovery peers for a particular existing torrent and keep +-- statistics about the swarm. This module also provides a way to +-- request scrape info for a particular torrent list. +-- +{-# LANGUAGE DeriveDataTypeable #-} +module Network.BitTorrent.Tracker.RPC + ( PeerInfo (..) + + -- * Manager + , Options (..) + , Manager + , newManager + , closeManager + , withManager + + -- * RPC + , SAnnounceQuery (..) + , RpcException (..) + , Network.BitTorrent.Tracker.RPC.announce + , scrape + ) where + +import Control.Exception +import Data.Default +import Data.Typeable +import Network +import Network.URI +import Network.Socket (HostAddress) + +import Data.Torrent +import Network.Address +import Network.BitTorrent.Internal.Progress +import Network.BitTorrent.Tracker.Message +import qualified Network.BitTorrent.Tracker.RPC.HTTP as HTTP +import qualified Network.BitTorrent.Tracker.RPC.UDP as UDP + + +{----------------------------------------------------------------------- +-- Simplified announce +-----------------------------------------------------------------------} + +-- | Info to advertise to trackers. +data PeerInfo = PeerInfo + { peerId :: !PeerId + , peerIP :: !(Maybe HostAddress) + , peerPort :: !PortNumber + } deriving (Show, Eq) + +instance Default PeerInfo where + def = PeerInfo def Nothing 6881 + +-- | Simplified announce query. +data SAnnounceQuery = SAnnounceQuery + { sInfoHash :: InfoHash + , sProgress :: Progress + , sNumWant :: Maybe Int + , sEvent :: Maybe AnnounceEvent + } + +fillAnnounceQuery :: PeerInfo -> SAnnounceQuery -> AnnounceQuery +fillAnnounceQuery PeerInfo{..} SAnnounceQuery {..} = AnnounceQuery + { reqInfoHash = sInfoHash + , reqPeerId = peerId + , reqPort = peerPort + , reqProgress = sProgress + , reqIP = peerIP + , reqNumWant = sNumWant + , reqEvent = sEvent + } + +{----------------------------------------------------------------------- +-- RPC manager +-----------------------------------------------------------------------} + +-- | Tracker manager settings. +data Options = Options + { -- | HTTP tracker protocol specific options. + optHttpRPC :: !HTTP.Options + + -- | UDP tracker protocol specific options. + , optUdpRPC :: !UDP.Options + + -- | Whether to use multitracker extension. + , optMultitracker :: !Bool + } + +instance Default Options where + def = Options + { optHttpRPC = def + , optUdpRPC = def + , optMultitracker = True + } + +-- | Tracker RPC Manager. +data Manager = Manager + { options :: !Options + , peerInfo :: !PeerInfo + , httpMgr :: !HTTP.Manager + , udpMgr :: !UDP.Manager + } + +-- | Create a new 'Manager'. You /must/ manually 'closeManager' +-- otherwise resource leakage is possible. Normally, a bittorrent +-- client need a single RPC manager only. +-- +-- This function can throw 'IOException' on invalid 'Options'. +-- +newManager :: Options -> PeerInfo -> IO Manager +newManager opts info = do + h <- HTTP.newManager (optHttpRPC opts) + u <- UDP.newManager (optUdpRPC opts) `onException` HTTP.closeManager h + return $ Manager opts info h u + +-- | Close all pending RPCs. Behaviour of currently in-flight RPCs can +-- differ depending on underlying protocol used. No rpc calls should +-- be performed after manager becomes closed. +closeManager :: Manager -> IO () +closeManager Manager {..} = do + UDP.closeManager udpMgr `finally` HTTP.closeManager httpMgr + +-- | Normally you need to use 'Control.Monad.Trans.Resource.allocate'. +withManager :: Options -> PeerInfo -> (Manager -> IO a) -> IO a +withManager opts info = bracket (newManager opts info) closeManager + +{----------------------------------------------------------------------- +-- Exceptions +-----------------------------------------------------------------------} +-- TODO Catch IO exceptions on rpc calls (?) + +data RpcException + = UdpException UDP.RpcException -- ^ UDP RPC driver failure; + | HttpException HTTP.RpcException -- ^ HTTP RPC driver failure; + | UnrecognizedScheme String -- ^ unsupported scheme in announce URI; + | GenericException String -- ^ for furter extensibility. + deriving (Show, Typeable) + +instance Exception RpcException + +packException :: Exception e => (e -> RpcException) -> IO a -> IO a +packException f m = try m >>= either (throwIO . f) return +{-# INLINE packException #-} + +{----------------------------------------------------------------------- +-- RPC calls +-----------------------------------------------------------------------} + +dispatch :: URI -> IO a -> IO a -> IO a +dispatch URI {..} http udp + | uriScheme == "http:" || + uriScheme == "https:" = packException HttpException http + | uriScheme == "udp:" = packException UdpException udp + | otherwise = throwIO $ UnrecognizedScheme uriScheme + +announce :: Manager -> URI -> SAnnounceQuery -> IO AnnounceInfo +announce Manager {..} uri simpleQuery + = dispatch uri + (HTTP.announce httpMgr uri annQ) + ( UDP.announce udpMgr uri annQ) + where + annQ = fillAnnounceQuery peerInfo simpleQuery + +scrape :: Manager -> URI -> ScrapeQuery -> IO ScrapeInfo +scrape Manager {..} uri q + = dispatch uri + (HTTP.scrape httpMgr uri q) + ( UDP.scrape udpMgr uri q) diff --git a/dht/bittorrent/src/Network/BitTorrent/Tracker/RPC/HTTP.hs b/dht/bittorrent/src/Network/BitTorrent/Tracker/RPC/HTTP.hs new file mode 100644 index 00000000..6f7a53bf --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Tracker/RPC/HTTP.hs @@ -0,0 +1,191 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD +-- Maintainer : pxqr.sta@gmail.com +-- Stability : provisional +-- Portability : portable +-- +-- This module implement HTTP tracker protocol. +-- +-- For more information see: +-- +-- +{-# LANGUAGE DeriveDataTypeable #-} +module Network.BitTorrent.Tracker.RPC.HTTP + ( -- * Manager + Options (..) + , Manager + , newManager + , closeManager + , withManager + + -- * RPC + , RpcException (..) + , announce + , scrape + , scrapeOne + ) where + +import Control.Applicative +import Control.Exception +import Control.Monad +import Control.Monad.Trans.Resource +import Data.BEncode as BE +import Data.ByteString as BS +import Data.ByteString.Char8 as BC +import Data.ByteString.Lazy as BL +import Data.Default +import Data.List as L +import Data.Monoid +import Data.Typeable hiding (Proxy) +import Network.URI +import Network.HTTP.Conduit hiding + (Manager, newManager, closeManager, withManager) +import Network.HTTP.Client (defaultManagerSettings) +import Network.HTTP.Client.Internal (setUri) +import qualified Network.HTTP.Conduit as HTTP +import Network.HTTP.Types.Header (hUserAgent) +import Network.HTTP.Types.URI (SimpleQuery, renderSimpleQuery) + +import Data.Torrent (InfoHash) +import Network.Address (libUserAgent) +import Network.BitTorrent.Tracker.Message hiding (Request, Response) + +{----------------------------------------------------------------------- +-- Exceptions +-----------------------------------------------------------------------} + +data RpcException + = RequestFailed HttpException -- ^ failed HTTP request. + | ParserFailure String -- ^ unable to decode tracker response; + | ScrapelessTracker -- ^ tracker do not support scraping; + | BadScrape -- ^ unable to find info hash in response dict; + deriving (Show, Typeable) + +instance Exception RpcException + +packHttpException :: IO a -> IO a +packHttpException m = try m >>= either (throwIO . RequestFailed) return + +{----------------------------------------------------------------------- +-- Manager +-----------------------------------------------------------------------} + +-- | HTTP tracker specific RPC options. +data Options = Options + { -- | Global HTTP announce query preferences. + optAnnouncePrefs :: !AnnouncePrefs + + -- | Whether to use HTTP proxy for HTTP tracker requests. + , optHttpProxy :: !(Maybe Proxy) + + -- | Value to put in HTTP user agent header. + , optUserAgent :: !BS.ByteString + + -- | HTTP manager options. + , optHttpOptions :: !ManagerSettings + } + +instance Default Options where + def = Options + { optAnnouncePrefs = def + , optHttpProxy = Nothing + , optUserAgent = BC.pack libUserAgent + , optHttpOptions = defaultManagerSettings + } + +-- | HTTP tracker manager. +data Manager = Manager + { options :: !Options + , httpMgr :: !HTTP.Manager + } + +-- | +newManager :: Options -> IO Manager +newManager opts = Manager opts <$> HTTP.newManager (optHttpOptions opts) + +-- | +closeManager :: Manager -> IO () +closeManager Manager {..} = HTTP.closeManager httpMgr + +-- | Normally you need to use 'Control.Monad.Trans.Resource.allocate'. +withManager :: Options -> (Manager -> IO a) -> IO a +withManager opts = bracket (newManager opts) closeManager + +{----------------------------------------------------------------------- +-- Queries +-----------------------------------------------------------------------} + +fillRequest :: Options -> SimpleQuery -> Request -> Request +fillRequest Options {..} q r = r + { queryString = joinQuery (queryString r) (renderSimpleQuery False q) + , requestHeaders = (hUserAgent, optUserAgent) : requestHeaders r + , proxy = optHttpProxy + } + where + joinQuery a b + | BS.null a = b + | otherwise = a <> "&" <> b + +httpTracker :: BEncode a => Manager -> URI -> SimpleQuery -> IO a +httpTracker Manager {..} uri q = packHttpException $ do + request <- fillRequest options q <$> setUri defaultRequest uri + response <- runResourceT $ httpLbs request httpMgr + case BE.decode $ BL.toStrict $ responseBody response of + Left msg -> throwIO (ParserFailure msg) + Right info -> return info + +{----------------------------------------------------------------------- +-- RPC +-----------------------------------------------------------------------} + +-- | Send request and receive response from the tracker specified in +-- announce list. +-- +-- This function can throw 'RpcException'. +-- +announce :: Manager -> URI -> AnnounceQuery -> IO AnnounceInfo +announce mgr uri q = httpTracker mgr uri (renderAnnounceRequest uriQ) + where + uriQ = AnnounceRequest + { announceQuery = q + , announcePrefs = optAnnouncePrefs (options mgr) + } + +-- | Trying to convert /announce/ URL to /scrape/ URL. If 'scrapeURL' +-- gives 'Nothing' then tracker do not support scraping. +-- +scrapeURL :: URI -> Maybe URI +scrapeURL uri = do + newPath <- replace (BC.pack (uriPath uri)) + return uri { uriPath = BC.unpack newPath } + where + replace p = do + let ps = BC.splitWith (== '/') p + guard (not (L.null ps)) + guard ("announce" `BS.isPrefixOf` L.last ps) + let newSuff = "scrape" <> BS.drop (BS.length "announce") (L.last ps) + return (BS.intercalate "/" (L.init ps ++ [newSuff])) + +-- | For each 'InfoHash' of torrents request scrape info from the tracker. +-- However if the info hash list is 'null', the tracker should list +-- all available torrents. +-- +-- This function can throw 'RpcException'. +-- +scrape :: Manager -> URI -> ScrapeQuery -> IO ScrapeInfo +scrape m u q = do + case scrapeURL u of + Nothing -> throwIO ScrapelessTracker + Just uri -> httpTracker m uri (renderScrapeQuery q) + +-- | More particular version of 'scrape', just for one torrent. +-- +-- This function can throw 'RpcException'. +-- +scrapeOne :: Manager -> URI -> InfoHash -> IO ScrapeEntry +scrapeOne m uri ih = do + xs <- scrape m uri [ih] + case L.lookup ih xs of + Nothing -> throwIO BadScrape + Just a -> return a diff --git a/dht/bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs b/dht/bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs new file mode 100644 index 00000000..31b6b870 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs @@ -0,0 +1,454 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013-2014 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : provisional +-- Portability : portable +-- +-- This module implement UDP tracker protocol. +-- +-- For protocol details and uri scheme see: +-- , +-- +-- +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE DeriveDataTypeable #-} +module Network.BitTorrent.Tracker.RPC.UDP + ( -- * Manager + Options (..) + , Manager + , newManager + , closeManager + , withManager + + -- * RPC + , RpcException (..) + , announce + , scrape + ) where + +import Control.Applicative +import Control.Concurrent +import Control.Exception +import Control.Monad +import Data.Default +import Data.IORef +import Data.List as L +import Data.Map as M +import Data.Maybe +import Data.Serialize +import Data.Text as T +import Data.Time +import Data.Time.Clock.POSIX +import Data.Traversable +import Data.Typeable +import Text.Read (readMaybe) +import Network.Socket hiding (Connected, connect, listen) +import Network.Socket.ByteString as BS +import Network.URI +import System.Timeout + +import Network.BitTorrent.Tracker.Message + +{----------------------------------------------------------------------- +-- Options +-----------------------------------------------------------------------} + +-- | 'System.Timeout.timeout' specific. +sec :: Int +sec = 1000000 + +-- | See +defMinTimeout :: Int +defMinTimeout = 15 + +-- | See +defMaxTimeout :: Int +defMaxTimeout = 15 * 2 ^ (8 :: Int) + +-- | See: +defMultiplier :: Int +defMultiplier = 2 + +-- TODO why 98? +defMaxPacketSize :: Int +defMaxPacketSize = 98 + +-- | Manager configuration. +data Options = Options + { -- | Max size of a /response/ packet. + -- + -- 'optMaxPacketSize' /must/ be a positive value. + -- + optMaxPacketSize :: {-# UNPACK #-} !Int + + -- | Starting timeout interval in seconds. If a response is not + -- received after 'optMinTimeout' then 'Manager' repeat RPC with + -- timeout interval multiplied by 'optMultiplier' and so on until + -- timeout interval reach 'optMaxTimeout'. + -- + -- 'optMinTimeout' /must/ be a positive value. + -- + , optMinTimeout :: {-# UNPACK #-} !Int + + -- | Final timeout interval in seconds. After 'optMaxTimeout' + -- reached and tracker still not responding both 'announce' and + -- 'scrape' functions will throw 'TimeoutExpired' exception. + -- + -- 'optMaxTimeout' /must/ be greater than 'optMinTimeout'. + -- + , optMaxTimeout :: {-# UNPACK #-} !Int + + -- | 'optMultiplier' /must/ be a positive value. + , optMultiplier :: {-# UNPACK #-} !Int + } deriving (Show, Eq) + +-- | Options suitable for bittorrent client. +instance Default Options where + def = Options + { optMaxPacketSize = defMaxPacketSize + , optMinTimeout = defMinTimeout + , optMaxTimeout = defMaxTimeout + , optMultiplier = defMultiplier + } + +checkOptions :: Options -> IO () +checkOptions Options {..} = do + unless (optMaxPacketSize > 0) $ do + throwIO $ userError "optMaxPacketSize must be positive" + + unless (optMinTimeout > 0) $ do + throwIO $ userError "optMinTimeout must be positive" + + unless (optMaxTimeout > 0) $ do + throwIO $ userError "optMaxTimeout must be positive" + + unless (optMultiplier > 0) $ do + throwIO $ userError "optMultiplier must be positive" + + unless (optMaxTimeout > optMinTimeout) $ do + throwIO $ userError "optMaxTimeout must be greater than optMinTimeout" + + +{----------------------------------------------------------------------- +-- Manager state +-----------------------------------------------------------------------} + +type ConnectionCache = Map SockAddr Connection + +type PendingResponse = MVar (Either RpcException Response) +type PendingTransactions = Map TransactionId PendingResponse +type PendingQueries = Map SockAddr PendingTransactions + +-- | UDP tracker manager. +data Manager = Manager + { options :: !Options + , sock :: !Socket +-- , dnsCache :: !(IORef (Map URI SockAddr)) + , connectionCache :: !(IORef ConnectionCache) + , pendingResps :: !(MVar PendingQueries) + , listenerThread :: !(MVar ThreadId) + } + +initManager :: Options -> IO Manager +initManager opts = Manager opts + <$> socket AF_INET Datagram defaultProtocol + <*> newIORef M.empty + <*> newMVar M.empty + <*> newEmptyMVar + +unblockAll :: PendingQueries -> IO () +unblockAll m = traverse (traverse unblockCall) m >> return () + where + unblockCall ares = putMVar ares (Left ManagerClosed) + +resetState :: Manager -> IO () +resetState Manager {..} = do + writeIORef connectionCache err + m <- swapMVar pendingResps err + unblockAll m + mtid <- tryTakeMVar listenerThread + case mtid of + Nothing -> return () -- thread killed by 'closeManager' + Just _ -> return () -- thread killed by exception from 'listen' + return () + where + err = error "UDP tracker manager closed" + +-- | This function will throw 'IOException' on invalid 'Options'. +newManager :: Options -> IO Manager +newManager opts = do + checkOptions opts + mgr <- initManager opts + tid <- forkIO (listen mgr `finally` resetState mgr) + putMVar (listenerThread mgr) tid + return mgr + +-- | Unblock all RPCs by throwing 'ManagerClosed' exception. No rpc +-- calls should be performed after manager becomes closed. +closeManager :: Manager -> IO () +closeManager Manager {..} = do + close sock + mtid <- tryTakeMVar listenerThread + case mtid of + Nothing -> return () + Just tid -> killThread tid + +-- | Normally you need to use 'Control.Monad.Trans.Resource.allocate'. +withManager :: Options -> (Manager -> IO a) -> IO a +withManager opts = bracket (newManager opts) closeManager + +{----------------------------------------------------------------------- +-- Exceptions +-----------------------------------------------------------------------} + +data RpcException + -- | Unable to lookup hostname; + = HostUnknown + + -- | Unable to lookup hostname; + | HostLookupFailed + + -- | Expecting 'udp:', but some other scheme provided. + | UnrecognizedScheme String + + -- | Tracker exists but not responding for specific number of seconds. + | TimeoutExpired Int + + -- | Tracker responded with unexpected message type. + | UnexpectedResponse + { expectedMsg :: String + , actualMsg :: String + } + + -- | RPC succeed, but tracker responded with error code. + | QueryFailed Text + + -- | RPC manager closed while waiting for response. + | ManagerClosed + deriving (Eq, Show, Typeable) + +instance Exception RpcException + +{----------------------------------------------------------------------- +-- Host Addr resolution +-----------------------------------------------------------------------} + +setPort :: PortNumber -> SockAddr -> SockAddr +setPort p (SockAddrInet _ h) = SockAddrInet p h +setPort p (SockAddrInet6 _ f h s) = SockAddrInet6 p f h s +setPort _ addr = addr + +resolveURI :: URI -> IO SockAddr +resolveURI URI { uriAuthority = Just (URIAuth {..}) } = do + infos <- getAddrInfo Nothing (Just uriRegName) Nothing + let port = fromMaybe 0 (readMaybe (L.drop 1 uriPort) :: Maybe Int) + case infos of + AddrInfo {..} : _ -> return $ setPort (fromIntegral port) addrAddress + _ -> throwIO HostLookupFailed +resolveURI _ = throwIO HostUnknown + +-- TODO caching? +getTrackerAddr :: Manager -> URI -> IO SockAddr +getTrackerAddr _ uri + | uriScheme uri == "udp:" = resolveURI uri + | otherwise = throwIO (UnrecognizedScheme (uriScheme uri)) + +{----------------------------------------------------------------------- + Connection +-----------------------------------------------------------------------} + +connectionLifetime :: NominalDiffTime +connectionLifetime = 60 + +data Connection = Connection + { connectionId :: ConnectionId + , connectionTimestamp :: UTCTime + } deriving Show + +-- placeholder for the first 'connect' +initialConnection :: Connection +initialConnection = Connection initialConnectionId (posixSecondsToUTCTime 0) + +establishedConnection :: ConnectionId -> IO Connection +establishedConnection cid = Connection cid <$> getCurrentTime + +isExpired :: Connection -> IO Bool +isExpired Connection {..} = do + currentTime <- getCurrentTime + let timeDiff = diffUTCTime currentTime connectionTimestamp + return $ timeDiff > connectionLifetime + +{----------------------------------------------------------------------- +-- Transactions +-----------------------------------------------------------------------} + +-- | Sometimes 'genTransactionId' may return already used transaction +-- id. We use a good entropy source but the issue /still/ (with very +-- small probabality) may happen. If the collision happen then this +-- function tries to find nearest unused slot, otherwise pending +-- transactions table is full. +firstUnused :: SockAddr -> TransactionId -> PendingQueries -> TransactionId +firstUnused addr rid m = do + case M.splitLookup rid <$> M.lookup addr m of + Nothing -> rid + Just (_ , Nothing, _ ) -> rid + Just (lt, Just _ , gt) -> + case backwardHole (keys lt) rid <|> forwardHole rid (keys gt) of + Nothing -> error "firstUnused: table is full" -- impossible + Just tid -> tid + where + forwardHole a [] + | a == maxBound = Nothing + | otherwise = Just (succ a) + forwardHole a (b : xs) + | succ a == b = forwardHole b xs + | otherwise = Just (succ a) + + backwardHole [] a + | a == minBound = Nothing + | otherwise = Just (pred a) + backwardHole (b : xs) a + | b == pred a = backwardHole xs b + | otherwise = Just (pred a) + +register :: SockAddr -> TransactionId -> PendingResponse + -> PendingQueries -> PendingQueries +register addr tid ares = M.alter insertId addr + where + insertId Nothing = Just (M.singleton tid ares) + insertId (Just m) = Just (M.insert tid ares m) + +unregister :: SockAddr -> TransactionId + -> PendingQueries -> PendingQueries +unregister addr tid = M.update deleteId addr + where + deleteId m + | M.null m' = Nothing + | otherwise = Just m' + where + m' = M.delete tid m + +-- | Generate a new unused transaction id and register as pending. +allocTransaction :: Manager -> SockAddr -> PendingResponse -> IO TransactionId +allocTransaction Manager {..} addr ares = + modifyMVar pendingResps $ \ m -> do + rndId <- genTransactionId + let tid = firstUnused addr rndId m + return (register addr tid ares m, tid) + +-- | Wake up blocked thread and return response back. +commitTransaction :: Manager -> SockAddr -> TransactionId -> Response -> IO () +commitTransaction Manager {..} addr tid resp = + modifyMVarMasked_ pendingResps $ \ m -> do + case M.lookup tid =<< M.lookup addr m of + Nothing -> return m -- tracker responded after 'cancelTransaction' fired + Just ares -> do + putMVar ares (Right resp) + return $ unregister addr tid m + +-- | Abort transaction forcefully. +cancelTransaction :: Manager -> SockAddr -> TransactionId -> IO () +cancelTransaction Manager {..} addr tid = + modifyMVarMasked_ pendingResps $ \m -> + return $ unregister addr tid m + +-- | Handle responses from trackers. +listen :: Manager -> IO () +listen mgr @ Manager {..} = do + forever $ do + (bs, addr) <- BS.recvFrom sock (optMaxPacketSize options) + case decode bs of + Left _ -> return () -- parser failed, ignoring + Right (TransactionR {..}) -> commitTransaction mgr addr transIdR response + +-- | Perform RPC transaction. If the action interrupted transaction +-- will be aborted. +transaction :: Manager -> SockAddr -> Connection -> Request -> IO Response +transaction mgr @ Manager {..} addr conn request = do + ares <- newEmptyMVar + tid <- allocTransaction mgr addr ares + performTransaction tid ares + `onException` cancelTransaction mgr addr tid + where + performTransaction tid ares = do + let trans = TransactionQ (connectionId conn) tid request + BS.sendAllTo sock (encode trans) addr + takeMVar ares >>= either throwIO return + +{----------------------------------------------------------------------- +-- Connection cache +-----------------------------------------------------------------------} + +connect :: Manager -> SockAddr -> Connection -> IO ConnectionId +connect m addr conn = do + resp <- transaction m addr conn Connect + case resp of + Connected cid -> return cid + Failed msg -> throwIO $ QueryFailed msg + _ -> throwIO $ UnexpectedResponse "connected" (responseName resp) + +newConnection :: Manager -> SockAddr -> IO Connection +newConnection m addr = do + connId <- connect m addr initialConnection + establishedConnection connId + +refreshConnection :: Manager -> SockAddr -> Connection -> IO Connection +refreshConnection mgr addr conn = do + expired <- isExpired conn + if expired + then do + connId <- connect mgr addr conn + establishedConnection connId + else do + return conn + +withCache :: Manager -> SockAddr + -> (Maybe Connection -> IO Connection) -> IO Connection +withCache mgr addr action = do + cache <- readIORef (connectionCache mgr) + conn <- action (M.lookup addr cache) + writeIORef (connectionCache mgr) (M.insert addr conn cache) + return conn + +getConnection :: Manager -> SockAddr -> IO Connection +getConnection mgr addr = withCache mgr addr $ + maybe (newConnection mgr addr) (refreshConnection mgr addr) + +{----------------------------------------------------------------------- +-- RPC +-----------------------------------------------------------------------} + +retransmission :: Options -> IO a -> IO a +retransmission Options {..} action = go optMinTimeout + where + go curTimeout + | curTimeout > optMaxTimeout = throwIO $ TimeoutExpired curTimeout + | otherwise = do + r <- timeout (curTimeout * sec) action + maybe (go (optMultiplier * curTimeout)) return r + +queryTracker :: Manager -> URI -> Request -> IO Response +queryTracker mgr uri req = do + addr <- getTrackerAddr mgr uri + retransmission (options mgr) $ do + conn <- getConnection mgr addr + transaction mgr addr conn req + +-- | This function can throw 'RpcException'. +announce :: Manager -> URI -> AnnounceQuery -> IO AnnounceInfo +announce mgr uri q = do + resp <- queryTracker mgr uri (Announce q) + case resp of + Announced info -> return info + _ -> throwIO $ UnexpectedResponse "announce" (responseName resp) + +-- | This function can throw 'RpcException'. +scrape :: Manager -> URI -> ScrapeQuery -> IO ScrapeInfo +scrape mgr uri ihs = do + resp <- queryTracker mgr uri (Scrape ihs) + case resp of + Scraped info -> return $ L.zip ihs info + _ -> throwIO $ UnexpectedResponse "scrape" (responseName resp) diff --git a/dht/bittorrent/src/Network/BitTorrent/Tracker/Session.hs b/dht/bittorrent/src/Network/BitTorrent/Tracker/Session.hs new file mode 100644 index 00000000..db6ebaff --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Tracker/Session.hs @@ -0,0 +1,306 @@ +-- | +-- Copyright : (c) Sam Truzjan 2014 +-- License : BSD +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Multitracker sessions. +-- +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE TypeSynonymInstances #-} +{-# LANGUAGE TemplateHaskell #-} +module Network.BitTorrent.Tracker.Session + ( -- * Session + Session + , Event (..) + , newSession + , closeSession + , withSession + + -- * Client send notifications + , notify + , askPeers + + -- * Session state + -- ** Status + , Status (..) + , getStatus + + -- ** Single tracker sessions + , LastScrape (..) + , TrackerSession + , trackerPeers + , trackerScrape + , getSessionState + + -- * Tracker Exchange + -- | BEP28: + , addTracker + , removeTracker + , getTrustedTrackers + ) where + +import Control.Applicative +import Control.Exception +import Control.Concurrent +import Control.Concurrent.Chan.Split as CS +import Control.Monad +import Data.Default +import Data.Fixed +import Data.Foldable as F +import Data.IORef +import Data.List as L +import Data.Maybe +import Data.Time +import Data.Traversable +import Network.URI + +import Data.Torrent +import Network.Address +import Network.BitTorrent.Internal.Cache +import Network.BitTorrent.Internal.Types +import Network.BitTorrent.Tracker.List as TL +import Network.BitTorrent.Tracker.Message +import Network.BitTorrent.Tracker.RPC as RPC + +{----------------------------------------------------------------------- +-- Single tracker session +-----------------------------------------------------------------------} + +-- | Status of this client. +data Status + = Running -- ^ This client is announced and listenning for incoming + -- connections. + | Paused -- ^ This client does not expecting incoming connections. + deriving (Show, Eq, Bounded, Enum) + +-- | Client starting in the paused state. +instance Default Status where + def = Paused + +-- | Tracker session starts with scrape unknown. +instance Default LastScrape where + def = LastScrape Nothing Nothing + +data LastScrape = LastScrape + { -- | Count of leechers the tracker aware of. + scrapeLeechers :: Maybe Int + + -- | Count of seeders the tracker aware of. + , scrapeSeeders :: Maybe Int + } deriving (Show, Eq) + +-- | Single tracker session. +data TrackerSession = TrackerSession + { -- | Used to notify 'Stopped' and 'Completed' events. + statusSent :: !(Maybe Status) + + -- | Can be used to retrieve peer set. + , trackerPeers :: Cached [PeerAddr] + + -- | Can be used to show brief swarm stats in client GUI. + , trackerScrape :: Cached LastScrape + } + +-- | Not contacted. +instance Default TrackerSession where + def = TrackerSession Nothing def def + +-- | Do we need to notify this /specific/ tracker? +needNotify :: AnnounceEvent -> Maybe Status -> Maybe Bool +needNotify Started Nothing = Just True +needNotify Stopped Nothing = Just False +needNotify Completed Nothing = Just False +needNotify Started (Just Running) = Nothing +needNotify Stopped (Just Running) = Just True +needNotify Completed (Just Running) = Just True +needNotify Started (Just Paused ) = Just True +needNotify Stopped (Just Paused ) = Just False +needNotify Completed (Just Paused ) = Just True + +-- | Client status after event announce succeed. +nextStatus :: AnnounceEvent -> Maybe Status +nextStatus Started = Just Running +nextStatus Stopped = Just Paused +nextStatus Completed = Nothing -- must keep previous status + +seconds :: Int -> NominalDiffTime +seconds n = realToFrac (toEnum n :: Uni) + +cachePeers :: AnnounceInfo -> IO (Cached [PeerAddr]) +cachePeers AnnounceInfo {..} = + newCached (seconds respInterval) + (seconds (fromMaybe respInterval respMinInterval)) + (getPeerList respPeers) + +cacheScrape :: AnnounceInfo -> IO (Cached LastScrape) +cacheScrape AnnounceInfo {..} = + newCached (seconds respInterval) + (seconds (fromMaybe respInterval respMinInterval)) + LastScrape + { scrapeSeeders = respComplete + , scrapeLeechers = respIncomplete + } + +-- | Make announce request to specific tracker returning new state. +notifyTo :: Manager -> Session -> AnnounceEvent + -> TierEntry TrackerSession -> IO TrackerSession +notifyTo mgr s @ Session {..} event (uri, entry @ TrackerSession {..}) = do + let shouldNotify = needNotify event statusSent + mustNotify <- maybe (isExpired trackerPeers) return shouldNotify + if not mustNotify + then return entry + else do + let q = SAnnounceQuery sessionTopic def Nothing (Just event) + res <- RPC.announce mgr uri q + when (statusSent == Nothing) $ do + send sessionEvents (TrackerConfirmed uri) + send sessionEvents (AnnouncedTo uri) + let status' = nextStatus event <|> statusSent + TrackerSession status' <$> cachePeers res <*> cacheScrape res + +{----------------------------------------------------------------------- +-- Multitracker Session +-----------------------------------------------------------------------} + +-- | Multitracker session. +data Session = Session + { -- | Infohash to announce at each 'announce' request. + sessionTopic :: !InfoHash + + -- | Current status of this client is used to filter duplicated + -- notifications, for e.g. we don't want to notify a tracker with + -- ['Stopped', 'Stopped'], the last should be ignored. + , sessionStatus :: !(IORef Status) + + -- | A set of single-tracker sessions. Any request to a tracker + -- must take a lock. + , sessionTrackers :: !(MVar (TrackerList TrackerSession)) + + , sessionEvents :: !(SendPort (Event Session)) + } + +instance EventSource Session where + data Event Session + = TrackerAdded URI + | TrackerConfirmed URI + | TrackerRemoved URI + | AnnouncedTo URI + | SessionClosed + + listen Session {..} = CS.listen sessionEvents + + +-- | Create a new multitracker session in paused state. Tracker list +-- must contant only /trusted/ tracker uris. To start announcing +-- client presence use 'notify'. +newSession :: InfoHash -> TrackerList () -> IO Session +newSession ih origUris = do + urisList <- shuffleTiers origUris + statusRef <- newIORef def + entriesVar <- newMVar (fmap (const def) urisList) + eventStream <- newSendPort + return Session + { sessionTopic = ih + , sessionStatus = statusRef + , sessionTrackers = entriesVar + , sessionEvents = eventStream + } + +-- | Release scarce resources associated with the given session. This +-- function block until all trackers tied with this peer notified with +-- 'Stopped' event. +closeSession :: Manager -> Session -> IO () +closeSession m s @ Session {..} = do + notify m s Stopped + send sessionEvents SessionClosed + +{----------------------------------------------------------------------- +-- Operations +-----------------------------------------------------------------------} + +-- | Normally you need to use 'Control.Monad.Trans.Resource.alloc'. +withSession :: Manager -> InfoHash -> TrackerList () + -> (Session -> IO ()) -> IO () +withSession m ih uris = bracket (newSession ih uris) (closeSession m) + +-- | Get last announced status. The only action can alter this status +-- is 'notify'. +getStatus :: Session -> IO Status +getStatus Session {..} = readIORef sessionStatus + +getSessionState :: Session -> IO [[TierEntry TrackerSession]] +getSessionState Session {..} = TL.toList <$> readMVar sessionTrackers + +-- | Do we need to sent this event to a first working tracker or to +-- the all known good trackers? +allNotify :: AnnounceEvent -> Bool +allNotify Started = False +allNotify Stopped = True +allNotify Completed = True + +notifyAll :: Manager -> Session -> AnnounceEvent -> IO () +notifyAll mgr s @ Session {..} event = do + modifyMVar_ sessionTrackers $ + (traversal (notifyTo mgr s event)) + where + traversal + | allNotify event = traverseAll + | otherwise = traverseTiers + +-- TODO send notifications to tracker periodically. +-- | +-- +-- This function /may/ block until tracker query proceed. +notify :: Manager -> Session -> AnnounceEvent -> IO () +notify mgr ses event = do + prevStatus <- atomicModifyIORef (sessionStatus ses) $ \ s -> + (fromMaybe s (nextStatus event), s) + when (needNotify event (Just prevStatus) == Just True) $ do + notifyAll mgr ses event + +-- TODO run announce if sesion have no peers +-- | The returned list of peers can have duplicates. +-- This function /may/ block. Use async if needed. +askPeers :: Manager -> Session -> IO [PeerAddr] +askPeers _mgr ses = do + list <- readMVar (sessionTrackers ses) + L.concat <$> collect (tryTakeData . trackerPeers) list + +collect :: (a -> IO (Maybe b)) -> TrackerList a -> IO [b] +collect f lst = (catMaybes . F.toList) <$> traverse f lst + +--sourcePeers :: Session -> Source (PeerAddr IP) +--sourcePeers + +{----------------------------------------------------------------------- +-- Tracker exchange +-----------------------------------------------------------------------} + +-- Trackers discovered through this protocol SHOULD be treated with a +-- certain amount of suspicion. Since the source of a tracker exchange +-- message cannot be trusted, an implementation SHOULD have a lower +-- number of retries before giving up entirely. + +addTracker :: Session -> URI -> IO () +addTracker Session {..} uri = do + undefined + send sessionEvents (TrackerAdded uri) + +removeTracker :: Manager -> Session -> URI -> IO () +removeTracker m Session {..} uri = do + send sessionEvents (TrackerRemoved uri) + +-- Also, as specified under the definitions section, a tracker that +-- has not worked should never be propagated to other peers over the +-- tracker exchange protocol. + +-- | Return all known trackers. +getTrackers :: Session -> IO [URI] +getTrackers = undefined + +-- | Return trackers from torrent file and +getTrustedTrackers :: Session -> IO [URI] +getTrustedTrackers = undefined -- cgit v1.2.3