1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE DeriveDataTypeable #-}
module Network.BitTorrent.Exchange.Session
( Session
, newSession
, closeSession
, Network.BitTorrent.Exchange.Session.insert
) where
import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Lens
import Control.Monad.Reader
import Control.Monad.State
import Data.Function
import Data.IORef
import Data.Maybe
import Data.Map as M
import Data.Ord
import Data.Typeable
import Text.PrettyPrint
import Data.Torrent (InfoDict (..))
import Data.Torrent.Bitfield as BF
import Data.Torrent.InfoHash
import Network.BitTorrent.Core
import Network.BitTorrent.Exchange.Assembler
import Network.BitTorrent.Exchange.Block
import Network.BitTorrent.Exchange.Message
import Network.BitTorrent.Exchange.Status
import Network.BitTorrent.Exchange.Wire
import System.Torrent.Storage
data Session = Session
{ tpeerId :: PeerId
, infohash :: InfoHash
, bitfield :: Bitfield
, assembler :: Assembler
, storage :: Storage
, unchoked :: [PeerAddr IP]
, connections :: MVar (Map (PeerAddr IP) (Connection Session))
, broadcast :: Chan Message
}
newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer;
-> FilePath -- ^ root directory for content files;
-> InfoDict -- ^ torrent info dictionary;
-> IO Session -- ^
newSession addr rootPath dict = do
connVar <- newMVar M.empty
store <- openInfoDict ReadWriteEx rootPath dict
chan <- newChan
return Session
{ tpeerId = fromMaybe (error "newSession: impossible") (peerId addr)
, infohash = idInfoHash dict
, bitfield = BF.haveNone (totalPieces store)
, assembler = error "newSession"
, storage = store
, unchoked = []
, connections = connVar
, broadcast = chan
}
closeSession :: Session -> IO ()
closeSession = undefined
insert :: PeerAddr IP
-> {- Maybe Socket
-> -} Session -> IO ()
insert addr ses @ Session {..} = do
forkIO $ do
let caps = def
let ecaps = def
let hs = Handshake def caps infohash tpeerId
chan <- dupChan broadcast
connectWire ses hs addr ecaps chan $ do
conn <- getConnection
-- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn
exchange
-- liftIO $ modifyMVar_ connections $ pure . M.delete addr
return ()
delete :: PeerAddr IP -> Session -> IO ()
delete = undefined
deleteAll :: Session -> IO ()
deleteAll = undefined
{-----------------------------------------------------------------------
-- Query
-----------------------------------------------------------------------}
getThisBitfield :: Wire Session Bitfield
getThisBitfield = undefined
{-
data PendingSet = PendingSet (Map (PeerAddr IP) [BlockIx])
empty :: PendingSet
empty = undefined
member :: PeerAddr IP -> BlockIx -> PendingSet -> Bool
member addr bix = undefined
insert :: PeerAddr IP -> BlockIx -> PendingSet -> PendingSet
insert addr bix = undefined
-}
{-----------------------------------------------------------------------
-- Event loop
-----------------------------------------------------------------------}
{-
data ExchangeError
= InvalidRequest BlockIx StorageFailure
| CorruptedPiece PieceIx
packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a
packException f m = try >>= either (throwIO . f) m
readBlock :: BlockIx -> Storage -> IO (Block ByteString)
readBlock bix @ BlockIx {..} s = do
p <- packException (InvalidRequest bix) $ do readPiece ixPiece storage
let chunk = BS.take ixLength $ BS.drop ixOffset p
if BS.length chunk == ixLength
then return chunk
else throwIO $ InvalidRequest bix (InvalidSize ixLength)
-}
handleTransfer :: Transfer -> Wire Session ()
handleTransfer (Request bix) = do
-- Session {..} <- getSession
-- addr <- getRemoteAddr
-- when (addr `elem` unchoked && ixPiece bix `BF.member` bitfield) $ do
-- blk <- liftIO $ readBlock bix storage
-- sendMsg (Piece blk)
return ()
handleTransfer (Piece blk) = do
{-
Session {..} <- getSession
when (blockIx blk `PS.member` pendingSet) $ do
insert blk stalledSet
sendBroadcast have
maybe send not interested
-}
return ()
handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix))
where
transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix
transferResponse _ _ = False
handleMessage :: Message -> Wire Session ()
handleMessage KeepAlive = return ()
handleMessage (Status s) = undefined
handleMessage (Available msg) = do
thisBf <- getThisBitfield
case msg of
Have ix
| ix `BF.member` thisBf -> return ()
| otherwise -> undefined
Bitfield bf
| bf `BF.isSubsetOf` thisBf -> return ()
| otherwise -> undefined
handleMessage (Transfer msg) = handleTransfer msg
handleMessage (Port n) = undefined
handleMessage (Fast _) = undefined
handleMessage (Extended _) = undefined
exchange :: Wire Session ()
exchange = do
e <- recvMessage
liftIO $ print e
type Exchange = StateT Session (ReaderT (Connection Session) IO)
--runExchange :: Exchange () -> [PeerAddr] -> IO ()
--runExchange exchange peers = do
-- forM_ peers $ \ peer -> do
-- forkIO $ runReaderT (runStateT exchange session )
data Event = NewMessage (PeerAddr IP) Message
| Timeout -- for scheduling
awaitEvent :: Exchange Event
awaitEvent = undefined
yieldEvent :: Exchange Event
yieldEvent = undefined
|