summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Session.hs
blob: dce2572f74696aea6b820c99a86aafeff0541f70 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
{-# LANGUAGE TemplateHaskell    #-}
{-# LANGUAGE DeriveDataTypeable #-}
module Network.BitTorrent.Exchange.Session
       ( Session
       , newSession
       , closeSession

       , Network.BitTorrent.Exchange.Session.insert
       ) where

import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Lens
import Control.Monad.Reader
import Control.Monad.State
import Data.Function
import Data.IORef
import Data.Maybe
import Data.Map as M
import Data.Ord
import Data.Typeable
import Text.PrettyPrint

import Data.Torrent (InfoDict (..))
import Data.Torrent.Bitfield as BF
import Data.Torrent.InfoHash
import Network.BitTorrent.Core
import Network.BitTorrent.Exchange.Assembler
import Network.BitTorrent.Exchange.Block
import Network.BitTorrent.Exchange.Message
import Network.BitTorrent.Exchange.Status
import Network.BitTorrent.Exchange.Wire
import System.Torrent.Storage


data Session = Session
  { tpeerId     :: PeerId
  , infohash    :: InfoHash
  , bitfield    :: Bitfield
  , assembler   :: Assembler
  , storage     :: Storage
  , unchoked    :: [PeerAddr IP]
  , connections :: MVar (Map (PeerAddr IP) (Connection Session))
  , broadcast   :: Chan Message
  }

newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer;
           -> FilePath            -- ^ root directory for content files;
           -> InfoDict            -- ^ torrent info dictionary;
           -> IO Session          -- ^
newSession addr rootPath dict = do
  connVar <- newMVar M.empty
  store   <- openInfoDict ReadWriteEx rootPath dict
  chan    <- newChan
  return Session
    { tpeerId     = fromMaybe (error "newSession: impossible") (peerId addr)
    , infohash    = idInfoHash dict
    , bitfield    = BF.haveNone (totalPieces store)
    , assembler   = error "newSession"
    , storage     = store
    , unchoked    = []
    , connections = connVar
    , broadcast   = chan
    }

closeSession :: Session -> IO ()
closeSession = undefined

insert :: PeerAddr IP
       -> {- Maybe Socket
       -> -} Session -> IO ()
insert addr ses @ Session {..} = do
  forkIO $ do
    let caps  = def
    let ecaps = def
    let hs = Handshake def caps infohash tpeerId
    chan <- dupChan broadcast
    connectWire ses hs addr ecaps chan $ do
      conn <- getConnection
--      liftIO $ modifyMVar_ connections $ pure . M.insert addr conn
      exchange
--      liftIO $ modifyMVar_ connections $ pure . M.delete addr
  return ()

delete :: PeerAddr IP -> Session -> IO ()
delete = undefined

deleteAll :: Session -> IO ()
deleteAll = undefined

{-----------------------------------------------------------------------
--  Query
-----------------------------------------------------------------------}

getThisBitfield :: Wire Session Bitfield
getThisBitfield = undefined

{-
data PendingSet = PendingSet (Map (PeerAddr IP) [BlockIx])

empty :: PendingSet
empty = undefined

member :: PeerAddr IP -> BlockIx -> PendingSet -> Bool
member addr bix = undefined

insert :: PeerAddr IP -> BlockIx -> PendingSet -> PendingSet
insert addr bix = undefined
-}
{-----------------------------------------------------------------------
--  Event loop
-----------------------------------------------------------------------}
{-
data ExchangeError
  = InvalidRequest BlockIx StorageFailure
  | CorruptedPiece PieceIx

packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a
packException f m = try >>= either (throwIO . f) m

readBlock :: BlockIx -> Storage -> IO (Block ByteString)
readBlock bix @ BlockIx {..} s = do
  p <- packException (InvalidRequest bix) $ do readPiece ixPiece storage
  let chunk = BS.take ixLength $ BS.drop ixOffset p
  if BS.length chunk == ixLength
    then return chunk
    else throwIO $ InvalidRequest bix (InvalidSize ixLength)
-}

handleMessage :: Message -> Wire Session ()
handleMessage KeepAlive       = return ()
handleMessage (Status s)      = undefined
handleMessage (Available msg) = do
  thisBf <- getThisBitfield
  case msg of
    Have     ix
      | ix `BF.member`     thisBf -> return ()
      |     otherwise             -> undefined
    Bitfield bf
      | bf `BF.isSubsetOf` thisBf -> return ()
      |     otherwise             -> undefined

handleMessage (Transfer  msg) = case msg of
  Request bix -> do
--    Session {..} <- getSession
--    addr <- getRemoteAddr
--    when (addr `elem` unchoked && ixPiece bix `BF.member` bitfield) $ do
--      blk <- liftIO $ readBlock bix storage
--      sendMsg (Piece blk)
    return ()

  Piece   blk -> do
{-
    Session {..} <- getSession
    when (blockIx blk `PS.member` pendingSet) $ do
      insert blk stalledSet
      sendBroadcast have
      maybe send not interested
-}
    return ()

  Cancel  bix -> filterQueue (not . (transferResponse bix))
    where
      transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix
      transferResponse _    _                     = False

handleMessage (Port      n) = undefined
handleMessage (Fast      _) = undefined
handleMessage (Extended  _) = undefined

exchange :: Wire Session ()
exchange = do
  e <- recvMessage
  liftIO $ print e

type Exchange = StateT Session (ReaderT (Connection Session) IO)

--runExchange :: Exchange () -> [PeerAddr] -> IO ()
--runExchange exchange peers = do
--  forM_ peers $ \ peer -> do
--    forkIO $ runReaderT (runStateT exchange session )

data Event = NewMessage (PeerAddr IP) Message
           | Timeout -- for scheduling

awaitEvent :: Exchange Event
awaitEvent = undefined

yieldEvent :: Exchange Event
yieldEvent = undefined