From 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sat, 28 Sep 2019 13:43:29 -0400 Subject: Factor out some new libraries word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search --- .../src/Network/BitTorrent/Exchange/Bitfield.hs | 405 +++++++ .../src/Network/BitTorrent/Exchange/Block.hs | 369 ++++++ .../src/Network/BitTorrent/Exchange/Connection.hs | 1012 ++++++++++++++++ .../src/Network/BitTorrent/Exchange/Download.hs | 296 +++++ .../src/Network/BitTorrent/Exchange/Manager.hs | 62 + .../src/Network/BitTorrent/Exchange/Message.hs | 1237 ++++++++++++++++++++ .../src/Network/BitTorrent/Exchange/Session.hs | 586 ++++++++++ 7 files changed, 3967 insertions(+) create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Block.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Message.hs create mode 100644 dht/bittorrent/src/Network/BitTorrent/Exchange/Session.hs (limited to 'dht/bittorrent/src/Network/BitTorrent/Exchange') diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs new file mode 100644 index 00000000..1be9f970 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs @@ -0,0 +1,405 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- This modules provides all necessary machinery to work with +-- bitfields. Bitfields are used to keep track indices of complete +-- pieces either this peer have or remote peer have. +-- +-- There are also commonly used piece selection algorithms +-- which used to find out which one next piece to download. +-- Selectors considered to be used in the following order: +-- +-- * 'randomFirst' - at the start of download. +-- +-- * 'rarestFirst' - performed to avoid situation when +-- rarest piece is unaccessible. +-- +-- * 'endGame' - performed after a peer has requested all +-- the subpieces of the content. +-- +-- Note that BitTorrent protocol recommend (TODO link?) the +-- 'strictFirst' priority policy for /subpiece/ or /blocks/ +-- selection. +-- +{-# LANGUAGE CPP #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE RecordWildCards #-} +module Network.BitTorrent.Exchange.Bitfield + ( -- * Bitfield + PieceIx + , PieceCount + , Bitfield + + -- * Construction + , haveAll + , haveNone + , have + , singleton + , interval + , adjustSize + + -- * Query + -- ** Cardinality + , Network.BitTorrent.Exchange.Bitfield.null + , Network.BitTorrent.Exchange.Bitfield.full + , haveCount + , totalCount + , completeness + + -- ** Membership + , member + , notMember + , findMin + , findMax + , isSubsetOf + + -- ** Availability + , complement + , Frequency + , frequencies + , rarest + + -- * Combine + , insert + , union + , intersection + , difference + + -- * Conversion + , toList + , fromList + + -- * Serialization + , fromBitmap + , toBitmap + + -- * Piece selection + , Selector + , selector + , strategyClass + + , strictFirst + , strictLast + , rarestFirst + , randomFirst + , endGame + ) where + +import Control.Monad +import Control.Monad.ST +import Data.ByteString (ByteString) +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as Lazy +import Data.Vector.Unboxed (Vector) +import qualified Data.Vector.Unboxed as V +import qualified Data.Vector.Unboxed.Mutable as VM +import Data.IntervalSet (IntSet) +import qualified Data.IntervalSet as S +import qualified Data.IntervalSet.ByteString as S +import Data.List (foldl') +import Data.Monoid +import Data.Ratio + +import Data.Torrent + +-- TODO cache some operations + +-- | Bitfields are represented just as integer sets but with a restriction: +-- each integer in the set should be within the given interval. The greatest +-- lower bound of the interval must be zero, so intervals may be specified by +-- providing a maximum set size. For example, a bitfield of size 10 might +-- contain only indices in interval [0..9]. +-- +-- By convention, we use the following aliases for Int: +-- +-- [ PieceIx ] an Int member of the Bitfield. +-- +-- [ PieceCount ] maximum set size for a Bitfield. +data Bitfield = Bitfield { + bfSize :: !PieceCount + , bfSet :: !IntSet + } deriving (Show, Read, Eq) + +-- Invariants: all elements of bfSet lie in [0..bfSize - 1]; + +instance Monoid Bitfield where + {-# SPECIALIZE instance Monoid Bitfield #-} + mempty = haveNone 0 + mappend = union + mconcat = unions + +{----------------------------------------------------------------------- + Construction +-----------------------------------------------------------------------} + +-- | The empty bitfield of the given size. +haveNone :: PieceCount -> Bitfield +haveNone s = Bitfield s S.empty + +-- | The full bitfield containing all piece indices for the given size. +haveAll :: PieceCount -> Bitfield +haveAll s = Bitfield s (S.interval 0 (s - 1)) + +-- | Insert the index in the set ignoring out of range indices. +have :: PieceIx -> Bitfield -> Bitfield +have ix Bitfield {..} + | 0 <= ix && ix < bfSize = Bitfield bfSize (S.insert ix bfSet) + | otherwise = Bitfield bfSize bfSet + +singleton :: PieceIx -> PieceCount -> Bitfield +singleton ix pc = have ix (haveNone pc) + +-- | Assign new size to bitfield. FIXME Normally, size should be only +-- decreased, otherwise exception raised. +adjustSize :: PieceCount -> Bitfield -> Bitfield +adjustSize s Bitfield {..} = Bitfield s bfSet + +-- | NOTE: for internal use only +interval :: PieceCount -> PieceIx -> PieceIx -> Bitfield +interval pc a b = Bitfield pc (S.interval a b) + +{----------------------------------------------------------------------- + Query +-----------------------------------------------------------------------} + +-- | Test if bitifield have no one index: peer do not have anything. +null :: Bitfield -> Bool +null Bitfield {..} = S.null bfSet + +-- | Test if bitfield have all pieces. +full :: Bitfield -> Bool +full Bitfield {..} = S.size bfSet == bfSize + +-- | Count of peer have pieces. +haveCount :: Bitfield -> PieceCount +haveCount = S.size . bfSet + +-- | Total count of pieces and its indices. +totalCount :: Bitfield -> PieceCount +totalCount = bfSize + +-- | Ratio of /have/ piece count to the /total/ piece count. +-- +-- > forall bf. 0 <= completeness bf <= 1 +-- +completeness :: Bitfield -> Ratio PieceCount +completeness b = haveCount b % totalCount b + +inRange :: PieceIx -> Bitfield -> Bool +inRange ix Bitfield {..} = 0 <= ix && ix < bfSize + +member :: PieceIx -> Bitfield -> Bool +member ix bf @ Bitfield {..} + | ix `inRange` bf = ix `S.member` bfSet + | otherwise = False + +notMember :: PieceIx -> Bitfield -> Bool +notMember ix bf @ Bitfield {..} + | ix `inRange` bf = ix `S.notMember` bfSet + | otherwise = True + +-- | Find first available piece index. +findMin :: Bitfield -> PieceIx +findMin = S.findMin . bfSet +{-# INLINE findMin #-} + +-- | Find last available piece index. +findMax :: Bitfield -> PieceIx +findMax = S.findMax . bfSet +{-# INLINE findMax #-} + +-- | Check if all pieces from first bitfield present if the second bitfield +isSubsetOf :: Bitfield -> Bitfield -> Bool +isSubsetOf a b = bfSet a `S.isSubsetOf` bfSet b +{-# INLINE isSubsetOf #-} + +-- | Resulting bitfield includes only missing pieces. +complement :: Bitfield -> Bitfield +complement Bitfield {..} = Bitfield + { bfSet = uni `S.difference` bfSet + , bfSize = bfSize + } + where + Bitfield _ uni = haveAll bfSize +{-# INLINE complement #-} + +{----------------------------------------------------------------------- +-- Availability +-----------------------------------------------------------------------} + +-- | Frequencies are needed in piece selection startegies which use +-- availability quantity to find out the optimal next piece index to +-- download. +type Frequency = Int + +-- TODO rename to availability +-- | How many times each piece index occur in the given bitfield set. +frequencies :: [Bitfield] -> Vector Frequency +frequencies [] = V.fromList [] +frequencies xs = runST $ do + v <- VM.new size + VM.set v 0 + forM_ xs $ \ Bitfield {..} -> do + forM_ (S.toList bfSet) $ \ x -> do + fr <- VM.read v x + VM.write v x (succ fr) + V.unsafeFreeze v + where + size = maximum (map bfSize xs) + +-- TODO it seems like this operation is veeery slow + +-- | Find least available piece index. If no piece available return +-- 'Nothing'. +rarest :: [Bitfield] -> Maybe PieceIx +rarest xs + | V.null freqMap = Nothing + | otherwise + = Just $ fst $ V.ifoldr' minIx (0, freqMap V.! 0) freqMap + where + freqMap = frequencies xs + {-# NOINLINE freqMap #-} + + minIx :: PieceIx -> Frequency + -> (PieceIx, Frequency) + -> (PieceIx, Frequency) + minIx ix fr acc@(_, fra) + | fr < fra && fr > 0 = (ix, fr) + | otherwise = acc + + +{----------------------------------------------------------------------- + Combine +-----------------------------------------------------------------------} + +insert :: PieceIx -> Bitfield -> Bitfield +insert pix bf @ Bitfield {..} + | 0 <= pix && pix < bfSize = Bitfield + { bfSet = S.insert pix bfSet + , bfSize = bfSize + } + | otherwise = bf + +-- | Find indices at least one peer have. +union :: Bitfield -> Bitfield -> Bitfield +union a b = {-# SCC union #-} Bitfield { + bfSize = bfSize a `max` bfSize b + , bfSet = bfSet a `S.union` bfSet b + } + +-- | Find indices both peers have. +intersection :: Bitfield -> Bitfield -> Bitfield +intersection a b = {-# SCC intersection #-} Bitfield { + bfSize = bfSize a `min` bfSize b + , bfSet = bfSet a `S.intersection` bfSet b + } + +-- | Find indices which have first peer but do not have the second peer. +difference :: Bitfield -> Bitfield -> Bitfield +difference a b = {-# SCC difference #-} Bitfield { + bfSize = bfSize a -- FIXME is it reasonable? + , bfSet = bfSet a `S.difference` bfSet b + } + +-- | Find indices the any of the peers have. +unions :: [Bitfield] -> Bitfield +unions = {-# SCC unions #-} foldl' union (haveNone 0) + +{----------------------------------------------------------------------- + Serialization +-----------------------------------------------------------------------} + +-- | List all /have/ indexes. +toList :: Bitfield -> [PieceIx] +toList Bitfield {..} = S.toList bfSet + +-- | Make bitfield from list of /have/ indexes. +fromList :: PieceCount -> [PieceIx] -> Bitfield +fromList s ixs = Bitfield { + bfSize = s + , bfSet = S.splitGT (-1) $ S.splitLT s $ S.fromList ixs + } + +-- | Unpack 'Bitfield' from tightly packed bit array. Note resulting +-- size might be more than real bitfield size, use 'adjustSize'. +fromBitmap :: ByteString -> Bitfield +fromBitmap bs = {-# SCC fromBitmap #-} Bitfield { + bfSize = B.length bs * 8 + , bfSet = S.fromByteString bs + } +{-# INLINE fromBitmap #-} + +-- | Pack a 'Bitfield' to tightly packed bit array. +toBitmap :: Bitfield -> Lazy.ByteString +toBitmap Bitfield {..} = {-# SCC toBitmap #-} Lazy.fromChunks [intsetBM, alignment] + where + byteSize = bfSize `div` 8 + if bfSize `mod` 8 == 0 then 0 else 1 + alignment = B.replicate (byteSize - B.length intsetBM) 0 + intsetBM = S.toByteString bfSet + +{----------------------------------------------------------------------- +-- Piece selection +-----------------------------------------------------------------------} + +type Selector = Bitfield -- ^ Indices of client /have/ pieces. + -> Bitfield -- ^ Indices of peer /have/ pieces. + -> [Bitfield] -- ^ Indices of other peers /have/ pieces. + -> Maybe PieceIx -- ^ Zero-based index of piece to request + -- to, if any. + +selector :: Selector -- ^ Selector to use at the start. + -> Ratio PieceCount + -> Selector -- ^ Selector to use after the client have + -- the C pieces. + -> Selector -- ^ Selector that changes behaviour based + -- on completeness. +selector start pt ready h a xs = + case strategyClass pt h of + SCBeginning -> start h a xs + SCReady -> ready h a xs + SCEnd -> endGame h a xs + +data StartegyClass + = SCBeginning + | SCReady + | SCEnd + deriving (Show, Eq, Ord, Enum, Bounded) + + +strategyClass :: Ratio PieceCount -> Bitfield -> StartegyClass +strategyClass threshold = classify . completeness + where + classify c + | c < threshold = SCBeginning + | c + 1 % numerator c < 1 = SCReady + -- FIXME numerator have is not total count + | otherwise = SCEnd + + +-- | Select the first available piece. +strictFirst :: Selector +strictFirst h a _ = Just $ findMin (difference a h) + +-- | Select the last available piece. +strictLast :: Selector +strictLast h a _ = Just $ findMax (difference a h) + +-- | +rarestFirst :: Selector +rarestFirst h a xs = rarest (map (intersection want) xs) + where + want = difference h a + +-- | In average random first is faster than rarest first strategy but +-- only if all pieces are available. +randomFirst :: Selector +randomFirst = do +-- randomIO + error "TODO: randomFirst" + +endGame :: Selector +endGame = strictLast diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Block.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Block.hs new file mode 100644 index 00000000..bc9a3d24 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Block.hs @@ -0,0 +1,369 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Blocks are used to transfer pieces. +-- +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +module Network.BitTorrent.Exchange.Block + ( -- * Block attributes + BlockOffset + , BlockCount + , BlockSize + , defaultTransferSize + + -- * Block index + , BlockIx(..) + , blockIxRange + + -- * Block data + , Block(..) + , blockIx + , blockSize + , blockRange + , isPiece + , leadingBlock + + -- * Block bucket + , Bucket + + -- ** Query + , Network.BitTorrent.Exchange.Block.null + , Network.BitTorrent.Exchange.Block.full + , Network.BitTorrent.Exchange.Block.size + , Network.BitTorrent.Exchange.Block.spans + + -- ** Construction + , Network.BitTorrent.Exchange.Block.empty + , Network.BitTorrent.Exchange.Block.insert + , Network.BitTorrent.Exchange.Block.insertLazy + , Network.BitTorrent.Exchange.Block.merge + , Network.BitTorrent.Exchange.Block.fromList + + -- ** Rendering + , Network.BitTorrent.Exchange.Block.toPiece + + -- ** Debug + , Network.BitTorrent.Exchange.Block.valid + ) where + +import Prelude hiding (span) +import Control.Applicative +import Data.ByteString as BS hiding (span) +import Data.ByteString.Lazy as BL hiding (span) +import Data.ByteString.Lazy.Builder as BS +import Data.Default +import Data.Monoid +import Data.List as L hiding (span) +import Data.Serialize as S +import Data.Typeable +import Numeric +import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) + +import Data.Torrent + +{----------------------------------------------------------------------- +-- Block attributes +-----------------------------------------------------------------------} + +-- | Offset of a block in a piece in bytes. Should be multiple of +-- the choosen block size. +type BlockOffset = Int + +-- | Size of a block in bytes. Should be power of 2. +-- +-- Normally block size is equal to 'defaultTransferSize'. +-- +type BlockSize = Int + +-- | Number of block in a piece of a torrent. Used to distinguish +-- block count from piece count. +type BlockCount = Int + +-- | Widely used semi-official block size. Some clients can ignore if +-- block size of BlockIx in Request message is not equal to this +-- value. +-- +defaultTransferSize :: BlockSize +defaultTransferSize = 16 * 1024 + +{----------------------------------------------------------------------- + Block Index +-----------------------------------------------------------------------} + +-- | BlockIx correspond. +data BlockIx = BlockIx { + -- | Zero-based piece index. + ixPiece :: {-# UNPACK #-} !PieceIx + + -- | Zero-based byte offset within the piece. + , ixOffset :: {-# UNPACK #-} !BlockOffset + + -- | Block size starting from offset. + , ixLength :: {-# UNPACK #-} !BlockSize + } deriving (Show, Eq, Typeable) + +-- | First block in torrent. Useful for debugging. +instance Default BlockIx where + def = BlockIx 0 0 defaultTransferSize + +getInt :: S.Get Int +getInt = fromIntegral <$> S.getWord32be +{-# INLINE getInt #-} + +putInt :: S.Putter Int +putInt = S.putWord32be . fromIntegral +{-# INLINE putInt #-} + +instance Serialize BlockIx where + {-# SPECIALIZE instance Serialize BlockIx #-} + get = BlockIx <$> getInt + <*> getInt + <*> getInt + {-# INLINE get #-} + + put BlockIx {..} = do + putInt ixPiece + putInt ixOffset + putInt ixLength + {-# INLINE put #-} + +instance Pretty BlockIx where + pPrint BlockIx {..} = + ("piece = " <> int ixPiece <> ",") <+> + ("offset = " <> int ixOffset <> ",") <+> + ("length = " <> int ixLength) + +-- | Get location of payload bytes in the torrent content. +blockIxRange :: (Num a, Integral a) => PieceSize -> BlockIx -> (a, a) +blockIxRange piSize BlockIx {..} = (offset, offset + len) + where + offset = fromIntegral piSize * fromIntegral ixPiece + + fromIntegral ixOffset + len = fromIntegral ixLength +{-# INLINE blockIxRange #-} + +{----------------------------------------------------------------------- + Block +-----------------------------------------------------------------------} + +data Block payload = Block { + -- | Zero-based piece index. + blkPiece :: {-# UNPACK #-} !PieceIx + + -- | Zero-based byte offset within the piece. + , blkOffset :: {-# UNPACK #-} !BlockOffset + + -- | Payload bytes. + , blkData :: !payload + } deriving (Show, Eq, Functor, Typeable) + +-- | Payload is ommitted. +instance Pretty (Block BL.ByteString) where + pPrint = pPrint . blockIx + {-# INLINE pPrint #-} + +-- | Get size of block /payload/ in bytes. +blockSize :: Block BL.ByteString -> BlockSize +blockSize = fromIntegral . BL.length . blkData +{-# INLINE blockSize #-} + +-- | Get block index of a block. +blockIx :: Block BL.ByteString -> BlockIx +blockIx = BlockIx <$> blkPiece <*> blkOffset <*> blockSize + +-- | Get location of payload bytes in the torrent content. +blockRange :: (Num a, Integral a) + => PieceSize -> Block BL.ByteString -> (a, a) +blockRange piSize = blockIxRange piSize . blockIx +{-# INLINE blockRange #-} + +-- | Test if a block can be safely turned into a piece. +isPiece :: PieceSize -> Block BL.ByteString -> Bool +isPiece pieceLen blk @ (Block i offset _) = + offset == 0 && blockSize blk == pieceLen && i >= 0 +{-# INLINE isPiece #-} + +-- | First block in the piece. +leadingBlock :: PieceIx -> BlockSize -> BlockIx +leadingBlock pix blockSize = BlockIx + { ixPiece = pix + , ixOffset = 0 + , ixLength = blockSize + } +{-# INLINE leadingBlock #-} + +{----------------------------------------------------------------------- +-- Bucket +-----------------------------------------------------------------------} + +type Pos = Int +type ChunkSize = Int + +-- | A sparse set of blocks used to represent an /in progress/ piece. +data Bucket + = Nil + | Span {-# UNPACK #-} !ChunkSize !Bucket + | Fill {-# UNPACK #-} !ChunkSize !Builder !Bucket + +instance Show Bucket where + showsPrec i Nil = showString "" + showsPrec i (Span s xs) = showString "Span " <> showInt s + <> showString " " <> showsPrec i xs + showsPrec i (Fill s _ xs) = showString "Fill " <> showInt s + <> showString " " <> showsPrec i xs + +-- | INVARIANT: 'Nil' should appear only after 'Span' of 'Fill'. +nilInvFailed :: a +nilInvFailed = error "Nil: bucket invariant failed" + +valid :: Bucket -> Bool +valid = check Nothing + where + check Nothing Nil = False -- see 'nilInvFailed' + check (Just _) _ = True + check prevIsSpan (Span sz xs) = + prevIsSpan /= Just True && -- Span n (NotSpan .. ) invariant + sz > 0 && -- Span is always non-empty + check (Just True) xs + check prevIsSpan (Fill sz b xs) = + prevIsSpan /= Just True && -- Fill n (NotFill .. ) invariant + sz > 0 && -- Fill is always non-empty + check (Just False) xs + +instance Pretty Bucket where + pPrint Nil = nilInvFailed + pPrint bkt = go bkt + where + go Nil = PP.empty + go (Span sz xs) = "Span" <+> PP.int sz <+> go xs + go (Fill sz b xs) = "Fill" <+> PP.int sz <+> go xs + +-- | Smart constructor: use it when some block is /deleted/ from +-- bucket. +span :: ChunkSize -> Bucket -> Bucket +span sz (Span sz' xs) = Span (sz + sz') xs +span sz xxs = Span sz xxs +{-# INLINE span #-} + +-- | Smart constructor: use it when some block is /inserted/ to +-- bucket. +fill :: ChunkSize -> Builder -> Bucket -> Bucket +fill sz b (Fill sz' b' xs) = Fill (sz + sz') (b <> b') xs +fill sz b xxs = Fill sz b xxs +{-# INLINE fill #-} + +{----------------------------------------------------------------------- +-- Bucket queries +-----------------------------------------------------------------------} + +-- | /O(1)/. Test if this bucket is empty. +null :: Bucket -> Bool +null Nil = nilInvFailed +null (Span _ Nil) = True +null _ = False +{-# INLINE null #-} + +-- | /O(1)/. Test if this bucket is complete. +full :: Bucket -> Bool +full Nil = nilInvFailed +full (Fill _ _ Nil) = True +full _ = False +{-# INLINE full #-} + +-- | /O(n)/. Total size of the incompleted piece. +size :: Bucket -> PieceSize +size Nil = nilInvFailed +size bkt = go bkt + where + go Nil = 0 + go (Span sz xs) = sz + go xs + go (Fill sz _ xs) = sz + go xs + +-- | /O(n)/. List incomplete blocks to download. If some block have +-- size more than the specified 'BlockSize' then block is split into +-- smaller blocks to satisfy given 'BlockSize'. Small (for +-- e.g. trailing) blocks is not ignored, but returned in-order. +spans :: BlockSize -> Bucket -> [(BlockOffset, BlockSize)] +spans expectedSize = go 0 + where + go _ Nil = [] + go off (Span sz xs) = listChunks off sz ++ go (off + sz) xs + go off (Fill sz _ xs) = go (off + sz) xs + + listChunks off restSize + | restSize <= 0 = [] + | otherwise = (off, blkSize) + : listChunks (off + blkSize) (restSize - blkSize) + where + blkSize = min expectedSize restSize + +{----------------------------------------------------------------------- +-- Bucket contstruction +-----------------------------------------------------------------------} + +-- | /O(1)/. A new empty bucket capable to alloof specified size. +empty :: PieceSize -> Bucket +empty sz + | sz < 0 = error "empty: Bucket size must be a non-negative value" + | otherwise = Span sz Nil +{-# INLINE empty #-} + +insertSpan :: Pos -> BS.ByteString -> ChunkSize -> Bucket -> Bucket +insertSpan !pos !bs !span_sz !xs = + let pref_len = pos + fill_len = span_sz - pos `min` BS.length bs + suff_len = (span_sz - pos) - fill_len + in mkSpan pref_len $ + fill fill_len (byteString (BS.take fill_len bs)) $ + mkSpan suff_len $ + xs + where + mkSpan 0 xs = xs + mkSpan sz xs = Span sz xs + +-- | /O(n)/. Insert a strict bytestring at specified position. +-- +-- Best case: if blocks are inserted in sequential order, then this +-- operation should take /O(1)/. +-- +insert :: Pos -> BS.ByteString -> Bucket -> Bucket +insert _ _ Nil = nilInvFailed +insert dstPos bs bucket = go 0 bucket + where + intersects curPos sz = dstPos >= curPos && dstPos <= curPos + sz + + go _ Nil = Nil + go curPos (Span sz xs) + | intersects curPos sz = insertSpan (dstPos - curPos) bs sz xs + | otherwise = span sz (go (curPos + sz) xs) + go curPos bkt @ (Fill sz br xs) + | intersects curPos sz = bkt + | otherwise = fill sz br (go (curPos + sz) xs) + +fromList :: PieceSize -> [(Pos, BS.ByteString)] -> Bucket +fromList s = L.foldr (uncurry Network.BitTorrent.Exchange.Block.insert) + (Network.BitTorrent.Exchange.Block.empty s) + +-- TODO zero-copy +insertLazy :: Pos -> BL.ByteString -> Bucket -> Bucket +insertLazy pos bl = Network.BitTorrent.Exchange.Block.insert pos (BL.toStrict bl) + +-- | /O(n)/. +merge :: Bucket -> Bucket -> Bucket +merge = error "Bucket.merge: not implemented" + +-- | /O(1)/. +toPiece :: Bucket -> Maybe BL.ByteString +toPiece Nil = nilInvFailed +toPiece (Fill _ b Nil) = Just (toLazyByteString b) +toPiece _ = Nothing diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs new file mode 100644 index 00000000..6804d0a2 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs @@ -0,0 +1,1012 @@ +-- | +-- Module : Network.BitTorrent.Exchange.Wire +-- Copyright : (c) Sam Truzjan 2013 +-- (c) Daniel Gröber 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Each peer wire connection is identified by triple @(topic, +-- remote_addr, this_addr)@. This means that connections are the +-- same if and only if their 'ConnectionId' are the same. Of course, +-- you /must/ avoid duplicated connections. +-- +-- This module control /integrity/ of data send and received. +-- +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +module Network.BitTorrent.Exchange.Connection + ( -- * Wire + Connected + , Wire + , ChannelSide (..) + + -- * Connection + , Connection + , connInitiatedBy + + -- ** Identity + , connRemoteAddr + , connTopic + , connRemotePeerId + , connThisPeerId + + -- ** Capabilities + , connProtocol + , connCaps + , connExtCaps + , connRemoteEhs + + -- ** State + , connStatus + , connBitfield + + -- ** Env + , connOptions + , connSession + , connStats + + -- ** Status + , PeerStatus (..) + , ConnectionStatus (..) + , updateStatus + , statusUpdates + , clientStatus + , remoteStatus + , canUpload + , canDownload + , defaultUnchokeSlots + , defaultRechokeInterval + + + -- * Setup + , ConnectionPrefs (..) + , SessionLink (..) + , ConnectionConfig (..) + + -- ** Initiate + , connectWire + + -- ** Accept + , PendingConnection + , newPendingConnection + , pendingPeer + , pendingCaps + , pendingTopic + , closePending + , acceptWire + + -- ** Post setup actions + , resizeBitfield + + -- * Messaging + , recvMessage + , sendMessage + , filterQueue + , getMaxQueueLength + + -- * Exceptions + , ProtocolError (..) + , WireFailure (..) + , peerPenalty + , isWireFailure + , disconnectPeer + + -- * Stats + , ByteStats (..) + , FlowStats (..) + , ConnectionStats (..) + + -- * Flood detection + , FloodDetector (..) + + -- * Options + , Options (..) + ) where + +import Control.Applicative +import Control.Concurrent hiding (yield) +import Control.Exception +import Control.Monad.Reader +import Control.Monad.State +import Control.Monad.Trans.Resource +import Control.Lens +import Data.ByteString as BS +import Data.ByteString.Lazy as BSL +import Data.Conduit as C +import Data.Conduit.Cereal +import Data.Conduit.List +import Data.Conduit.Network +import Data.Default +import Data.IORef +import Data.List as L +import Data.Maybe as M +import Data.Monoid +import Data.Serialize as S +import Data.Typeable +import Network +import Network.Socket hiding (Connected) +import Network.Socket.ByteString as BS +import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) +import Text.Show.Functions () +import System.Log.FastLogger (ToLogStr(..)) +import System.Timeout + +import Data.Torrent +import Network.Address +import Network.BitTorrent.Exchange.Bitfield as BF +import Network.BitTorrent.Exchange.Message as Msg + +-- TODO handle port message? +-- TODO handle limits? +-- TODO filter not requested PIECE messages +-- TODO metadata piece request flood protection +-- TODO piece request flood protection +-- TODO protect against flood attacks +{----------------------------------------------------------------------- +-- Exceptions +-----------------------------------------------------------------------} + +-- | Used to specify initiator of 'ProtocolError'. +data ChannelSide + = ThisPeer + | RemotePeer + deriving (Show, Eq, Enum, Bounded) + +instance Default ChannelSide where + def = ThisPeer + +instance Pretty ChannelSide where + pPrint = PP.text . show + +-- | A protocol errors occur when a peer violates protocol +-- specification. +data ProtocolError + -- | Protocol string should be 'BitTorrent Protocol' but remote + -- peer have sent a different string. + = InvalidProtocol ProtocolName + + -- | Sent and received protocol strings do not match. Can occur + -- in 'connectWire' only. + | UnexpectedProtocol ProtocolName + + -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not + -- match with 'hsInfoHash' /this/ peer have sent. Can occur in + -- 'connectWire' or 'acceptWire' only. + | UnexpectedTopic InfoHash + + -- | Some trackers or DHT can return 'PeerId' of a peer. If a + -- remote peer handshaked with different 'hsPeerId' then this + -- exception is raised. Can occur in 'connectWire' only. + | UnexpectedPeerId PeerId + + -- | Accepted peer have sent unknown torrent infohash in + -- 'hsInfoHash' field. This situation usually happen when /this/ + -- peer have deleted the requested torrent. The error can occur in + -- 'acceptWire' function only. + | UnknownTopic InfoHash + + -- | A remote peer have 'ExtExtended' enabled but did not send an + -- 'ExtendedHandshake' back. + | HandshakeRefused + + -- | 'Network.BitTorrent.Exchange.Message.Bitfield' message MUST + -- be send either once or zero times, but either this peer or + -- remote peer send a bitfield message the second time. + | BitfieldAlreadySent ChannelSide + + -- | Capabilities violation. For example this exception can occur + -- when a peer have sent 'Port' message but 'ExtDHT' is not + -- allowed in 'connCaps'. + | DisallowedMessage + { -- | Who sent invalid message. + violentSender :: ChannelSide + + -- | If the 'violentSender' reconnect with this extension + -- enabled then he can try to send this message. + , extensionRequired :: Extension + } + deriving Show + +instance Pretty ProtocolError where + pPrint = PP.text . show + +errorPenalty :: ProtocolError -> Int +errorPenalty (InvalidProtocol _) = 1 +errorPenalty (UnexpectedProtocol _) = 1 +errorPenalty (UnexpectedTopic _) = 1 +errorPenalty (UnexpectedPeerId _) = 1 +errorPenalty (UnknownTopic _) = 0 +errorPenalty (HandshakeRefused ) = 1 +errorPenalty (BitfieldAlreadySent _) = 1 +errorPenalty (DisallowedMessage _ _) = 1 + +-- | Exceptions used to interrupt the current P2P session. +data WireFailure + = ConnectionRefused IOError + + -- | Force termination of wire connection. + -- + -- Normally you should throw only this exception from event loop + -- using 'disconnectPeer', other exceptions are thrown + -- automatically by functions from this module. + -- + | DisconnectPeer + + -- | A peer not responding and did not send a 'KeepAlive' message + -- for a specified period of time. + | PeerDisconnected + + -- | A remote peer have sent some unknown message we unable to + -- parse. + | DecodingError GetException + + -- | See 'ProtocolError' for more details. + | ProtocolError ProtocolError + + -- | A possible malicious peer have sent too many control messages + -- without making any progress. + | FloodDetected ConnectionStats + deriving (Show, Typeable) + +instance Exception WireFailure + +instance Pretty WireFailure where + pPrint = PP.text . show + +-- TODO +-- data Penalty = Ban | Penalty Int + +peerPenalty :: WireFailure -> Int +peerPenalty DisconnectPeer = 0 +peerPenalty PeerDisconnected = 0 +peerPenalty (DecodingError _) = 1 +peerPenalty (ProtocolError e) = errorPenalty e +peerPenalty (FloodDetected _) = 1 + +-- | Do nothing with exception, used with 'handle' or 'try'. +isWireFailure :: Monad m => WireFailure -> m () +isWireFailure _ = return () + +protocolError :: MonadThrow m => ProtocolError -> m a +protocolError = monadThrow . ProtocolError + +{----------------------------------------------------------------------- +-- Stats +-----------------------------------------------------------------------} + +-- | Message stats in one direction. +data FlowStats = FlowStats + { -- | Number of the messages sent or received. + messageCount :: {-# UNPACK #-} !Int + -- | Sum of byte sequences of all messages. + , messageBytes :: {-# UNPACK #-} !ByteStats + } deriving Show + +instance Pretty FlowStats where + pPrint FlowStats {..} = + PP.int messageCount <+> "messages" $+$ + pPrint messageBytes + +-- | Zeroed stats. +instance Default FlowStats where + def = FlowStats 0 def + +-- | Monoid under addition. +instance Monoid FlowStats where + mempty = def + mappend a b = FlowStats + { messageBytes = messageBytes a <> messageBytes b + , messageCount = messageCount a + messageCount b + } + +-- | Find average length of byte sequences per message. +avgByteStats :: FlowStats -> ByteStats +avgByteStats (FlowStats n ByteStats {..}) = ByteStats + { overhead = overhead `quot` n + , control = control `quot` n + , payload = payload `quot` n + } + +-- | Message stats in both directions. This data can be retrieved +-- using 'getStats' function. +-- +-- Note that this stats is completely different from +-- 'Data.Torrent.Progress.Progress': payload bytes not necessary +-- equal to downloaded\/uploaded bytes since a peer can send a +-- broken block. +-- +data ConnectionStats = ConnectionStats + { -- | Received messages stats. + incomingFlow :: !FlowStats + -- | Sent messages stats. + , outcomingFlow :: !FlowStats + } deriving Show + +instance Pretty ConnectionStats where + pPrint ConnectionStats {..} = vcat + [ "Recv:" <+> pPrint incomingFlow + , "Sent:" <+> pPrint outcomingFlow + , "Both:" <+> pPrint (incomingFlow <> outcomingFlow) + ] + +-- | Zeroed stats. +instance Default ConnectionStats where + def = ConnectionStats def def + +-- | Monoid under addition. +instance Monoid ConnectionStats where + mempty = def + mappend a b = ConnectionStats + { incomingFlow = incomingFlow a <> incomingFlow b + , outcomingFlow = outcomingFlow a <> outcomingFlow b + } + +-- | Aggregate one more message stats in the /specified/ direction. +addStats :: ChannelSide -> ByteStats -> ConnectionStats -> ConnectionStats +addStats ThisPeer x s = s { outcomingFlow = (FlowStats 1 x) <> (outcomingFlow s) } +addStats RemotePeer x s = s { incomingFlow = (FlowStats 1 x) <> (incomingFlow s) } + +-- | Sum of overhead and control bytes in both directions. +wastedBytes :: ConnectionStats -> Int +wastedBytes ConnectionStats {..} = overhead + control + where + FlowStats _ ByteStats {..} = incomingFlow <> outcomingFlow + +-- | Sum of payload bytes in both directions. +payloadBytes :: ConnectionStats -> Int +payloadBytes ConnectionStats {..} = + payload (messageBytes (incomingFlow <> outcomingFlow)) + +-- | Sum of any bytes in both directions. +transmittedBytes :: ConnectionStats -> Int +transmittedBytes ConnectionStats {..} = + byteLength (messageBytes (incomingFlow <> outcomingFlow)) + +{----------------------------------------------------------------------- +-- Flood protection +-----------------------------------------------------------------------} + +defaultFloodFactor :: Int +defaultFloodFactor = 1 + +-- | This is a very permissive value, connection setup usually takes +-- around 10-100KB, including both directions. +defaultFloodThreshold :: Int +defaultFloodThreshold = 2 * 1024 * 1024 + +-- | A flood detection function. +type Detector stats = Int -- ^ Factor; + -> Int -- ^ Threshold; + -> stats -- ^ Stats to analyse; + -> Bool -- ^ Is this a flooded connection? + +defaultDetector :: Detector ConnectionStats +defaultDetector factor threshold s = + transmittedBytes s > threshold && + factor * wastedBytes s > payloadBytes s + +-- | Flood detection is used to protect /this/ peer against a /remote/ +-- malicious peer sending meaningless control messages. +data FloodDetector = FloodDetector + { -- | Max ratio of payload bytes to control bytes. + floodFactor :: {-# UNPACK #-} !Int + + -- | Max count of bytes connection /setup/ can take including + -- 'Handshake', 'ExtendedHandshake', 'Bitfield', 'Have' and 'Port' + -- messages. This value is used to avoid false positives at the + -- connection initialization. + , floodThreshold :: {-# UNPACK #-} !Int + + -- | Flood predicate on the /current/ 'ConnectionStats'. + , floodPredicate :: Detector ConnectionStats + } deriving Show + +instance Eq FloodDetector where + a == b = floodFactor a == floodFactor b + && floodThreshold a == floodThreshold b + +-- | Flood detector with very permissive options. +instance Default FloodDetector where + def = FloodDetector + { floodFactor = defaultFloodFactor + , floodThreshold = defaultFloodThreshold + , floodPredicate = defaultDetector + } + +-- | This peer might drop connection if the detector gives positive answer. +runDetector :: FloodDetector -> ConnectionStats -> Bool +runDetector FloodDetector {..} = floodPredicate floodFactor floodThreshold + +{----------------------------------------------------------------------- +-- Options +-----------------------------------------------------------------------} + +-- | Various connection settings and limits. +data Options = Options + { -- | How often /this/ peer should send 'KeepAlive' messages. + keepaliveInterval :: {-# UNPACK #-} !Int + + -- | /This/ peer will drop connection if a /remote/ peer did not + -- send any message for this period of time. + , keepaliveTimeout :: {-# UNPACK #-} !Int + + , requestQueueLength :: {-# UNPACK #-} !Int + + -- | Used to protect against flood attacks. + , floodDetector :: FloodDetector + + -- | Used to protect against flood attacks in /metadata + -- exchange/. Normally, a requesting peer should request each + -- 'InfoDict' piece only one time, but a malicious peer can + -- saturate wire with 'MetadataRequest' messages thus flooding + -- responding peer. + -- + -- This value set upper bound for number of 'MetadataRequests' + -- for each piece. + -- + , metadataFactor :: {-# UNPACK #-} !Int + + -- | Used to protect against out-of-memory attacks: malicious peer + -- can claim that 'totalSize' is, say, 100TB and send some random + -- data instead of infodict pieces. Since requesting peer unable + -- to check not completed infodict via the infohash, the + -- accumulated pieces will allocate the all available memory. + -- + -- This limit set upper bound for 'InfoDict' size. See + -- 'ExtendedMetadata' for more info. + -- + , maxInfoDictSize :: {-# UNPACK #-} !Int + } deriving (Show, Eq) + +-- | Permissive default parameters, most likely you don't need to +-- change them. +instance Default Options where + def = Options + { keepaliveInterval = defaultKeepAliveInterval + , keepaliveTimeout = defaultKeepAliveTimeout + , requestQueueLength = defaultRequestQueueLength + , floodDetector = def + , metadataFactor = defaultMetadataFactor + , maxInfoDictSize = defaultMaxInfoDictSize + } + +{----------------------------------------------------------------------- +-- Peer status +-----------------------------------------------------------------------} + +-- | Connections contain two bits of state on either end: choked or +-- not, and interested or not. +data PeerStatus = PeerStatus + { -- | Choking is a notification that no data will be sent until + -- unchoking happens. + _choking :: !Bool + + -- | + , _interested :: !Bool + } deriving (Show, Eq, Ord) + +$(makeLenses ''PeerStatus) + +instance Pretty PeerStatus where + pPrint PeerStatus {..} = + pPrint (Choking _choking) <+> "and" <+> pPrint (Interested _interested) + +-- | Connections start out choked and not interested. +instance Default PeerStatus where + def = PeerStatus True False + +instance Monoid PeerStatus where + mempty = def + mappend a b = PeerStatus + { _choking = _choking a && _choking b + , _interested = _interested a || _interested b + } + +-- | Can be used to update remote peer status using incoming 'Status' +-- message. +updateStatus :: StatusUpdate -> PeerStatus -> PeerStatus +updateStatus (Choking b) = choking .~ b +updateStatus (Interested b) = interested .~ b + +-- | Can be used to generate outcoming messages. +statusUpdates :: PeerStatus -> PeerStatus -> [StatusUpdate] +statusUpdates a b = M.catMaybes $ + [ if _choking a == _choking b then Nothing + else Just $ Choking $ _choking b + , if _interested a == _interested b then Nothing + else Just $ Interested $ _interested b + ] + +{----------------------------------------------------------------------- +-- Connection status +-----------------------------------------------------------------------} + +-- | Status of the both endpoints. +data ConnectionStatus = ConnectionStatus + { _clientStatus :: !PeerStatus + , _remoteStatus :: !PeerStatus + } deriving (Show, Eq) + +$(makeLenses ''ConnectionStatus) + +instance Pretty ConnectionStatus where + pPrint ConnectionStatus {..} = + "this " PP.<+> pPrint _clientStatus PP.$$ + "remote" PP.<+> pPrint _remoteStatus + +-- | Connections start out choked and not interested. +instance Default ConnectionStatus where + def = ConnectionStatus def def + +-- | Can the client transfer to the remote peer? +canUpload :: ConnectionStatus -> Bool +canUpload ConnectionStatus {..} + = _interested _remoteStatus && not (_choking _clientStatus) + +-- | Can the client transfer from the remote peer? +canDownload :: ConnectionStatus -> Bool +canDownload ConnectionStatus {..} + = _interested _clientStatus && not (_choking _remoteStatus) + +-- | Indicates how many peers are allowed to download from the client +-- by default. +defaultUnchokeSlots :: Int +defaultUnchokeSlots = 4 + +-- | +defaultRechokeInterval :: Int +defaultRechokeInterval = 10 * 1000 * 1000 + +{----------------------------------------------------------------------- +-- Connection +-----------------------------------------------------------------------} + +data ConnectionState = ConnectionState { + -- | If @not (allowed ExtExtended connCaps)@ then this set is always + -- empty. Otherwise it has the BEP10 extension protocol mandated mapping of + -- 'MessageId' to the message type for the remote peer. + -- + -- Note that this value can change in current session if either + -- this or remote peer will initiate rehandshaking. + -- + _connExtCaps :: !ExtendedCaps + + -- | Current extended handshake information from the remote peer + , _connRemoteEhs :: !ExtendedHandshake + + -- | Various stats about messages sent and received. Stats can be + -- used to protect /this/ peer against flood attacks. + -- + -- Note that this value will change with the next sent or received + -- message. + , _connStats :: !ConnectionStats + + , _connStatus :: !ConnectionStatus + + -- | Bitfield of remote endpoint. + , _connBitfield :: !Bitfield + } + +makeLenses ''ConnectionState + +instance Default ConnectionState where + def = ConnectionState + { _connExtCaps = def + , _connRemoteEhs = def + , _connStats = def + , _connStatus = def + , _connBitfield = BF.haveNone 0 + } + +-- | Connection keep various info about both peers. +data Connection s = Connection + { connInitiatedBy :: !ChannelSide + + , connRemoteAddr :: !(PeerAddr IP) + + -- | /Both/ peers handshaked with this protocol string. The only + -- value is \"Bittorrent Protocol\" but this can be changed in + -- future. + , connProtocol :: !ProtocolName + + -- | Set of enabled core extensions, i.e. the pre BEP10 extension + -- mechanism. This value is used to check if a message is allowed + -- to be sent or received. + , connCaps :: !Caps + + -- | /Both/ peers handshaked with this infohash. A connection can + -- handle only one topic, use 'reconnect' to change the current + -- topic. + , connTopic :: !InfoHash + + -- | Typically extracted from handshake. + , connRemotePeerId :: !PeerId + + -- | Typically extracted from handshake. + , connThisPeerId :: !PeerId + + -- | + , connOptions :: !Options + + -- | Mutable connection state, see 'ConnectionState' + , connState :: !(IORef ConnectionState) + +-- -- | Max request queue length. +-- , connMaxQueueLen :: !Int + + -- | Environment data. + , connSession :: !s + + , connChan :: !(Chan Message) + } + +instance Pretty (Connection s) where + pPrint Connection {..} = "Connection" + +instance ToLogStr (Connection s) where + toLogStr Connection {..} = mconcat + [ toLogStr (show connRemoteAddr) + , toLogStr (show connProtocol) + , toLogStr (show connCaps) + , toLogStr (show connTopic) + , toLogStr (show connRemotePeerId) + , toLogStr (show connThisPeerId) + , toLogStr (show connOptions) + ] + +-- TODO check extended messages too +isAllowed :: Connection s -> Message -> Bool +isAllowed Connection {..} msg + | Just ext <- requires msg = ext `allowed` connCaps + | otherwise = True + +{----------------------------------------------------------------------- +-- Hanshaking +-----------------------------------------------------------------------} + +sendHandshake :: Socket -> Handshake -> IO () +sendHandshake sock hs = sendAll sock (S.encode hs) + +recvHandshake :: Socket -> IO Handshake +recvHandshake sock = do + header <- BS.recv sock 1 + unless (BS.length header == 1) $ + throw $ userError "Unable to receive handshake header." + + let protocolLen = BS.head header + let restLen = handshakeSize protocolLen - 1 + + body <- BS.recv sock restLen + let resp = BS.cons protocolLen body + either (throwIO . userError) return $ S.decode resp + +-- | Handshaking with a peer specified by the second argument. +-- +-- It's important to send handshake first because /accepting/ peer +-- do not know handshake topic and will wait until /connecting/ peer +-- will send handshake. +-- +initiateHandshake :: Socket -> Handshake -> IO Handshake +initiateHandshake sock hs = do + sendHandshake sock hs + recvHandshake sock + +data HandshakePair = HandshakePair + { handshakeSent :: !Handshake + , handshakeRecv :: !Handshake + } deriving (Show, Eq) + +validatePair :: HandshakePair -> PeerAddr IP -> IO () +validatePair (HandshakePair hs hs') addr = Prelude.mapM_ checkProp + [ (def == hsProtocol hs', InvalidProtocol $ hsProtocol hs') + , (hsProtocol hs == hsProtocol hs', UnexpectedProtocol $ hsProtocol hs') + , (hsInfoHash hs == hsInfoHash hs', UnexpectedTopic $ hsInfoHash hs') + , (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr) + , UnexpectedPeerId $ hsPeerId hs') + ] + where + checkProp (t, e) = unless t $ throwIO $ ProtocolError e + +-- | Connection state /right/ after handshaking. +establishedStats :: HandshakePair -> ConnectionStats +establishedStats HandshakePair {..} = ConnectionStats + { outcomingFlow = FlowStats 1 $ handshakeStats handshakeSent + , incomingFlow = FlowStats 1 $ handshakeStats handshakeRecv + } + +{----------------------------------------------------------------------- +-- Wire +-----------------------------------------------------------------------} + +-- | do not expose this so we can change it without breaking api +newtype Connected s a = Connected { runConnected :: (ReaderT (Connection s) IO a) } + deriving (Functor, Applicative, Monad + , MonadIO, MonadReader (Connection s), MonadThrow + ) + +instance MonadState ConnectionState (Connected s) where + get = Connected (asks connState) >>= liftIO . readIORef + put x = Connected (asks connState) >>= liftIO . flip writeIORef x + +-- | A duplex channel connected to a remote peer which keep tracks +-- connection parameters. +type Wire s a = ConduitM Message Message (Connected s) a + +{----------------------------------------------------------------------- +-- Wrapper +-----------------------------------------------------------------------} + +putStats :: ChannelSide -> Message -> Connected s () +putStats side msg = connStats %= addStats side (stats msg) + +validate :: ChannelSide -> Message -> Connected s () +validate side msg = do + caps <- asks connCaps + case requires msg of + Nothing -> return () + Just ext + | ext `allowed` caps -> return () + | otherwise -> protocolError $ DisallowedMessage side ext + +trackFlow :: ChannelSide -> Wire s () +trackFlow side = iterM $ do + validate side + putStats side + +{----------------------------------------------------------------------- +-- Setup +-----------------------------------------------------------------------} + +-- System.Timeout.timeout multiplier +seconds :: Int +seconds = 1000000 + +sinkChan :: MonadIO m => Chan Message -> Sink Message m () +sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan) + +sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message +sourceChan interval chan = do + mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan + yield $ fromMaybe Msg.KeepAlive mmsg + +-- | Normally you should use 'connectWire' or 'acceptWire'. +runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO () +runWire action sock chan conn = flip runReaderT conn $ runConnected $ + sourceSocket sock $= + conduitGet S.get $= + trackFlow RemotePeer $= + action $= + trackFlow ThisPeer C.$$ + sinkChan chan + +-- | This function will block until a peer send new message. You can +-- also use 'await'. +recvMessage :: Wire s Message +recvMessage = await >>= maybe (monadThrow PeerDisconnected) return + +-- | You can also use 'yield'. +sendMessage :: PeerMessage msg => msg -> Wire s () +sendMessage msg = do + ecaps <- use connExtCaps + yield $ envelop ecaps msg + +getMaxQueueLength :: Connected s Int +getMaxQueueLength = do + advertisedLen <- ehsQueueLength <$> use connRemoteEhs + defaultLen <- asks (requestQueueLength . connOptions) + return $ fromMaybe defaultLen advertisedLen + +-- | Filter pending messages from send buffer. +filterQueue :: (Message -> Bool) -> Wire s () +filterQueue p = lift $ do + chan <- asks connChan + liftIO $ getChanContents chan >>= writeList2Chan chan . L.filter p + +-- | Forcefully terminate wire session and close socket. +disconnectPeer :: Wire s a +disconnectPeer = monadThrow DisconnectPeer + +extendedHandshake :: ExtendedCaps -> Wire s () +extendedHandshake caps = do + -- TODO add other params to the handshake + sendMessage $ nullExtendedHandshake caps + msg <- recvMessage + case msg of + Extended (EHandshake remoteEhs@(ExtendedHandshake {..})) -> do + connExtCaps .= (ehsCaps <> caps) + connRemoteEhs .= remoteEhs + _ -> protocolError HandshakeRefused + +rehandshake :: ExtendedCaps -> Wire s () +rehandshake caps = error "rehandshake" + +reconnect :: Wire s () +reconnect = error "reconnect" + +data ConnectionId = ConnectionId + { topic :: !InfoHash + , remoteAddr :: !(PeerAddr IP) + , thisAddr :: !(PeerAddr (Maybe IP)) -- ^ foreign address of this node. + } + +-- | /Preffered/ settings of wire. To get the real use 'ask'. +data ConnectionPrefs = ConnectionPrefs + { prefOptions :: !Options + , prefProtocol :: !ProtocolName + , prefCaps :: !Caps + , prefExtCaps :: !ExtendedCaps + } deriving (Show, Eq) + +instance Default ConnectionPrefs where + def = ConnectionPrefs + { prefOptions = def + , prefProtocol = def + , prefCaps = def + , prefExtCaps = def + } + +normalize :: ConnectionPrefs -> ConnectionPrefs +normalize = error "normalize" + +-- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'. +data SessionLink s = SessionLink + { linkTopic :: !(InfoHash) + , linkPeerId :: !(PeerId) + , linkMetadataSize :: !(Maybe Int) + , linkOutputChan :: !(Maybe (Chan Message)) + , linkSession :: !(s) + } + +data ConnectionConfig s = ConnectionConfig + { cfgPrefs :: !(ConnectionPrefs) + , cfgSession :: !(SessionLink s) + , cfgWire :: !(Wire s ()) + } + +configHandshake :: ConnectionConfig s -> Handshake +configHandshake ConnectionConfig {..} = Handshake + { hsProtocol = prefProtocol cfgPrefs + , hsReserved = prefCaps cfgPrefs + , hsInfoHash = linkTopic cfgSession + , hsPeerId = linkPeerId cfgSession + } + +{----------------------------------------------------------------------- +-- Pending connections +-----------------------------------------------------------------------} + +-- | Connection in half opened state. A normal usage scenario: +-- +-- * Opened using 'newPendingConnection', usually in the listener +-- loop; +-- +-- * Closed using 'closePending' if 'pendingPeer' is banned, +-- 'pendingCaps' is prohibited or pendingTopic is unknown; +-- +-- * Accepted using 'acceptWire' otherwise. +-- +data PendingConnection = PendingConnection + { pendingSock :: Socket + , pendingPeer :: PeerAddr IP -- ^ 'peerId' is always non empty; + , pendingCaps :: Caps -- ^ advertised by the peer; + , pendingTopic :: InfoHash -- ^ possible non-existent topic. + } + +-- | Reconstruct handshake sent by the remote peer. +pendingHandshake :: PendingConnection -> Handshake +pendingHandshake PendingConnection {..} = Handshake + { hsProtocol = def + , hsReserved = pendingCaps + , hsInfoHash = pendingTopic + , hsPeerId = fromMaybe (error "pendingHandshake: impossible") + (peerId pendingPeer) + } + +-- | +-- +-- This function can throw 'WireFailure' exception. +-- +newPendingConnection :: Socket -> PeerAddr IP -> IO PendingConnection +newPendingConnection sock addr = do + Handshake {..} <- recvHandshake sock + unless (hsProtocol == def) $ do + throwIO $ ProtocolError $ InvalidProtocol hsProtocol + return PendingConnection + { pendingSock = sock + , pendingPeer = addr { peerId = Just hsPeerId } + , pendingCaps = hsReserved + , pendingTopic = hsInfoHash + } + +-- | Release all resources associated with the given connection. Note +-- that you /must not/ 'closePending' if you 'acceptWire'. +closePending :: PendingConnection -> IO () +closePending PendingConnection {..} = do + close pendingSock + +{----------------------------------------------------------------------- +-- Connection setup +-----------------------------------------------------------------------} + +chanToSock :: Int -> Chan Message -> Socket -> IO () +chanToSock ka chan sock = + sourceChan ka chan $= conduitPut S.put C.$$ sinkSocket sock + +afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair + -> ConnectionConfig s -> IO () +afterHandshaking initiator addr sock + hpair @ (HandshakePair hs hs') + (ConnectionConfig + { cfgPrefs = ConnectionPrefs {..} + , cfgSession = SessionLink {..} + , cfgWire = wire + }) = do + let caps = hsReserved hs <> hsReserved hs' + cstate <- newIORef def { _connStats = establishedStats hpair } + chan <- maybe newChan return linkOutputChan + let conn = Connection { + connInitiatedBy = initiator + , connRemoteAddr = addr + , connProtocol = hsProtocol hs + , connCaps = caps + , connTopic = hsInfoHash hs + , connRemotePeerId = hsPeerId hs' + , connThisPeerId = hsPeerId hs + , connOptions = def + , connState = cstate + , connSession = linkSession + , connChan = chan + } + + -- TODO make KA interval configurable + let kaInterval = defaultKeepAliveInterval + wire' = if ExtExtended `allowed` caps + then extendedHandshake prefExtCaps >> wire + else wire + + bracket (forkIO (chanToSock kaInterval chan sock)) + (killThread) + (\ _ -> runWire wire' sock chan conn) + +-- | Initiate 'Wire' connection and handshake with a peer. This function will +-- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on +-- both sides. +-- +-- This function can throw 'WireFailure' exception. +-- +connectWire :: PeerAddr IP -> ConnectionConfig s -> IO () +connectWire addr cfg = do + let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return + bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do + let hs = configHandshake cfg + hs' <- initiateHandshake sock hs + let hpair = HandshakePair hs hs' + validatePair hpair addr + afterHandshaking ThisPeer addr sock hpair cfg + +-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed +-- socket. For peer listener loop the 'acceptSafe' should be +-- prefered against 'accept'. The socket will be closed at exit. +-- +-- This function can throw 'WireFailure' exception. +-- +acceptWire :: PendingConnection -> ConnectionConfig s -> IO () +acceptWire pc @ PendingConnection {..} cfg = do + bracket (return pendingSock) close $ \ _ -> do + unless (linkTopic (cfgSession cfg) == pendingTopic) $ do + throwIO (ProtocolError (UnexpectedTopic pendingTopic)) + + let hs = configHandshake cfg + sendHandshake pendingSock hs + let hpair = HandshakePair hs (pendingHandshake pc) + + afterHandshaking RemotePeer pendingPeer pendingSock hpair cfg + +-- | Used when size of bitfield becomes known. +resizeBitfield :: Int -> Connected s () +resizeBitfield n = connBitfield %= adjustSize n diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs new file mode 100644 index 00000000..981db2fb --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs @@ -0,0 +1,296 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- +-- +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE TemplateHaskell #-} +module Network.BitTorrent.Exchange.Download + ( -- * Downloading + Download (..) + , Updates + , runDownloadUpdates + + -- ** Metadata + -- $metadata-download + , MetadataDownload + , metadataDownload + + -- ** Content + -- $content-download + , ContentDownload + , contentDownload + ) where + +import Control.Applicative +import Control.Concurrent +import Control.Lens +import Control.Monad.State +import Data.BEncode as BE +import Data.ByteString as BS +import Data.ByteString.Lazy as BL +import Data.Default +import Data.List as L +import Data.Maybe +import Data.Map as M +import Data.Tuple + +import Data.Torrent as Torrent +import Network.Address +import Network.BitTorrent.Exchange.Bitfield as BF +import Network.BitTorrent.Exchange.Block as Block +import Network.BitTorrent.Exchange.Message as Msg +import System.Torrent.Storage (Storage, writePiece) + + +{----------------------------------------------------------------------- +-- Class +-----------------------------------------------------------------------} + +type Updates s a = StateT s IO a + +runDownloadUpdates :: MVar s -> Updates s a -> IO a +runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m) + +class Download s chunk | s -> chunk where + scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx] + + -- | + scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx) + scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf + + -- | Get number of sent requests to this peer. + getRequestQueueLength :: PeerAddr IP -> Updates s Int + + -- | Remove all pending block requests to the remote peer. May be used + -- when: + -- + -- * a peer closes connection; + -- + -- * remote peer choked this peer; + -- + -- * timeout expired. + -- + resetPending :: PeerAddr IP -> Updates s () + + -- | MAY write to storage, if a new piece have been completed. + -- + -- You should check if a returned by peer block is actually have + -- been requested and in-flight. This is needed to avoid "I send + -- random corrupted block" attacks. + pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool) + +{----------------------------------------------------------------------- +-- Metadata download +-----------------------------------------------------------------------} +-- $metadata-download +-- TODO + +data MetadataDownload = MetadataDownload + { _pendingPieces :: [(PeerAddr IP, PieceIx)] + , _bucket :: Bucket + , _topic :: InfoHash + } + +makeLenses ''MetadataDownload + +-- | Create a new scheduler for infodict of the given size. +metadataDownload :: Int -> InfoHash -> MetadataDownload +metadataDownload ps = MetadataDownload [] (Block.empty ps) + +instance Default MetadataDownload where + def = error "instance Default MetadataDownload" + +--cancelPending :: PieceIx -> Updates () +cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd) + +instance Download MetadataDownload (Piece BS.ByteString) where + scheduleBlock addr bf = do + bkt <- use bucket + case spans metadataPieceSize bkt of + [] -> return Nothing + ((off, _ ) : _) -> do + let pix = off `div` metadataPieceSize + pendingPieces %= ((addr, pix) :) + return (Just (BlockIx pix 0 metadataPieceSize)) + + resetPending addr = pendingPieces %= L.filter ((addr ==) . fst) + + pushBlock addr Torrent.Piece {..} = do + p <- use pendingPieces + when ((addr, pieceIndex) `L.notElem` p) $ + error "not requested" + cancelPending pieceIndex + + bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData + b <- use bucket + case toPiece b of + Nothing -> return Nothing + Just chunks -> do + t <- use topic + case parseInfoDict (BL.toStrict chunks) t of + Right x -> do + pendingPieces .= [] + return undefined -- (Just x) + Left e -> do + pendingPieces .= [] + bucket .= Block.empty (Block.size b) + return undefined -- Nothing + where + -- todo use incremental parsing to avoid BS.concat call + parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict + parseInfoDict chunk topic = + case BE.decode chunk of + Right (infodict @ InfoDict {..}) + | topic == idInfoHash -> return infodict + | otherwise -> Left "broken infodict" + Left err -> Left $ "unable to parse infodict " ++ err + +{----------------------------------------------------------------------- +-- Content download +-----------------------------------------------------------------------} +-- $content-download +-- +-- A block can have one of the following status: +-- +-- 1) /not allowed/: Piece is not in download set. +-- +-- 2) /waiting/: (allowed?) Block have been allowed to download, +-- but /this/ peer did not send any 'Request' message for this +-- block. To allow some piece use +-- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' +-- and 'allowPiece'. +-- +-- 3) /inflight/: (pending?) Block have been requested but +-- /remote/ peer did not send any 'Piece' message for this block. +-- Related functions 'markInflight' +-- +-- 4) /pending/: (stalled?) Block have have been downloaded +-- Related functions 'insertBlock'. +-- +-- Piece status: +-- +-- 1) /assembled/: (downloaded?) All blocks in piece have been +-- downloaded but the piece did not verified yet. +-- +-- * Valid: go to completed; +-- +-- * Invalid: go to waiting. +-- +-- 2) /corrupted/: +-- +-- 3) /downloaded/: (verified?) A piece have been successfully +-- verified via the hash. Usually the piece should be stored to +-- the 'System.Torrent.Storage' and /this/ peer should send 'Have' +-- messages to the /remote/ peers. +-- + +data PieceEntry = PieceEntry + { pending :: [(PeerAddr IP, BlockIx)] + , stalled :: Bucket + } + +pieceEntry :: PieceSize -> PieceEntry +pieceEntry s = PieceEntry [] (Block.empty s) + +isEmpty :: PieceEntry -> Bool +isEmpty PieceEntry {..} = L.null pending && Block.null stalled + +_holes :: PieceIx -> PieceEntry -> [BlockIx] +_holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) + where + mkBlockIx (off, sz) = BlockIx pix off sz + +data ContentDownload = ContentDownload + { inprogress :: !(Map PieceIx PieceEntry) + , bitfield :: !Bitfield + , pieceSize :: !PieceSize + , contentStorage :: Storage + } + +contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload +contentDownload = ContentDownload M.empty + +--modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates () +modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s + { inprogress = alter (g pieceSize) pix inprogress } + where + g s = h . f . fromMaybe (pieceEntry s) + h e + | isEmpty e = Nothing + | otherwise = Just e + +instance Download ContentDownload (Block BL.ByteString) where + scheduleBlocks n addr maskBF = do + ContentDownload {..} <- get + let wantPieces = maskBF `BF.difference` bitfield + let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $ + M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) + inprogress + + bixs <- if L.null wantBlocks + then do + mpix <- choosePiece wantPieces + case mpix of -- TODO return 'n' blocks + Nothing -> return [] + Just pix -> return [leadingBlock pix defaultTransferSize] + else chooseBlocks wantBlocks n + + forM_ bixs $ \ bix -> do + modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e + { pending = (addr, bix) : pending } + + return bixs + where + -- TODO choose block nearest to pending or stalled sets to reduce disk + -- seeks on remote machines + --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx] + chooseBlocks xs n = return (L.take n xs) + + -- TODO use selection strategies from Exchange.Selector + --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx) + choosePiece bf + | BF.null bf = return $ Nothing + | otherwise = return $ Just $ BF.findMin bf + + getRequestQueueLength addr = do + m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress) + return $ L.sum $ L.map L.length $ M.elems m + + resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } + where + reset = fmap $ \ e -> e + { pending = L.filter (not . (==) addr . fst) (pending e) } + + pushBlock addr blk @ Block {..} = do + mpe <- gets (M.lookup blkPiece . inprogress) + case mpe of + Nothing -> return Nothing + Just (pe @ PieceEntry {..}) + | blockIx blk `L.notElem` fmap snd pending -> return Nothing + | otherwise -> do + let bkt' = Block.insertLazy blkOffset blkData stalled + case toPiece bkt' of + Nothing -> do + modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e + { pending = L.filter ((==) (blockIx blk) . snd) pending + , stalled = bkt' + } + return (Just False) + + Just pieceData -> do + -- TODO verify + storage <- gets contentStorage + liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage + modify $ \ s @ ContentDownload {..} -> s + { inprogress = M.delete blkPiece inprogress + , bitfield = BF.insert blkPiece bitfield + } + return (Just True) diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs new file mode 100644 index 00000000..30a6a607 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs @@ -0,0 +1,62 @@ +module Network.BitTorrent.Exchange.Manager + ( Options (..) + , Manager + , Handler + , newManager + , closeManager + ) where + +import Control.Concurrent +import Control.Exception hiding (Handler) +import Control.Monad +import Data.Default +import Network.Socket + +import Data.Torrent +import Network.Address +import Network.BitTorrent.Exchange.Connection hiding (Options) +import Network.BitTorrent.Exchange.Session + + +data Options = Options + { optBacklog :: Int + , optPeerAddr :: PeerAddr IP + } deriving (Show, Eq) + +instance Default Options where + def = Options + { optBacklog = maxListenQueue + , optPeerAddr = def + } + +data Manager = Manager + { listener :: !ThreadId + } + +type Handler = InfoHash -> IO Session + +handleNewConn :: Socket -> PeerAddr IP -> Handler -> IO () +handleNewConn sock addr handler = do + conn <- newPendingConnection sock addr + ses <- handler (pendingTopic conn) `onException` closePending conn + establish conn ses + +listenIncoming :: Options -> Handler -> IO () +listenIncoming Options {..} handler = do + bracket (socket AF_INET Stream defaultProtocol) close $ \ sock -> do + bind sock (toSockAddr optPeerAddr) + listen sock optBacklog + forever $ do + (conn, sockAddr) <- accept sock + case fromSockAddr sockAddr of + Nothing -> return () + Just addr -> void $ forkIO $ handleNewConn sock addr handler + +newManager :: Options -> Handler -> IO Manager +newManager opts handler = do + tid <- forkIO $ listenIncoming opts handler + return (Manager tid) + +closeManager :: Manager -> IO () +closeManager Manager {..} = do + killThread listener \ No newline at end of file diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Message.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Message.hs new file mode 100644 index 00000000..5c096523 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Message.hs @@ -0,0 +1,1237 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Normally peer to peer communication consisting of the following +-- steps: +-- +-- * In order to establish the connection between peers we should +-- send 'Handshake' message. The 'Handshake' is a required message +-- and must be the first message transmitted by the peer to the +-- another peer. Another peer should reply with a handshake as well. +-- +-- * Next peer might sent bitfield message, but might not. In the +-- former case we should update bitfield peer have. Again, if we +-- have some pieces we should send bitfield. Normally bitfield +-- message should sent after the handshake message. +-- +-- * Regular exchange messages. TODO docs +-- +-- For more high level API see "Network.BitTorrent.Exchange" module. +-- +-- For more infomation see: +-- +-- +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE CPP #-} +{-# OPTIONS -fno-warn-orphans #-} +module Network.BitTorrent.Exchange.Message + ( -- * Capabilities + Capabilities (..) + , Extension (..) + , Caps + + -- * Handshake + , ProtocolName + , Handshake(..) + , defaultHandshake + , handshakeSize + , handshakeMaxSize + , handshakeStats + + -- * Stats + , ByteCount + , ByteStats (..) + , byteLength + + -- * Messages + , Message (..) + , defaultKeepAliveTimeout + , defaultKeepAliveInterval + , PeerMessage (..) + + -- ** Core messages + , StatusUpdate (..) + , Available (..) + , Transfer (..) + , defaultRequestQueueLength + + -- ** Fast extension + , FastMessage (..) + + -- ** Extension protocol + , ExtendedMessage (..) + + -- *** Capabilities + , ExtendedExtension (..) + , ExtendedCaps (..) + + -- *** Handshake + , ExtendedHandshake (..) + , defaultQueueLength + , nullExtendedHandshake + + -- *** Metadata + , ExtendedMetadata (..) + , metadataPieceSize + , defaultMetadataFactor + , defaultMaxInfoDictSize + , isLastPiece + , isValidPiece + ) where + +import Control.Applicative +import Control.Arrow ((&&&), (***)) +import Control.Monad (when) +import Data.Attoparsec.ByteString.Char8 as BS +import Data.BEncode as BE +import Data.BEncode.BDict as BE +import Data.BEncode.Internal as BE (ppBEncode, parser) +import Data.BEncode.Types (BDict) +import Data.Bits +import Data.ByteString as BS +import Data.ByteString.Char8 as BC +import Data.ByteString.Lazy as BL +import Data.Default +import Data.List as L +import Data.Map.Strict as M +import Data.Maybe +import Data.Monoid +import Data.Ord +import Data.Serialize as S +import Data.String +import Data.Text as T +import Data.Typeable +import Data.Word +#if MIN_VERSION_iproute(1,7,4) +import Data.IP hiding (fromSockAddr) +#else +import Data.IP +#endif +import Network +import Network.Socket hiding (KeepAlive) +import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) + +import Data.Torrent hiding (Piece (..)) +import qualified Data.Torrent as P (Piece (..)) +import Network.Address +import Network.BitTorrent.Exchange.Bitfield +import Network.BitTorrent.Exchange.Block + +{----------------------------------------------------------------------- +-- Capabilities +-----------------------------------------------------------------------} + +-- | +class Capabilities caps where + type Ext caps :: * + + -- | Pack extensions to caps. + toCaps :: [Ext caps] -> caps + + -- | Unpack extensions from caps. + fromCaps :: caps -> [Ext caps] + + -- | Check if an extension is a member of the specified set. + allowed :: Ext caps -> caps -> Bool + +ppCaps :: Capabilities caps => Pretty (Ext caps) => caps -> Doc +ppCaps = hcat . punctuate ", " . L.map pPrint . fromCaps + +{----------------------------------------------------------------------- +-- Extensions +-----------------------------------------------------------------------} + +-- | Enumeration of message extension protocols. +-- +-- For more info see: +-- +data Extension + = ExtDHT -- ^ BEP 5: allow to send PORT messages. + | ExtFast -- ^ BEP 6: allow to send FAST messages. + | ExtExtended -- ^ BEP 10: allow to send the extension protocol messages. + deriving (Show, Eq, Ord, Enum, Bounded) + +-- | Full extension names, suitable for logging. +instance Pretty Extension where + pPrint ExtDHT = "Distributed Hash Table Protocol" + pPrint ExtFast = "Fast Extension" + pPrint ExtExtended = "Extension Protocol" + +-- | Extension bitmask as specified by BEP 4. +extMask :: Extension -> Word64 +extMask ExtDHT = 0x01 +extMask ExtFast = 0x04 +extMask ExtExtended = 0x100000 + +{----------------------------------------------------------------------- +-- Capabilities +-----------------------------------------------------------------------} + +-- | Capabilities is a set of 'Extension's usually sent in 'Handshake' +-- messages. +newtype Caps = Caps Word64 + deriving (Show, Eq) + +-- | Render set of extensions as comma separated list. +instance Pretty Caps where + pPrint = ppCaps + {-# INLINE pPrint #-} + +-- | The empty set. +instance Default Caps where + def = Caps 0 + {-# INLINE def #-} + +-- | Monoid under intersection. 'mempty' includes all known extensions. +instance Monoid Caps where + mempty = toCaps [minBound .. maxBound] + {-# INLINE mempty #-} + + mappend (Caps a) (Caps b) = Caps (a .&. b) + {-# INLINE mappend #-} + +-- | 'Handshake' compatible encoding. +instance Serialize Caps where + put (Caps caps) = S.putWord64be caps + {-# INLINE put #-} + + get = Caps <$> S.getWord64be + {-# INLINE get #-} + +instance Capabilities Caps where + type Ext Caps = Extension + + allowed e (Caps caps) = (extMask e .&. caps) /= 0 + {-# INLINE allowed #-} + + toCaps = Caps . L.foldr (.|.) 0 . L.map extMask + fromCaps caps = L.filter (`allowed` caps) [minBound..maxBound] + +{----------------------------------------------------------------------- + Handshake +-----------------------------------------------------------------------} + +maxProtocolNameSize :: Word8 +maxProtocolNameSize = maxBound + +-- | The protocol name is used to identify to the local peer which +-- version of BTP the remote peer uses. +newtype ProtocolName = ProtocolName BS.ByteString + deriving (Eq, Ord, Typeable) + +-- | In BTP/1.0 the name is 'BitTorrent protocol'. If this string is +-- different from the local peers own protocol name, then the +-- connection is to be dropped. +instance Default ProtocolName where + def = ProtocolName "BitTorrent protocol" + +instance Show ProtocolName where + show (ProtocolName bs) = show bs + +instance Pretty ProtocolName where + pPrint (ProtocolName bs) = PP.text $ BC.unpack bs + +instance IsString ProtocolName where + fromString str + | L.length str <= fromIntegral maxProtocolNameSize + = ProtocolName (fromString str) + | otherwise = error $ "fromString: ProtocolName too long: " ++ str + +instance Serialize ProtocolName where + put (ProtocolName bs) = do + putWord8 $ fromIntegral $ BS.length bs + putByteString bs + + get = do + len <- getWord8 + bs <- getByteString $ fromIntegral len + return (ProtocolName bs) + +-- | Handshake message is used to exchange all information necessary +-- to establish connection between peers. +-- +data Handshake = Handshake { + -- | Identifier of the protocol. This is usually equal to 'def'. + hsProtocol :: ProtocolName + + -- | Reserved bytes used to specify supported BEP's. + , hsReserved :: Caps + + -- | Info hash of the info part of the metainfo file. that is + -- transmitted in tracker requests. Info hash of the initiator + -- handshake and response handshake should match, otherwise + -- initiator should break the connection. + -- + , hsInfoHash :: InfoHash + + -- | Peer id of the initiator. This is usually the same peer id + -- that is transmitted in tracker requests. + -- + , hsPeerId :: PeerId + + } deriving (Show, Eq) + +instance Serialize Handshake where + put Handshake {..} = do + put hsProtocol + put hsReserved + put hsInfoHash + put hsPeerId + get = Handshake <$> get <*> get <*> get <*> get + +-- | Show handshake protocol string, caps and fingerprint. +instance Pretty Handshake where + pPrint Handshake {..} + = pPrint hsProtocol $$ + pPrint hsReserved $$ + pPrint (fingerprint hsPeerId) + +-- | Get handshake message size in bytes from the length of protocol +-- string. +handshakeSize :: Word8 -> Int +handshakeSize n = 1 + fromIntegral n + 8 + 20 + 20 + +-- | Maximum size of handshake message in bytes. +handshakeMaxSize :: Int +handshakeMaxSize = handshakeSize maxProtocolNameSize + +-- | Handshake with default protocol string and reserved bitmask. +defaultHandshake :: InfoHash -> PeerId -> Handshake +defaultHandshake = Handshake def def + +handshakeStats :: Handshake -> ByteStats +handshakeStats (Handshake (ProtocolName bs) _ _ _) + = ByteStats 1 (BS.length bs + 8 + 20 + 20) 0 + +{----------------------------------------------------------------------- +-- Stats +-----------------------------------------------------------------------} + +-- | Number of bytes. +type ByteCount = Int + +-- | Summary of encoded message byte layout can be used to collect +-- stats about message flow in both directions. This data can be +-- retrieved using 'stats' function. +data ByteStats = ByteStats + { -- | Number of bytes used to help encode 'control' and 'payload' + -- bytes: message size, message ID's, etc + overhead :: {-# UNPACK #-} !ByteCount + + -- | Number of bytes used to exchange peers state\/options: piece + -- and block indexes, infohash, port numbers, peer ID\/IP, etc. + , control :: {-# UNPACK #-} !ByteCount + + -- | Number of payload bytes: torrent data blocks and infodict + -- metadata. + , payload :: {-# UNPACK #-} !ByteCount + } deriving Show + +instance Pretty ByteStats where + pPrint s @ ByteStats {..} = fsep + [ PP.int overhead, "overhead" + , PP.int control, "control" + , PP.int payload, "payload" + , "bytes" + ] $+$ fsep + [ PP.int (byteLength s), "total bytes" + ] + +-- | Empty byte sequences. +instance Default ByteStats where + def = ByteStats 0 0 0 + +-- | Monoid under addition. +instance Monoid ByteStats where + mempty = def + mappend a b = ByteStats + { overhead = overhead a + overhead b + , control = control a + control b + , payload = payload a + payload b + } + +-- | Sum of the all byte sequences. +byteLength :: ByteStats -> Int +byteLength ByteStats {..} = overhead + control + payload + +{----------------------------------------------------------------------- +-- Regular messages +-----------------------------------------------------------------------} + +-- | Messages which can be sent after handshaking. Minimal complete +-- definition: 'envelop'. +class PeerMessage a where + -- | Construct a message to be /sent/. Note that if 'ExtendedCaps' + -- do not contain mapping for this message the default + -- 'ExtendedMessageId' is used. + envelop :: ExtendedCaps -- ^ The /receiver/ extended capabilities; + -> a -- ^ An regular message; + -> Message -- ^ Enveloped message to sent. + + -- | Find out the extension this message belong to. Can be used to + -- check if this message is allowed to send\/recv in current + -- session. + requires :: a -> Maybe Extension + requires _ = Nothing + + -- | Get sizes of overhead\/control\/payload byte sequences of + -- binary message representation without encoding message to binary + -- bytestring. + -- + -- This function should obey one law: + -- + -- * 'byteLength' ('stats' msg) == 'BL.length' ('encode' msg) + -- + stats :: a -> ByteStats + stats _ = ByteStats 4 0 0 + +{----------------------------------------------------------------------- +-- Status messages +-----------------------------------------------------------------------} + +-- | Notification that the sender have updated its +-- 'Network.BitTorrent.Exchange.Status.PeerStatus'. +data StatusUpdate + -- | Notification that the sender will not upload data to the + -- receiver until unchoking happen. + = Choking !Bool + + -- | Notification that the sender is interested (or not interested) + -- in any of the receiver's data pieces. + | Interested !Bool + deriving (Show, Eq, Ord, Typeable) + +instance Pretty StatusUpdate where + pPrint (Choking False) = "not choking" + pPrint (Choking True ) = "choking" + pPrint (Interested False) = "not interested" + pPrint (Interested True ) = "interested" + +instance PeerMessage StatusUpdate where + envelop _ = Status + {-# INLINE envelop #-} + + stats _ = ByteStats 4 1 0 + {-# INLINE stats #-} + +{----------------------------------------------------------------------- +-- Available messages +-----------------------------------------------------------------------} + +-- | Messages used to inform receiver which pieces of the torrent +-- sender have. +data Available = + -- | Zero-based index of a piece that has just been successfully + -- downloaded and verified via the hash. + Have ! PieceIx + + -- | The bitfield message may only be sent immediately after the + -- handshaking sequence is complete, and before any other message + -- are sent. If client have no pieces then bitfield need not to be + -- sent. + | Bitfield !Bitfield + deriving (Show, Eq) + +instance Pretty Available where + pPrint (Have ix ) = "Have" <+> int ix + pPrint (Bitfield _ ) = "Bitfield" + +instance PeerMessage Available where + envelop _ = Available + {-# INLINE envelop #-} + + stats (Have _) = ByteStats (4 + 1) 4 0 + stats (Bitfield bf) = ByteStats (4 + 1) (q + trailing) 0 + where + trailing = if r == 0 then 0 else 1 + (q, r) = quotRem (totalCount bf) 8 + +{----------------------------------------------------------------------- +-- Transfer messages +-----------------------------------------------------------------------} + +-- | Messages used to transfer 'Block's. +data Transfer + -- | Request for a particular block. If a client is requested a + -- block that another peer do not have the peer might not answer + -- at all. + = Request ! BlockIx + + -- | Response to a request for a block. + | Piece !(Block BL.ByteString) + + -- | Used to cancel block requests. It is typically used during + -- "End Game". + | Cancel !BlockIx + deriving (Show, Eq) + +instance Pretty Transfer where + pPrint (Request ix ) = "Request" <+> pPrint ix + pPrint (Piece blk) = "Piece" <+> pPrint blk + pPrint (Cancel i ) = "Cancel" <+> pPrint i + +instance PeerMessage Transfer where + envelop _ = Transfer + {-# INLINE envelop #-} + + stats (Request _ ) = ByteStats (4 + 1) (3 * 4) 0 + stats (Piece p ) = ByteStats (4 + 1) (4 + 4 + blockSize p) 0 + stats (Cancel _ ) = ByteStats (4 + 1) (3 * 4) 0 + +-- TODO increase +-- | Max number of pending 'Request's inflight. +defaultRequestQueueLength :: Int +defaultRequestQueueLength = 1 + +{----------------------------------------------------------------------- +-- Fast messages +-----------------------------------------------------------------------} + +-- | BEP6 messages. +data FastMessage = + -- | If a peer have all pieces it might send the 'HaveAll' message + -- instead of 'Bitfield' message. Used to save bandwidth. + HaveAll + + -- | If a peer have no pieces it might send 'HaveNone' message + -- intead of 'Bitfield' message. Used to save bandwidth. + | HaveNone + + -- | This is an advisory message meaning "you might like to + -- download this piece." Used to avoid excessive disk seeks and + -- amount of IO. + | SuggestPiece !PieceIx + + -- | Notifies a requesting peer that its request will not be + -- satisfied. + | RejectRequest !BlockIx + + -- | This is an advisory messsage meaning \"if you ask for this + -- piece, I'll give it to you even if you're choked.\" Used to + -- shorten starting phase. + | AllowedFast !PieceIx + deriving (Show, Eq) + +instance Pretty FastMessage where + pPrint (HaveAll ) = "Have all" + pPrint (HaveNone ) = "Have none" + pPrint (SuggestPiece pix) = "Suggest" <+> int pix + pPrint (RejectRequest bix) = "Reject" <+> pPrint bix + pPrint (AllowedFast pix) = "Allowed fast" <+> int pix + +instance PeerMessage FastMessage where + envelop _ = Fast + {-# INLINE envelop #-} + + requires _ = Just ExtFast + {-# INLINE requires #-} + + stats HaveAll = ByteStats 4 1 0 + stats HaveNone = ByteStats 4 1 0 + stats (SuggestPiece _) = ByteStats 5 4 0 + stats (RejectRequest _) = ByteStats 5 12 0 + stats (AllowedFast _) = ByteStats 5 4 0 + +{----------------------------------------------------------------------- +-- Extension protocol +-----------------------------------------------------------------------} + +{----------------------------------------------------------------------- +-- Extended capabilities +-----------------------------------------------------------------------} + +data ExtendedExtension + = ExtMetadata -- ^ BEP 9: Extension for Peers to Send Metadata Files + deriving (Show, Eq, Ord, Enum, Bounded, Typeable) + +instance IsString ExtendedExtension where + fromString = fromMaybe (error msg) . fromKey . fromString + where + msg = "fromString: could not parse ExtendedExtension" + +instance Pretty ExtendedExtension where + pPrint ExtMetadata = "Extension for Peers to Send Metadata Files" + +fromKey :: BKey -> Maybe ExtendedExtension +fromKey "ut_metadata" = Just ExtMetadata +fromKey _ = Nothing +{-# INLINE fromKey #-} + +toKey :: ExtendedExtension -> BKey +toKey ExtMetadata = "ut_metadata" +{-# INLINE toKey #-} + +type ExtendedMessageId = Word8 + +extId :: ExtendedExtension -> ExtendedMessageId +extId ExtMetadata = 1 +{-# INLINE extId #-} + +type ExtendedMap = Map ExtendedExtension ExtendedMessageId + +-- | The extension IDs must be stored for every peer, because every +-- peer may have different IDs for the same extension. +-- +newtype ExtendedCaps = ExtendedCaps { extendedCaps :: ExtendedMap } + deriving (Show, Eq) + +instance Pretty ExtendedCaps where + pPrint = ppCaps + {-# INLINE pPrint #-} + +-- | The empty set. +instance Default ExtendedCaps where + def = ExtendedCaps M.empty + +-- | Monoid under intersection: +-- +-- * The 'mempty' caps includes all known extensions; +-- +-- * the 'mappend' operation is NOT commutative: it return message +-- id from the first caps for the extensions existing in both caps. +-- +instance Monoid ExtendedCaps where + mempty = toCaps [minBound..maxBound] + mappend (ExtendedCaps a) (ExtendedCaps b) = + ExtendedCaps (M.intersection a b) + +appendBDict :: BDict -> ExtendedMap -> ExtendedMap +appendBDict (Cons key val xs) caps + | Just ext <- fromKey key + , Right eid <- fromBEncode val = M.insert ext eid (appendBDict xs caps) + | otherwise = appendBDict xs caps +appendBDict Nil caps = caps + +-- | Handshake compatible encoding. +instance BEncode ExtendedCaps where + toBEncode = BDict . BE.fromAscList . L.sortBy (comparing fst) + . L.map (toKey *** toBEncode) . M.toList . extendedCaps + + fromBEncode (BDict bd) = pure $ ExtendedCaps $ appendBDict bd M.empty + fromBEncode _ = decodingError "ExtendedCaps" + +instance Capabilities ExtendedCaps where + type Ext ExtendedCaps = ExtendedExtension + + toCaps = ExtendedCaps . M.fromList . L.map (id &&& extId) + + fromCaps = M.keys . extendedCaps + {-# INLINE fromCaps #-} + + allowed e (ExtendedCaps caps) = M.member e caps + {-# INLINE allowed #-} + +remoteMessageId :: ExtendedExtension -> ExtendedCaps -> ExtendedMessageId +remoteMessageId ext = fromMaybe (extId ext) . M.lookup ext . extendedCaps + +{----------------------------------------------------------------------- +-- Extended handshake +-----------------------------------------------------------------------} + +-- | This message should be sent immediately after the standard +-- bittorrent handshake to any peer that supports this extension +-- protocol. Extended handshakes can be sent more than once, however +-- an implementation may choose to ignore subsequent handshake +-- messages. +-- +data ExtendedHandshake = ExtendedHandshake + { -- | If this peer has an IPv4 interface, this is the compact + -- representation of that address. + ehsIPv4 :: Maybe HostAddress + + -- | If this peer has an IPv6 interface, this is the compact + -- representation of that address. + , ehsIPv6 :: Maybe HostAddress6 + + -- | Dictionary of supported extension messages which maps names + -- of extensions to an extended message ID for each extension + -- message. + , ehsCaps :: ExtendedCaps + + -- | Size of 'Data.Torrent.InfoDict' in bytes. This field should + -- be added if 'ExtMetadata' is enabled in current session /and/ + -- peer have the torrent file. + , ehsMetadataSize :: Maybe Int + + -- | Local TCP /listen/ port. Allows each side to learn about the + -- TCP port number of the other side. + , ehsPort :: Maybe PortNumber + + -- | Request queue the number of outstanding 'Request' messages + -- this client supports without dropping any. + , ehsQueueLength :: Maybe Int + + -- | Client name and version. + , ehsVersion :: Maybe Text + + -- | IP of the remote end + , ehsYourIp :: Maybe IP + } deriving (Show, Eq, Typeable) + +extHandshakeId :: ExtendedMessageId +extHandshakeId = 0 + +-- | Default 'Request' queue size. +defaultQueueLength :: Int +defaultQueueLength = 1 + +-- | All fields are empty. +instance Default ExtendedHandshake where + def = ExtendedHandshake def def def def def def def def + +instance Monoid ExtendedHandshake where + mempty = def { ehsCaps = mempty } + mappend old new = ExtendedHandshake { + ehsCaps = ehsCaps old <> ehsCaps new, + ehsIPv4 = ehsIPv4 old `mergeOld` ehsIPv4 new, + ehsIPv6 = ehsIPv6 old `mergeOld` ehsIPv6 new, + ehsMetadataSize = ehsMetadataSize old `mergeNew` ehsMetadataSize new, + ehsPort = ehsPort old `mergeOld` ehsPort new, + ehsQueueLength = ehsQueueLength old `mergeNew` ehsQueueLength new, + ehsVersion = ehsVersion old `mergeOld` ehsVersion new, + ehsYourIp = ehsYourIp old `mergeOld` ehsYourIp new + } + where + mergeOld mold mnew = mold <|> mnew + mergeNew mold mnew = mnew <|> mold + + +instance BEncode ExtendedHandshake where + toBEncode ExtendedHandshake {..} = toDict $ + "ipv4" .=? (S.encode <$> ehsIPv4) + .: "ipv6" .=? (S.encode <$> ehsIPv6) + .: "m" .=! ehsCaps + .: "metadata_size" .=? ehsMetadataSize + .: "p" .=? ehsPort + .: "reqq" .=? ehsQueueLength + .: "v" .=? ehsVersion + .: "yourip" .=? (runPut <$> either put put <$> toEither <$> ehsYourIp) + .: endDict + where + toEither (IPv4 v4) = Left v4 + toEither (IPv6 v6) = Right v6 + + fromBEncode = fromDict $ ExtendedHandshake + <$>? "ipv4" + <*>? "ipv6" + <*>! "m" + <*>? "metadata_size" + <*>? "p" + <*>? "reqq" + <*>? "v" + <*> (opt "yourip" >>= getYourIp) + +getYourIp :: Maybe BValue -> BE.Get (Maybe IP) +getYourIp f = + return $ do + BString ip <- f + either (const Nothing) Just $ + case BS.length ip of + 4 -> IPv4 <$> S.decode ip + 16 -> IPv6 <$> S.decode ip + _ -> fail "" + +instance Pretty ExtendedHandshake where + pPrint = PP.text . show + +-- | NOTE: Approximated 'stats'. +instance PeerMessage ExtendedHandshake where + envelop c = envelop c . EHandshake + {-# INLINE envelop #-} + + requires _ = Just ExtExtended + {-# INLINE requires #-} + + stats _ = ByteStats (4 + 1 + 1) 100 {- is it ok? -} 0 -- FIXME + {-# INLINE stats #-} + +-- | Set default values and the specified 'ExtendedCaps'. +nullExtendedHandshake :: ExtendedCaps -> ExtendedHandshake +nullExtendedHandshake caps = ExtendedHandshake + { ehsIPv4 = Nothing + , ehsIPv6 = Nothing + , ehsCaps = caps + , ehsMetadataSize = Nothing + , ehsPort = Nothing + , ehsQueueLength = Just defaultQueueLength + , ehsVersion = Just $ T.pack $ render $ pPrint libFingerprint + , ehsYourIp = Nothing + } + +{----------------------------------------------------------------------- +-- Metadata exchange extension +-----------------------------------------------------------------------} + +-- | A peer MUST verify that any piece it sends passes the info-hash +-- verification. i.e. until the peer has the entire metadata, it +-- cannot run SHA-1 to verify that it yields the same hash as the +-- info-hash. +-- +data ExtendedMetadata + -- | This message requests the a specified metadata piece. The + -- response to this message, from a peer supporting the extension, + -- is either a 'MetadataReject' or a 'MetadataData' message. + = MetadataRequest PieceIx + + -- | If sender requested a valid 'PieceIx' and receiver have the + -- corresponding piece then receiver should respond with this + -- message. + | MetadataData + { -- | A piece of 'Data.Torrent.InfoDict'. + piece :: P.Piece BS.ByteString + + -- | This key has the same semantics as the 'ehsMetadataSize' in + -- the 'ExtendedHandshake' — it is size of the torrent info + -- dict. + , totalSize :: Int + } + + -- | Peers that do not have the entire metadata MUST respond with + -- a reject message to any metadata request. + -- + -- Clients MAY implement flood protection by rejecting request + -- messages after a certain number of them have been + -- served. Typically the number of pieces of metadata times a + -- factor. + | MetadataReject PieceIx + + -- | Reserved. By specification we should ignore unknown metadata + -- messages. + | MetadataUnknown BValue + deriving (Show, Eq, Typeable) + +-- | Extended metadata message id used in 'msg_type_key'. +type MetadataId = Int + +msg_type_key, piece_key, total_size_key :: BKey +msg_type_key = "msg_type" +piece_key = "piece" +total_size_key = "total_size" + +-- | BEP9 compatible encoding. +instance BEncode ExtendedMetadata where + toBEncode (MetadataRequest pix) = toDict $ + msg_type_key .=! (0 :: MetadataId) + .: piece_key .=! pix + .: endDict + toBEncode (MetadataData (P.Piece pix _) totalSize) = toDict $ + msg_type_key .=! (1 :: MetadataId) + .: piece_key .=! pix + .: total_size_key .=! totalSize + .: endDict + toBEncode (MetadataReject pix) = toDict $ + msg_type_key .=! (2 :: MetadataId) + .: piece_key .=! pix + .: endDict + toBEncode (MetadataUnknown bval) = bval + + fromBEncode bval = (`fromDict` bval) $ do + mid <- field $ req msg_type_key + case mid :: MetadataId of + 0 -> MetadataRequest <$>! piece_key + 1 -> metadataData <$>! piece_key <*>! total_size_key + 2 -> MetadataReject <$>! piece_key + _ -> pure (MetadataUnknown bval) + where + metadataData pix s = MetadataData (P.Piece pix BS.empty) s + +-- | Piece data bytes are omitted. +instance Pretty ExtendedMetadata where + pPrint (MetadataRequest pix ) = "Request" <+> PP.int pix + pPrint (MetadataData p t) = "Data" <+> pPrint p <+> PP.int t + pPrint (MetadataReject pix ) = "Reject" <+> PP.int pix + pPrint (MetadataUnknown bval ) = "Unknown" <+> ppBEncode bval + +-- | NOTE: Approximated 'stats'. +instance PeerMessage ExtendedMetadata where + envelop c = envelop c . EMetadata (remoteMessageId ExtMetadata c) + {-# INLINE envelop #-} + + requires _ = Just ExtExtended + {-# INLINE requires #-} + + stats (MetadataRequest _) = ByteStats (4 + 1 + 1) {- ~ -} 25 0 + stats (MetadataData p _) = ByteStats (4 + 1 + 1) {- ~ -} 41 $ + BS.length (P.pieceData p) + stats (MetadataReject _) = ByteStats (4 + 1 + 1) {- ~ -} 25 0 + stats (MetadataUnknown _) = ByteStats (4 + 1 + 1) {- ? -} 0 0 + +-- | All 'Piece's in 'MetadataData' messages MUST have size equal to +-- this value. The last trailing piece can be shorter. +metadataPieceSize :: PieceSize +metadataPieceSize = 16 * 1024 + +isLastPiece :: P.Piece a -> Int -> Bool +isLastPiece P.Piece {..} total = succ pieceIndex == pcnt + where + pcnt = q + if r > 0 then 1 else 0 + (q, r) = quotRem total metadataPieceSize + +-- TODO we can check if the piece payload bytestring have appropriate +-- length; otherwise serialization MUST fail. +isValidPiece :: P.Piece BL.ByteString -> Int -> Bool +isValidPiece p @ P.Piece {..} total + | isLastPiece p total = pieceSize p <= metadataPieceSize + | otherwise = pieceSize p == metadataPieceSize + +setMetadataPayload :: BS.ByteString -> ExtendedMetadata -> ExtendedMetadata +setMetadataPayload bs (MetadataData (P.Piece pix _) t) = + MetadataData (P.Piece pix bs) t +setMetadataPayload _ msg = msg + +getMetadataPayload :: ExtendedMetadata -> Maybe BS.ByteString +getMetadataPayload (MetadataData (P.Piece _ bs) _) = Just bs +getMetadataPayload _ = Nothing + +-- | Metadata BDict usually contain only 'msg_type_key', 'piece_key' +-- and 'total_size_key' fields so it normally should take less than +-- 100 bytes. This limit is two order of magnitude larger to be +-- permissive to 'MetadataUnknown' messages. +-- +-- See 'maxMessageSize' for further explanation. +-- +maxMetadataBDictSize :: Int +maxMetadataBDictSize = 16 * 1024 + +maxMetadataSize :: Int +maxMetadataSize = maxMetadataBDictSize + metadataPieceSize + +-- to make MetadataData constructor fields a little bit prettier we +-- cheat here: first we read empty 'pieceData' from bdict, but then we +-- fill that field with the actual piece data — trailing bytes of +-- the message +getMetadata :: Int -> S.Get ExtendedMetadata +getMetadata len + | len > maxMetadataSize = fail $ parseError "size exceeded limit" + | otherwise = do + bs <- getByteString len + parseRes $ BS.parse BE.parser bs + where + parseError reason = "unable to parse metadata message: " ++ reason + + parseRes (BS.Fail _ _ m) = fail $ parseError $ "bdict: " ++ m + parseRes (BS.Partial _) = fail $ parseError "bdict: not enough bytes" + parseRes (BS.Done piece bvalueBS) + | BS.length piece > metadataPieceSize + = fail "infodict piece: size exceeded limit" + | otherwise = do + metadata <- either (fail . parseError) pure $ fromBEncode bvalueBS + return $ setMetadataPayload piece metadata + +putMetadata :: ExtendedMetadata -> BL.ByteString +putMetadata msg + | Just bs <- getMetadataPayload msg = BE.encode msg <> BL.fromStrict bs + | otherwise = BE.encode msg + +-- | Allows a requesting peer to send 2 'MetadataRequest's for the +-- each piece. +-- +-- See 'Network.BitTorrent.Wire.Options.metadataFactor' for +-- explanation why do we need this limit. +defaultMetadataFactor :: Int +defaultMetadataFactor = 2 + +-- | Usually torrent size do not exceed 1MB. This value limit torrent +-- /content/ size to about 8TB. +-- +-- See 'Network.BitTorrent.Wire.Options.maxInfoDictSize' for +-- explanation why do we need this limit. +defaultMaxInfoDictSize :: Int +defaultMaxInfoDictSize = 10 * 1024 * 1024 + +{----------------------------------------------------------------------- +-- Extension protocol messages +-----------------------------------------------------------------------} + +-- | For more info see +data ExtendedMessage + = EHandshake ExtendedHandshake + | EMetadata ExtendedMessageId ExtendedMetadata + | EUnknown ExtendedMessageId BS.ByteString + deriving (Show, Eq, Typeable) + +instance Pretty ExtendedMessage where + pPrint (EHandshake ehs) = pPrint ehs + pPrint (EMetadata _ msg) = "Metadata" <+> pPrint msg + pPrint (EUnknown mid _ ) = "Unknown" <+> PP.text (show mid) + +instance PeerMessage ExtendedMessage where + envelop _ = Extended + {-# INLINE envelop #-} + + requires _ = Just ExtExtended + {-# INLINE requires #-} + + stats (EHandshake hs) = stats hs + stats (EMetadata _ msg) = stats msg + stats (EUnknown _ msg) = ByteStats (4 + 1 + 1) (BS.length msg) 0 + +{----------------------------------------------------------------------- +-- The message datatype +-----------------------------------------------------------------------} + +type MessageId = Word8 + +-- | Messages used in communication between peers. +-- +-- Note: If some extensions are disabled (not present in extension +-- mask) and client receive message used by the disabled +-- extension then the client MUST close the connection. +-- +data Message + -- | Peers may close the TCP connection if they have not received + -- any messages for a given period of time, generally 2 + -- minutes. Thus, the KeepAlive message is sent to keep the + -- connection between two peers alive, if no /other/ message has + -- been sent in a given period of time. + = KeepAlive + | Status !StatusUpdate -- ^ Messages used to update peer status. + | Available !Available -- ^ Messages used to inform availability. + | Transfer !Transfer -- ^ Messages used to transfer 'Block's. + + -- | Peer receiving a handshake indicating the remote peer + -- supports the 'ExtDHT' should send a 'Port' message. Peers that + -- receive this message should attempt to ping the node on the + -- received port and IP address of the remote peer. + | Port !PortNumber + | Fast !FastMessage + | Extended !ExtendedMessage + deriving (Show, Eq) + +instance Default Message where + def = KeepAlive + {-# INLINE def #-} + +-- | Payload bytes are omitted. +instance Pretty Message where + pPrint (KeepAlive ) = "Keep alive" + pPrint (Status m) = "Status" <+> pPrint m + pPrint (Available m) = pPrint m + pPrint (Transfer m) = pPrint m + pPrint (Port p) = "Port" <+> int (fromEnum p) + pPrint (Fast m) = pPrint m + pPrint (Extended m) = pPrint m + +instance PeerMessage Message where + envelop _ = id + {-# INLINE envelop #-} + + requires KeepAlive = Nothing + requires (Status _) = Nothing + requires (Available _) = Nothing + requires (Transfer _) = Nothing + requires (Port _) = Just ExtDHT + requires (Fast _) = Just ExtFast + requires (Extended _) = Just ExtExtended + + stats KeepAlive = ByteStats 4 0 0 + stats (Status m) = stats m + stats (Available m) = stats m + stats (Transfer m) = stats m + stats (Port _) = ByteStats 5 2 0 + stats (Fast m) = stats m + stats (Extended m) = stats m + +-- | PORT message. +instance PeerMessage PortNumber where + envelop _ = Port + {-# INLINE envelop #-} + + requires _ = Just ExtDHT + {-# INLINE requires #-} + +-- | How long /this/ peer should wait before dropping connection, in +-- seconds. +defaultKeepAliveTimeout :: Int +defaultKeepAliveTimeout = 2 * 60 + +-- | How often /this/ peer should send 'KeepAlive' messages, in +-- seconds. +defaultKeepAliveInterval :: Int +defaultKeepAliveInterval = 60 + +getInt :: S.Get Int +getInt = fromIntegral <$> S.getWord32be +{-# INLINE getInt #-} + +putInt :: S.Putter Int +putInt = S.putWord32be . fromIntegral +{-# INLINE putInt #-} + +-- | This limit should protect against "out-of-memory" attacks: if a +-- malicious peer have sent a long varlength message then receiver can +-- accumulate too long bytestring in the 'Get'. +-- +-- Normal messages should never exceed this limits. +-- +-- See also 'maxBitfieldSize', 'maxBlockSize' limits. +-- +maxMessageSize :: Int +maxMessageSize = 20 + 1024 * 1024 + +-- | This also limit max torrent size to: +-- +-- max_bitfield_size * piece_ix_per_byte * max_piece_size = +-- 2 ^ 20 * 8 * 1MB = +-- 8TB +-- +maxBitfieldSize :: Int +maxBitfieldSize = 1024 * 1024 + +getBitfield :: Int -> S.Get Bitfield +getBitfield len + | len > maxBitfieldSize = fail "BITFIELD message size exceeded limit" + | otherwise = fromBitmap <$> getByteString len + +maxBlockSize :: Int +maxBlockSize = 4 * defaultTransferSize + +getBlock :: Int -> S.Get (Block BL.ByteString) +getBlock len + | len > maxBlockSize = fail "BLOCK message size exceeded limit" + | otherwise = Block <$> getInt <*> getInt + <*> getLazyByteString (fromIntegral len) +{-# INLINE getBlock #-} + +instance Serialize Message where + get = do + len <- getInt + + when (len > maxMessageSize) $ do + fail "message body size exceeded the limit" + + if len == 0 then return KeepAlive + else do + mid <- S.getWord8 + case mid of + 0x00 -> return $ Status (Choking True) + 0x01 -> return $ Status (Choking False) + 0x02 -> return $ Status (Interested True) + 0x03 -> return $ Status (Interested False) + 0x04 -> (Available . Have) <$> getInt + 0x05 -> (Available . Bitfield) <$> getBitfield (pred len) + 0x06 -> (Transfer . Request) <$> S.get + 0x07 -> (Transfer . Piece) <$> getBlock (len - 9) + 0x08 -> (Transfer . Cancel) <$> S.get + 0x09 -> Port <$> S.get + 0x0D -> (Fast . SuggestPiece) <$> getInt + 0x0E -> return $ Fast HaveAll + 0x0F -> return $ Fast HaveNone + 0x10 -> (Fast . RejectRequest) <$> S.get + 0x11 -> (Fast . AllowedFast) <$> getInt + 0x14 -> Extended <$> getExtendedMessage (pred len) + _ -> do + rm <- S.remaining >>= S.getBytes + fail $ "unknown message ID: " ++ show mid ++ "\n" + ++ "remaining available bytes: " ++ show rm + + put KeepAlive = putInt 0 + put (Status msg) = putStatus msg + put (Available msg) = putAvailable msg + put (Transfer msg) = putTransfer msg + put (Port p ) = putPort p + put (Fast msg) = putFast msg + put (Extended m ) = putExtendedMessage m + +statusUpdateId :: StatusUpdate -> MessageId +statusUpdateId (Choking choking) = fromIntegral (0 + fromEnum choking) +statusUpdateId (Interested choking) = fromIntegral (2 + fromEnum choking) + +putStatus :: Putter StatusUpdate +putStatus su = do + putInt 1 + putWord8 (statusUpdateId su) + +putAvailable :: Putter Available +putAvailable (Have i) = do + putInt 5 + putWord8 0x04 + putInt i +putAvailable (Bitfield (toBitmap -> bs)) = do + putInt $ 1 + fromIntegral (BL.length bs) + putWord8 0x05 + putLazyByteString bs + +putBlock :: Putter (Block BL.ByteString) +putBlock Block {..} = do + putInt blkPiece + putInt blkOffset + putLazyByteString blkData + +putTransfer :: Putter Transfer +putTransfer (Request blk) = putInt 13 >> S.putWord8 0x06 >> S.put blk +putTransfer (Piece blk) = do + putInt (9 + blockSize blk) + putWord8 0x07 + putBlock blk +putTransfer (Cancel blk) = putInt 13 >> S.putWord8 0x08 >> S.put blk + +putPort :: Putter PortNumber +putPort p = do + putInt 3 + putWord8 0x09 + put p + +putFast :: Putter FastMessage +putFast HaveAll = putInt 1 >> putWord8 0x0E +putFast HaveNone = putInt 1 >> putWord8 0x0F +putFast (SuggestPiece pix) = putInt 5 >> putWord8 0x0D >> putInt pix +putFast (RejectRequest i ) = putInt 13 >> putWord8 0x10 >> put i +putFast (AllowedFast i ) = putInt 5 >> putWord8 0x11 >> putInt i + +maxEHandshakeSize :: Int +maxEHandshakeSize = 16 * 1024 + +getExtendedHandshake :: Int -> S.Get ExtendedHandshake +getExtendedHandshake messageSize + | messageSize > maxEHandshakeSize + = fail "extended handshake size exceeded limit" + | otherwise = do + bs <- getByteString messageSize + either fail pure $ BE.decode bs + +maxEUnknownSize :: Int +maxEUnknownSize = 64 * 1024 + +getExtendedUnknown :: Int -> S.Get BS.ByteString +getExtendedUnknown len + | len > maxEUnknownSize = fail "unknown extended message size exceeded limit" + | otherwise = getByteString len + +getExtendedMessage :: Int -> S.Get ExtendedMessage +getExtendedMessage messageSize = do + msgId <- getWord8 + let msgBodySize = messageSize - 1 + case msgId of + 0 -> EHandshake <$> getExtendedHandshake msgBodySize + 1 -> EMetadata msgId <$> getMetadata msgBodySize + _ -> EUnknown msgId <$> getExtendedUnknown msgBodySize + +-- | By spec. +extendedMessageId :: MessageId +extendedMessageId = 20 + +putExt :: ExtendedMessageId -> BL.ByteString -> Put +putExt mid lbs = do + putWord32be $ fromIntegral (1 + 1 + BL.length lbs) + putWord8 extendedMessageId + putWord8 mid + putLazyByteString lbs + +-- NOTE: in contrast to getExtendedMessage this function put length +-- and message id too! +putExtendedMessage :: Putter ExtendedMessage +putExtendedMessage (EHandshake hs) = putExt extHandshakeId $ BE.encode hs +putExtendedMessage (EMetadata mid msg) = putExt mid $ putMetadata msg +putExtendedMessage (EUnknown mid bs) = putExt mid $ BL.fromStrict bs diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Session.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Session.hs new file mode 100644 index 00000000..38a3c3a6 --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Session.hs @@ -0,0 +1,586 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TypeFamilies #-} +module Network.BitTorrent.Exchange.Session + ( -- * Session + Session + , Event (..) + , LogFun + , sessionLogger + + -- * Construction + , newSession + , closeSession + , withSession + + -- * Connection Set + , connect + , connectSink + , establish + + -- * Query + , waitMetadata + , takeMetadata + ) where + +import Control.Applicative +import Control.Concurrent +import Control.Concurrent.Chan.Split as CS +import Control.Concurrent.STM +import Control.Exception hiding (Handler) +import Control.Lens +import Control.Monad as M +import Control.Monad.Logger +import Control.Monad.Reader +import Data.ByteString as BS +import Data.ByteString.Lazy as BL +import Data.Conduit as C (Sink, awaitForever, (=$=), ($=)) +import qualified Data.Conduit as C +import Data.Conduit.List as C +import Data.Map as M +import Data.Monoid +import Data.Set as S +import Data.Text as T +import Data.Typeable +import Text.PrettyPrint hiding ((<>)) +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) +import System.Log.FastLogger (LogStr, ToLogStr (..)) + +import Data.BEncode as BE +import Data.Torrent as Torrent +import Network.BitTorrent.Internal.Types +import Network.Address +import Network.BitTorrent.Exchange.Bitfield as BF +import Network.BitTorrent.Exchange.Block as Block +import Network.BitTorrent.Exchange.Connection +import Network.BitTorrent.Exchange.Download as D +import Network.BitTorrent.Exchange.Message as Message +import System.Torrent.Storage + +#if !MIN_VERSION_iproute(1,2,12) +deriving instance Ord IP +#endif + +{----------------------------------------------------------------------- +-- Exceptions +-----------------------------------------------------------------------} + +data ExchangeError + = InvalidRequest BlockIx StorageFailure + | CorruptedPiece PieceIx + deriving (Show, Typeable) + +instance Exception ExchangeError + +packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a +packException f m = try m >>= either (throwIO . f) return + +{----------------------------------------------------------------------- +-- Session state +-----------------------------------------------------------------------} +-- TODO unmap storage on zero connections + +data Cached a = Cached + { cachedValue :: !a + , cachedData :: BL.ByteString -- keep lazy + } + +cache :: BEncode a => a -> Cached a +cache s = Cached s (BE.encode s) + +-- | Logger function. +type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () + +--data SessionStatus = Seeder | Leecher + +data SessionState + = WaitingMetadata + { metadataDownload :: MVar MetadataDownload + , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters + , contentRootPath :: FilePath + } + | HavingMetadata + { metadataCache :: Cached InfoDict + , contentDownload :: MVar ContentDownload + , contentStorage :: Storage + } + +newSessionState :: FilePath -> Either InfoHash InfoDict -> IO SessionState +newSessionState rootPath (Left ih ) = do + WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath +newSessionState rootPath (Right dict) = do + storage <- openInfoDict ReadWriteEx rootPath dict + download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) + (piPieceLength (idPieceInfo dict)) + storage + return $ HavingMetadata (cache dict) download storage + +closeSessionState :: SessionState -> IO () +closeSessionState WaitingMetadata {..} = return () +closeSessionState HavingMetadata {..} = close contentStorage + +haveMetadata :: InfoDict -> SessionState -> IO SessionState +haveMetadata dict WaitingMetadata {..} = do + storage <- openInfoDict ReadWriteEx contentRootPath dict + download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) + (piPieceLength (idPieceInfo dict)) + storage + return HavingMetadata + { metadataCache = cache dict + , contentDownload = download + , contentStorage = storage + } +haveMetadata _ s = return s + +{----------------------------------------------------------------------- +-- Session +-----------------------------------------------------------------------} + +data Session = Session + { sessionPeerId :: !(PeerId) + , sessionTopic :: !(InfoHash) + , sessionLogger :: !(LogFun) + , sessionEvents :: !(SendPort (Event Session)) + + , sessionState :: !(MVar SessionState) + +------------------------------------------------------------------------ + , connectionsPrefs :: !ConnectionPrefs + + -- | Connections either waiting for TCP/uTP 'connect' or waiting + -- for BT handshake. + , connectionsPending :: !(TVar (Set (PeerAddr IP))) + + -- | Connections successfully handshaked and data transfer can + -- take place. + , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session))) + + -- | TODO implement choking mechanism + , connectionsUnchoked :: [PeerAddr IP] + + -- | Messages written to this channel will be sent to the all + -- connections, including pending connections (but right after + -- handshake). + , connectionsBroadcast :: !(Chan Message) + } + +instance EventSource Session where + data Event Session + = ConnectingTo (PeerAddr IP) + | ConnectionEstablished (PeerAddr IP) + | ConnectionAborted + | ConnectionClosed (PeerAddr IP) + | SessionClosed + deriving Show + + listen Session {..} = CS.listen sessionEvents + +newSession :: LogFun + -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; + -> FilePath -- ^ root directory for content files; + -> Either InfoHash InfoDict -- ^ torrent info dictionary; + -> IO Session +newSession logFun addr rootPath source = do + let ih = either id idInfoHash source + pid <- maybe genPeerId return (peerId addr) + eventStream <- newSendPort + sState <- newSessionState rootPath source + sStateVar <- newMVar sState + pSetVar <- newTVarIO S.empty + eSetVar <- newTVarIO M.empty + chan <- newChan + return Session + { sessionPeerId = pid + , sessionTopic = ih + , sessionLogger = logFun + , sessionEvents = eventStream + , sessionState = sStateVar + , connectionsPrefs = def + , connectionsPending = pSetVar + , connectionsEstablished = eSetVar + , connectionsUnchoked = [] + , connectionsBroadcast = chan + } + +closeSession :: Session -> IO () +closeSession Session {..} = do + s <- readMVar sessionState + closeSessionState s +{- + hSet <- atomically $ do + pSet <- swapTVar connectionsPending S.empty + eSet <- swapTVar connectionsEstablished S.empty + return pSet + mapM_ kill hSet +-} + +withSession :: () +withSession = error "withSession" + +{----------------------------------------------------------------------- +-- Logging +-----------------------------------------------------------------------} + +instance MonadLogger (Connected Session) where + monadLoggerLog loc src lvl msg = do + conn <- ask + ses <- asks connSession + addr <- asks connRemoteAddr + let addrSrc = src <> " @ " <> T.pack (render (pPrint addr)) + liftIO $ sessionLogger ses loc addrSrc lvl (toLogStr msg) + +logMessage :: MonadLogger m => Message -> m () +logMessage msg = logDebugN $ T.pack (render (pPrint msg)) + +logEvent :: MonadLogger m => Text -> m () +logEvent = logInfoN + +{----------------------------------------------------------------------- +-- Connection set +-----------------------------------------------------------------------} +--- Connection status transition: +--- +--- pending -> established -> finished -> closed +--- | \|/ /|\ +--- \-------------------------------------| +--- +--- Purpose of slots: +--- 1) to avoid duplicates +--- 2) connect concurrently +--- + +-- | Add connection to the pending set. +pendingConnection :: PeerAddr IP -> Session -> STM Bool +pendingConnection addr Session {..} = do + pSet <- readTVar connectionsPending + eSet <- readTVar connectionsEstablished + if (addr `S.member` pSet) || (addr `M.member` eSet) + then return False + else do + modifyTVar' connectionsPending (S.insert addr) + return True + +-- | Pending connection successfully established, add it to the +-- established set. +establishedConnection :: Connected Session () +establishedConnection = do + conn <- ask + addr <- asks connRemoteAddr + Session {..} <- asks connSession + liftIO $ atomically $ do + modifyTVar connectionsPending (S.delete addr) + modifyTVar connectionsEstablished (M.insert addr conn) + +-- | Either this or remote peer decided to finish conversation +-- (conversation is alread /established/ connection), remote it from +-- the established set. +finishedConnection :: Connected Session () +finishedConnection = do + Session {..} <- asks connSession + addr <- asks connRemoteAddr + liftIO $ atomically $ do + modifyTVar connectionsEstablished $ M.delete addr + +-- | There are no state for this connection, remove it from the all +-- sets. +closedConnection :: PeerAddr IP -> Session -> STM () +closedConnection addr Session {..} = do + modifyTVar connectionsPending $ S.delete addr + modifyTVar connectionsEstablished $ M.delete addr + +getConnectionConfig :: Session -> IO (ConnectionConfig Session) +getConnectionConfig s @ Session {..} = do + chan <- dupChan connectionsBroadcast + let sessionLink = SessionLink { + linkTopic = sessionTopic + , linkPeerId = sessionPeerId + , linkMetadataSize = Nothing + , linkOutputChan = Just chan + , linkSession = s + } + return ConnectionConfig + { cfgPrefs = connectionsPrefs + , cfgSession = sessionLink + , cfgWire = mainWire + } + +type Finalizer = IO () +type Runner = (ConnectionConfig Session -> IO ()) + +runConnection :: Runner -> Finalizer -> PeerAddr IP -> Session -> IO () +runConnection runner finalize addr set @ Session {..} = do + _ <- forkIO (action `finally` cleanup) + return () + where + action = do + notExist <- atomically $ pendingConnection addr set + when notExist $ do + cfg <- getConnectionConfig set + runner cfg + + cleanup = do + finalize +-- runStatusUpdates status (SS.resetPending addr) + -- TODO Metata.resetPending addr + atomically $ closedConnection addr set + +-- | Establish connection from scratch. If this endpoint is already +-- connected, no new connections is created. This function do not block. +connect :: PeerAddr IP -> Session -> IO () +connect addr = runConnection (connectWire addr) (return ()) addr + +-- | Establish connection with already pre-connected endpoint. If this +-- endpoint is already connected, no new connections is created. This +-- function do not block. +-- +-- 'PendingConnection' will be closed automatically, you do not need +-- to call 'closePending'. +establish :: PendingConnection -> Session -> IO () +establish conn = runConnection (acceptWire conn) (closePending conn) + (pendingPeer conn) + +-- | Conduit version of 'connect'. +connectSink :: MonadIO m => Session -> Sink [PeerAddr IPv4] m () +connectSink s = C.mapM_ (liftIO . connectBatch) + where + connectBatch = M.mapM_ (\ addr -> connect (IPv4 <$> addr) s) + +-- | Why do we need this message? +type BroadcastMessage = ExtendedCaps -> Message + +broadcast :: BroadcastMessage -> Session -> IO () +broadcast = error "broadcast" + +{----------------------------------------------------------------------- +-- Helpers +-----------------------------------------------------------------------} + +waitMVar :: MVar a -> IO () +waitMVar m = withMVar m (const (return ())) + +-- This function appear in new GHC "out of box". (moreover it is atomic) +tryReadMVar :: MVar a -> IO (Maybe a) +tryReadMVar m = do + ma <- tryTakeMVar m + maybe (return ()) (putMVar m) ma + return ma + +readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString) +readBlock bix @ BlockIx {..} s = do + p <- packException (InvalidRequest bix) $ do readPiece ixPiece s + let chunk = BL.take (fromIntegral ixLength) $ + BL.drop (fromIntegral ixOffset) (pieceData p) + if BL.length chunk == fromIntegral ixLength + then return $ Block ixPiece ixOffset chunk + else throwIO $ InvalidRequest bix (InvalidSize ixLength) + +-- | +tryReadMetadataBlock :: PieceIx + -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) +tryReadMetadataBlock pix = do + Session {..} <- asks connSession + s <- liftIO (readMVar sessionState) + case s of + WaitingMetadata {..} -> error "tryReadMetadataBlock" + HavingMetadata {..} -> error "tryReadMetadataBlock" + +sendBroadcast :: PeerMessage msg => msg -> Wire Session () +sendBroadcast msg = do + Session {..} <- asks connSession + error "sendBroadcast" +-- liftIO $ msg `broadcast` sessionConnections + +waitMetadata :: Session -> IO InfoDict +waitMetadata Session {..} = do + s <- readMVar sessionState + case s of + WaitingMetadata {..} -> readMVar metadataCompleted + HavingMetadata {..} -> return (cachedValue metadataCache) + +takeMetadata :: Session -> IO (Maybe InfoDict) +takeMetadata Session {..} = do + s <- readMVar sessionState + case s of + WaitingMetadata {..} -> return Nothing + HavingMetadata {..} -> return (Just (cachedValue metadataCache)) + +{----------------------------------------------------------------------- +-- Triggers +-----------------------------------------------------------------------} + +-- | Trigger is the reaction of a handler at some event. +type Trigger = Wire Session () + +interesting :: Trigger +interesting = do + addr <- asks connRemoteAddr + sendMessage (Interested True) + sendMessage (Choking False) + tryFillRequestQueue + +fillRequestQueue :: Trigger +fillRequestQueue = do + maxN <- lift getMaxQueueLength + rbf <- use connBitfield + addr <- asks connRemoteAddr +-- blks <- withStatusUpdates $ do +-- n <- getRequestQueueLength addr +-- scheduleBlocks addr rbf (maxN - n) +-- mapM_ (sendMessage . Request) blks + return () + +tryFillRequestQueue :: Trigger +tryFillRequestQueue = do + allowed <- canDownload <$> use connStatus + when allowed $ do + fillRequestQueue + +{----------------------------------------------------------------------- +-- Incoming message handling +-----------------------------------------------------------------------} + +type Handler msg = msg -> Wire Session () + +handleStatus :: Handler StatusUpdate +handleStatus s = do + connStatus %= over remoteStatus (updateStatus s) + case s of + Interested _ -> return () + Choking True -> do + addr <- asks connRemoteAddr +-- withStatusUpdates (SS.resetPending addr) + return () + Choking False -> tryFillRequestQueue + +handleAvailable :: Handler Available +handleAvailable msg = do + connBitfield %= case msg of + Have ix -> BF.insert ix + Bitfield bf -> const bf + + --thisBf <- getThisBitfield + thisBf <- undefined + case msg of + Have ix + | ix `BF.member` thisBf -> return () + | otherwise -> interesting + Bitfield bf + | bf `BF.isSubsetOf` thisBf -> return () + | otherwise -> interesting + +handleTransfer :: Handler Transfer +handleTransfer (Request bix) = do + Session {..} <- asks connSession + s <- liftIO $ readMVar sessionState + case s of + WaitingMetadata {..} -> return () + HavingMetadata {..} -> do + bitfield <- undefined -- getThisBitfield + upload <- canUpload <$> use connStatus + when (upload && ixPiece bix `BF.member` bitfield) $ do + blk <- liftIO $ readBlock bix contentStorage + sendMessage (Message.Piece blk) + +handleTransfer (Message.Piece blk) = do + Session {..} <- asks connSession + s <- liftIO $ readMVar sessionState + case s of + WaitingMetadata {..} -> return () -- TODO (?) break connection + HavingMetadata {..} -> do + isSuccess <- undefined -- withStatusUpdates (SS.pushBlock blk storage) + case isSuccess of + Nothing -> liftIO $ throwIO $ userError "block is not requested" + Just isCompleted -> do + when isCompleted $ do + sendBroadcast (Have (blkPiece blk)) +-- maybe send not interested + tryFillRequestQueue + +handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) + where + transferResponse bix (Transfer (Message.Piece blk)) = blockIx blk == bix + transferResponse _ _ = False + +{----------------------------------------------------------------------- +-- Metadata exchange +-----------------------------------------------------------------------} +-- TODO introduce new metadata exchange specific exceptions + +waitForMetadata :: Trigger +waitForMetadata = do + Session {..} <- asks connSession + needFetch <- undefined --liftIO (isEmptyMVar infodict) + when needFetch $ do + canFetch <- allowed ExtMetadata <$> use connExtCaps + if canFetch + then tryRequestMetadataBlock + else undefined -- liftIO (waitMVar infodict) + +tryRequestMetadataBlock :: Trigger +tryRequestMetadataBlock = do + mpix <- lift $ undefined --withMetadataUpdates Metadata.scheduleBlock + case mpix of + Nothing -> error "tryRequestMetadataBlock" + Just pix -> sendMessage (MetadataRequest pix) + +handleMetadata :: Handler ExtendedMetadata +handleMetadata (MetadataRequest pix) = + lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse + where + mkResponse Nothing = MetadataReject pix + mkResponse (Just (piece, total)) = MetadataData piece total + +handleMetadata (MetadataData {..}) = do + ih <- asks connTopic + mdict <- lift $ undefined --withMetadataUpdates (Metadata.pushBlock piece ih) + case mdict of + Nothing -> tryRequestMetadataBlock -- not completed, need all blocks + Just dict -> do -- complete, wake up payload fetch + Session {..} <- asks connSession + liftIO $ modifyMVar_ sessionState (haveMetadata dict) + +handleMetadata (MetadataReject pix) = do + lift $ undefined -- withMetadataUpdates (Metadata.cancelPending pix) + +handleMetadata (MetadataUnknown _ ) = do + logInfoN "Unknown metadata message" + +{----------------------------------------------------------------------- +-- Main entry point +-----------------------------------------------------------------------} + +acceptRehandshake :: ExtendedHandshake -> Trigger +acceptRehandshake ehs = error "acceptRehandshake" + +handleExtended :: Handler ExtendedMessage +handleExtended (EHandshake ehs) = acceptRehandshake ehs +handleExtended (EMetadata _ msg) = handleMetadata msg +handleExtended (EUnknown _ _ ) = logWarnN "Unknown extension message" + +handleMessage :: Handler Message +handleMessage KeepAlive = return () +handleMessage (Status s) = handleStatus s +handleMessage (Available msg) = handleAvailable msg +handleMessage (Transfer msg) = handleTransfer msg +handleMessage (Port n) = error "handleMessage" +handleMessage (Fast _) = error "handleMessage" +handleMessage (Extended msg) = handleExtended msg + +exchange :: Wire Session () +exchange = do + waitForMetadata + bf <- undefined --getThisBitfield + sendMessage (Bitfield bf) + awaitForever handleMessage + +mainWire :: Wire Session () +mainWire = do + lift establishedConnection + Session {..} <- asks connSession +-- lift $ resizeBitfield (totalPieces storage) + logEvent "Connection established" + iterM logMessage =$= exchange =$= iterM logMessage + lift finishedConnection -- cgit v1.2.3