summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-03-05 00:35:47 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-03-05 00:35:47 +0400
commit3c7f12c10caca196a970c0cc8b6c46945c9dee58 (patch)
treee5a4f8d2d29828ecee64c18881322a3cb7602b93 /src/Network/BitTorrent
parent6794c6843e625a3b61fec48e54167a13f5fd093b (diff)
Minor refactoring
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r--src/Network/BitTorrent/Client/Handle.hs6
-rw-r--r--src/Network/BitTorrent/DHT/ContactInfo.hs77
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs5
-rw-r--r--src/Network/BitTorrent/Exchange.hs14
-rw-r--r--src/Network/BitTorrent/Exchange/Connection.hs6
-rw-r--r--src/Network/BitTorrent/Exchange/Manager.hs2
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs221
7 files changed, 236 insertions, 95 deletions
diff --git a/src/Network/BitTorrent/Client/Handle.hs b/src/Network/BitTorrent/Client/Handle.hs
index e7ac779c..e7884caa 100644
--- a/src/Network/BitTorrent/Client/Handle.hs
+++ b/src/Network/BitTorrent/Client/Handle.hs
@@ -110,12 +110,12 @@ start Handle {..} = do
110 Client {..} <- getClient 110 Client {..} <- getClient
111 liftIO $ Tracker.notify trackerManager trackers Tracker.Started 111 liftIO $ Tracker.notify trackerManager trackers Tracker.Started
112 unless private $ do 112 unless private $ do
113 liftDHT $ DHT.insert topic undefined 113 liftDHT $ DHT.insert topic (error "start")
114 liftIO $ do 114 liftIO $ do
115 peers <- askPeers trackerManager trackers 115 peers <- askPeers trackerManager trackers
116 print $ "got: " ++ show (L.length peers) ++ " peers" 116 print $ "got: " ++ show (L.length peers) ++ " peers"
117 forM_ peers $ \ peer -> do 117 forM_ peers $ \ peer -> do
118 Exchange.insert peer exchange 118 Exchange.connect peer exchange
119 119
120-- | Stop downloading this torrent. 120-- | Stop downloading this torrent.
121pause :: Handle -> BitTorrent () 121pause :: Handle -> BitTorrent ()
@@ -126,7 +126,7 @@ stop :: Handle -> BitTorrent ()
126stop Handle {..} = do 126stop Handle {..} = do
127 Client {..} <- getClient 127 Client {..} <- getClient
128 unless private $ do 128 unless private $ do
129 liftDHT $ DHT.delete topic undefined 129 liftDHT $ DHT.delete topic (error "stop")
130 liftIO $ Tracker.notify trackerManager trackers Tracker.Stopped 130 liftIO $ Tracker.notify trackerManager trackers Tracker.Stopped
131 131
132{----------------------------------------------------------------------- 132{-----------------------------------------------------------------------
diff --git a/src/Network/BitTorrent/DHT/ContactInfo.hs b/src/Network/BitTorrent/DHT/ContactInfo.hs
index 06d2dac0..028a4214 100644
--- a/src/Network/BitTorrent/DHT/ContactInfo.hs
+++ b/src/Network/BitTorrent/DHT/ContactInfo.hs
@@ -1,4 +1,6 @@
1module Network.BitTorrent.DHT.ContactInfo () where 1module Network.BitTorrent.DHT.ContactInfo
2 ( ) where
3{-
2import Data.HashMap.Strict as HM 4import Data.HashMap.Strict as HM
3 5
4import Data.Torrent.InfoHash 6import Data.Torrent.InfoHash
@@ -8,6 +10,57 @@ import Network.BitTorrent.Core
8-- decrease prefix when table is too small 10-- decrease prefix when table is too small
9-- filter outdated peers 11-- filter outdated peers
10 12
13{-----------------------------------------------------------------------
14-- PeerSet
15-----------------------------------------------------------------------}
16
17type PeerSet a = [(PeerAddr a, NodeInfo a, Timestamp)]
18
19-- compare PSQueue vs Ordered list
20
21takeNewest :: PeerSet a -> [PeerAddr a]
22takeNewest = undefined
23
24dropOld :: Timestamp -> PeerSet a -> PeerSet a
25dropOld = undefined
26
27insert :: PeerAddr a -> Timestamp -> PeerSet a -> PeerSet a
28insert = undefined
29
30type Mask = Int
31type Size = Int
32type Timestamp = Int
33
34{-----------------------------------------------------------------------
35-- InfoHashMap
36-----------------------------------------------------------------------}
37
38-- compare handwritten prefix tree versus IntMap
39
40data Tree a
41 = Nil
42 | Tip !InfoHash !(PeerSet a)
43 | Bin !InfoHash !Mask !Size !Timestamp (Tree a) (Tree a)
44
45insertTree :: InfoHash -> a -> Tree a -> Tree a
46insertTree = undefined
47
48type Prio = Int
49
50--shrink :: ContactInfo ip -> Int
51shrink Nil = Nil
52shrink (Tip _ _) = undefined
53shrink (Bin _ _) = undefined
54
55{-----------------------------------------------------------------------
56-- InfoHashMap
57-----------------------------------------------------------------------}
58
59-- compare new design versus HashMap
60
61data IntMap k p a
62type ContactInfo = Map InfoHash Timestamp (Set (PeerAddr IP) Timestamp)
63
11data ContactInfo ip = PeerStore 64data ContactInfo ip = PeerStore
12 { maxSize :: Int 65 { maxSize :: Int
13 , prefixSize :: Int 66 , prefixSize :: Int
@@ -16,3 +69,25 @@ data ContactInfo ip = PeerStore
16 , count :: Int -- ^ Cached size of the 'peerSet' 69 , count :: Int -- ^ Cached size of the 'peerSet'
17 , peerSet :: HashMap InfoHash [PeerAddr ip] 70 , peerSet :: HashMap InfoHash [PeerAddr ip]
18 } 71 }
72
73size :: ContactInfo ip -> Int
74size = undefined
75
76prefixSize :: ContactInfo ip -> Int
77prefixSize = undefined
78
79lookup :: InfoHash -> ContactInfo ip -> [PeerAddr ip]
80lookup = undefined
81
82insert :: InfoHash -> PeerAddr ip -> ContactInfo ip -> ContactInfo ip
83insert = undefined
84
85-- | Limit in size.
86prune :: NodeId -> Int -> ContactInfo ip -> ContactInfo ip
87prune pref targetSize Nil = Nil
88prune pref targetSize (Tip _ _) = undefined
89
90-- | Remove expired entries.
91splitGT :: Timestamp -> ContactInfo ip -> ContactInfo ip
92splitGT = undefined
93-} \ No newline at end of file
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index 755985fc..e770b1d3 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -434,15 +434,20 @@ insertNode info = fork $ do
434 434
435-- TODO limit dht peer store in size (probably by removing oldest peers) 435-- TODO limit dht peer store in size (probably by removing oldest peers)
436 436
437refreshContacts :: DHT ip ()
438refreshContacts = undefined
439
437-- | Insert peer to peer store. Used to handle announce requests. 440-- | Insert peer to peer store. Used to handle announce requests.
438insertPeer :: Eq ip => InfoHash -> PeerAddr ip -> DHT ip () 441insertPeer :: Eq ip => InfoHash -> PeerAddr ip -> DHT ip ()
439insertPeer ih addr = do 442insertPeer ih addr = do
443 refreshContacts
440 var <- asks contactInfo 444 var <- asks contactInfo
441 liftIO $ atomically $ modifyTVar' var (P.insert ih addr) 445 liftIO $ atomically $ modifyTVar' var (P.insert ih addr)
442 446
443-- | Get peer set for specific swarm. 447-- | Get peer set for specific swarm.
444lookupPeers :: InfoHash -> DHT ip [PeerAddr ip] 448lookupPeers :: InfoHash -> DHT ip [PeerAddr ip]
445lookupPeers ih = do 449lookupPeers ih = do
450 refreshContacts
446 var <- asks contactInfo 451 var <- asks contactInfo
447 liftIO $ P.lookup ih <$> readTVarIO var 452 liftIO $ P.lookup ih <$> readTVarIO var
448 453
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index 86e13d58..8dac3c84 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -6,25 +6,23 @@
6-- Portability : portable 6-- Portability : portable
7-- 7--
8module Network.BitTorrent.Exchange 8module Network.BitTorrent.Exchange
9 ( -- * Options 9 ( -- * Manager
10 Options (..) 10 Options (..)
11 , Caps
12 , Extension
13 , toCaps
14
15 -- * Manager
16 , Manager 11 , Manager
17 , Handler 12 , Handler
18 , newManager 13 , newManager
19 , closeManager 14 , closeManager
20 15
21 -- * Session 16 -- * Session
17 , Caps
18 , Extension
19 , toCaps
22 , Session 20 , Session
23 , newSession 21 , newSession
24 , closeSession 22 , closeSession
25 23
26 -- * Session control 24 -- * Connections
27 , insert 25 , connect
28 ) where 26 ) where
29 27
30import Network.BitTorrent.Exchange.Manager 28import Network.BitTorrent.Exchange.Manager
diff --git a/src/Network/BitTorrent/Exchange/Connection.hs b/src/Network/BitTorrent/Exchange/Connection.hs
index b23eb08b..dde9a468 100644
--- a/src/Network/BitTorrent/Exchange/Connection.hs
+++ b/src/Network/BitTorrent/Exchange/Connection.hs
@@ -723,10 +723,10 @@ extendedHandshake caps = do
723 _ -> protocolError HandshakeRefused 723 _ -> protocolError HandshakeRefused
724 724
725rehandshake :: ExtendedCaps -> Wire s () 725rehandshake :: ExtendedCaps -> Wire s ()
726rehandshake caps = undefined 726rehandshake caps = error "rehandshake"
727 727
728reconnect :: Wire s () 728reconnect :: Wire s ()
729reconnect = undefined 729reconnect = error "reconnect"
730 730
731data ConnectionId = ConnectionId 731data ConnectionId = ConnectionId
732 { topic :: !InfoHash 732 { topic :: !InfoHash
@@ -751,7 +751,7 @@ instance Default ConnectionPrefs where
751 } 751 }
752 752
753normalize :: ConnectionPrefs -> ConnectionPrefs 753normalize :: ConnectionPrefs -> ConnectionPrefs
754normalize = undefined 754normalize = error "normalize"
755 755
756-- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'. 756-- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'.
757data SessionLink s = SessionLink 757data SessionLink s = SessionLink
diff --git a/src/Network/BitTorrent/Exchange/Manager.hs b/src/Network/BitTorrent/Exchange/Manager.hs
index f7f3cea7..b9aaa818 100644
--- a/src/Network/BitTorrent/Exchange/Manager.hs
+++ b/src/Network/BitTorrent/Exchange/Manager.hs
@@ -39,7 +39,7 @@ handleNewConn :: Socket -> PeerAddr IP -> Handler -> IO ()
39handleNewConn sock addr handler = do 39handleNewConn sock addr handler = do
40 conn <- newPendingConnection sock addr 40 conn <- newPendingConnection sock addr
41 ses <- handler (pendingTopic conn) `onException` closePending conn 41 ses <- handler (pendingTopic conn) `onException` closePending conn
42 attach conn ses 42 establish conn ses
43 43
44listenIncoming :: Options -> Handler -> IO () 44listenIncoming :: Options -> Handler -> IO ()
45listenIncoming Options {..} handler = do 45listenIncoming Options {..} handler = do
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index 74d0cc87..8c3d5388 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -2,16 +2,19 @@
2{-# LANGUAGE TemplateHaskell #-} 2{-# LANGUAGE TemplateHaskell #-}
3{-# LANGUAGE DeriveDataTypeable #-} 3{-# LANGUAGE DeriveDataTypeable #-}
4module Network.BitTorrent.Exchange.Session 4module Network.BitTorrent.Exchange.Session
5 ( Session 5 ( -- * Session
6 Session
6 , LogFun 7 , LogFun
8 , sessionLogger
9
10 -- * Construction
7 , newSession 11 , newSession
8 , closeSession 12 , closeSession
13 , withSession
9 14
10 -- * Connections 15 -- * Connection Set
11 , Network.BitTorrent.Exchange.Session.insert 16 , connect
12 , Network.BitTorrent.Exchange.Session.attach 17 , establish
13 , Network.BitTorrent.Exchange.Session.delete
14 , Network.BitTorrent.Exchange.Session.deleteAll
15 18
16 -- * Events 19 -- * Events
17 , waitMetadata 20 , waitMetadata
@@ -70,6 +73,7 @@ packException f m = try m >>= either (throwIO . f) return
70{----------------------------------------------------------------------- 73{-----------------------------------------------------------------------
71-- Session 74-- Session
72-----------------------------------------------------------------------} 75-----------------------------------------------------------------------}
76-- TODO unmap storage on zero connections
73 77
74data Cached a = Cached 78data Cached a = Cached
75 { cachedValue :: !a 79 { cachedValue :: !a
@@ -79,9 +83,13 @@ data Cached a = Cached
79cache :: BEncode a => a -> Cached a 83cache :: BEncode a => a -> Cached a
80cache s = Cached s (BE.encode s) 84cache s = Cached s (BE.encode s)
81 85
86-- | Logger function.
87type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
88
82data Session = Session 89data Session = Session
83 { sessionPeerId :: !(PeerId) 90 { sessionPeerId :: !(PeerId)
84 , sessionTopic :: !(InfoHash) 91 , sessionTopic :: !(InfoHash)
92 , sessionLogger :: !(LogFun)
85 93
86 , metadata :: !(MVar Metadata.Status) 94 , metadata :: !(MVar Metadata.Status)
87 , infodict :: !(MVar (Cached InfoDict)) 95 , infodict :: !(MVar (Cached InfoDict))
@@ -90,16 +98,27 @@ data Session = Session
90 , storage :: !(Storage) 98 , storage :: !(Storage)
91 99
92 , connectionsPrefs :: !ConnectionPrefs 100 , connectionsPrefs :: !ConnectionPrefs
101
102 -- | Connections either waiting for TCP/uTP 'connect' or waiting
103 -- for BT handshake.
93 , connectionsPending :: !(TVar (Set (PeerAddr IP))) 104 , connectionsPending :: !(TVar (Set (PeerAddr IP)))
105
106 -- | Connections successfully handshaked and data transfer can
107 -- take place.
94 , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session))) 108 , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session)))
109
110 -- | TODO implement choking mechanism
95 , connectionsUnchoked :: [PeerAddr IP] 111 , connectionsUnchoked :: [PeerAddr IP]
96 , broadcast :: !(Chan Message)
97 112
98 , logger :: !(LogFun) 113 -- | Messages written to this channel will be sent to the all
114 -- connections, including pending connections (but right after
115 -- handshake).
116 , connectionsBroadcast :: !(Chan Message)
99 } 117 }
100 118
101-- | Logger function. 119{-----------------------------------------------------------------------
102type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () 120-- Session construction
121-----------------------------------------------------------------------}
103 122
104newSession :: LogFun 123newSession :: LogFun
105 -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; 124 -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer;
@@ -108,17 +127,20 @@ newSession :: LogFun
108 -> IO Session -- ^ 127 -> IO Session -- ^
109newSession logFun addr rootPath dict = do 128newSession logFun addr rootPath dict = do
110 pid <- maybe genPeerId return (peerId addr) 129 pid <- maybe genPeerId return (peerId addr)
111 pconnVar <- newTVarIO S.empty
112 econnVar <- newTVarIO M.empty
113 store <- openInfoDict ReadWriteEx rootPath dict 130 store <- openInfoDict ReadWriteEx rootPath dict
114 statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) 131 statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store))
115 (piPieceLength (idPieceInfo dict)) 132 (piPieceLength (idPieceInfo dict))
116 metadataVar <- newMVar undefined 133 metadataVar <- newMVar (error "sessionMetadata")
117 infodictVar <- newMVar (cache dict) 134 infodictVar <- newMVar (cache dict)
135
136 pSetVar <- newTVarIO S.empty
137 eSetVar <- newTVarIO M.empty
118 chan <- newChan 138 chan <- newChan
139
119 return Session 140 return Session
120 { sessionPeerId = pid 141 { sessionPeerId = pid
121 , sessionTopic = idInfoHash dict 142 , sessionTopic = idInfoHash dict
143 , sessionLogger = logFun
122 144
123 , metadata = metadataVar 145 , metadata = metadataVar
124 , infodict = infodictVar 146 , infodict = infodictVar
@@ -127,21 +149,25 @@ newSession logFun addr rootPath dict = do
127 , storage = store 149 , storage = store
128 150
129 , connectionsPrefs = def 151 , connectionsPrefs = def
130 , connectionsPending = pconnVar 152 , connectionsPending = pSetVar
131 , connectionsEstablished = econnVar 153 , connectionsEstablished = eSetVar
132 , connectionsUnchoked = [] 154 , connectionsUnchoked = []
133 , broadcast = chan 155 , connectionsBroadcast = chan
134
135 , logger = logFun
136 } 156 }
137 157
138closeSession :: Session -> IO () 158closeSession :: Session -> IO ()
139closeSession ses = do 159closeSession Session {..} = do
140 deleteAll ses 160 close storage
141 error "closeSession" 161{-
142 162 hSet <- atomically $ do
143waitMetadata :: Session -> IO InfoDict 163 pSet <- swapTVar connectionsPending S.empty
144waitMetadata Session {..} = cachedValue <$> readMVar infodict 164 eSet <- swapTVar connectionsEstablished S.empty
165 return pSet
166 mapM_ kill hSet
167-}
168
169withSession :: ()
170withSession = error "withSession"
145 171
146{----------------------------------------------------------------------- 172{-----------------------------------------------------------------------
147-- Logging 173-- Logging
@@ -153,7 +179,7 @@ instance MonadLogger (Connected Session) where
153 ses <- asks connSession 179 ses <- asks connSession
154 addr <- asks connRemoteAddr 180 addr <- asks connRemoteAddr
155 let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) 181 let addrSrc = src <> " @ " <> T.pack (render (pretty addr))
156 liftIO $ logger ses loc addrSrc lvl (toLogStr msg) 182 liftIO $ sessionLogger ses loc addrSrc lvl (toLogStr msg)
157 183
158logMessage :: MonadLogger m => Message -> m () 184logMessage :: MonadLogger m => Message -> m ()
159logMessage msg = logDebugN $ T.pack (render (pretty msg)) 185logMessage msg = logDebugN $ T.pack (render (pretty msg))
@@ -162,14 +188,22 @@ logEvent :: MonadLogger m => Text -> m ()
162logEvent = logInfoN 188logEvent = logInfoN
163 189
164{----------------------------------------------------------------------- 190{-----------------------------------------------------------------------
165-- Connection slots 191-- Connection set
166-----------------------------------------------------------------------} 192-----------------------------------------------------------------------}
167--- pending -> established -> closed 193--- Connection status transition:
168--- | /|\ 194---
169--- \-------------------------| 195--- pending -> established -> finished -> closed
170 196--- | \|/ /|\
171pendingConnection :: PeerAddr IP -> Session -> IO Bool 197--- \-------------------------------------|
172pendingConnection addr Session {..} = atomically $ do 198---
199--- Purpose of slots:
200--- 1) to avoid duplicates
201--- 2) connect concurrently
202---
203
204-- | Add connection to the pending set.
205pendingConnection :: PeerAddr IP -> Session -> STM Bool
206pendingConnection addr Session {..} = do
173 pSet <- readTVar connectionsPending 207 pSet <- readTVar connectionsPending
174 eSet <- readTVar connectionsEstablished 208 eSet <- readTVar connectionsEstablished
175 if (addr `S.member` pSet) || (addr `M.member` eSet) 209 if (addr `S.member` pSet) || (addr `M.member` eSet)
@@ -178,38 +212,37 @@ pendingConnection addr Session {..} = atomically $ do
178 modifyTVar' connectionsPending (S.insert addr) 212 modifyTVar' connectionsPending (S.insert addr)
179 return True 213 return True
180 214
215-- | Pending connection successfully established, add it to the
216-- established set.
181establishedConnection :: Connected Session () 217establishedConnection :: Connected Session ()
182establishedConnection = undefined --atomically $ do 218establishedConnection = do
183-- pSet <- readTVar pendingConnections 219 conn <- ask
184-- eSet <- readTVar 220 addr <- asks connRemoteAddr
185 undefined 221 Session {..} <- asks connSession
222 liftIO $ atomically $ do
223 modifyTVar connectionsPending (S.delete addr)
224 modifyTVar connectionsEstablished (M.insert addr conn)
186 225
226-- | Either this or remote peer decided to finish conversation
227-- (conversation is alread /established/ connection), remote it from
228-- the established set.
187finishedConnection :: Connected Session () 229finishedConnection :: Connected Session ()
188finishedConnection = return () 230finishedConnection = do
231 Session {..} <- asks connSession
232 addr <- asks connRemoteAddr
233 liftIO $ atomically $ do
234 modifyTVar connectionsEstablished $ M.delete addr
189 235
190-- | There are no state for this connection, remove it. 236-- | There are no state for this connection, remove it from the all
191closedConnection :: PeerAddr IP -> Session -> IO () 237-- sets.
192closedConnection addr Session {..} = atomically $ do 238closedConnection :: PeerAddr IP -> Session -> STM ()
239closedConnection addr Session {..} = do
193 modifyTVar connectionsPending $ S.delete addr 240 modifyTVar connectionsPending $ S.delete addr
194 modifyTVar connectionsEstablished $ M.delete addr 241 modifyTVar connectionsEstablished $ M.delete addr
195 242
196{-----------------------------------------------------------------------
197-- Connections
198-----------------------------------------------------------------------}
199-- TODO unmap storage on zero connections
200
201mainWire :: Wire Session ()
202mainWire = do
203 lift establishedConnection
204 Session {..} <- asks connSession
205 lift $ resizeBitfield (totalPieces storage)
206 logEvent "Connection established"
207 iterM logMessage =$= exchange =$= iterM logMessage
208 lift finishedConnection
209
210getConnectionConfig :: Session -> IO (ConnectionConfig Session) 243getConnectionConfig :: Session -> IO (ConnectionConfig Session)
211getConnectionConfig s @ Session {..} = do 244getConnectionConfig s @ Session {..} = do
212 chan <- dupChan broadcast 245 chan <- dupChan connectionsBroadcast
213 let sessionLink = SessionLink { 246 let sessionLink = SessionLink {
214 linkTopic = sessionTopic 247 linkTopic = sessionTopic
215 , linkPeerId = sessionPeerId 248 , linkPeerId = sessionPeerId
@@ -223,28 +256,46 @@ getConnectionConfig s @ Session {..} = do
223 , cfgWire = mainWire 256 , cfgWire = mainWire
224 } 257 }
225 258
226insert :: PeerAddr IP -> Session -> IO () 259type Finalizer = IO ()
227insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) 260type Runner = (ConnectionConfig Session -> IO ())
261
262runConnection :: Runner -> Finalizer -> PeerAddr IP -> Session -> IO ()
263runConnection runner finalize addr set @ Session {..} = do
264 _ <- forkIO (action `finally` cleanup)
265 return ()
228 where 266 where
229 action = do 267 action = do
230 pendingConnection addr ses 268 notExist <- atomically $ pendingConnection addr set
231 cfg <- getConnectionConfig ses 269 when notExist $ do
232 connectWire addr cfg 270 cfg <- getConnectionConfig set
271 runner cfg
233 272
234 cleanup = do 273 cleanup = do
235 runStatusUpdates status (SS.resetPending addr) 274 finalize
275-- runStatusUpdates status (SS.resetPending addr)
236 -- TODO Metata.resetPending addr 276 -- TODO Metata.resetPending addr
237 closedConnection addr ses 277 atomically $ closedConnection addr set
278
279-- | Establish connection from scratch. If this endpoint is already
280-- connected, no new connections is created. This function do not block.
281connect :: PeerAddr IP -> Session -> IO ()
282connect addr = runConnection (connectWire addr) (return ()) addr
238 283
239-- TODO closePending on error 284-- | Establish connection with already pre-connected endpoint. If this
240attach :: PendingConnection -> Session -> IO () 285-- endpoint is already connected, no new connections is created. This
241attach = undefined 286-- function do not block.
287--
288-- 'PendingConnection' will be closed automatically, you do not need
289-- to call 'closePending'.
290establish :: PendingConnection -> Session -> IO ()
291establish conn = runConnection (acceptWire conn) (closePending conn)
292 (pendingPeer conn)
242 293
243delete :: PeerAddr IP -> Session -> IO () 294-- | Why do we need this message?
244delete = undefined 295type BroadcastMessage = ExtendedCaps -> Message
245 296
246deleteAll :: Session -> IO () 297broadcast :: BroadcastMessage -> Session -> IO ()
247deleteAll = undefined 298broadcast = error "broadcast"
248 299
249{----------------------------------------------------------------------- 300{-----------------------------------------------------------------------
250-- Helpers 301-- Helpers
@@ -292,14 +343,17 @@ tryReadMetadataBlock pix = do
292 Session {..} <- asks connSession 343 Session {..} <- asks connSession
293 mcached <- liftIO (tryReadMVar infodict) 344 mcached <- liftIO (tryReadMVar infodict)
294 case mcached of 345 case mcached of
295 Nothing -> undefined 346 Nothing -> error "tryReadMetadataBlock"
296 Just (Cached {..}) -> undefined 347 Just (Cached {..}) -> error "tryReadMetadataBlock"
297 348
298sendBroadcast :: PeerMessage msg => msg -> Wire Session () 349sendBroadcast :: PeerMessage msg => msg -> Wire Session ()
299sendBroadcast msg = do 350sendBroadcast msg = do
300 Session {..} <- asks connSession 351 Session {..} <- asks connSession
301 ecaps <- use connExtCaps 352 error "sendBroadcast"
302 liftIO $ writeChan broadcast (envelop ecaps msg) 353-- liftIO $ msg `broadcast` sessionConnections
354
355waitMetadata :: Session -> IO InfoDict
356waitMetadata Session {..} = cachedValue <$> readMVar infodict
303 357
304{----------------------------------------------------------------------- 358{-----------------------------------------------------------------------
305-- Triggers 359-- Triggers
@@ -406,7 +460,7 @@ tryRequestMetadataBlock :: Trigger
406tryRequestMetadataBlock = do 460tryRequestMetadataBlock = do
407 mpix <- lift $ withMetadataUpdates Metadata.scheduleBlock 461 mpix <- lift $ withMetadataUpdates Metadata.scheduleBlock
408 case mpix of 462 case mpix of
409 Nothing -> undefined 463 Nothing -> error "tryRequestMetadataBlock"
410 Just pix -> sendMessage (MetadataRequest pix) 464 Just pix -> sendMessage (MetadataRequest pix)
411 465
412metadataCompleted :: InfoDict -> Trigger 466metadataCompleted :: InfoDict -> Trigger
@@ -439,7 +493,7 @@ handleMetadata (MetadataUnknown _ ) = do
439-----------------------------------------------------------------------} 493-----------------------------------------------------------------------}
440 494
441acceptRehandshake :: ExtendedHandshake -> Trigger 495acceptRehandshake :: ExtendedHandshake -> Trigger
442acceptRehandshake ehs = undefined 496acceptRehandshake ehs = error "acceptRehandshake"
443 497
444handleExtended :: Handler ExtendedMessage 498handleExtended :: Handler ExtendedMessage
445handleExtended (EHandshake ehs) = acceptRehandshake ehs 499handleExtended (EHandshake ehs) = acceptRehandshake ehs
@@ -451,8 +505,8 @@ handleMessage KeepAlive = return ()
451handleMessage (Status s) = handleStatus s 505handleMessage (Status s) = handleStatus s
452handleMessage (Available msg) = handleAvailable msg 506handleMessage (Available msg) = handleAvailable msg
453handleMessage (Transfer msg) = handleTransfer msg 507handleMessage (Transfer msg) = handleTransfer msg
454handleMessage (Port n) = undefined 508handleMessage (Port n) = error "handleMessage"
455handleMessage (Fast _) = undefined 509handleMessage (Fast _) = error "handleMessage"
456handleMessage (Extended msg) = handleExtended msg 510handleMessage (Extended msg) = handleExtended msg
457 511
458exchange :: Wire Session () 512exchange :: Wire Session ()
@@ -462,13 +516,22 @@ exchange = do
462 sendMessage (Bitfield bf) 516 sendMessage (Bitfield bf)
463 awaitForever handleMessage 517 awaitForever handleMessage
464 518
519mainWire :: Wire Session ()
520mainWire = do
521 lift establishedConnection
522 Session {..} <- asks connSession
523 lift $ resizeBitfield (totalPieces storage)
524 logEvent "Connection established"
525 iterM logMessage =$= exchange =$= iterM logMessage
526 lift finishedConnection
527
465data Event = NewMessage (PeerAddr IP) Message 528data Event = NewMessage (PeerAddr IP) Message
466 | Timeout -- for scheduling 529 | Timeout -- for scheduling
467 530
468type Exchange a = Wire Session a 531type Exchange a = Wire Session a
469 532
470awaitEvent :: Exchange Event 533awaitEvent :: Exchange Event
471awaitEvent = undefined 534awaitEvent = error "awaitEvent"
472 535
473yieldEvent :: Exchange Event 536yieldEvent :: Exchange Event
474yieldEvent = undefined 537yieldEvent = error "yieldEvent"