diff options
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Session.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 285 |
1 files changed, 197 insertions, 88 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 565ef7ab..0d4f3d02 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -1,7 +1,9 @@ | |||
1 | {-# LANGUAGE TemplateHaskell #-} | 1 | {-# LANGUAGE FlexibleInstances #-} |
2 | {-# LANGUAGE DeriveDataTypeable #-} | 2 | {-# LANGUAGE TemplateHaskell #-} |
3 | {-# LANGUAGE DeriveDataTypeable #-} | ||
3 | module Network.BitTorrent.Exchange.Session | 4 | module Network.BitTorrent.Exchange.Session |
4 | ( Session | 5 | ( Session |
6 | , LogFun | ||
5 | , newSession | 7 | , newSession |
6 | , closeSession | 8 | , closeSession |
7 | 9 | ||
@@ -12,76 +14,141 @@ import Control.Applicative | |||
12 | import Control.Concurrent | 14 | import Control.Concurrent |
13 | import Control.Exception | 15 | import Control.Exception |
14 | import Control.Lens | 16 | import Control.Lens |
17 | import Control.Monad.Logger | ||
15 | import Control.Monad.Reader | 18 | import Control.Monad.Reader |
16 | import Control.Monad.State | 19 | import Control.Monad.State |
20 | import Data.ByteString as BS | ||
21 | import Data.ByteString.Lazy as BL | ||
22 | import Data.Conduit | ||
17 | import Data.Function | 23 | import Data.Function |
18 | import Data.IORef | 24 | import Data.IORef |
25 | import Data.List as L | ||
19 | import Data.Maybe | 26 | import Data.Maybe |
20 | import Data.Map as M | 27 | import Data.Map as M |
28 | import Data.Monoid | ||
21 | import Data.Ord | 29 | import Data.Ord |
30 | import Data.Set as S | ||
31 | import Data.Text as T | ||
22 | import Data.Typeable | 32 | import Data.Typeable |
23 | import Text.PrettyPrint | 33 | import Text.PrettyPrint hiding ((<>)) |
34 | import Text.PrettyPrint.Class | ||
35 | import System.Log.FastLogger (LogStr, ToLogStr (..)) | ||
24 | 36 | ||
25 | import Data.Torrent (InfoDict (..)) | 37 | import Data.Torrent (InfoDict (..)) |
26 | import Data.Torrent.Bitfield as BF | 38 | import Data.Torrent.Bitfield as BF |
27 | import Data.Torrent.InfoHash | 39 | import Data.Torrent.InfoHash |
40 | import Data.Torrent.Piece (pieceData, piPieceLength) | ||
41 | import qualified Data.Torrent.Piece as Torrent (Piece (Piece)) | ||
28 | import Network.BitTorrent.Core | 42 | import Network.BitTorrent.Core |
29 | import Network.BitTorrent.Exchange.Assembler | 43 | import Network.BitTorrent.Exchange.Assembler |
30 | import Network.BitTorrent.Exchange.Block | 44 | import Network.BitTorrent.Exchange.Block as Block |
31 | import Network.BitTorrent.Exchange.Message | 45 | import Network.BitTorrent.Exchange.Message |
46 | import Network.BitTorrent.Exchange.Session.Status as SS | ||
32 | import Network.BitTorrent.Exchange.Status | 47 | import Network.BitTorrent.Exchange.Status |
33 | import Network.BitTorrent.Exchange.Wire | 48 | import Network.BitTorrent.Exchange.Wire |
34 | import System.Torrent.Storage | 49 | import System.Torrent.Storage |
35 | 50 | ||
51 | {----------------------------------------------------------------------- | ||
52 | -- Exceptions | ||
53 | -----------------------------------------------------------------------} | ||
54 | |||
55 | data ExchangeError | ||
56 | = InvalidRequest BlockIx StorageFailure | ||
57 | | CorruptedPiece PieceIx | ||
58 | deriving (Show, Typeable) | ||
59 | |||
60 | instance Exception ExchangeError | ||
61 | |||
62 | packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a | ||
63 | packException f m = try m >>= either (throwIO . f) return | ||
64 | |||
65 | {----------------------------------------------------------------------- | ||
66 | -- Session | ||
67 | -----------------------------------------------------------------------} | ||
68 | |||
69 | data ConnectionEntry = ConnectionEntry | ||
70 | { initiatedBy :: !ChannelSide | ||
71 | , connection :: !(Connection Session) | ||
72 | } | ||
36 | 73 | ||
37 | data Session = Session | 74 | data Session = Session |
38 | { tpeerId :: PeerId | 75 | { tpeerId :: PeerId |
39 | , infohash :: InfoHash | 76 | , infohash :: InfoHash |
40 | , bitfield :: Bitfield | 77 | , storage :: Storage |
41 | , assembler :: Assembler | 78 | , status :: MVar SessionStatus |
42 | , storage :: Storage | 79 | , unchoked :: [PeerAddr IP] |
43 | , unchoked :: [PeerAddr IP] | 80 | , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) |
44 | , connections :: MVar (Map (PeerAddr IP) (Connection Session)) | 81 | , broadcast :: Chan Message |
45 | , broadcast :: Chan Message | 82 | , logger :: LogFun |
46 | } | 83 | } |
47 | 84 | ||
48 | newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer; | 85 | -- | Logger function. |
86 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | ||
87 | |||
88 | newSession :: LogFun | ||
89 | -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; | ||
49 | -> FilePath -- ^ root directory for content files; | 90 | -> FilePath -- ^ root directory for content files; |
50 | -> InfoDict -- ^ torrent info dictionary; | 91 | -> InfoDict -- ^ torrent info dictionary; |
51 | -> IO Session -- ^ | 92 | -> IO Session -- ^ |
52 | newSession addr rootPath dict = do | 93 | newSession logFun addr rootPath dict = do |
53 | connVar <- newMVar M.empty | 94 | connVar <- newMVar M.empty |
54 | store <- openInfoDict ReadWriteEx rootPath dict | 95 | store <- openInfoDict ReadWriteEx rootPath dict |
55 | chan <- newChan | 96 | statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) |
97 | (piPieceLength (idPieceInfo dict)) | ||
98 | chan <- newChan | ||
56 | return Session | 99 | return Session |
57 | { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) | 100 | { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) |
58 | , infohash = idInfoHash dict | 101 | , infohash = idInfoHash dict |
59 | , bitfield = BF.haveNone (totalPieces store) | 102 | , status = statusVar |
60 | , assembler = error "newSession" | ||
61 | , storage = store | 103 | , storage = store |
62 | , unchoked = [] | 104 | , unchoked = [] |
63 | , connections = connVar | 105 | , connections = connVar |
64 | , broadcast = chan | 106 | , broadcast = chan |
107 | , logger = logFun | ||
65 | } | 108 | } |
66 | 109 | ||
67 | closeSession :: Session -> IO () | 110 | closeSession :: Session -> IO () |
68 | closeSession = undefined | 111 | closeSession = undefined |
69 | 112 | ||
113 | instance MonadIO m => MonadLogger (Connected Session m) where | ||
114 | monadLoggerLog loc src lvl msg = do | ||
115 | conn <- ask | ||
116 | ses <- asks connSession | ||
117 | addr <- asks connRemoteAddr | ||
118 | let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) | ||
119 | liftIO $ logger ses loc addrSrc lvl (toLogStr msg) | ||
120 | |||
121 | logMessage :: Message -> Wire Session () | ||
122 | logMessage msg = logDebugN $ T.pack (render (pretty msg)) | ||
123 | |||
124 | logEvent :: Text -> Wire Session () | ||
125 | logEvent = logInfoN | ||
126 | |||
127 | {----------------------------------------------------------------------- | ||
128 | -- Connections | ||
129 | -----------------------------------------------------------------------} | ||
130 | -- TODO unmap storage on zero connections | ||
131 | |||
70 | insert :: PeerAddr IP | 132 | insert :: PeerAddr IP |
71 | -> {- Maybe Socket | 133 | -> {- Maybe Socket |
72 | -> -} Session -> IO () | 134 | -> -} Session -> IO () |
73 | insert addr ses @ Session {..} = do | 135 | insert addr ses @ Session {..} = do |
74 | forkIO $ do | 136 | forkIO $ do |
75 | let caps = def | 137 | action `finally` runStatusUpdates status (resetPending addr) |
76 | let ecaps = def | 138 | return () |
77 | let hs = Handshake def caps infohash tpeerId | 139 | where |
78 | chan <- dupChan broadcast | 140 | action = do |
79 | connectWire ses hs addr ecaps chan $ do | 141 | let caps = def |
80 | conn <- getConnection | 142 | let ecaps = def |
143 | let hs = Handshake def caps infohash tpeerId | ||
144 | chan <- dupChan broadcast | ||
145 | connectWire ses hs addr ecaps chan $ do | ||
146 | conn <- getConnection | ||
81 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn | 147 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn |
82 | exchange | 148 | resizeBitfield (totalPieces storage) |
149 | logEvent "Connection established" | ||
150 | exchange | ||
83 | -- liftIO $ modifyMVar_ connections $ pure . M.delete addr | 151 | -- liftIO $ modifyMVar_ connections $ pure . M.delete addr |
84 | return () | ||
85 | 152 | ||
86 | delete :: PeerAddr IP -> Session -> IO () | 153 | delete :: PeerAddr IP -> Session -> IO () |
87 | delete = undefined | 154 | delete = undefined |
@@ -90,81 +157,125 @@ deleteAll :: Session -> IO () | |||
90 | deleteAll = undefined | 157 | deleteAll = undefined |
91 | 158 | ||
92 | {----------------------------------------------------------------------- | 159 | {----------------------------------------------------------------------- |
93 | -- Query | 160 | -- Helpers |
94 | -----------------------------------------------------------------------} | 161 | -----------------------------------------------------------------------} |
95 | 162 | ||
163 | withStatusUpdates :: StatusUpdates a -> Wire Session a | ||
164 | withStatusUpdates m = do | ||
165 | Session {..} <- getSession | ||
166 | liftIO $ runStatusUpdates status m | ||
167 | |||
96 | getThisBitfield :: Wire Session Bitfield | 168 | getThisBitfield :: Wire Session Bitfield |
97 | getThisBitfield = undefined | 169 | getThisBitfield = do |
170 | ses <- getSession | ||
171 | liftIO $ SS.getBitfield (status ses) | ||
172 | |||
173 | readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString) | ||
174 | readBlock bix @ BlockIx {..} s = do | ||
175 | p <- packException (InvalidRequest bix) $ do readPiece ixPiece s | ||
176 | let chunk = BL.take (fromIntegral ixLength) $ | ||
177 | BL.drop (fromIntegral ixOffset) (pieceData p) | ||
178 | if BL.length chunk == fromIntegral ixLength | ||
179 | then return $ Block ixPiece ixOffset chunk | ||
180 | else throwIO $ InvalidRequest bix (InvalidSize ixLength) | ||
181 | |||
182 | sendBroadcast :: PeerMessage msg => msg -> Wire Session () | ||
183 | sendBroadcast msg = do | ||
184 | Session {..} <- getSession | ||
185 | ecaps <- getExtCaps | ||
186 | liftIO $ writeChan broadcast (envelop ecaps msg) | ||
187 | |||
188 | {----------------------------------------------------------------------- | ||
189 | -- Triggers | ||
190 | -----------------------------------------------------------------------} | ||
98 | 191 | ||
99 | {- | 192 | fillRequestQueue :: Wire Session () |
100 | data PendingSet = PendingSet (Map (PeerAddr IP) [BlockIx]) | 193 | fillRequestQueue = do |
194 | maxN <- getAdvertisedQueueLength | ||
195 | rbf <- getRemoteBitfield | ||
196 | addr <- connRemoteAddr <$> getConnection | ||
197 | blks <- withStatusUpdates $ do | ||
198 | n <- getRequestQueueLength addr | ||
199 | scheduleBlocks addr rbf (maxN - n) | ||
200 | mapM_ (sendMessage . Request) blks | ||
101 | 201 | ||
102 | empty :: PendingSet | 202 | tryFillRequestQueue :: Wire Session () |
103 | empty = undefined | 203 | tryFillRequestQueue = do |
204 | allowed <- canDownload <$> getStatus | ||
205 | when allowed $ do | ||
206 | fillRequestQueue | ||
104 | 207 | ||
105 | member :: PeerAddr IP -> BlockIx -> PendingSet -> Bool | 208 | interesting :: Wire Session () |
106 | member addr bix = undefined | 209 | interesting = do |
210 | addr <- connRemoteAddr <$> getConnection | ||
211 | logMessage (Status (Interested True)) | ||
212 | sendMessage (Interested True) | ||
213 | logMessage (Status (Choking False)) | ||
214 | sendMessage (Choking False) | ||
215 | tryFillRequestQueue | ||
107 | 216 | ||
108 | insert :: PeerAddr IP -> BlockIx -> PendingSet -> PendingSet | ||
109 | insert addr bix = undefined | ||
110 | -} | ||
111 | {----------------------------------------------------------------------- | 217 | {----------------------------------------------------------------------- |
112 | -- Event loop | 218 | -- Incoming message handling |
113 | -----------------------------------------------------------------------} | 219 | -----------------------------------------------------------------------} |
114 | {- | ||
115 | data ExchangeError | ||
116 | = InvalidRequest BlockIx StorageFailure | ||
117 | | CorruptedPiece PieceIx | ||
118 | 220 | ||
119 | packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a | 221 | handleStatus :: StatusUpdate -> Wire Session () |
120 | packException f m = try >>= either (throwIO . f) m | 222 | handleStatus s = do |
223 | updateConnStatus RemotePeer s | ||
224 | case s of | ||
225 | Interested _ -> return () | ||
226 | Choking True -> do | ||
227 | addr <- connRemoteAddr <$> getConnection | ||
228 | withStatusUpdates (resetPending addr) | ||
229 | Choking False -> tryFillRequestQueue | ||
121 | 230 | ||
122 | readBlock :: BlockIx -> Storage -> IO (Block ByteString) | 231 | handleAvailable :: Available -> Wire Session () |
123 | readBlock bix @ BlockIx {..} s = do | 232 | handleAvailable msg = do |
124 | p <- packException (InvalidRequest bix) $ do readPiece ixPiece storage | 233 | updateRemoteBitfield $ case msg of |
125 | let chunk = BS.take ixLength $ BS.drop ixOffset p | 234 | Have ix -> BF.insert ix |
126 | if BS.length chunk == ixLength | 235 | Bitfield bf -> const bf |
127 | then return chunk | 236 | |
128 | else throwIO $ InvalidRequest bix (InvalidSize ixLength) | 237 | thisBf <- getThisBitfield |
129 | -} | 238 | case msg of |
239 | Have ix | ||
240 | | ix `BF.member` thisBf -> return () | ||
241 | | otherwise -> interesting | ||
242 | Bitfield bf | ||
243 | | bf `BF.isSubsetOf` thisBf -> return () | ||
244 | | otherwise -> interesting | ||
130 | 245 | ||
131 | handleTransfer :: Transfer -> Wire Session () | 246 | handleTransfer :: Transfer -> Wire Session () |
132 | handleTransfer (Request bix) = do | 247 | handleTransfer (Request bix) = do |
133 | -- Session {..} <- getSession | 248 | Session {..} <- getSession |
134 | -- addr <- getRemoteAddr | 249 | bitfield <- getThisBitfield |
135 | -- when (addr `elem` unchoked && ixPiece bix `BF.member` bitfield) $ do | 250 | upload <- canUpload <$> getStatus |
136 | -- blk <- liftIO $ readBlock bix storage | 251 | when (upload && ixPiece bix `BF.member` bitfield) $ do |
137 | -- sendMsg (Piece blk) | 252 | blk <- liftIO $ readBlock bix storage |
138 | return () | 253 | sendMessage (Piece blk) |
139 | 254 | ||
140 | handleTransfer (Piece blk) = do | 255 | handleTransfer (Piece blk) = do |
141 | {- | 256 | Session {..} <- getSession |
142 | Session {..} <- getSession | 257 | isSuccess <- withStatusUpdates (pushBlock blk storage) |
143 | when (blockIx blk `PS.member` pendingSet) $ do | 258 | case isSuccess of |
144 | insert blk stalledSet | 259 | Nothing -> liftIO $ throwIO $ userError "block is not requested" |
145 | sendBroadcast have | 260 | Just isCompleted -> do |
146 | maybe send not interested | 261 | when isCompleted $ do |
147 | -} | 262 | sendBroadcast (Have (blkPiece blk)) |
148 | return () | 263 | -- maybe send not interested |
264 | tryFillRequestQueue | ||
149 | 265 | ||
150 | handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) | 266 | handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) |
151 | where | 267 | where |
152 | transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix | 268 | transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix |
153 | transferResponse _ _ = False | 269 | transferResponse _ _ = False |
154 | 270 | ||
271 | {----------------------------------------------------------------------- | ||
272 | -- Event loop | ||
273 | -----------------------------------------------------------------------} | ||
274 | |||
155 | handleMessage :: Message -> Wire Session () | 275 | handleMessage :: Message -> Wire Session () |
156 | handleMessage KeepAlive = return () | 276 | handleMessage KeepAlive = return () |
157 | handleMessage (Status s) = undefined | 277 | handleMessage (Status s) = handleStatus s |
158 | handleMessage (Available msg) = do | 278 | handleMessage (Available msg) = handleAvailable msg |
159 | thisBf <- getThisBitfield | ||
160 | case msg of | ||
161 | Have ix | ||
162 | | ix `BF.member` thisBf -> return () | ||
163 | | otherwise -> undefined | ||
164 | Bitfield bf | ||
165 | | bf `BF.isSubsetOf` thisBf -> return () | ||
166 | | otherwise -> undefined | ||
167 | |||
168 | handleMessage (Transfer msg) = handleTransfer msg | 279 | handleMessage (Transfer msg) = handleTransfer msg |
169 | handleMessage (Port n) = undefined | 280 | handleMessage (Port n) = undefined |
170 | handleMessage (Fast _) = undefined | 281 | handleMessage (Fast _) = undefined |
@@ -172,19 +283,17 @@ handleMessage (Extended _) = undefined | |||
172 | 283 | ||
173 | exchange :: Wire Session () | 284 | exchange :: Wire Session () |
174 | exchange = do | 285 | exchange = do |
175 | e <- recvMessage | 286 | bf <- getThisBitfield |
176 | liftIO $ print e | 287 | sendMessage (Bitfield bf) |
177 | 288 | awaitForever $ \ msg -> do | |
178 | type Exchange = StateT Session (ReaderT (Connection Session) IO) | 289 | logMessage msg |
179 | 290 | handleMessage msg | |
180 | --runExchange :: Exchange () -> [PeerAddr] -> IO () | ||
181 | --runExchange exchange peers = do | ||
182 | -- forM_ peers $ \ peer -> do | ||
183 | -- forkIO $ runReaderT (runStateT exchange session ) | ||
184 | 291 | ||
185 | data Event = NewMessage (PeerAddr IP) Message | 292 | data Event = NewMessage (PeerAddr IP) Message |
186 | | Timeout -- for scheduling | 293 | | Timeout -- for scheduling |
187 | 294 | ||
295 | type Exchange a = Wire Session a | ||
296 | |||
188 | awaitEvent :: Exchange Event | 297 | awaitEvent :: Exchange Event |
189 | awaitEvent = undefined | 298 | awaitEvent = undefined |
190 | 299 | ||