blob: c24e2b0b0a0ad7a17428e67c54a63a4e69b0666f (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE DeriveDataTypeable #-}
module Network.BitTorrent.Exchange.Session
(
) where
import Control.Concurrent.STM
import Control.Exception
import Control.Lens
import Data.Function
import Data.IORef
import Data.Ord
import Data.Typeable
import Text.PrettyPrint
import Data.Torrent.Bitfield
import Data.Torrent.InfoHash
import Network.BitTorrent.Core
import Network.BitTorrent.Exchange.Message
import Network.BitTorrent.Exchange.Status
type Extension = ()
data ExchangeError
= InvalidPieceIx PieceIx
| InvalidBlock BlockIx
| CorruptedPiece PieceIx
-- | Peer session contain all data necessary for peer to peer
-- communication.
data ExchangeSession = ExchangeSession
{ -- | Used as unique identifier of the session.
connectedPeerAddr :: !PeerAddr
-- | Extensions such that both peer and client support.
, enabledExtensions :: [Extension]
-- | Broadcast messages waiting to be sent to peer.
, pendingMessages :: !(TChan Message)
-- | Dymanic P2P data.
, sessionState :: !(IORef SessionState)
}
instance Eq ExchangeSession where
(==) = (==) `on` connectedPeerAddr
{-# INLINE (==) #-}
instance Ord ExchangeSession where
compare = comparing connectedPeerAddr
{-# INLINE compare #-}
enqueueBroadcast :: ExchangeSession -> Message -> IO ()
enqueueBroadcast = undefined
dequeueBroadcast :: ExchangeSession -> IO Message
dequeueBroadcast = undefined
{-----------------------------------------------------------------------
-- Session state
-----------------------------------------------------------------------}
data SessionState = SessionState
{ _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
, _status :: !SessionStatus -- ^ Status of both peers.
} deriving (Show, Eq)
$(makeLenses ''SessionState)
--initialSessionState :: PieceCount -> SessionState
--initialSessionState pc = SessionState (haveNone pc) def
--getSessionState :: PeerSession -> IO SessionState
--getSessionState PeerSession {..} = readIORef sessionState
{-
{-----------------------------------------------------------------------
-- Broadcasting: Have, Cancel, Bitfield, SuggestPiece
-----------------------------------------------------------------------}
{-
Here we should enqueue broadcast messages and keep in mind that:
* We should enqueue broadcast events as they are appear.
* We should yield broadcast messages as fast as we get them.
these 2 phases might differ in time significantly
**TODO**: do this; but only when it'll be clean which other broadcast
messages & events we should send.
1. Update client have bitfield --\____ in one transaction;
2. Update downloaded stats --/
3. Signal to the all other peer about this.
-}
available :: Bitfield -> SwarmSession -> STM ()
available bf SwarmSession {..} = {-# SCC available #-} do
updateProgress >> broadcast
where
updateProgress = do
let piLen = ciPieceLength $ tInfo $ torrentMeta
let bytes = piLen * BF.haveCount bf
modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
-- TODO compute size of messages: if it's faster to send Bitfield
-- instead many Have do that
-- Also if there is single Have message in queue then the
-- corresponding piece is likely still in memory or disc cache,
-- when we can send SuggestPiece.
readAvail :: TChan a -> STM [a]
readAvail chan = do
m <- tryReadTChan chan
case m of
Just a -> (:) <$> pure a <*> readAvail chan
Nothing -> return []
-- | Get pending messages queue appeared in result of asynchronously
-- changed client state. Resulting queue should be sent to a peer
-- immediately.
--
getPending :: PeerSession -> IO [Message]
getPending PeerSession {..} = {-# SCC getPending #-} do
atomically (readAvail pendingMessages)
-}
|