diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-24 14:21:10 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-24 14:21:10 +0400 |
commit | be9c82cd6d3351d02e5f944f041837709d97fa39 (patch) | |
tree | a77798caf6bce0e232a2e044050d952469ef46aa /src/Network/BitTorrent/Exchange/Session.hs | |
parent | 42eaee8dbcd1cfb922d94e974043d8d564dbd353 (diff) |
Implement acceptWire function
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Session.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 111 |
1 files changed, 83 insertions, 28 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 88554807..1537efe1 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -7,11 +7,16 @@ module Network.BitTorrent.Exchange.Session | |||
7 | , newSession | 7 | , newSession |
8 | , closeSession | 8 | , closeSession |
9 | 9 | ||
10 | -- * Connections | ||
10 | , Network.BitTorrent.Exchange.Session.insert | 11 | , Network.BitTorrent.Exchange.Session.insert |
12 | , Network.BitTorrent.Exchange.Session.attach | ||
13 | , Network.BitTorrent.Exchange.Session.delete | ||
14 | , Network.BitTorrent.Exchange.Session.deleteAll | ||
11 | ) where | 15 | ) where |
12 | 16 | ||
13 | import Control.Applicative | 17 | import Control.Applicative |
14 | import Control.Concurrent | 18 | import Control.Concurrent |
19 | import Control.Concurrent.STM | ||
15 | import Control.Exception hiding (Handler) | 20 | import Control.Exception hiding (Handler) |
16 | import Control.Lens | 21 | import Control.Lens |
17 | import Control.Monad.Logger | 22 | import Control.Monad.Logger |
@@ -23,6 +28,7 @@ import Data.Conduit.List as CL (iterM) | |||
23 | import Data.Maybe | 28 | import Data.Maybe |
24 | import Data.Map as M | 29 | import Data.Map as M |
25 | import Data.Monoid | 30 | import Data.Monoid |
31 | import Data.Set as S | ||
26 | import Data.Text as T | 32 | import Data.Text as T |
27 | import Data.Typeable | 33 | import Data.Typeable |
28 | import Text.PrettyPrint hiding ((<>)) | 34 | import Text.PrettyPrint hiding ((<>)) |
@@ -70,11 +76,6 @@ data Cached a = Cached | |||
70 | cache :: BEncode a => a -> Cached a | 76 | cache :: BEncode a => a -> Cached a |
71 | cache s = Cached s (BE.encode s) | 77 | cache s = Cached s (BE.encode s) |
72 | 78 | ||
73 | data ConnectionEntry = ConnectionEntry | ||
74 | { initiatedBy :: !ChannelSide | ||
75 | , connection :: !(Connection Session) | ||
76 | } | ||
77 | |||
78 | data Session = Session | 79 | data Session = Session |
79 | { tpeerId :: PeerId | 80 | { tpeerId :: PeerId |
80 | , infohash :: InfoHash | 81 | , infohash :: InfoHash |
@@ -82,12 +83,15 @@ data Session = Session | |||
82 | , storage :: Storage | 83 | , storage :: Storage |
83 | , status :: MVar SessionStatus | 84 | , status :: MVar SessionStatus |
84 | , unchoked :: [PeerAddr IP] | 85 | , unchoked :: [PeerAddr IP] |
85 | , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) | 86 | , pendingConnections :: TVar (Set (PeerAddr IP)) |
87 | , establishedConnections :: TVar (Map (PeerAddr IP) (Connection Session)) | ||
86 | , broadcast :: Chan Message | 88 | , broadcast :: Chan Message |
87 | , logger :: LogFun | 89 | , logger :: LogFun |
88 | , infodict :: MVar (Cached InfoDict) | 90 | , infodict :: MVar (Cached InfoDict) |
89 | } | 91 | } |
90 | 92 | ||
93 | instance Ord IP | ||
94 | |||
91 | -- | Logger function. | 95 | -- | Logger function. |
92 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | 96 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () |
93 | 97 | ||
@@ -97,18 +101,21 @@ newSession :: LogFun | |||
97 | -> InfoDict -- ^ torrent info dictionary; | 101 | -> InfoDict -- ^ torrent info dictionary; |
98 | -> IO Session -- ^ | 102 | -> IO Session -- ^ |
99 | newSession logFun addr rootPath dict = do | 103 | newSession logFun addr rootPath dict = do |
100 | connVar <- newMVar M.empty | 104 | pconnVar <- newTVarIO S.empty |
105 | econnVar <- newTVarIO M.empty | ||
101 | store <- openInfoDict ReadWriteEx rootPath dict | 106 | store <- openInfoDict ReadWriteEx rootPath dict |
102 | statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) | 107 | statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) |
103 | (piPieceLength (idPieceInfo dict)) | 108 | (piPieceLength (idPieceInfo dict)) |
104 | chan <- newChan | 109 | chan <- newChan |
105 | return Session | 110 | return Session |
106 | { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) | 111 | { tpeerId = fromMaybe (error "newSession: impossible") |
107 | , infohash = idInfoHash dict | 112 | (peerId addr) |
108 | , status = statusVar | 113 | , infohash = idInfoHash dict |
109 | , storage = store | 114 | , status = statusVar |
110 | , unchoked = [] | 115 | , storage = store |
111 | , connections = connVar | 116 | , unchoked = [] |
117 | , pendingConnections = pconnVar | ||
118 | , establishedConnections = econnVar | ||
112 | , broadcast = chan | 119 | , broadcast = chan |
113 | , logger = logFun | 120 | , logger = logFun |
114 | } | 121 | } |
@@ -137,31 +144,79 @@ logEvent :: MonadLogger m => Text -> m () | |||
137 | logEvent = logInfoN | 144 | logEvent = logInfoN |
138 | 145 | ||
139 | {----------------------------------------------------------------------- | 146 | {----------------------------------------------------------------------- |
147 | -- Connection slots | ||
148 | -----------------------------------------------------------------------} | ||
149 | --- pending -> established -> closed | ||
150 | --- | /|\ | ||
151 | --- \-------------------------| | ||
152 | |||
153 | pendingConnection :: PeerAddr IP -> Session -> IO Bool | ||
154 | pendingConnection addr Session {..} = atomically $ do | ||
155 | pSet <- readTVar pendingConnections | ||
156 | eSet <- readTVar establishedConnections | ||
157 | if (addr `S.member` pSet) || (addr `M.member` eSet) | ||
158 | then return False | ||
159 | else do | ||
160 | modifyTVar' pendingConnections (S.insert addr) | ||
161 | return True | ||
162 | |||
163 | establishedConnection :: Connected Session () | ||
164 | establishedConnection = undefined --atomically $ do | ||
165 | -- pSet <- readTVar pendingConnections | ||
166 | -- eSet <- readTVar | ||
167 | undefined | ||
168 | |||
169 | finishedConnection :: Connected Session () | ||
170 | finishedConnection = return () | ||
171 | |||
172 | -- | There are no state for this connection, remove it. | ||
173 | closedConnection :: PeerAddr IP -> Session -> IO () | ||
174 | closedConnection addr Session {..} = atomically $ do | ||
175 | modifyTVar pendingConnections $ S.delete addr | ||
176 | modifyTVar establishedConnections $ M.delete addr | ||
177 | |||
178 | {----------------------------------------------------------------------- | ||
140 | -- Connections | 179 | -- Connections |
141 | -----------------------------------------------------------------------} | 180 | -----------------------------------------------------------------------} |
142 | -- TODO unmap storage on zero connections | 181 | -- TODO unmap storage on zero connections |
143 | 182 | ||
144 | insert :: PeerAddr IP | 183 | mainWire :: Wire Session () |
145 | -> {- Maybe Socket | 184 | mainWire = do |
146 | -> -} Session -> IO () | 185 | lift establishedConnection |
186 | Session {..} <- asks connSession | ||
187 | lift $ resizeBitfield (totalPieces storage) | ||
188 | logEvent "Connection established" | ||
189 | iterM logMessage =$= exchange =$= iterM logMessage | ||
190 | lift finishedConnection | ||
191 | |||
192 | getConnectionConfig :: Session -> IO (ConnectionConfig Session) | ||
193 | getConnectionConfig s @ Session {..} = undefined --ConnectionConfig | ||
194 | -- let caps = def | ||
195 | -- let ecaps = def | ||
196 | -- let hs = Handshake def caps infohash tpeerId | ||
197 | -- chan <- dupChan broadcast | ||
198 | |||
199 | -- { cfgPrefs = undefined | ||
200 | -- , cfgSession = ConnectionSession undefined undefined s | ||
201 | -- , cfgWire = mainWire | ||
202 | -- } | ||
203 | |||
204 | insert :: PeerAddr IP -> Session -> IO () | ||
147 | insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) | 205 | insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) |
148 | where | 206 | where |
207 | action = do | ||
208 | pendingConnection addr ses | ||
209 | cfg <- getConnectionConfig ses | ||
210 | connectWire addr cfg | ||
211 | |||
149 | cleanup = do | 212 | cleanup = do |
150 | runStatusUpdates status (SS.resetPending addr) | 213 | runStatusUpdates status (SS.resetPending addr) |
151 | -- TODO Metata.resetPending addr | 214 | -- TODO Metata.resetPending addr |
215 | closedConnection addr ses | ||
152 | 216 | ||
153 | action = do | 217 | -- TODO closePending on error |
154 | let caps = def | 218 | attach :: PendingConnection -> Session -> IO () |
155 | let ecaps = def | 219 | attach = undefined |
156 | let hs = Handshake def caps infohash tpeerId | ||
157 | chan <- dupChan broadcast | ||
158 | connectWire ses hs addr ecaps chan $ do | ||
159 | conn <- ask | ||
160 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn | ||
161 | lift $ resizeBitfield (totalPieces storage) | ||
162 | logEvent "Connection established" | ||
163 | iterM logMessage =$= exchange =$= iterM logMessage | ||
164 | -- liftIO $ modifyMVar_ connections $ pure . M.delete addr | ||
165 | 220 | ||
166 | delete :: PeerAddr IP -> Session -> IO () | 221 | delete :: PeerAddr IP -> Session -> IO () |
167 | delete = undefined | 222 | delete = undefined |