summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Session.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Session.hs')
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs285
1 files changed, 197 insertions, 88 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index 565ef7ab..0d4f3d02 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -1,7 +1,9 @@
1{-# LANGUAGE TemplateHaskell #-} 1{-# LANGUAGE FlexibleInstances #-}
2{-# LANGUAGE DeriveDataTypeable #-} 2{-# LANGUAGE TemplateHaskell #-}
3{-# LANGUAGE DeriveDataTypeable #-}
3module Network.BitTorrent.Exchange.Session 4module Network.BitTorrent.Exchange.Session
4 ( Session 5 ( Session
6 , LogFun
5 , newSession 7 , newSession
6 , closeSession 8 , closeSession
7 9
@@ -12,76 +14,141 @@ import Control.Applicative
12import Control.Concurrent 14import Control.Concurrent
13import Control.Exception 15import Control.Exception
14import Control.Lens 16import Control.Lens
17import Control.Monad.Logger
15import Control.Monad.Reader 18import Control.Monad.Reader
16import Control.Monad.State 19import Control.Monad.State
20import Data.ByteString as BS
21import Data.ByteString.Lazy as BL
22import Data.Conduit
17import Data.Function 23import Data.Function
18import Data.IORef 24import Data.IORef
25import Data.List as L
19import Data.Maybe 26import Data.Maybe
20import Data.Map as M 27import Data.Map as M
28import Data.Monoid
21import Data.Ord 29import Data.Ord
30import Data.Set as S
31import Data.Text as T
22import Data.Typeable 32import Data.Typeable
23import Text.PrettyPrint 33import Text.PrettyPrint hiding ((<>))
34import Text.PrettyPrint.Class
35import System.Log.FastLogger (LogStr, ToLogStr (..))
24 36
25import Data.Torrent (InfoDict (..)) 37import Data.Torrent (InfoDict (..))
26import Data.Torrent.Bitfield as BF 38import Data.Torrent.Bitfield as BF
27import Data.Torrent.InfoHash 39import Data.Torrent.InfoHash
40import Data.Torrent.Piece (pieceData, piPieceLength)
41import qualified Data.Torrent.Piece as Torrent (Piece (Piece))
28import Network.BitTorrent.Core 42import Network.BitTorrent.Core
29import Network.BitTorrent.Exchange.Assembler 43import Network.BitTorrent.Exchange.Assembler
30import Network.BitTorrent.Exchange.Block 44import Network.BitTorrent.Exchange.Block as Block
31import Network.BitTorrent.Exchange.Message 45import Network.BitTorrent.Exchange.Message
46import Network.BitTorrent.Exchange.Session.Status as SS
32import Network.BitTorrent.Exchange.Status 47import Network.BitTorrent.Exchange.Status
33import Network.BitTorrent.Exchange.Wire 48import Network.BitTorrent.Exchange.Wire
34import System.Torrent.Storage 49import System.Torrent.Storage
35 50
51{-----------------------------------------------------------------------
52-- Exceptions
53-----------------------------------------------------------------------}
54
55data ExchangeError
56 = InvalidRequest BlockIx StorageFailure
57 | CorruptedPiece PieceIx
58 deriving (Show, Typeable)
59
60instance Exception ExchangeError
61
62packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a
63packException f m = try m >>= either (throwIO . f) return
64
65{-----------------------------------------------------------------------
66-- Session
67-----------------------------------------------------------------------}
68
69data ConnectionEntry = ConnectionEntry
70 { initiatedBy :: !ChannelSide
71 , connection :: !(Connection Session)
72 }
36 73
37data Session = Session 74data Session = Session
38 { tpeerId :: PeerId 75 { tpeerId :: PeerId
39 , infohash :: InfoHash 76 , infohash :: InfoHash
40 , bitfield :: Bitfield 77 , storage :: Storage
41 , assembler :: Assembler 78 , status :: MVar SessionStatus
42 , storage :: Storage 79 , unchoked :: [PeerAddr IP]
43 , unchoked :: [PeerAddr IP] 80 , connections :: MVar (Map (PeerAddr IP) ConnectionEntry)
44 , connections :: MVar (Map (PeerAddr IP) (Connection Session)) 81 , broadcast :: Chan Message
45 , broadcast :: Chan Message 82 , logger :: LogFun
46 } 83 }
47 84
48newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer; 85-- | Logger function.
86type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
87
88newSession :: LogFun
89 -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer;
49 -> FilePath -- ^ root directory for content files; 90 -> FilePath -- ^ root directory for content files;
50 -> InfoDict -- ^ torrent info dictionary; 91 -> InfoDict -- ^ torrent info dictionary;
51 -> IO Session -- ^ 92 -> IO Session -- ^
52newSession addr rootPath dict = do 93newSession logFun addr rootPath dict = do
53 connVar <- newMVar M.empty 94 connVar <- newMVar M.empty
54 store <- openInfoDict ReadWriteEx rootPath dict 95 store <- openInfoDict ReadWriteEx rootPath dict
55 chan <- newChan 96 statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store))
97 (piPieceLength (idPieceInfo dict))
98 chan <- newChan
56 return Session 99 return Session
57 { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) 100 { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr)
58 , infohash = idInfoHash dict 101 , infohash = idInfoHash dict
59 , bitfield = BF.haveNone (totalPieces store) 102 , status = statusVar
60 , assembler = error "newSession"
61 , storage = store 103 , storage = store
62 , unchoked = [] 104 , unchoked = []
63 , connections = connVar 105 , connections = connVar
64 , broadcast = chan 106 , broadcast = chan
107 , logger = logFun
65 } 108 }
66 109
67closeSession :: Session -> IO () 110closeSession :: Session -> IO ()
68closeSession = undefined 111closeSession = undefined
69 112
113instance MonadIO m => MonadLogger (Connected Session m) where
114 monadLoggerLog loc src lvl msg = do
115 conn <- ask
116 ses <- asks connSession
117 addr <- asks connRemoteAddr
118 let addrSrc = src <> " @ " <> T.pack (render (pretty addr))
119 liftIO $ logger ses loc addrSrc lvl (toLogStr msg)
120
121logMessage :: Message -> Wire Session ()
122logMessage msg = logDebugN $ T.pack (render (pretty msg))
123
124logEvent :: Text -> Wire Session ()
125logEvent = logInfoN
126
127{-----------------------------------------------------------------------
128-- Connections
129-----------------------------------------------------------------------}
130-- TODO unmap storage on zero connections
131
70insert :: PeerAddr IP 132insert :: PeerAddr IP
71 -> {- Maybe Socket 133 -> {- Maybe Socket
72 -> -} Session -> IO () 134 -> -} Session -> IO ()
73insert addr ses @ Session {..} = do 135insert addr ses @ Session {..} = do
74 forkIO $ do 136 forkIO $ do
75 let caps = def 137 action `finally` runStatusUpdates status (resetPending addr)
76 let ecaps = def 138 return ()
77 let hs = Handshake def caps infohash tpeerId 139 where
78 chan <- dupChan broadcast 140 action = do
79 connectWire ses hs addr ecaps chan $ do 141 let caps = def
80 conn <- getConnection 142 let ecaps = def
143 let hs = Handshake def caps infohash tpeerId
144 chan <- dupChan broadcast
145 connectWire ses hs addr ecaps chan $ do
146 conn <- getConnection
81-- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn 147-- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn
82 exchange 148 resizeBitfield (totalPieces storage)
149 logEvent "Connection established"
150 exchange
83-- liftIO $ modifyMVar_ connections $ pure . M.delete addr 151-- liftIO $ modifyMVar_ connections $ pure . M.delete addr
84 return ()
85 152
86delete :: PeerAddr IP -> Session -> IO () 153delete :: PeerAddr IP -> Session -> IO ()
87delete = undefined 154delete = undefined
@@ -90,81 +157,125 @@ deleteAll :: Session -> IO ()
90deleteAll = undefined 157deleteAll = undefined
91 158
92{----------------------------------------------------------------------- 159{-----------------------------------------------------------------------
93-- Query 160-- Helpers
94-----------------------------------------------------------------------} 161-----------------------------------------------------------------------}
95 162
163withStatusUpdates :: StatusUpdates a -> Wire Session a
164withStatusUpdates m = do
165 Session {..} <- getSession
166 liftIO $ runStatusUpdates status m
167
96getThisBitfield :: Wire Session Bitfield 168getThisBitfield :: Wire Session Bitfield
97getThisBitfield = undefined 169getThisBitfield = do
170 ses <- getSession
171 liftIO $ SS.getBitfield (status ses)
172
173readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString)
174readBlock bix @ BlockIx {..} s = do
175 p <- packException (InvalidRequest bix) $ do readPiece ixPiece s
176 let chunk = BL.take (fromIntegral ixLength) $
177 BL.drop (fromIntegral ixOffset) (pieceData p)
178 if BL.length chunk == fromIntegral ixLength
179 then return $ Block ixPiece ixOffset chunk
180 else throwIO $ InvalidRequest bix (InvalidSize ixLength)
181
182sendBroadcast :: PeerMessage msg => msg -> Wire Session ()
183sendBroadcast msg = do
184 Session {..} <- getSession
185 ecaps <- getExtCaps
186 liftIO $ writeChan broadcast (envelop ecaps msg)
187
188{-----------------------------------------------------------------------
189-- Triggers
190-----------------------------------------------------------------------}
98 191
99{- 192fillRequestQueue :: Wire Session ()
100data PendingSet = PendingSet (Map (PeerAddr IP) [BlockIx]) 193fillRequestQueue = do
194 maxN <- getAdvertisedQueueLength
195 rbf <- getRemoteBitfield
196 addr <- connRemoteAddr <$> getConnection
197 blks <- withStatusUpdates $ do
198 n <- getRequestQueueLength addr
199 scheduleBlocks addr rbf (maxN - n)
200 mapM_ (sendMessage . Request) blks
101 201
102empty :: PendingSet 202tryFillRequestQueue :: Wire Session ()
103empty = undefined 203tryFillRequestQueue = do
204 allowed <- canDownload <$> getStatus
205 when allowed $ do
206 fillRequestQueue
104 207
105member :: PeerAddr IP -> BlockIx -> PendingSet -> Bool 208interesting :: Wire Session ()
106member addr bix = undefined 209interesting = do
210 addr <- connRemoteAddr <$> getConnection
211 logMessage (Status (Interested True))
212 sendMessage (Interested True)
213 logMessage (Status (Choking False))
214 sendMessage (Choking False)
215 tryFillRequestQueue
107 216
108insert :: PeerAddr IP -> BlockIx -> PendingSet -> PendingSet
109insert addr bix = undefined
110-}
111{----------------------------------------------------------------------- 217{-----------------------------------------------------------------------
112-- Event loop 218-- Incoming message handling
113-----------------------------------------------------------------------} 219-----------------------------------------------------------------------}
114{-
115data ExchangeError
116 = InvalidRequest BlockIx StorageFailure
117 | CorruptedPiece PieceIx
118 220
119packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a 221handleStatus :: StatusUpdate -> Wire Session ()
120packException f m = try >>= either (throwIO . f) m 222handleStatus s = do
223 updateConnStatus RemotePeer s
224 case s of
225 Interested _ -> return ()
226 Choking True -> do
227 addr <- connRemoteAddr <$> getConnection
228 withStatusUpdates (resetPending addr)
229 Choking False -> tryFillRequestQueue
121 230
122readBlock :: BlockIx -> Storage -> IO (Block ByteString) 231handleAvailable :: Available -> Wire Session ()
123readBlock bix @ BlockIx {..} s = do 232handleAvailable msg = do
124 p <- packException (InvalidRequest bix) $ do readPiece ixPiece storage 233 updateRemoteBitfield $ case msg of
125 let chunk = BS.take ixLength $ BS.drop ixOffset p 234 Have ix -> BF.insert ix
126 if BS.length chunk == ixLength 235 Bitfield bf -> const bf
127 then return chunk 236
128 else throwIO $ InvalidRequest bix (InvalidSize ixLength) 237 thisBf <- getThisBitfield
129-} 238 case msg of
239 Have ix
240 | ix `BF.member` thisBf -> return ()
241 | otherwise -> interesting
242 Bitfield bf
243 | bf `BF.isSubsetOf` thisBf -> return ()
244 | otherwise -> interesting
130 245
131handleTransfer :: Transfer -> Wire Session () 246handleTransfer :: Transfer -> Wire Session ()
132handleTransfer (Request bix) = do 247handleTransfer (Request bix) = do
133-- Session {..} <- getSession 248 Session {..} <- getSession
134-- addr <- getRemoteAddr 249 bitfield <- getThisBitfield
135-- when (addr `elem` unchoked && ixPiece bix `BF.member` bitfield) $ do 250 upload <- canUpload <$> getStatus
136-- blk <- liftIO $ readBlock bix storage 251 when (upload && ixPiece bix `BF.member` bitfield) $ do
137-- sendMsg (Piece blk) 252 blk <- liftIO $ readBlock bix storage
138 return () 253 sendMessage (Piece blk)
139 254
140handleTransfer (Piece blk) = do 255handleTransfer (Piece blk) = do
141{- 256 Session {..} <- getSession
142 Session {..} <- getSession 257 isSuccess <- withStatusUpdates (pushBlock blk storage)
143 when (blockIx blk `PS.member` pendingSet) $ do 258 case isSuccess of
144 insert blk stalledSet 259 Nothing -> liftIO $ throwIO $ userError "block is not requested"
145 sendBroadcast have 260 Just isCompleted -> do
146 maybe send not interested 261 when isCompleted $ do
147-} 262 sendBroadcast (Have (blkPiece blk))
148 return () 263-- maybe send not interested
264 tryFillRequestQueue
149 265
150handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) 266handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix))
151 where 267 where
152 transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix 268 transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix
153 transferResponse _ _ = False 269 transferResponse _ _ = False
154 270
271{-----------------------------------------------------------------------
272-- Event loop
273-----------------------------------------------------------------------}
274
155handleMessage :: Message -> Wire Session () 275handleMessage :: Message -> Wire Session ()
156handleMessage KeepAlive = return () 276handleMessage KeepAlive = return ()
157handleMessage (Status s) = undefined 277handleMessage (Status s) = handleStatus s
158handleMessage (Available msg) = do 278handleMessage (Available msg) = handleAvailable msg
159 thisBf <- getThisBitfield
160 case msg of
161 Have ix
162 | ix `BF.member` thisBf -> return ()
163 | otherwise -> undefined
164 Bitfield bf
165 | bf `BF.isSubsetOf` thisBf -> return ()
166 | otherwise -> undefined
167
168handleMessage (Transfer msg) = handleTransfer msg 279handleMessage (Transfer msg) = handleTransfer msg
169handleMessage (Port n) = undefined 280handleMessage (Port n) = undefined
170handleMessage (Fast _) = undefined 281handleMessage (Fast _) = undefined
@@ -172,19 +283,17 @@ handleMessage (Extended _) = undefined
172 283
173exchange :: Wire Session () 284exchange :: Wire Session ()
174exchange = do 285exchange = do
175 e <- recvMessage 286 bf <- getThisBitfield
176 liftIO $ print e 287 sendMessage (Bitfield bf)
177 288 awaitForever $ \ msg -> do
178type Exchange = StateT Session (ReaderT (Connection Session) IO) 289 logMessage msg
179 290 handleMessage msg
180--runExchange :: Exchange () -> [PeerAddr] -> IO ()
181--runExchange exchange peers = do
182-- forM_ peers $ \ peer -> do
183-- forkIO $ runReaderT (runStateT exchange session )
184 291
185data Event = NewMessage (PeerAddr IP) Message 292data Event = NewMessage (PeerAddr IP) Message
186 | Timeout -- for scheduling 293 | Timeout -- for scheduling
187 294
295type Exchange a = Wire Session a
296
188awaitEvent :: Exchange Event 297awaitEvent :: Exchange Event
189awaitEvent = undefined 298awaitEvent = undefined
190 299