From 12cbb3af2413dc28838ed271351dda16df8f7bdb Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 15 Sep 2017 06:22:10 -0400 Subject: Separating dht-client library from bittorrent package. --- .../src/Network/BitTorrent/Exchange/Session.hs | 586 +++++++++++++++++++++ 1 file changed, 586 insertions(+) create mode 100644 bittorrent/src/Network/BitTorrent/Exchange/Session.hs (limited to 'bittorrent/src/Network/BitTorrent/Exchange/Session.hs') diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Session.hs b/bittorrent/src/Network/BitTorrent/Exchange/Session.hs new file mode 100644 index 00000000..38a3c3a6 --- /dev/null +++ b/bittorrent/src/Network/BitTorrent/Exchange/Session.hs @@ -0,0 +1,586 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TypeFamilies #-} +module Network.BitTorrent.Exchange.Session + ( -- * Session + Session + , Event (..) + , LogFun + , sessionLogger + + -- * Construction + , newSession + , closeSession + , withSession + + -- * Connection Set + , connect + , connectSink + , establish + + -- * Query + , waitMetadata + , takeMetadata + ) where + +import Control.Applicative +import Control.Concurrent +import Control.Concurrent.Chan.Split as CS +import Control.Concurrent.STM +import Control.Exception hiding (Handler) +import Control.Lens +import Control.Monad as M +import Control.Monad.Logger +import Control.Monad.Reader +import Data.ByteString as BS +import Data.ByteString.Lazy as BL +import Data.Conduit as C (Sink, awaitForever, (=$=), ($=)) +import qualified Data.Conduit as C +import Data.Conduit.List as C +import Data.Map as M +import Data.Monoid +import Data.Set as S +import Data.Text as T +import Data.Typeable +import Text.PrettyPrint hiding ((<>)) +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) +import System.Log.FastLogger (LogStr, ToLogStr (..)) + +import Data.BEncode as BE +import Data.Torrent as Torrent +import Network.BitTorrent.Internal.Types +import Network.Address +import Network.BitTorrent.Exchange.Bitfield as BF +import Network.BitTorrent.Exchange.Block as Block +import Network.BitTorrent.Exchange.Connection +import Network.BitTorrent.Exchange.Download as D +import Network.BitTorrent.Exchange.Message as Message +import System.Torrent.Storage + +#if !MIN_VERSION_iproute(1,2,12) +deriving instance Ord IP +#endif + +{----------------------------------------------------------------------- +-- Exceptions +-----------------------------------------------------------------------} + +data ExchangeError + = InvalidRequest BlockIx StorageFailure + | CorruptedPiece PieceIx + deriving (Show, Typeable) + +instance Exception ExchangeError + +packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a +packException f m = try m >>= either (throwIO . f) return + +{----------------------------------------------------------------------- +-- Session state +-----------------------------------------------------------------------} +-- TODO unmap storage on zero connections + +data Cached a = Cached + { cachedValue :: !a + , cachedData :: BL.ByteString -- keep lazy + } + +cache :: BEncode a => a -> Cached a +cache s = Cached s (BE.encode s) + +-- | Logger function. +type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () + +--data SessionStatus = Seeder | Leecher + +data SessionState + = WaitingMetadata + { metadataDownload :: MVar MetadataDownload + , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters + , contentRootPath :: FilePath + } + | HavingMetadata + { metadataCache :: Cached InfoDict + , contentDownload :: MVar ContentDownload + , contentStorage :: Storage + } + +newSessionState :: FilePath -> Either InfoHash InfoDict -> IO SessionState +newSessionState rootPath (Left ih ) = do + WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath +newSessionState rootPath (Right dict) = do + storage <- openInfoDict ReadWriteEx rootPath dict + download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) + (piPieceLength (idPieceInfo dict)) + storage + return $ HavingMetadata (cache dict) download storage + +closeSessionState :: SessionState -> IO () +closeSessionState WaitingMetadata {..} = return () +closeSessionState HavingMetadata {..} = close contentStorage + +haveMetadata :: InfoDict -> SessionState -> IO SessionState +haveMetadata dict WaitingMetadata {..} = do + storage <- openInfoDict ReadWriteEx contentRootPath dict + download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) + (piPieceLength (idPieceInfo dict)) + storage + return HavingMetadata + { metadataCache = cache dict + , contentDownload = download + , contentStorage = storage + } +haveMetadata _ s = return s + +{----------------------------------------------------------------------- +-- Session +-----------------------------------------------------------------------} + +data Session = Session + { sessionPeerId :: !(PeerId) + , sessionTopic :: !(InfoHash) + , sessionLogger :: !(LogFun) + , sessionEvents :: !(SendPort (Event Session)) + + , sessionState :: !(MVar SessionState) + +------------------------------------------------------------------------ + , connectionsPrefs :: !ConnectionPrefs + + -- | Connections either waiting for TCP/uTP 'connect' or waiting + -- for BT handshake. + , connectionsPending :: !(TVar (Set (PeerAddr IP))) + + -- | Connections successfully handshaked and data transfer can + -- take place. + , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session))) + + -- | TODO implement choking mechanism + , connectionsUnchoked :: [PeerAddr IP] + + -- | Messages written to this channel will be sent to the all + -- connections, including pending connections (but right after + -- handshake). + , connectionsBroadcast :: !(Chan Message) + } + +instance EventSource Session where + data Event Session + = ConnectingTo (PeerAddr IP) + | ConnectionEstablished (PeerAddr IP) + | ConnectionAborted + | ConnectionClosed (PeerAddr IP) + | SessionClosed + deriving Show + + listen Session {..} = CS.listen sessionEvents + +newSession :: LogFun + -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; + -> FilePath -- ^ root directory for content files; + -> Either InfoHash InfoDict -- ^ torrent info dictionary; + -> IO Session +newSession logFun addr rootPath source = do + let ih = either id idInfoHash source + pid <- maybe genPeerId return (peerId addr) + eventStream <- newSendPort + sState <- newSessionState rootPath source + sStateVar <- newMVar sState + pSetVar <- newTVarIO S.empty + eSetVar <- newTVarIO M.empty + chan <- newChan + return Session + { sessionPeerId = pid + , sessionTopic = ih + , sessionLogger = logFun + , sessionEvents = eventStream + , sessionState = sStateVar + , connectionsPrefs = def + , connectionsPending = pSetVar + , connectionsEstablished = eSetVar + , connectionsUnchoked = [] + , connectionsBroadcast = chan + } + +closeSession :: Session -> IO () +closeSession Session {..} = do + s <- readMVar sessionState + closeSessionState s +{- + hSet <- atomically $ do + pSet <- swapTVar connectionsPending S.empty + eSet <- swapTVar connectionsEstablished S.empty + return pSet + mapM_ kill hSet +-} + +withSession :: () +withSession = error "withSession" + +{----------------------------------------------------------------------- +-- Logging +-----------------------------------------------------------------------} + +instance MonadLogger (Connected Session) where + monadLoggerLog loc src lvl msg = do + conn <- ask + ses <- asks connSession + addr <- asks connRemoteAddr + let addrSrc = src <> " @ " <> T.pack (render (pPrint addr)) + liftIO $ sessionLogger ses loc addrSrc lvl (toLogStr msg) + +logMessage :: MonadLogger m => Message -> m () +logMessage msg = logDebugN $ T.pack (render (pPrint msg)) + +logEvent :: MonadLogger m => Text -> m () +logEvent = logInfoN + +{----------------------------------------------------------------------- +-- Connection set +-----------------------------------------------------------------------} +--- Connection status transition: +--- +--- pending -> established -> finished -> closed +--- | \|/ /|\ +--- \-------------------------------------| +--- +--- Purpose of slots: +--- 1) to avoid duplicates +--- 2) connect concurrently +--- + +-- | Add connection to the pending set. +pendingConnection :: PeerAddr IP -> Session -> STM Bool +pendingConnection addr Session {..} = do + pSet <- readTVar connectionsPending + eSet <- readTVar connectionsEstablished + if (addr `S.member` pSet) || (addr `M.member` eSet) + then return False + else do + modifyTVar' connectionsPending (S.insert addr) + return True + +-- | Pending connection successfully established, add it to the +-- established set. +establishedConnection :: Connected Session () +establishedConnection = do + conn <- ask + addr <- asks connRemoteAddr + Session {..} <- asks connSession + liftIO $ atomically $ do + modifyTVar connectionsPending (S.delete addr) + modifyTVar connectionsEstablished (M.insert addr conn) + +-- | Either this or remote peer decided to finish conversation +-- (conversation is alread /established/ connection), remote it from +-- the established set. +finishedConnection :: Connected Session () +finishedConnection = do + Session {..} <- asks connSession + addr <- asks connRemoteAddr + liftIO $ atomically $ do + modifyTVar connectionsEstablished $ M.delete addr + +-- | There are no state for this connection, remove it from the all +-- sets. +closedConnection :: PeerAddr IP -> Session -> STM () +closedConnection addr Session {..} = do + modifyTVar connectionsPending $ S.delete addr + modifyTVar connectionsEstablished $ M.delete addr + +getConnectionConfig :: Session -> IO (ConnectionConfig Session) +getConnectionConfig s @ Session {..} = do + chan <- dupChan connectionsBroadcast + let sessionLink = SessionLink { + linkTopic = sessionTopic + , linkPeerId = sessionPeerId + , linkMetadataSize = Nothing + , linkOutputChan = Just chan + , linkSession = s + } + return ConnectionConfig + { cfgPrefs = connectionsPrefs + , cfgSession = sessionLink + , cfgWire = mainWire + } + +type Finalizer = IO () +type Runner = (ConnectionConfig Session -> IO ()) + +runConnection :: Runner -> Finalizer -> PeerAddr IP -> Session -> IO () +runConnection runner finalize addr set @ Session {..} = do + _ <- forkIO (action `finally` cleanup) + return () + where + action = do + notExist <- atomically $ pendingConnection addr set + when notExist $ do + cfg <- getConnectionConfig set + runner cfg + + cleanup = do + finalize +-- runStatusUpdates status (SS.resetPending addr) + -- TODO Metata.resetPending addr + atomically $ closedConnection addr set + +-- | Establish connection from scratch. If this endpoint is already +-- connected, no new connections is created. This function do not block. +connect :: PeerAddr IP -> Session -> IO () +connect addr = runConnection (connectWire addr) (return ()) addr + +-- | Establish connection with already pre-connected endpoint. If this +-- endpoint is already connected, no new connections is created. This +-- function do not block. +-- +-- 'PendingConnection' will be closed automatically, you do not need +-- to call 'closePending'. +establish :: PendingConnection -> Session -> IO () +establish conn = runConnection (acceptWire conn) (closePending conn) + (pendingPeer conn) + +-- | Conduit version of 'connect'. +connectSink :: MonadIO m => Session -> Sink [PeerAddr IPv4] m () +connectSink s = C.mapM_ (liftIO . connectBatch) + where + connectBatch = M.mapM_ (\ addr -> connect (IPv4 <$> addr) s) + +-- | Why do we need this message? +type BroadcastMessage = ExtendedCaps -> Message + +broadcast :: BroadcastMessage -> Session -> IO () +broadcast = error "broadcast" + +{----------------------------------------------------------------------- +-- Helpers +-----------------------------------------------------------------------} + +waitMVar :: MVar a -> IO () +waitMVar m = withMVar m (const (return ())) + +-- This function appear in new GHC "out of box". (moreover it is atomic) +tryReadMVar :: MVar a -> IO (Maybe a) +tryReadMVar m = do + ma <- tryTakeMVar m + maybe (return ()) (putMVar m) ma + return ma + +readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString) +readBlock bix @ BlockIx {..} s = do + p <- packException (InvalidRequest bix) $ do readPiece ixPiece s + let chunk = BL.take (fromIntegral ixLength) $ + BL.drop (fromIntegral ixOffset) (pieceData p) + if BL.length chunk == fromIntegral ixLength + then return $ Block ixPiece ixOffset chunk + else throwIO $ InvalidRequest bix (InvalidSize ixLength) + +-- | +tryReadMetadataBlock :: PieceIx + -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) +tryReadMetadataBlock pix = do + Session {..} <- asks connSession + s <- liftIO (readMVar sessionState) + case s of + WaitingMetadata {..} -> error "tryReadMetadataBlock" + HavingMetadata {..} -> error "tryReadMetadataBlock" + +sendBroadcast :: PeerMessage msg => msg -> Wire Session () +sendBroadcast msg = do + Session {..} <- asks connSession + error "sendBroadcast" +-- liftIO $ msg `broadcast` sessionConnections + +waitMetadata :: Session -> IO InfoDict +waitMetadata Session {..} = do + s <- readMVar sessionState + case s of + WaitingMetadata {..} -> readMVar metadataCompleted + HavingMetadata {..} -> return (cachedValue metadataCache) + +takeMetadata :: Session -> IO (Maybe InfoDict) +takeMetadata Session {..} = do + s <- readMVar sessionState + case s of + WaitingMetadata {..} -> return Nothing + HavingMetadata {..} -> return (Just (cachedValue metadataCache)) + +{----------------------------------------------------------------------- +-- Triggers +-----------------------------------------------------------------------} + +-- | Trigger is the reaction of a handler at some event. +type Trigger = Wire Session () + +interesting :: Trigger +interesting = do + addr <- asks connRemoteAddr + sendMessage (Interested True) + sendMessage (Choking False) + tryFillRequestQueue + +fillRequestQueue :: Trigger +fillRequestQueue = do + maxN <- lift getMaxQueueLength + rbf <- use connBitfield + addr <- asks connRemoteAddr +-- blks <- withStatusUpdates $ do +-- n <- getRequestQueueLength addr +-- scheduleBlocks addr rbf (maxN - n) +-- mapM_ (sendMessage . Request) blks + return () + +tryFillRequestQueue :: Trigger +tryFillRequestQueue = do + allowed <- canDownload <$> use connStatus + when allowed $ do + fillRequestQueue + +{----------------------------------------------------------------------- +-- Incoming message handling +-----------------------------------------------------------------------} + +type Handler msg = msg -> Wire Session () + +handleStatus :: Handler StatusUpdate +handleStatus s = do + connStatus %= over remoteStatus (updateStatus s) + case s of + Interested _ -> return () + Choking True -> do + addr <- asks connRemoteAddr +-- withStatusUpdates (SS.resetPending addr) + return () + Choking False -> tryFillRequestQueue + +handleAvailable :: Handler Available +handleAvailable msg = do + connBitfield %= case msg of + Have ix -> BF.insert ix + Bitfield bf -> const bf + + --thisBf <- getThisBitfield + thisBf <- undefined + case msg of + Have ix + | ix `BF.member` thisBf -> return () + | otherwise -> interesting + Bitfield bf + | bf `BF.isSubsetOf` thisBf -> return () + | otherwise -> interesting + +handleTransfer :: Handler Transfer +handleTransfer (Request bix) = do + Session {..} <- asks connSession + s <- liftIO $ readMVar sessionState + case s of + WaitingMetadata {..} -> return () + HavingMetadata {..} -> do + bitfield <- undefined -- getThisBitfield + upload <- canUpload <$> use connStatus + when (upload && ixPiece bix `BF.member` bitfield) $ do + blk <- liftIO $ readBlock bix contentStorage + sendMessage (Message.Piece blk) + +handleTransfer (Message.Piece blk) = do + Session {..} <- asks connSession + s <- liftIO $ readMVar sessionState + case s of + WaitingMetadata {..} -> return () -- TODO (?) break connection + HavingMetadata {..} -> do + isSuccess <- undefined -- withStatusUpdates (SS.pushBlock blk storage) + case isSuccess of + Nothing -> liftIO $ throwIO $ userError "block is not requested" + Just isCompleted -> do + when isCompleted $ do + sendBroadcast (Have (blkPiece blk)) +-- maybe send not interested + tryFillRequestQueue + +handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) + where + transferResponse bix (Transfer (Message.Piece blk)) = blockIx blk == bix + transferResponse _ _ = False + +{----------------------------------------------------------------------- +-- Metadata exchange +-----------------------------------------------------------------------} +-- TODO introduce new metadata exchange specific exceptions + +waitForMetadata :: Trigger +waitForMetadata = do + Session {..} <- asks connSession + needFetch <- undefined --liftIO (isEmptyMVar infodict) + when needFetch $ do + canFetch <- allowed ExtMetadata <$> use connExtCaps + if canFetch + then tryRequestMetadataBlock + else undefined -- liftIO (waitMVar infodict) + +tryRequestMetadataBlock :: Trigger +tryRequestMetadataBlock = do + mpix <- lift $ undefined --withMetadataUpdates Metadata.scheduleBlock + case mpix of + Nothing -> error "tryRequestMetadataBlock" + Just pix -> sendMessage (MetadataRequest pix) + +handleMetadata :: Handler ExtendedMetadata +handleMetadata (MetadataRequest pix) = + lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse + where + mkResponse Nothing = MetadataReject pix + mkResponse (Just (piece, total)) = MetadataData piece total + +handleMetadata (MetadataData {..}) = do + ih <- asks connTopic + mdict <- lift $ undefined --withMetadataUpdates (Metadata.pushBlock piece ih) + case mdict of + Nothing -> tryRequestMetadataBlock -- not completed, need all blocks + Just dict -> do -- complete, wake up payload fetch + Session {..} <- asks connSession + liftIO $ modifyMVar_ sessionState (haveMetadata dict) + +handleMetadata (MetadataReject pix) = do + lift $ undefined -- withMetadataUpdates (Metadata.cancelPending pix) + +handleMetadata (MetadataUnknown _ ) = do + logInfoN "Unknown metadata message" + +{----------------------------------------------------------------------- +-- Main entry point +-----------------------------------------------------------------------} + +acceptRehandshake :: ExtendedHandshake -> Trigger +acceptRehandshake ehs = error "acceptRehandshake" + +handleExtended :: Handler ExtendedMessage +handleExtended (EHandshake ehs) = acceptRehandshake ehs +handleExtended (EMetadata _ msg) = handleMetadata msg +handleExtended (EUnknown _ _ ) = logWarnN "Unknown extension message" + +handleMessage :: Handler Message +handleMessage KeepAlive = return () +handleMessage (Status s) = handleStatus s +handleMessage (Available msg) = handleAvailable msg +handleMessage (Transfer msg) = handleTransfer msg +handleMessage (Port n) = error "handleMessage" +handleMessage (Fast _) = error "handleMessage" +handleMessage (Extended msg) = handleExtended msg + +exchange :: Wire Session () +exchange = do + waitForMetadata + bf <- undefined --getThisBitfield + sendMessage (Bitfield bf) + awaitForever handleMessage + +mainWire :: Wire Session () +mainWire = do + lift establishedConnection + Session {..} <- asks connSession +-- lift $ resizeBitfield (totalPieces storage) + logEvent "Connection established" + iterM logMessage =$= exchange =$= iterM logMessage + lift finishedConnection -- cgit v1.2.3