summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Internal.hs
diff options
context:
space:
mode:
authorSam T <pxqr.sta@gmail.com>2013-06-11 04:27:12 +0400
committerSam T <pxqr.sta@gmail.com>2013-06-11 04:27:12 +0400
commitcdb75165ee0e4f2c36f5766fba4c7bc4bd31db2b (patch)
tree890a091aceab0b62dd31e697efa2eed8793d31d0 /src/Network/BitTorrent/Internal.hs
parent7313c139eb5a75edf4fca36e5d0f401584ab7502 (diff)
~ Add keepalive timeouts.
Diffstat (limited to 'src/Network/BitTorrent/Internal.hs')
-rw-r--r--src/Network/BitTorrent/Internal.hs210
1 files changed, 210 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs
new file mode 100644
index 00000000..d34c6236
--- /dev/null
+++ b/src/Network/BitTorrent/Internal.hs
@@ -0,0 +1,210 @@
1-- |
2-- Copyright : (c) Sam T. 2013
3-- License : MIT
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This module implement opaque broadcast message passing. It
9-- provides sessions needed by Network.BitTorrent and
10-- Network.BitTorrent.Exchange and modules. To hide some internals
11-- of this module we detach it from Exchange.
12--
13{-# LANGUAGE RecordWildCards #-}
14module Network.BitTorrent.Internal
15 ( ClientSession(..), newClient
16 , SwarmSession(..), newLeacher, newSeeder
17 , PeerSession(..), withPeerSession
18
19 -- * Timeouts
20 , updateIncoming, updateOutcoming
21 ) where
22
23import Control.Applicative
24import Control.Concurrent
25import Control.Concurrent.STM
26import Control.Exception
27
28import Data.IORef
29import Data.Function
30import Data.Ord
31import Data.Set as S
32
33import Data.Conduit
34import Data.Conduit.Cereal
35import Data.Conduit.Network
36import Data.Serialize
37
38import Network
39import Network.Socket
40import Network.Socket.ByteString
41
42import GHC.Event as Ev
43
44import Data.Bitfield as BF
45import Data.Torrent
46import Network.BitTorrent.Extension
47import Network.BitTorrent.Peer
48import Network.BitTorrent.Exchange.Protocol as BT
49
50
51
52{-----------------------------------------------------------------------
53 Client session
54-----------------------------------------------------------------------}
55
56-- | In one application you could have many clients.
57data ClientSession = ClientSession {
58 clientPeerID :: PeerID -- ^
59 , allowedExtensions :: [Extension] -- ^
60 , swarmSessions :: TVar (Set SwarmSession)
61 , eventManager :: EventManager
62 }
63
64instance Eq ClientSession where
65 (==) = (==) `on` clientPeerID
66
67instance Ord ClientSession where
68 compare = comparing clientPeerID
69
70newClient :: [Extension] -> IO ClientSession
71newClient exts = ClientSession <$> newPeerID
72 <*> pure exts
73 <*> newTVarIO S.empty
74 <*> Ev.new
75
76{-----------------------------------------------------------------------
77 Swarm session
78-----------------------------------------------------------------------}
79
80-- | Extensions are set globally by
81-- Swarm session are un
82data SwarmSession = SwarmSession {
83 torrentInfoHash :: InfoHash
84 , clientSession :: ClientSession
85 , clientBitfield :: IORef Bitfield
86 , connectedPeers :: TVar (Set PeerSession)
87 }
88
89instance Eq SwarmSession where
90 (==) = (==) `on` torrentInfoHash
91
92instance Ord SwarmSession where
93 compare = comparing torrentInfoHash
94
95newSwarmSession :: Bitfield -> ClientSession -> Torrent -> IO SwarmSession
96newSwarmSession bf cs @ ClientSession {..} Torrent {..}
97 = SwarmSession <$> pure tInfoHash
98 <*> pure cs
99 <*> newIORef bf
100 <*> newTVarIO S.empty
101
102newSeeder :: ClientSession -> Torrent -> IO SwarmSession
103newSeeder cs t @ Torrent {..}
104 = newSwarmSession (haveAll (pieceCount tInfo)) cs t
105
106newLeacher :: ClientSession -> Torrent -> IO SwarmSession
107newLeacher cs t @ Torrent {..}
108 = newSwarmSession (haveNone (pieceCount tInfo)) cs t
109
110isLeacher :: SwarmSession -> IO Bool
111isLeacher = undefined
112
113{-----------------------------------------------------------------------
114 Peer session
115-----------------------------------------------------------------------}
116
117data PeerSession = PeerSession {
118 connectedPeerAddr :: PeerAddr
119 , swarmSession :: SwarmSession
120 , enabledExtensions :: [Extension]
121
122 -- | To dissconnect from died peers appropriately we should check
123 -- if a peer do not sent the KA message within given interval. If
124 -- yes, we should throw an exception in 'TimeoutCallback' and
125 -- close session between peers.
126 --
127 -- We should update timeout if we /receive/ any message within
128 -- timeout interval to keep connection up.
129 , incomingTimeout :: TimeoutKey
130
131 -- | To send KA message appropriately we should know when was last
132 -- time we sent a message to a peer. To do that we keep registered
133 -- timeout in event manager and if we do not sent any message to
134 -- the peer within given interval then we send KA message in
135 -- 'TimeoutCallback'.
136 --
137 -- We should update timeout if we /send/ any message within timeout
138 -- to avoid reduntant KA messages.
139 , outcomingTimeout :: TimeoutKey
140
141 , broadcastMessages :: Chan [Message]
142 , peerBitfield :: IORef Bitfield
143 , peerSessionStatus :: IORef SessionStatus
144 }
145
146instance Eq PeerSession where
147 (==) = (==) `on` connectedPeerAddr
148
149instance Ord PeerSession where
150 compare = comparing connectedPeerAddr
151
152-- TODO check if it connected yet peer
153withPeerSession :: SwarmSession -> PeerAddr
154 -> ((Socket, PeerSession) -> IO a)
155 -> IO a
156
157withPeerSession ss @ SwarmSession {..} addr
158 = bracket openSession closeSession
159 where
160 openSession = do
161 let caps = encodeExts $ allowedExtensions $ clientSession
162 let pid = clientPeerID $ clientSession
163 let chs = Handshake defaultBTProtocol caps torrentInfoHash pid
164
165 sock <- connectToPeer addr
166 phs <- handshake sock chs `onException` close sock
167
168 let enabled = decodeExts (enabledCaps caps (handshakeCaps phs))
169 ps <- PeerSession addr ss enabled
170 <$> registerTimeout (eventManager clientSession)
171 maxIncomingTime abortSession
172 <*> registerTimeout (eventManager clientSession)
173 maxOutcomingTime (sendKA sock)
174 <*> newChan
175 <*> pure clientBitfield
176 <*> newIORef initSessionStatus
177 return (sock, ps)
178
179 closeSession = close . fst
180
181{-----------------------------------------------------------------------
182 Timeouts
183-----------------------------------------------------------------------}
184
185sec :: Int
186sec = 1000 * 1000
187
188maxIncomingTime :: Int
189maxIncomingTime = 120 * sec
190
191maxOutcomingTime :: Int
192maxOutcomingTime = 60 * sec
193
194-- | Should be called after we have received any message from a peer.
195updateIncoming :: PeerSession -> IO ()
196updateIncoming PeerSession {..} = do
197 updateTimeout (eventManager (clientSession swarmSession))
198 incomingTimeout maxIncomingTime
199
200-- | Should be called before we have send any message to a peer.
201updateOutcoming :: PeerSession -> IO ()
202updateOutcoming PeerSession {..} =
203 updateTimeout (eventManager (clientSession swarmSession))
204 outcomingTimeout maxOutcomingTime
205
206sendKA :: Socket -> IO ()
207sendKA sock = sendAll sock (encode BT.KeepAlive)
208
209abortSession :: IO ()
210abortSession = error "abortSession: not implemented"