summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-02-14 22:28:15 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-02-14 22:28:15 +0400
commit933e7d37aeafac38eae806fb4556d59803a03270 (patch)
treebfe885dbe83fb0b23e456b75d70ab5c4bb88d1ae /src/Network/BitTorrent
parentc3594d389a0caba85ae4b5c3c97339c5705551c0 (diff)
Move piece manager to separate module
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r--src/Network/BitTorrent/Client/Handle.hs12
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs285
-rw-r--r--src/Network/BitTorrent/Exchange/Session/Status.hs175
-rw-r--r--src/Network/BitTorrent/Exchange/Wire.hs45
4 files changed, 418 insertions, 99 deletions
diff --git a/src/Network/BitTorrent/Client/Handle.hs b/src/Network/BitTorrent/Client/Handle.hs
index 7aaaf5aa..f539d53a 100644
--- a/src/Network/BitTorrent/Client/Handle.hs
+++ b/src/Network/BitTorrent/Client/Handle.hs
@@ -22,6 +22,7 @@ import Control.Applicative
22import Control.Concurrent 22import Control.Concurrent
23import Control.Monad 23import Control.Monad
24import Control.Monad.Trans 24import Control.Monad.Trans
25import Data.List as L
25import Data.HashMap.Strict as HM 26import Data.HashMap.Strict as HM
26 27
27import Data.Torrent 28import Data.Torrent
@@ -77,7 +78,8 @@ openTorrent rootPath t @ Torrent {..} = do
77 allocHandle ih $ do 78 allocHandle ih $ do
78 c @ Client {..} <- getClient 79 c @ Client {..} <- getClient
79 tses <- liftIO $ Tracker.newSession ih (trackerList t) 80 tses <- liftIO $ Tracker.newSession ih (trackerList t)
80 eses <- liftIO $ Exchange.newSession (externalAddr c) rootPath tInfoDict 81 eses <- liftIO $ Exchange.newSession clientLogger (externalAddr c) rootPath
82 tInfoDict
81 return $ Handle ih (idPrivate tInfoDict) tses eses 83 return $ Handle ih (idPrivate tInfoDict) tses eses
82 84
83-- | Use 'nullMagnet' to open handle from 'InfoHash'. 85-- | Use 'nullMagnet' to open handle from 'InfoHash'.
@@ -109,9 +111,11 @@ start Handle {..} = do
109 liftIO $ Tracker.notify trackerManager trackers Tracker.Started 111 liftIO $ Tracker.notify trackerManager trackers Tracker.Started
110 unless private $ do 112 unless private $ do
111 liftDHT $ DHT.insert topic undefined 113 liftDHT $ DHT.insert topic undefined
112 peers <- liftIO $ askPeers trackerManager trackers 114 liftIO $ do
113 forM_ peers $ \ peer -> do 115 peers <- askPeers trackerManager trackers
114 liftIO $ Exchange.insert peer exchange 116 print $ "got: " ++ show (L.length peers) ++ " peers"
117 forM_ peers $ \ peer -> do
118 Exchange.insert peer exchange
115 119
116-- | Stop downloading this torrent. 120-- | Stop downloading this torrent.
117pause :: Handle -> BitTorrent () 121pause :: Handle -> BitTorrent ()
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index 565ef7ab..0d4f3d02 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -1,7 +1,9 @@
1{-# LANGUAGE TemplateHaskell #-} 1{-# LANGUAGE FlexibleInstances #-}
2{-# LANGUAGE DeriveDataTypeable #-} 2{-# LANGUAGE TemplateHaskell #-}
3{-# LANGUAGE DeriveDataTypeable #-}
3module Network.BitTorrent.Exchange.Session 4module Network.BitTorrent.Exchange.Session
4 ( Session 5 ( Session
6 , LogFun
5 , newSession 7 , newSession
6 , closeSession 8 , closeSession
7 9
@@ -12,76 +14,141 @@ import Control.Applicative
12import Control.Concurrent 14import Control.Concurrent
13import Control.Exception 15import Control.Exception
14import Control.Lens 16import Control.Lens
17import Control.Monad.Logger
15import Control.Monad.Reader 18import Control.Monad.Reader
16import Control.Monad.State 19import Control.Monad.State
20import Data.ByteString as BS
21import Data.ByteString.Lazy as BL
22import Data.Conduit
17import Data.Function 23import Data.Function
18import Data.IORef 24import Data.IORef
25import Data.List as L
19import Data.Maybe 26import Data.Maybe
20import Data.Map as M 27import Data.Map as M
28import Data.Monoid
21import Data.Ord 29import Data.Ord
30import Data.Set as S
31import Data.Text as T
22import Data.Typeable 32import Data.Typeable
23import Text.PrettyPrint 33import Text.PrettyPrint hiding ((<>))
34import Text.PrettyPrint.Class
35import System.Log.FastLogger (LogStr, ToLogStr (..))
24 36
25import Data.Torrent (InfoDict (..)) 37import Data.Torrent (InfoDict (..))
26import Data.Torrent.Bitfield as BF 38import Data.Torrent.Bitfield as BF
27import Data.Torrent.InfoHash 39import Data.Torrent.InfoHash
40import Data.Torrent.Piece (pieceData, piPieceLength)
41import qualified Data.Torrent.Piece as Torrent (Piece (Piece))
28import Network.BitTorrent.Core 42import Network.BitTorrent.Core
29import Network.BitTorrent.Exchange.Assembler 43import Network.BitTorrent.Exchange.Assembler
30import Network.BitTorrent.Exchange.Block 44import Network.BitTorrent.Exchange.Block as Block
31import Network.BitTorrent.Exchange.Message 45import Network.BitTorrent.Exchange.Message
46import Network.BitTorrent.Exchange.Session.Status as SS
32import Network.BitTorrent.Exchange.Status 47import Network.BitTorrent.Exchange.Status
33import Network.BitTorrent.Exchange.Wire 48import Network.BitTorrent.Exchange.Wire
34import System.Torrent.Storage 49import System.Torrent.Storage
35 50
51{-----------------------------------------------------------------------
52-- Exceptions
53-----------------------------------------------------------------------}
54
55data ExchangeError
56 = InvalidRequest BlockIx StorageFailure
57 | CorruptedPiece PieceIx
58 deriving (Show, Typeable)
59
60instance Exception ExchangeError
61
62packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a
63packException f m = try m >>= either (throwIO . f) return
64
65{-----------------------------------------------------------------------
66-- Session
67-----------------------------------------------------------------------}
68
69data ConnectionEntry = ConnectionEntry
70 { initiatedBy :: !ChannelSide
71 , connection :: !(Connection Session)
72 }
36 73
37data Session = Session 74data Session = Session
38 { tpeerId :: PeerId 75 { tpeerId :: PeerId
39 , infohash :: InfoHash 76 , infohash :: InfoHash
40 , bitfield :: Bitfield 77 , storage :: Storage
41 , assembler :: Assembler 78 , status :: MVar SessionStatus
42 , storage :: Storage 79 , unchoked :: [PeerAddr IP]
43 , unchoked :: [PeerAddr IP] 80 , connections :: MVar (Map (PeerAddr IP) ConnectionEntry)
44 , connections :: MVar (Map (PeerAddr IP) (Connection Session)) 81 , broadcast :: Chan Message
45 , broadcast :: Chan Message 82 , logger :: LogFun
46 } 83 }
47 84
48newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer; 85-- | Logger function.
86type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
87
88newSession :: LogFun
89 -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer;
49 -> FilePath -- ^ root directory for content files; 90 -> FilePath -- ^ root directory for content files;
50 -> InfoDict -- ^ torrent info dictionary; 91 -> InfoDict -- ^ torrent info dictionary;
51 -> IO Session -- ^ 92 -> IO Session -- ^
52newSession addr rootPath dict = do 93newSession logFun addr rootPath dict = do
53 connVar <- newMVar M.empty 94 connVar <- newMVar M.empty
54 store <- openInfoDict ReadWriteEx rootPath dict 95 store <- openInfoDict ReadWriteEx rootPath dict
55 chan <- newChan 96 statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store))
97 (piPieceLength (idPieceInfo dict))
98 chan <- newChan
56 return Session 99 return Session
57 { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) 100 { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr)
58 , infohash = idInfoHash dict 101 , infohash = idInfoHash dict
59 , bitfield = BF.haveNone (totalPieces store) 102 , status = statusVar
60 , assembler = error "newSession"
61 , storage = store 103 , storage = store
62 , unchoked = [] 104 , unchoked = []
63 , connections = connVar 105 , connections = connVar
64 , broadcast = chan 106 , broadcast = chan
107 , logger = logFun
65 } 108 }
66 109
67closeSession :: Session -> IO () 110closeSession :: Session -> IO ()
68closeSession = undefined 111closeSession = undefined
69 112
113instance MonadIO m => MonadLogger (Connected Session m) where
114 monadLoggerLog loc src lvl msg = do
115 conn <- ask
116 ses <- asks connSession
117 addr <- asks connRemoteAddr
118 let addrSrc = src <> " @ " <> T.pack (render (pretty addr))
119 liftIO $ logger ses loc addrSrc lvl (toLogStr msg)
120
121logMessage :: Message -> Wire Session ()
122logMessage msg = logDebugN $ T.pack (render (pretty msg))
123
124logEvent :: Text -> Wire Session ()
125logEvent = logInfoN
126
127{-----------------------------------------------------------------------
128-- Connections
129-----------------------------------------------------------------------}
130-- TODO unmap storage on zero connections
131
70insert :: PeerAddr IP 132insert :: PeerAddr IP
71 -> {- Maybe Socket 133 -> {- Maybe Socket
72 -> -} Session -> IO () 134 -> -} Session -> IO ()
73insert addr ses @ Session {..} = do 135insert addr ses @ Session {..} = do
74 forkIO $ do 136 forkIO $ do
75 let caps = def 137 action `finally` runStatusUpdates status (resetPending addr)
76 let ecaps = def 138 return ()
77 let hs = Handshake def caps infohash tpeerId 139 where
78 chan <- dupChan broadcast 140 action = do
79 connectWire ses hs addr ecaps chan $ do 141 let caps = def
80 conn <- getConnection 142 let ecaps = def
143 let hs = Handshake def caps infohash tpeerId
144 chan <- dupChan broadcast
145 connectWire ses hs addr ecaps chan $ do
146 conn <- getConnection
81-- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn 147-- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn
82 exchange 148 resizeBitfield (totalPieces storage)
149 logEvent "Connection established"
150 exchange
83-- liftIO $ modifyMVar_ connections $ pure . M.delete addr 151-- liftIO $ modifyMVar_ connections $ pure . M.delete addr
84 return ()
85 152
86delete :: PeerAddr IP -> Session -> IO () 153delete :: PeerAddr IP -> Session -> IO ()
87delete = undefined 154delete = undefined
@@ -90,81 +157,125 @@ deleteAll :: Session -> IO ()
90deleteAll = undefined 157deleteAll = undefined
91 158
92{----------------------------------------------------------------------- 159{-----------------------------------------------------------------------
93-- Query 160-- Helpers
94-----------------------------------------------------------------------} 161-----------------------------------------------------------------------}
95 162
163withStatusUpdates :: StatusUpdates a -> Wire Session a
164withStatusUpdates m = do
165 Session {..} <- getSession
166 liftIO $ runStatusUpdates status m
167
96getThisBitfield :: Wire Session Bitfield 168getThisBitfield :: Wire Session Bitfield
97getThisBitfield = undefined 169getThisBitfield = do
170 ses <- getSession
171 liftIO $ SS.getBitfield (status ses)
172
173readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString)
174readBlock bix @ BlockIx {..} s = do
175 p <- packException (InvalidRequest bix) $ do readPiece ixPiece s
176 let chunk = BL.take (fromIntegral ixLength) $
177 BL.drop (fromIntegral ixOffset) (pieceData p)
178 if BL.length chunk == fromIntegral ixLength
179 then return $ Block ixPiece ixOffset chunk
180 else throwIO $ InvalidRequest bix (InvalidSize ixLength)
181
182sendBroadcast :: PeerMessage msg => msg -> Wire Session ()
183sendBroadcast msg = do
184 Session {..} <- getSession
185 ecaps <- getExtCaps
186 liftIO $ writeChan broadcast (envelop ecaps msg)
187
188{-----------------------------------------------------------------------
189-- Triggers
190-----------------------------------------------------------------------}
98 191
99{- 192fillRequestQueue :: Wire Session ()
100data PendingSet = PendingSet (Map (PeerAddr IP) [BlockIx]) 193fillRequestQueue = do
194 maxN <- getAdvertisedQueueLength
195 rbf <- getRemoteBitfield
196 addr <- connRemoteAddr <$> getConnection
197 blks <- withStatusUpdates $ do
198 n <- getRequestQueueLength addr
199 scheduleBlocks addr rbf (maxN - n)
200 mapM_ (sendMessage . Request) blks
101 201
102empty :: PendingSet 202tryFillRequestQueue :: Wire Session ()
103empty = undefined 203tryFillRequestQueue = do
204 allowed <- canDownload <$> getStatus
205 when allowed $ do
206 fillRequestQueue
104 207
105member :: PeerAddr IP -> BlockIx -> PendingSet -> Bool 208interesting :: Wire Session ()
106member addr bix = undefined 209interesting = do
210 addr <- connRemoteAddr <$> getConnection
211 logMessage (Status (Interested True))
212 sendMessage (Interested True)
213 logMessage (Status (Choking False))
214 sendMessage (Choking False)
215 tryFillRequestQueue
107 216
108insert :: PeerAddr IP -> BlockIx -> PendingSet -> PendingSet
109insert addr bix = undefined
110-}
111{----------------------------------------------------------------------- 217{-----------------------------------------------------------------------
112-- Event loop 218-- Incoming message handling
113-----------------------------------------------------------------------} 219-----------------------------------------------------------------------}
114{-
115data ExchangeError
116 = InvalidRequest BlockIx StorageFailure
117 | CorruptedPiece PieceIx
118 220
119packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a 221handleStatus :: StatusUpdate -> Wire Session ()
120packException f m = try >>= either (throwIO . f) m 222handleStatus s = do
223 updateConnStatus RemotePeer s
224 case s of
225 Interested _ -> return ()
226 Choking True -> do
227 addr <- connRemoteAddr <$> getConnection
228 withStatusUpdates (resetPending addr)
229 Choking False -> tryFillRequestQueue
121 230
122readBlock :: BlockIx -> Storage -> IO (Block ByteString) 231handleAvailable :: Available -> Wire Session ()
123readBlock bix @ BlockIx {..} s = do 232handleAvailable msg = do
124 p <- packException (InvalidRequest bix) $ do readPiece ixPiece storage 233 updateRemoteBitfield $ case msg of
125 let chunk = BS.take ixLength $ BS.drop ixOffset p 234 Have ix -> BF.insert ix
126 if BS.length chunk == ixLength 235 Bitfield bf -> const bf
127 then return chunk 236
128 else throwIO $ InvalidRequest bix (InvalidSize ixLength) 237 thisBf <- getThisBitfield
129-} 238 case msg of
239 Have ix
240 | ix `BF.member` thisBf -> return ()
241 | otherwise -> interesting
242 Bitfield bf
243 | bf `BF.isSubsetOf` thisBf -> return ()
244 | otherwise -> interesting
130 245
131handleTransfer :: Transfer -> Wire Session () 246handleTransfer :: Transfer -> Wire Session ()
132handleTransfer (Request bix) = do 247handleTransfer (Request bix) = do
133-- Session {..} <- getSession 248 Session {..} <- getSession
134-- addr <- getRemoteAddr 249 bitfield <- getThisBitfield
135-- when (addr `elem` unchoked && ixPiece bix `BF.member` bitfield) $ do 250 upload <- canUpload <$> getStatus
136-- blk <- liftIO $ readBlock bix storage 251 when (upload && ixPiece bix `BF.member` bitfield) $ do
137-- sendMsg (Piece blk) 252 blk <- liftIO $ readBlock bix storage
138 return () 253 sendMessage (Piece blk)
139 254
140handleTransfer (Piece blk) = do 255handleTransfer (Piece blk) = do
141{- 256 Session {..} <- getSession
142 Session {..} <- getSession 257 isSuccess <- withStatusUpdates (pushBlock blk storage)
143 when (blockIx blk `PS.member` pendingSet) $ do 258 case isSuccess of
144 insert blk stalledSet 259 Nothing -> liftIO $ throwIO $ userError "block is not requested"
145 sendBroadcast have 260 Just isCompleted -> do
146 maybe send not interested 261 when isCompleted $ do
147-} 262 sendBroadcast (Have (blkPiece blk))
148 return () 263-- maybe send not interested
264 tryFillRequestQueue
149 265
150handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) 266handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix))
151 where 267 where
152 transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix 268 transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix
153 transferResponse _ _ = False 269 transferResponse _ _ = False
154 270
271{-----------------------------------------------------------------------
272-- Event loop
273-----------------------------------------------------------------------}
274
155handleMessage :: Message -> Wire Session () 275handleMessage :: Message -> Wire Session ()
156handleMessage KeepAlive = return () 276handleMessage KeepAlive = return ()
157handleMessage (Status s) = undefined 277handleMessage (Status s) = handleStatus s
158handleMessage (Available msg) = do 278handleMessage (Available msg) = handleAvailable msg
159 thisBf <- getThisBitfield
160 case msg of
161 Have ix
162 | ix `BF.member` thisBf -> return ()
163 | otherwise -> undefined
164 Bitfield bf
165 | bf `BF.isSubsetOf` thisBf -> return ()
166 | otherwise -> undefined
167
168handleMessage (Transfer msg) = handleTransfer msg 279handleMessage (Transfer msg) = handleTransfer msg
169handleMessage (Port n) = undefined 280handleMessage (Port n) = undefined
170handleMessage (Fast _) = undefined 281handleMessage (Fast _) = undefined
@@ -172,19 +283,17 @@ handleMessage (Extended _) = undefined
172 283
173exchange :: Wire Session () 284exchange :: Wire Session ()
174exchange = do 285exchange = do
175 e <- recvMessage 286 bf <- getThisBitfield
176 liftIO $ print e 287 sendMessage (Bitfield bf)
177 288 awaitForever $ \ msg -> do
178type Exchange = StateT Session (ReaderT (Connection Session) IO) 289 logMessage msg
179 290 handleMessage msg
180--runExchange :: Exchange () -> [PeerAddr] -> IO ()
181--runExchange exchange peers = do
182-- forM_ peers $ \ peer -> do
183-- forkIO $ runReaderT (runStateT exchange session )
184 291
185data Event = NewMessage (PeerAddr IP) Message 292data Event = NewMessage (PeerAddr IP) Message
186 | Timeout -- for scheduling 293 | Timeout -- for scheduling
187 294
295type Exchange a = Wire Session a
296
188awaitEvent :: Exchange Event 297awaitEvent :: Exchange Event
189awaitEvent = undefined 298awaitEvent = undefined
190 299
diff --git a/src/Network/BitTorrent/Exchange/Session/Status.hs b/src/Network/BitTorrent/Exchange/Session/Status.hs
new file mode 100644
index 00000000..565c3bf3
--- /dev/null
+++ b/src/Network/BitTorrent/Exchange/Session/Status.hs
@@ -0,0 +1,175 @@
1module Network.BitTorrent.Exchange.Session.Status
2 ( -- * Environment
3 StatusUpdates
4 , runStatusUpdates
5
6 -- * Status
7 , SessionStatus
8 , sessionStatus
9
10 -- * Query
11 , getBitfield
12 , getRequestQueueLength
13
14 -- * Control
15 , scheduleBlocks
16 , resetPending
17 , pushBlock
18 ) where
19
20import Control.Applicative
21import Control.Concurrent
22import Control.Monad.State
23import Data.ByteString.Lazy as BL
24import Data.Default
25import Data.List as L
26import Data.Maybe
27import Data.Map as M
28import Data.Set as S
29import Data.Tuple
30
31import Data.Torrent.Piece
32import Data.Torrent.Bitfield as BF
33import Network.BitTorrent.Core
34import Network.BitTorrent.Exchange.Block as Block
35import System.Torrent.Storage (Storage, writePiece)
36
37
38{-----------------------------------------------------------------------
39-- Piece entry
40-----------------------------------------------------------------------}
41
42data PieceEntry = PieceEntry
43 { pending :: [(PeerAddr IP, BlockIx)]
44 , stalled :: Bucket
45 }
46
47pieceEntry :: PieceSize -> PieceEntry
48pieceEntry s = PieceEntry [] (Block.empty s)
49
50isEmpty :: PieceEntry -> Bool
51isEmpty PieceEntry {..} = L.null pending && Block.null stalled
52
53holes :: PieceIx -> PieceEntry -> [BlockIx]
54holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled)
55 where
56 mkBlockIx (off, sz) = BlockIx pix off sz
57
58{-----------------------------------------------------------------------
59-- Session status
60-----------------------------------------------------------------------}
61
62data SessionStatus = SessionStatus
63 { inprogress :: !(Map PieceIx PieceEntry)
64 , bitfield :: !Bitfield
65 , pieceSize :: !PieceSize
66 }
67
68sessionStatus :: Bitfield -> PieceSize -> SessionStatus
69sessionStatus bf ps = SessionStatus
70 { inprogress = M.empty
71 , bitfield = bf
72 , pieceSize = ps
73 }
74
75type StatusUpdates a = StateT SessionStatus IO a
76
77-- |
78runStatusUpdates :: MVar SessionStatus -> StatusUpdates a -> IO a
79runStatusUpdates var m = modifyMVar var (fmap swap . runStateT m)
80
81getBitfield :: MVar SessionStatus -> IO Bitfield
82getBitfield var = bitfield <$> readMVar var
83
84getRequestQueueLength :: PeerAddr IP -> StatusUpdates Int
85getRequestQueueLength addr = do
86 m <- gets (M.elems . M.map (L.filter ((==) addr . fst) . pending) . inprogress)
87 return $ L.sum $ L.map L.length m
88
89modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> StatusUpdates ()
90modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s
91 { inprogress = alter (g pieceSize) pix inprogress }
92 where
93 g s = h . f . fromMaybe (pieceEntry s)
94 h e
95 | isEmpty e = Nothing
96 | otherwise = Just e
97
98{-----------------------------------------------------------------------
99-- Piece download
100-----------------------------------------------------------------------}
101
102-- TODO choose block nearest to pending or stalled sets to reduce disk
103-- seeks on remote machines
104chooseBlocks :: [BlockIx] -> Int -> StatusUpdates [BlockIx]
105chooseBlocks xs n = return (L.take n xs)
106
107-- TODO use selection strategies from Exchange.Selector
108choosePiece :: Bitfield -> StatusUpdates (Maybe PieceIx)
109choosePiece bf
110 | BF.null bf = return $ Nothing
111 | otherwise = return $ Just $ BF.findMin bf
112
113scheduleBlocks :: PeerAddr IP -> Bitfield -> Int -> StatusUpdates [BlockIx]
114scheduleBlocks addr maskBF n = do
115 SessionStatus {..} <- get
116 let wantPieces = maskBF `BF.difference` bitfield
117 let wantBlocks = L.concat $ M.elems $ M.mapWithKey holes $
118 M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) inprogress
119
120 bixs <- if L.null wantBlocks
121 then do
122 mpix <- choosePiece wantPieces
123 case mpix of -- TODO return 'n' blocks
124 Nothing -> return []
125 Just pix -> return [leadingBlock pix defaultTransferSize]
126 else chooseBlocks wantBlocks n
127
128 forM_ bixs $ \ bix -> do
129 modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e
130 { pending = (addr, bix) : pending }
131
132 return bixs
133
134
135-- | Remove all pending block requests to the remote peer. May be used
136-- when:
137--
138-- * a peer closes connection;
139--
140-- * remote peer choked this peer;
141--
142-- * timeout expired.
143--
144resetPending :: PeerAddr IP -> StatusUpdates ()
145resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) }
146 where
147 reset = fmap $ \ e -> e
148 { pending = L.filter (not . (==) addr . fst) (pending e) }
149
150-- | MAY write to storage, if a new piece have been completed.
151pushBlock :: Block BL.ByteString -> Storage -> StatusUpdates (Maybe Bool)
152pushBlock blk @ Block {..} storage = do
153 mpe <- gets (M.lookup blkPiece . inprogress)
154 case mpe of
155 Nothing -> return Nothing
156 Just (pe @ PieceEntry {..})
157 | blockIx blk `L.notElem` fmap snd pending -> return Nothing
158 | otherwise -> do
159 let bkt' = Block.insertLazy blkOffset blkData stalled
160 case toPiece bkt' of
161 Nothing -> do
162 modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e
163 { pending = L.filter ((==) (blockIx blk) . snd) pending
164 , stalled = bkt'
165 }
166 return (Just False)
167
168 Just pieceData -> do
169 -- TODO verify
170 liftIO $ writePiece (Piece blkPiece pieceData) storage
171 modify $ \ s @ SessionStatus {..} -> s
172 { inprogress = M.delete blkPiece inprogress
173 , bitfield = BF.insert blkPiece bitfield
174 }
175 return (Just True)
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs
index 1fca6a66..8873546d 100644
--- a/src/Network/BitTorrent/Exchange/Wire.hs
+++ b/src/Network/BitTorrent/Exchange/Wire.hs
@@ -15,7 +15,8 @@
15{-# LANGUAGE GeneralizedNewtypeDeriving #-} 15{-# LANGUAGE GeneralizedNewtypeDeriving #-}
16module Network.BitTorrent.Exchange.Wire 16module Network.BitTorrent.Exchange.Wire
17 ( -- * Wire 17 ( -- * Wire
18 Wire 18 Connected
19 , Wire
19 20
20 -- ** Exceptions 21 -- ** Exceptions
21 , ChannelSide (..) 22 , ChannelSide (..)
@@ -38,12 +39,14 @@ module Network.BitTorrent.Exchange.Wire
38 39
39 -- ** Connection 40 -- ** Connection
40 , Connection 41 , Connection
42 , connRemoteAddr
41 , connProtocol 43 , connProtocol
42 , connCaps 44 , connCaps
43 , connTopic 45 , connTopic
44 , connRemotePeerId 46 , connRemotePeerId
45 , connThisPeerId 47 , connThisPeerId
46 , connOptions 48 , connOptions
49 , connSession
47 50
48 -- ** Setup 51 -- ** Setup
49 , runWire 52 , runWire
@@ -55,6 +58,7 @@ module Network.BitTorrent.Exchange.Wire
55 , recvMessage 58 , recvMessage
56 , sendMessage 59 , sendMessage
57 , filterQueue 60 , filterQueue
61 , getAdvertisedQueueLength
58 62
59 -- ** Query 63 -- ** Query
60 , getConnection 64 , getConnection
@@ -93,6 +97,7 @@ import Network.Socket.ByteString as BS
93import Text.PrettyPrint as PP hiding (($$), (<>)) 97import Text.PrettyPrint as PP hiding (($$), (<>))
94import Text.PrettyPrint.Class 98import Text.PrettyPrint.Class
95import Text.Show.Functions 99import Text.Show.Functions
100import System.Log.FastLogger (ToLogStr(..))
96import System.Timeout 101import System.Timeout
97 102
98import Data.BEncode as BE 103import Data.BEncode as BE
@@ -190,13 +195,15 @@ errorPenalty (DisallowedMessage _ _) = 1
190 195
191-- | Exceptions used to interrupt the current P2P session. 196-- | Exceptions used to interrupt the current P2P session.
192data WireFailure 197data WireFailure
198 = ConnectionRefused IOError
199
193 -- | Force termination of wire connection. 200 -- | Force termination of wire connection.
194 -- 201 --
195 -- Normally you should throw only this exception from event loop 202 -- Normally you should throw only this exception from event loop
196 -- using 'disconnectPeer', other exceptions are thrown 203 -- using 'disconnectPeer', other exceptions are thrown
197 -- automatically by functions from this module. 204 -- automatically by functions from this module.
198 -- 205 --
199 = DisconnectPeer 206 | DisconnectPeer
200 207
201 -- | A peer not responding and did not send a 'KeepAlive' message 208 -- | A peer not responding and did not send a 'KeepAlive' message
202 -- for a specified period of time. 209 -- for a specified period of time.
@@ -464,10 +471,12 @@ makeLenses ''ConnectionState
464 471
465-- | Connection keep various info about both peers. 472-- | Connection keep various info about both peers.
466data Connection s = Connection 473data Connection s = Connection
467 { -- | /Both/ peers handshaked with this protocol string. The only 474 { connRemoteAddr :: !(PeerAddr IP)
475
476 -- | /Both/ peers handshaked with this protocol string. The only
468 -- value is \"Bittorrent Protocol\" but this can be changed in 477 -- value is \"Bittorrent Protocol\" but this can be changed in
469 -- future. 478 -- future.
470 connProtocol :: !ProtocolName 479 , connProtocol :: !ProtocolName
471 480
472 -- | Set of enabled core extensions, i.e. the pre BEP10 extension 481 -- | Set of enabled core extensions, i.e. the pre BEP10 extension
473 -- mechanism. This value is used to check if a message is allowed 482 -- mechanism. This value is used to check if a message is allowed
@@ -503,6 +512,17 @@ data Connection s = Connection
503instance Pretty (Connection s) where 512instance Pretty (Connection s) where
504 pretty Connection {..} = "Connection" 513 pretty Connection {..} = "Connection"
505 514
515instance ToLogStr (Connection s) where
516 toLogStr Connection {..} = mconcat
517 [ toLogStr (show connRemoteAddr)
518 , toLogStr (show connProtocol)
519 , toLogStr (show connCaps)
520 , toLogStr (show connTopic)
521 , toLogStr (show connRemotePeerId)
522 , toLogStr (show connThisPeerId)
523 , toLogStr (show connOptions)
524 ]
525
506-- TODO check extended messages too 526-- TODO check extended messages too
507isAllowed :: Connection s -> Message -> Bool 527isAllowed :: Connection s -> Message -> Bool
508isAllowed Connection {..} msg 528isAllowed Connection {..} msg
@@ -592,6 +612,15 @@ getConnection = lift ask
592getSession :: Wire s s 612getSession :: Wire s s
593getSession = lift (asks connSession) 613getSession = lift (asks connSession)
594 614
615-- TODO configurable
616defQueueLength :: Int
617defQueueLength = 1
618
619getAdvertisedQueueLength :: Wire s Int
620getAdvertisedQueueLength = do
621 ExtendedHandshake {..} <- getRemoteEhs
622 return $ fromMaybe defQueueLength ehsQueueLength
623
595{----------------------------------------------------------------------- 624{-----------------------------------------------------------------------
596-- Wrapper 625-- Wrapper
597-----------------------------------------------------------------------} 626-----------------------------------------------------------------------}
@@ -685,8 +714,9 @@ reconnect = undefined
685-- 714--
686connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message 715connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message
687 -> Wire s () -> IO () 716 -> Wire s () -> IO ()
688connectWire session hs addr extCaps chan wire = 717connectWire session hs addr extCaps chan wire = do
689 bracket (peerSocket Stream addr) close $ \ sock -> do 718 let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return
719 bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do
690 hs' <- initiateHandshake sock hs 720 hs' <- initiateHandshake sock hs
691 721
692 Prelude.mapM_ (\(t,e) -> unless t $ throwIO $ ProtocolError e) [ 722 Prelude.mapM_ (\(t,e) -> unless t $ throwIO $ ProtocolError e) [
@@ -723,7 +753,8 @@ connectWire session hs addr extCaps chan wire =
723 (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock) 753 (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock)
724 (killThread) $ \ _ -> 754 (killThread) $ \ _ ->
725 runWire wire' sock chan $ Connection 755 runWire wire' sock chan $ Connection
726 { connProtocol = hsProtocol hs 756 { connRemoteAddr = addr
757 , connProtocol = hsProtocol hs
727 , connCaps = caps 758 , connCaps = caps
728 , connTopic = hsInfoHash hs 759 , connTopic = hsInfoHash hs
729 , connRemotePeerId = hsPeerId hs' 760 , connRemotePeerId = hsPeerId hs'