diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-03-05 00:35:47 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-03-05 00:35:47 +0400 |
commit | 3c7f12c10caca196a970c0cc8b6c46945c9dee58 (patch) | |
tree | e5a4f8d2d29828ecee64c18881322a3cb7602b93 /src | |
parent | 6794c6843e625a3b61fec48e54167a13f5fd093b (diff) |
Minor refactoring
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/Client/Handle.hs | 6 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/ContactInfo.hs | 77 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 5 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 14 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Connection.hs | 6 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Manager.hs | 2 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 221 |
7 files changed, 236 insertions, 95 deletions
diff --git a/src/Network/BitTorrent/Client/Handle.hs b/src/Network/BitTorrent/Client/Handle.hs index e7ac779c..e7884caa 100644 --- a/src/Network/BitTorrent/Client/Handle.hs +++ b/src/Network/BitTorrent/Client/Handle.hs | |||
@@ -110,12 +110,12 @@ start Handle {..} = do | |||
110 | Client {..} <- getClient | 110 | Client {..} <- getClient |
111 | liftIO $ Tracker.notify trackerManager trackers Tracker.Started | 111 | liftIO $ Tracker.notify trackerManager trackers Tracker.Started |
112 | unless private $ do | 112 | unless private $ do |
113 | liftDHT $ DHT.insert topic undefined | 113 | liftDHT $ DHT.insert topic (error "start") |
114 | liftIO $ do | 114 | liftIO $ do |
115 | peers <- askPeers trackerManager trackers | 115 | peers <- askPeers trackerManager trackers |
116 | print $ "got: " ++ show (L.length peers) ++ " peers" | 116 | print $ "got: " ++ show (L.length peers) ++ " peers" |
117 | forM_ peers $ \ peer -> do | 117 | forM_ peers $ \ peer -> do |
118 | Exchange.insert peer exchange | 118 | Exchange.connect peer exchange |
119 | 119 | ||
120 | -- | Stop downloading this torrent. | 120 | -- | Stop downloading this torrent. |
121 | pause :: Handle -> BitTorrent () | 121 | pause :: Handle -> BitTorrent () |
@@ -126,7 +126,7 @@ stop :: Handle -> BitTorrent () | |||
126 | stop Handle {..} = do | 126 | stop Handle {..} = do |
127 | Client {..} <- getClient | 127 | Client {..} <- getClient |
128 | unless private $ do | 128 | unless private $ do |
129 | liftDHT $ DHT.delete topic undefined | 129 | liftDHT $ DHT.delete topic (error "stop") |
130 | liftIO $ Tracker.notify trackerManager trackers Tracker.Stopped | 130 | liftIO $ Tracker.notify trackerManager trackers Tracker.Stopped |
131 | 131 | ||
132 | {----------------------------------------------------------------------- | 132 | {----------------------------------------------------------------------- |
diff --git a/src/Network/BitTorrent/DHT/ContactInfo.hs b/src/Network/BitTorrent/DHT/ContactInfo.hs index 06d2dac0..028a4214 100644 --- a/src/Network/BitTorrent/DHT/ContactInfo.hs +++ b/src/Network/BitTorrent/DHT/ContactInfo.hs | |||
@@ -1,4 +1,6 @@ | |||
1 | module Network.BitTorrent.DHT.ContactInfo () where | 1 | module Network.BitTorrent.DHT.ContactInfo |
2 | ( ) where | ||
3 | {- | ||
2 | import Data.HashMap.Strict as HM | 4 | import Data.HashMap.Strict as HM |
3 | 5 | ||
4 | import Data.Torrent.InfoHash | 6 | import Data.Torrent.InfoHash |
@@ -8,6 +10,57 @@ import Network.BitTorrent.Core | |||
8 | -- decrease prefix when table is too small | 10 | -- decrease prefix when table is too small |
9 | -- filter outdated peers | 11 | -- filter outdated peers |
10 | 12 | ||
13 | {----------------------------------------------------------------------- | ||
14 | -- PeerSet | ||
15 | -----------------------------------------------------------------------} | ||
16 | |||
17 | type PeerSet a = [(PeerAddr a, NodeInfo a, Timestamp)] | ||
18 | |||
19 | -- compare PSQueue vs Ordered list | ||
20 | |||
21 | takeNewest :: PeerSet a -> [PeerAddr a] | ||
22 | takeNewest = undefined | ||
23 | |||
24 | dropOld :: Timestamp -> PeerSet a -> PeerSet a | ||
25 | dropOld = undefined | ||
26 | |||
27 | insert :: PeerAddr a -> Timestamp -> PeerSet a -> PeerSet a | ||
28 | insert = undefined | ||
29 | |||
30 | type Mask = Int | ||
31 | type Size = Int | ||
32 | type Timestamp = Int | ||
33 | |||
34 | {----------------------------------------------------------------------- | ||
35 | -- InfoHashMap | ||
36 | -----------------------------------------------------------------------} | ||
37 | |||
38 | -- compare handwritten prefix tree versus IntMap | ||
39 | |||
40 | data Tree a | ||
41 | = Nil | ||
42 | | Tip !InfoHash !(PeerSet a) | ||
43 | | Bin !InfoHash !Mask !Size !Timestamp (Tree a) (Tree a) | ||
44 | |||
45 | insertTree :: InfoHash -> a -> Tree a -> Tree a | ||
46 | insertTree = undefined | ||
47 | |||
48 | type Prio = Int | ||
49 | |||
50 | --shrink :: ContactInfo ip -> Int | ||
51 | shrink Nil = Nil | ||
52 | shrink (Tip _ _) = undefined | ||
53 | shrink (Bin _ _) = undefined | ||
54 | |||
55 | {----------------------------------------------------------------------- | ||
56 | -- InfoHashMap | ||
57 | -----------------------------------------------------------------------} | ||
58 | |||
59 | -- compare new design versus HashMap | ||
60 | |||
61 | data IntMap k p a | ||
62 | type ContactInfo = Map InfoHash Timestamp (Set (PeerAddr IP) Timestamp) | ||
63 | |||
11 | data ContactInfo ip = PeerStore | 64 | data ContactInfo ip = PeerStore |
12 | { maxSize :: Int | 65 | { maxSize :: Int |
13 | , prefixSize :: Int | 66 | , prefixSize :: Int |
@@ -16,3 +69,25 @@ data ContactInfo ip = PeerStore | |||
16 | , count :: Int -- ^ Cached size of the 'peerSet' | 69 | , count :: Int -- ^ Cached size of the 'peerSet' |
17 | , peerSet :: HashMap InfoHash [PeerAddr ip] | 70 | , peerSet :: HashMap InfoHash [PeerAddr ip] |
18 | } | 71 | } |
72 | |||
73 | size :: ContactInfo ip -> Int | ||
74 | size = undefined | ||
75 | |||
76 | prefixSize :: ContactInfo ip -> Int | ||
77 | prefixSize = undefined | ||
78 | |||
79 | lookup :: InfoHash -> ContactInfo ip -> [PeerAddr ip] | ||
80 | lookup = undefined | ||
81 | |||
82 | insert :: InfoHash -> PeerAddr ip -> ContactInfo ip -> ContactInfo ip | ||
83 | insert = undefined | ||
84 | |||
85 | -- | Limit in size. | ||
86 | prune :: NodeId -> Int -> ContactInfo ip -> ContactInfo ip | ||
87 | prune pref targetSize Nil = Nil | ||
88 | prune pref targetSize (Tip _ _) = undefined | ||
89 | |||
90 | -- | Remove expired entries. | ||
91 | splitGT :: Timestamp -> ContactInfo ip -> ContactInfo ip | ||
92 | splitGT = undefined | ||
93 | -} \ No newline at end of file | ||
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 755985fc..e770b1d3 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -434,15 +434,20 @@ insertNode info = fork $ do | |||
434 | 434 | ||
435 | -- TODO limit dht peer store in size (probably by removing oldest peers) | 435 | -- TODO limit dht peer store in size (probably by removing oldest peers) |
436 | 436 | ||
437 | refreshContacts :: DHT ip () | ||
438 | refreshContacts = undefined | ||
439 | |||
437 | -- | Insert peer to peer store. Used to handle announce requests. | 440 | -- | Insert peer to peer store. Used to handle announce requests. |
438 | insertPeer :: Eq ip => InfoHash -> PeerAddr ip -> DHT ip () | 441 | insertPeer :: Eq ip => InfoHash -> PeerAddr ip -> DHT ip () |
439 | insertPeer ih addr = do | 442 | insertPeer ih addr = do |
443 | refreshContacts | ||
440 | var <- asks contactInfo | 444 | var <- asks contactInfo |
441 | liftIO $ atomically $ modifyTVar' var (P.insert ih addr) | 445 | liftIO $ atomically $ modifyTVar' var (P.insert ih addr) |
442 | 446 | ||
443 | -- | Get peer set for specific swarm. | 447 | -- | Get peer set for specific swarm. |
444 | lookupPeers :: InfoHash -> DHT ip [PeerAddr ip] | 448 | lookupPeers :: InfoHash -> DHT ip [PeerAddr ip] |
445 | lookupPeers ih = do | 449 | lookupPeers ih = do |
450 | refreshContacts | ||
446 | var <- asks contactInfo | 451 | var <- asks contactInfo |
447 | liftIO $ P.lookup ih <$> readTVarIO var | 452 | liftIO $ P.lookup ih <$> readTVarIO var |
448 | 453 | ||
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 86e13d58..8dac3c84 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -6,25 +6,23 @@ | |||
6 | -- Portability : portable | 6 | -- Portability : portable |
7 | -- | 7 | -- |
8 | module Network.BitTorrent.Exchange | 8 | module Network.BitTorrent.Exchange |
9 | ( -- * Options | 9 | ( -- * Manager |
10 | Options (..) | 10 | Options (..) |
11 | , Caps | ||
12 | , Extension | ||
13 | , toCaps | ||
14 | |||
15 | -- * Manager | ||
16 | , Manager | 11 | , Manager |
17 | , Handler | 12 | , Handler |
18 | , newManager | 13 | , newManager |
19 | , closeManager | 14 | , closeManager |
20 | 15 | ||
21 | -- * Session | 16 | -- * Session |
17 | , Caps | ||
18 | , Extension | ||
19 | , toCaps | ||
22 | , Session | 20 | , Session |
23 | , newSession | 21 | , newSession |
24 | , closeSession | 22 | , closeSession |
25 | 23 | ||
26 | -- * Session control | 24 | -- * Connections |
27 | , insert | 25 | , connect |
28 | ) where | 26 | ) where |
29 | 27 | ||
30 | import Network.BitTorrent.Exchange.Manager | 28 | import Network.BitTorrent.Exchange.Manager |
diff --git a/src/Network/BitTorrent/Exchange/Connection.hs b/src/Network/BitTorrent/Exchange/Connection.hs index b23eb08b..dde9a468 100644 --- a/src/Network/BitTorrent/Exchange/Connection.hs +++ b/src/Network/BitTorrent/Exchange/Connection.hs | |||
@@ -723,10 +723,10 @@ extendedHandshake caps = do | |||
723 | _ -> protocolError HandshakeRefused | 723 | _ -> protocolError HandshakeRefused |
724 | 724 | ||
725 | rehandshake :: ExtendedCaps -> Wire s () | 725 | rehandshake :: ExtendedCaps -> Wire s () |
726 | rehandshake caps = undefined | 726 | rehandshake caps = error "rehandshake" |
727 | 727 | ||
728 | reconnect :: Wire s () | 728 | reconnect :: Wire s () |
729 | reconnect = undefined | 729 | reconnect = error "reconnect" |
730 | 730 | ||
731 | data ConnectionId = ConnectionId | 731 | data ConnectionId = ConnectionId |
732 | { topic :: !InfoHash | 732 | { topic :: !InfoHash |
@@ -751,7 +751,7 @@ instance Default ConnectionPrefs where | |||
751 | } | 751 | } |
752 | 752 | ||
753 | normalize :: ConnectionPrefs -> ConnectionPrefs | 753 | normalize :: ConnectionPrefs -> ConnectionPrefs |
754 | normalize = undefined | 754 | normalize = error "normalize" |
755 | 755 | ||
756 | -- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'. | 756 | -- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'. |
757 | data SessionLink s = SessionLink | 757 | data SessionLink s = SessionLink |
diff --git a/src/Network/BitTorrent/Exchange/Manager.hs b/src/Network/BitTorrent/Exchange/Manager.hs index f7f3cea7..b9aaa818 100644 --- a/src/Network/BitTorrent/Exchange/Manager.hs +++ b/src/Network/BitTorrent/Exchange/Manager.hs | |||
@@ -39,7 +39,7 @@ handleNewConn :: Socket -> PeerAddr IP -> Handler -> IO () | |||
39 | handleNewConn sock addr handler = do | 39 | handleNewConn sock addr handler = do |
40 | conn <- newPendingConnection sock addr | 40 | conn <- newPendingConnection sock addr |
41 | ses <- handler (pendingTopic conn) `onException` closePending conn | 41 | ses <- handler (pendingTopic conn) `onException` closePending conn |
42 | attach conn ses | 42 | establish conn ses |
43 | 43 | ||
44 | listenIncoming :: Options -> Handler -> IO () | 44 | listenIncoming :: Options -> Handler -> IO () |
45 | listenIncoming Options {..} handler = do | 45 | listenIncoming Options {..} handler = do |
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 74d0cc87..8c3d5388 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -2,16 +2,19 @@ | |||
2 | {-# LANGUAGE TemplateHaskell #-} | 2 | {-# LANGUAGE TemplateHaskell #-} |
3 | {-# LANGUAGE DeriveDataTypeable #-} | 3 | {-# LANGUAGE DeriveDataTypeable #-} |
4 | module Network.BitTorrent.Exchange.Session | 4 | module Network.BitTorrent.Exchange.Session |
5 | ( Session | 5 | ( -- * Session |
6 | Session | ||
6 | , LogFun | 7 | , LogFun |
8 | , sessionLogger | ||
9 | |||
10 | -- * Construction | ||
7 | , newSession | 11 | , newSession |
8 | , closeSession | 12 | , closeSession |
13 | , withSession | ||
9 | 14 | ||
10 | -- * Connections | 15 | -- * Connection Set |
11 | , Network.BitTorrent.Exchange.Session.insert | 16 | , connect |
12 | , Network.BitTorrent.Exchange.Session.attach | 17 | , establish |
13 | , Network.BitTorrent.Exchange.Session.delete | ||
14 | , Network.BitTorrent.Exchange.Session.deleteAll | ||
15 | 18 | ||
16 | -- * Events | 19 | -- * Events |
17 | , waitMetadata | 20 | , waitMetadata |
@@ -70,6 +73,7 @@ packException f m = try m >>= either (throwIO . f) return | |||
70 | {----------------------------------------------------------------------- | 73 | {----------------------------------------------------------------------- |
71 | -- Session | 74 | -- Session |
72 | -----------------------------------------------------------------------} | 75 | -----------------------------------------------------------------------} |
76 | -- TODO unmap storage on zero connections | ||
73 | 77 | ||
74 | data Cached a = Cached | 78 | data Cached a = Cached |
75 | { cachedValue :: !a | 79 | { cachedValue :: !a |
@@ -79,9 +83,13 @@ data Cached a = Cached | |||
79 | cache :: BEncode a => a -> Cached a | 83 | cache :: BEncode a => a -> Cached a |
80 | cache s = Cached s (BE.encode s) | 84 | cache s = Cached s (BE.encode s) |
81 | 85 | ||
86 | -- | Logger function. | ||
87 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | ||
88 | |||
82 | data Session = Session | 89 | data Session = Session |
83 | { sessionPeerId :: !(PeerId) | 90 | { sessionPeerId :: !(PeerId) |
84 | , sessionTopic :: !(InfoHash) | 91 | , sessionTopic :: !(InfoHash) |
92 | , sessionLogger :: !(LogFun) | ||
85 | 93 | ||
86 | , metadata :: !(MVar Metadata.Status) | 94 | , metadata :: !(MVar Metadata.Status) |
87 | , infodict :: !(MVar (Cached InfoDict)) | 95 | , infodict :: !(MVar (Cached InfoDict)) |
@@ -90,16 +98,27 @@ data Session = Session | |||
90 | , storage :: !(Storage) | 98 | , storage :: !(Storage) |
91 | 99 | ||
92 | , connectionsPrefs :: !ConnectionPrefs | 100 | , connectionsPrefs :: !ConnectionPrefs |
101 | |||
102 | -- | Connections either waiting for TCP/uTP 'connect' or waiting | ||
103 | -- for BT handshake. | ||
93 | , connectionsPending :: !(TVar (Set (PeerAddr IP))) | 104 | , connectionsPending :: !(TVar (Set (PeerAddr IP))) |
105 | |||
106 | -- | Connections successfully handshaked and data transfer can | ||
107 | -- take place. | ||
94 | , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session))) | 108 | , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session))) |
109 | |||
110 | -- | TODO implement choking mechanism | ||
95 | , connectionsUnchoked :: [PeerAddr IP] | 111 | , connectionsUnchoked :: [PeerAddr IP] |
96 | , broadcast :: !(Chan Message) | ||
97 | 112 | ||
98 | , logger :: !(LogFun) | 113 | -- | Messages written to this channel will be sent to the all |
114 | -- connections, including pending connections (but right after | ||
115 | -- handshake). | ||
116 | , connectionsBroadcast :: !(Chan Message) | ||
99 | } | 117 | } |
100 | 118 | ||
101 | -- | Logger function. | 119 | {----------------------------------------------------------------------- |
102 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | 120 | -- Session construction |
121 | -----------------------------------------------------------------------} | ||
103 | 122 | ||
104 | newSession :: LogFun | 123 | newSession :: LogFun |
105 | -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; | 124 | -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; |
@@ -108,17 +127,20 @@ newSession :: LogFun | |||
108 | -> IO Session -- ^ | 127 | -> IO Session -- ^ |
109 | newSession logFun addr rootPath dict = do | 128 | newSession logFun addr rootPath dict = do |
110 | pid <- maybe genPeerId return (peerId addr) | 129 | pid <- maybe genPeerId return (peerId addr) |
111 | pconnVar <- newTVarIO S.empty | ||
112 | econnVar <- newTVarIO M.empty | ||
113 | store <- openInfoDict ReadWriteEx rootPath dict | 130 | store <- openInfoDict ReadWriteEx rootPath dict |
114 | statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) | 131 | statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) |
115 | (piPieceLength (idPieceInfo dict)) | 132 | (piPieceLength (idPieceInfo dict)) |
116 | metadataVar <- newMVar undefined | 133 | metadataVar <- newMVar (error "sessionMetadata") |
117 | infodictVar <- newMVar (cache dict) | 134 | infodictVar <- newMVar (cache dict) |
135 | |||
136 | pSetVar <- newTVarIO S.empty | ||
137 | eSetVar <- newTVarIO M.empty | ||
118 | chan <- newChan | 138 | chan <- newChan |
139 | |||
119 | return Session | 140 | return Session |
120 | { sessionPeerId = pid | 141 | { sessionPeerId = pid |
121 | , sessionTopic = idInfoHash dict | 142 | , sessionTopic = idInfoHash dict |
143 | , sessionLogger = logFun | ||
122 | 144 | ||
123 | , metadata = metadataVar | 145 | , metadata = metadataVar |
124 | , infodict = infodictVar | 146 | , infodict = infodictVar |
@@ -127,21 +149,25 @@ newSession logFun addr rootPath dict = do | |||
127 | , storage = store | 149 | , storage = store |
128 | 150 | ||
129 | , connectionsPrefs = def | 151 | , connectionsPrefs = def |
130 | , connectionsPending = pconnVar | 152 | , connectionsPending = pSetVar |
131 | , connectionsEstablished = econnVar | 153 | , connectionsEstablished = eSetVar |
132 | , connectionsUnchoked = [] | 154 | , connectionsUnchoked = [] |
133 | , broadcast = chan | 155 | , connectionsBroadcast = chan |
134 | |||
135 | , logger = logFun | ||
136 | } | 156 | } |
137 | 157 | ||
138 | closeSession :: Session -> IO () | 158 | closeSession :: Session -> IO () |
139 | closeSession ses = do | 159 | closeSession Session {..} = do |
140 | deleteAll ses | 160 | close storage |
141 | error "closeSession" | 161 | {- |
142 | 162 | hSet <- atomically $ do | |
143 | waitMetadata :: Session -> IO InfoDict | 163 | pSet <- swapTVar connectionsPending S.empty |
144 | waitMetadata Session {..} = cachedValue <$> readMVar infodict | 164 | eSet <- swapTVar connectionsEstablished S.empty |
165 | return pSet | ||
166 | mapM_ kill hSet | ||
167 | -} | ||
168 | |||
169 | withSession :: () | ||
170 | withSession = error "withSession" | ||
145 | 171 | ||
146 | {----------------------------------------------------------------------- | 172 | {----------------------------------------------------------------------- |
147 | -- Logging | 173 | -- Logging |
@@ -153,7 +179,7 @@ instance MonadLogger (Connected Session) where | |||
153 | ses <- asks connSession | 179 | ses <- asks connSession |
154 | addr <- asks connRemoteAddr | 180 | addr <- asks connRemoteAddr |
155 | let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) | 181 | let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) |
156 | liftIO $ logger ses loc addrSrc lvl (toLogStr msg) | 182 | liftIO $ sessionLogger ses loc addrSrc lvl (toLogStr msg) |
157 | 183 | ||
158 | logMessage :: MonadLogger m => Message -> m () | 184 | logMessage :: MonadLogger m => Message -> m () |
159 | logMessage msg = logDebugN $ T.pack (render (pretty msg)) | 185 | logMessage msg = logDebugN $ T.pack (render (pretty msg)) |
@@ -162,14 +188,22 @@ logEvent :: MonadLogger m => Text -> m () | |||
162 | logEvent = logInfoN | 188 | logEvent = logInfoN |
163 | 189 | ||
164 | {----------------------------------------------------------------------- | 190 | {----------------------------------------------------------------------- |
165 | -- Connection slots | 191 | -- Connection set |
166 | -----------------------------------------------------------------------} | 192 | -----------------------------------------------------------------------} |
167 | --- pending -> established -> closed | 193 | --- Connection status transition: |
168 | --- | /|\ | 194 | --- |
169 | --- \-------------------------| | 195 | --- pending -> established -> finished -> closed |
170 | 196 | --- | \|/ /|\ | |
171 | pendingConnection :: PeerAddr IP -> Session -> IO Bool | 197 | --- \-------------------------------------| |
172 | pendingConnection addr Session {..} = atomically $ do | 198 | --- |
199 | --- Purpose of slots: | ||
200 | --- 1) to avoid duplicates | ||
201 | --- 2) connect concurrently | ||
202 | --- | ||
203 | |||
204 | -- | Add connection to the pending set. | ||
205 | pendingConnection :: PeerAddr IP -> Session -> STM Bool | ||
206 | pendingConnection addr Session {..} = do | ||
173 | pSet <- readTVar connectionsPending | 207 | pSet <- readTVar connectionsPending |
174 | eSet <- readTVar connectionsEstablished | 208 | eSet <- readTVar connectionsEstablished |
175 | if (addr `S.member` pSet) || (addr `M.member` eSet) | 209 | if (addr `S.member` pSet) || (addr `M.member` eSet) |
@@ -178,38 +212,37 @@ pendingConnection addr Session {..} = atomically $ do | |||
178 | modifyTVar' connectionsPending (S.insert addr) | 212 | modifyTVar' connectionsPending (S.insert addr) |
179 | return True | 213 | return True |
180 | 214 | ||
215 | -- | Pending connection successfully established, add it to the | ||
216 | -- established set. | ||
181 | establishedConnection :: Connected Session () | 217 | establishedConnection :: Connected Session () |
182 | establishedConnection = undefined --atomically $ do | 218 | establishedConnection = do |
183 | -- pSet <- readTVar pendingConnections | 219 | conn <- ask |
184 | -- eSet <- readTVar | 220 | addr <- asks connRemoteAddr |
185 | undefined | 221 | Session {..} <- asks connSession |
222 | liftIO $ atomically $ do | ||
223 | modifyTVar connectionsPending (S.delete addr) | ||
224 | modifyTVar connectionsEstablished (M.insert addr conn) | ||
186 | 225 | ||
226 | -- | Either this or remote peer decided to finish conversation | ||
227 | -- (conversation is alread /established/ connection), remote it from | ||
228 | -- the established set. | ||
187 | finishedConnection :: Connected Session () | 229 | finishedConnection :: Connected Session () |
188 | finishedConnection = return () | 230 | finishedConnection = do |
231 | Session {..} <- asks connSession | ||
232 | addr <- asks connRemoteAddr | ||
233 | liftIO $ atomically $ do | ||
234 | modifyTVar connectionsEstablished $ M.delete addr | ||
189 | 235 | ||
190 | -- | There are no state for this connection, remove it. | 236 | -- | There are no state for this connection, remove it from the all |
191 | closedConnection :: PeerAddr IP -> Session -> IO () | 237 | -- sets. |
192 | closedConnection addr Session {..} = atomically $ do | 238 | closedConnection :: PeerAddr IP -> Session -> STM () |
239 | closedConnection addr Session {..} = do | ||
193 | modifyTVar connectionsPending $ S.delete addr | 240 | modifyTVar connectionsPending $ S.delete addr |
194 | modifyTVar connectionsEstablished $ M.delete addr | 241 | modifyTVar connectionsEstablished $ M.delete addr |
195 | 242 | ||
196 | {----------------------------------------------------------------------- | ||
197 | -- Connections | ||
198 | -----------------------------------------------------------------------} | ||
199 | -- TODO unmap storage on zero connections | ||
200 | |||
201 | mainWire :: Wire Session () | ||
202 | mainWire = do | ||
203 | lift establishedConnection | ||
204 | Session {..} <- asks connSession | ||
205 | lift $ resizeBitfield (totalPieces storage) | ||
206 | logEvent "Connection established" | ||
207 | iterM logMessage =$= exchange =$= iterM logMessage | ||
208 | lift finishedConnection | ||
209 | |||
210 | getConnectionConfig :: Session -> IO (ConnectionConfig Session) | 243 | getConnectionConfig :: Session -> IO (ConnectionConfig Session) |
211 | getConnectionConfig s @ Session {..} = do | 244 | getConnectionConfig s @ Session {..} = do |
212 | chan <- dupChan broadcast | 245 | chan <- dupChan connectionsBroadcast |
213 | let sessionLink = SessionLink { | 246 | let sessionLink = SessionLink { |
214 | linkTopic = sessionTopic | 247 | linkTopic = sessionTopic |
215 | , linkPeerId = sessionPeerId | 248 | , linkPeerId = sessionPeerId |
@@ -223,28 +256,46 @@ getConnectionConfig s @ Session {..} = do | |||
223 | , cfgWire = mainWire | 256 | , cfgWire = mainWire |
224 | } | 257 | } |
225 | 258 | ||
226 | insert :: PeerAddr IP -> Session -> IO () | 259 | type Finalizer = IO () |
227 | insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) | 260 | type Runner = (ConnectionConfig Session -> IO ()) |
261 | |||
262 | runConnection :: Runner -> Finalizer -> PeerAddr IP -> Session -> IO () | ||
263 | runConnection runner finalize addr set @ Session {..} = do | ||
264 | _ <- forkIO (action `finally` cleanup) | ||
265 | return () | ||
228 | where | 266 | where |
229 | action = do | 267 | action = do |
230 | pendingConnection addr ses | 268 | notExist <- atomically $ pendingConnection addr set |
231 | cfg <- getConnectionConfig ses | 269 | when notExist $ do |
232 | connectWire addr cfg | 270 | cfg <- getConnectionConfig set |
271 | runner cfg | ||
233 | 272 | ||
234 | cleanup = do | 273 | cleanup = do |
235 | runStatusUpdates status (SS.resetPending addr) | 274 | finalize |
275 | -- runStatusUpdates status (SS.resetPending addr) | ||
236 | -- TODO Metata.resetPending addr | 276 | -- TODO Metata.resetPending addr |
237 | closedConnection addr ses | 277 | atomically $ closedConnection addr set |
278 | |||
279 | -- | Establish connection from scratch. If this endpoint is already | ||
280 | -- connected, no new connections is created. This function do not block. | ||
281 | connect :: PeerAddr IP -> Session -> IO () | ||
282 | connect addr = runConnection (connectWire addr) (return ()) addr | ||
238 | 283 | ||
239 | -- TODO closePending on error | 284 | -- | Establish connection with already pre-connected endpoint. If this |
240 | attach :: PendingConnection -> Session -> IO () | 285 | -- endpoint is already connected, no new connections is created. This |
241 | attach = undefined | 286 | -- function do not block. |
287 | -- | ||
288 | -- 'PendingConnection' will be closed automatically, you do not need | ||
289 | -- to call 'closePending'. | ||
290 | establish :: PendingConnection -> Session -> IO () | ||
291 | establish conn = runConnection (acceptWire conn) (closePending conn) | ||
292 | (pendingPeer conn) | ||
242 | 293 | ||
243 | delete :: PeerAddr IP -> Session -> IO () | 294 | -- | Why do we need this message? |
244 | delete = undefined | 295 | type BroadcastMessage = ExtendedCaps -> Message |
245 | 296 | ||
246 | deleteAll :: Session -> IO () | 297 | broadcast :: BroadcastMessage -> Session -> IO () |
247 | deleteAll = undefined | 298 | broadcast = error "broadcast" |
248 | 299 | ||
249 | {----------------------------------------------------------------------- | 300 | {----------------------------------------------------------------------- |
250 | -- Helpers | 301 | -- Helpers |
@@ -292,14 +343,17 @@ tryReadMetadataBlock pix = do | |||
292 | Session {..} <- asks connSession | 343 | Session {..} <- asks connSession |
293 | mcached <- liftIO (tryReadMVar infodict) | 344 | mcached <- liftIO (tryReadMVar infodict) |
294 | case mcached of | 345 | case mcached of |
295 | Nothing -> undefined | 346 | Nothing -> error "tryReadMetadataBlock" |
296 | Just (Cached {..}) -> undefined | 347 | Just (Cached {..}) -> error "tryReadMetadataBlock" |
297 | 348 | ||
298 | sendBroadcast :: PeerMessage msg => msg -> Wire Session () | 349 | sendBroadcast :: PeerMessage msg => msg -> Wire Session () |
299 | sendBroadcast msg = do | 350 | sendBroadcast msg = do |
300 | Session {..} <- asks connSession | 351 | Session {..} <- asks connSession |
301 | ecaps <- use connExtCaps | 352 | error "sendBroadcast" |
302 | liftIO $ writeChan broadcast (envelop ecaps msg) | 353 | -- liftIO $ msg `broadcast` sessionConnections |
354 | |||
355 | waitMetadata :: Session -> IO InfoDict | ||
356 | waitMetadata Session {..} = cachedValue <$> readMVar infodict | ||
303 | 357 | ||
304 | {----------------------------------------------------------------------- | 358 | {----------------------------------------------------------------------- |
305 | -- Triggers | 359 | -- Triggers |
@@ -406,7 +460,7 @@ tryRequestMetadataBlock :: Trigger | |||
406 | tryRequestMetadataBlock = do | 460 | tryRequestMetadataBlock = do |
407 | mpix <- lift $ withMetadataUpdates Metadata.scheduleBlock | 461 | mpix <- lift $ withMetadataUpdates Metadata.scheduleBlock |
408 | case mpix of | 462 | case mpix of |
409 | Nothing -> undefined | 463 | Nothing -> error "tryRequestMetadataBlock" |
410 | Just pix -> sendMessage (MetadataRequest pix) | 464 | Just pix -> sendMessage (MetadataRequest pix) |
411 | 465 | ||
412 | metadataCompleted :: InfoDict -> Trigger | 466 | metadataCompleted :: InfoDict -> Trigger |
@@ -439,7 +493,7 @@ handleMetadata (MetadataUnknown _ ) = do | |||
439 | -----------------------------------------------------------------------} | 493 | -----------------------------------------------------------------------} |
440 | 494 | ||
441 | acceptRehandshake :: ExtendedHandshake -> Trigger | 495 | acceptRehandshake :: ExtendedHandshake -> Trigger |
442 | acceptRehandshake ehs = undefined | 496 | acceptRehandshake ehs = error "acceptRehandshake" |
443 | 497 | ||
444 | handleExtended :: Handler ExtendedMessage | 498 | handleExtended :: Handler ExtendedMessage |
445 | handleExtended (EHandshake ehs) = acceptRehandshake ehs | 499 | handleExtended (EHandshake ehs) = acceptRehandshake ehs |
@@ -451,8 +505,8 @@ handleMessage KeepAlive = return () | |||
451 | handleMessage (Status s) = handleStatus s | 505 | handleMessage (Status s) = handleStatus s |
452 | handleMessage (Available msg) = handleAvailable msg | 506 | handleMessage (Available msg) = handleAvailable msg |
453 | handleMessage (Transfer msg) = handleTransfer msg | 507 | handleMessage (Transfer msg) = handleTransfer msg |
454 | handleMessage (Port n) = undefined | 508 | handleMessage (Port n) = error "handleMessage" |
455 | handleMessage (Fast _) = undefined | 509 | handleMessage (Fast _) = error "handleMessage" |
456 | handleMessage (Extended msg) = handleExtended msg | 510 | handleMessage (Extended msg) = handleExtended msg |
457 | 511 | ||
458 | exchange :: Wire Session () | 512 | exchange :: Wire Session () |
@@ -462,13 +516,22 @@ exchange = do | |||
462 | sendMessage (Bitfield bf) | 516 | sendMessage (Bitfield bf) |
463 | awaitForever handleMessage | 517 | awaitForever handleMessage |
464 | 518 | ||
519 | mainWire :: Wire Session () | ||
520 | mainWire = do | ||
521 | lift establishedConnection | ||
522 | Session {..} <- asks connSession | ||
523 | lift $ resizeBitfield (totalPieces storage) | ||
524 | logEvent "Connection established" | ||
525 | iterM logMessage =$= exchange =$= iterM logMessage | ||
526 | lift finishedConnection | ||
527 | |||
465 | data Event = NewMessage (PeerAddr IP) Message | 528 | data Event = NewMessage (PeerAddr IP) Message |
466 | | Timeout -- for scheduling | 529 | | Timeout -- for scheduling |
467 | 530 | ||
468 | type Exchange a = Wire Session a | 531 | type Exchange a = Wire Session a |
469 | 532 | ||
470 | awaitEvent :: Exchange Event | 533 | awaitEvent :: Exchange Event |
471 | awaitEvent = undefined | 534 | awaitEvent = error "awaitEvent" |
472 | 535 | ||
473 | yieldEvent :: Exchange Event | 536 | yieldEvent :: Exchange Event |
474 | yieldEvent = undefined | 537 | yieldEvent = error "yieldEvent" |