1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE DeriveDataTypeable #-}
module Network.BitTorrent.Exchange.Session
(
) where
import Control.Concurrent.STM
import Control.Exception
import Control.Lens
import Data.Function
import Data.IORef
import Data.Ord
import Data.Typeable
import Text.PrettyPrint
import Data.Torrent.Bitfield
import Data.Torrent.InfoHash
import Network.BitTorrent.Core.PeerAddr
import Network.BitTorrent.Exchange.Message
import Network.BitTorrent.Exchange.Status
type Extension = ()
-- | Peer session contain all data necessary for peer to peer
-- communication.
data ExchangeSession = ExchangeSession
{ -- | Used as unique identifier of the session.
connectedPeerAddr :: !PeerAddr
-- | Extensions such that both peer and client support.
, enabledExtensions :: [Extension]
-- | Broadcast messages waiting to be sent to peer.
, pendingMessages :: !(TChan Message)
-- | Dymanic P2P data.
, sessionState :: !(IORef SessionState)
}
instance Eq ExchangeSession where
(==) = (==) `on` connectedPeerAddr
{-# INLINE (==) #-}
instance Ord ExchangeSession where
compare = comparing connectedPeerAddr
{-# INLINE compare #-}
enqueueBroadcast :: ExchangeSession -> Message -> IO ()
enqueueBroadcast = undefined
dequeueBroadcast :: ExchangeSession -> IO Message
dequeueBroadcast = undefined
{-----------------------------------------------------------------------
-- Session state
-----------------------------------------------------------------------}
data SessionState = SessionState
{ _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
, _status :: !SessionStatus -- ^ Status of both peers.
} deriving (Show, Eq)
$(makeLenses ''SessionState)
--initialSessionState :: PieceCount -> SessionState
--initialSessionState pc = SessionState (haveNone pc) def
--getSessionState :: PeerSession -> IO SessionState
--getSessionState PeerSession {..} = readIORef sessionState
{-----------------------------------------------------------------------
-- Exceptions
-----------------------------------------------------------------------}
-- | Exceptions used to interrupt the current P2P session. This
-- exceptions will NOT affect other P2P sessions, DHT, peer <->
-- tracker, or any other session.
--
data ExchangeFailure
= PeerDisconnected
| ProtocolError Doc
| UnknownTorrent InfoHash
deriving (Show, Typeable)
instance Exception ExchangeFailure
-- | Do nothing with exception, used with 'handle' or 'try'.
isSessionException :: Monad m => ExchangeFailure -> m ()
isSessionException _ = return ()
-- | The same as 'isSessionException' but output to stdout the catched
-- exception, for debugging purposes only.
putSessionException :: ExchangeFailure -> IO ()
putSessionException = print
{-
{-----------------------------------------------------------------------
-- Broadcasting: Have, Cancel, Bitfield, SuggestPiece
-----------------------------------------------------------------------}
{-
Here we should enqueue broadcast messages and keep in mind that:
* We should enqueue broadcast events as they are appear.
* We should yield broadcast messages as fast as we get them.
these 2 phases might differ in time significantly
**TODO**: do this; but only when it'll be clean which other broadcast
messages & events we should send.
1. Update client have bitfield --\____ in one transaction;
2. Update downloaded stats --/
3. Signal to the all other peer about this.
-}
available :: Bitfield -> SwarmSession -> STM ()
available bf SwarmSession {..} = {-# SCC available #-} do
updateProgress >> broadcast
where
updateProgress = do
let piLen = ciPieceLength $ tInfo $ torrentMeta
let bytes = piLen * BF.haveCount bf
modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
-- TODO compute size of messages: if it's faster to send Bitfield
-- instead many Have do that
-- Also if there is single Have message in queue then the
-- corresponding piece is likely still in memory or disc cache,
-- when we can send SuggestPiece.
readAvail :: TChan a -> STM [a]
readAvail chan = do
m <- tryReadTChan chan
case m of
Just a -> (:) <$> pure a <*> readAvail chan
Nothing -> return []
-- | Get pending messages queue appeared in result of asynchronously
-- changed client state. Resulting queue should be sent to a peer
-- immediately.
--
getPending :: PeerSession -> IO [Message]
getPending PeerSession {..} = {-# SCC getPending #-} do
atomically (readAvail pendingMessages)
-}
|