diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent.hs | 18 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 233 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Protocol.hs | 33 | ||||
-rw-r--r-- | src/Network/BitTorrent/Internal.hs | 125 |
4 files changed, 267 insertions, 142 deletions
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs index 8f0f42ce..e9b5f17d 100644 --- a/src/Network/BitTorrent.hs +++ b/src/Network/BitTorrent.hs | |||
@@ -21,10 +21,9 @@ module Network.BitTorrent | |||
21 | , discover | 21 | , discover |
22 | 22 | ||
23 | -- * Peer to Peer | 23 | -- * Peer to Peer |
24 | , P2P, PeerSession | 24 | , P2P |
25 | ( connectedPeerAddr, enabledExtensions | 25 | , PeerSession ( connectedPeerAddr, enabledExtensions ) |
26 | , peerBitfield, peerSessionStatus | 26 | , Block(..), BlockIx(..) |
27 | ) | ||
28 | 27 | ||
29 | , awaitEvent, signalEvent | 28 | , awaitEvent, signalEvent |
30 | ) where | 29 | ) where |
@@ -43,10 +42,13 @@ import Network.BitTorrent.Tracker | |||
43 | -- thus we can obtain unified interface | 42 | -- thus we can obtain unified interface |
44 | 43 | ||
45 | discover :: SwarmSession -> P2P () -> IO () | 44 | discover :: SwarmSession -> P2P () -> IO () |
46 | discover swarm @ SwarmSession {..} action = do | 45 | discover swarm action = do |
47 | let conn = TConnection (tAnnounce torrentMeta) (tInfoHash torrentMeta) | 46 | let conn = TConnection (tAnnounce (torrentMeta swarm)) |
48 | (clientPeerID clientSession) port | 47 | (tInfoHash (torrentMeta swarm)) |
49 | progress <- readIORef (currentProgress clientSession) | 48 | (clientPeerID (clientSession swarm)) |
49 | port | ||
50 | |||
51 | progress <- getCurrentProgress (clientSession swarm) | ||
50 | withTracker progress conn $ \tses -> do | 52 | withTracker progress conn $ \tses -> do |
51 | forever $ do | 53 | forever $ do |
52 | addr <- getPeerAddr tses | 54 | addr <- getPeerAddr tses |
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index dda7d304..65ec0eb7 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -5,12 +5,19 @@ | |||
5 | -- Stability : experimental | 5 | -- Stability : experimental |
6 | -- Portability : portable | 6 | -- Portability : portable |
7 | -- | 7 | -- |
8 | {-# LANGUAGE DoAndIfThenElse #-} | 8 | {-# LANGUAGE OverloadedStrings #-} |
9 | {-# LANGUAGE DoAndIfThenElse #-} | ||
9 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | 10 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} |
10 | {-# LANGUAGE MultiParamTypeClasses #-} | 11 | {-# LANGUAGE MultiParamTypeClasses #-} |
11 | {-# LANGUAGE RecordWildCards #-} | 12 | {-# LANGUAGE RecordWildCards #-} |
12 | module Network.BitTorrent.Exchange | 13 | module Network.BitTorrent.Exchange |
13 | ( P2P, withPeer | 14 | ( -- * Block |
15 | Block(..), BlockIx(..) | ||
16 | |||
17 | -- * Event | ||
18 | , Event(..) | ||
19 | |||
20 | , P2P, withPeer | ||
14 | , awaitEvent, signalEvent | 21 | , awaitEvent, signalEvent |
15 | ) where | 22 | ) where |
16 | 23 | ||
@@ -18,6 +25,7 @@ import Control.Applicative | |||
18 | import Control.Concurrent | 25 | import Control.Concurrent |
19 | import Control.Concurrent.STM | 26 | import Control.Concurrent.STM |
20 | import Control.Exception | 27 | import Control.Exception |
28 | import Control.Lens | ||
21 | import Control.Monad.Reader | 29 | import Control.Monad.Reader |
22 | import Control.Monad.State | 30 | import Control.Monad.State |
23 | 31 | ||
@@ -30,6 +38,7 @@ import Data.Conduit as C | |||
30 | import Data.Conduit.Cereal | 38 | import Data.Conduit.Cereal |
31 | import Data.Conduit.Network | 39 | import Data.Conduit.Network |
32 | import Data.Serialize as S | 40 | import Data.Serialize as S |
41 | import Text.PrettyPrint as PP hiding (($$)) | ||
33 | 42 | ||
34 | import Network | 43 | import Network |
35 | 44 | ||
@@ -43,8 +52,8 @@ import Data.Torrent | |||
43 | 52 | ||
44 | 53 | ||
45 | data Event = Available Bitfield | 54 | data Event = Available Bitfield |
46 | | Want | 55 | | Want BlockIx |
47 | | Block | 56 | | Fragment Block |
48 | deriving Show | 57 | deriving Show |
49 | 58 | ||
50 | {----------------------------------------------------------------------- | 59 | {----------------------------------------------------------------------- |
@@ -53,88 +62,132 @@ data Event = Available Bitfield | |||
53 | 62 | ||
54 | type PeerWire = ConduitM Message Message IO | 63 | type PeerWire = ConduitM Message Message IO |
55 | 64 | ||
56 | waitMessage :: PeerSession -> PeerWire Message | 65 | runConduit :: Socket -> PeerWire () -> IO () |
57 | waitMessage se = do | 66 | runConduit sock p2p = |
58 | mmsg <- await | 67 | sourceSocket sock $= |
59 | case mmsg of | 68 | conduitGet S.get $= |
60 | Nothing -> waitMessage se | 69 | forever p2p $= |
61 | Just msg -> do | 70 | conduitPut S.put $$ |
62 | liftIO $ updateIncoming se | 71 | sinkSocket sock |
63 | liftIO $ print msg | 72 | |
64 | return msg | 73 | waitMessage :: P2P Message |
65 | 74 | waitMessage = P2P (ReaderT go) | |
66 | signalMessage :: PeerSession -> Message -> PeerWire () | 75 | where |
67 | signalMessage se msg = do | 76 | go se = do |
77 | mmsg <- await | ||
78 | case mmsg of | ||
79 | Nothing -> go se | ||
80 | Just msg -> do | ||
81 | liftIO $ updateIncoming se | ||
82 | liftIO $ print msg | ||
83 | return msg | ||
84 | |||
85 | signalMessage :: Message -> P2P () | ||
86 | signalMessage msg = P2P $ ReaderT $ \se -> do | ||
68 | C.yield msg | 87 | C.yield msg |
69 | liftIO $ updateOutcoming se | 88 | liftIO $ updateOutcoming se |
70 | 89 | ||
71 | 90 | ||
72 | getPieceCount :: PeerSession -> IO PieceCount | 91 | peerWant :: P2P Bitfield |
73 | getPieceCount = undefined | 92 | peerWant = BF.difference <$> getPeerBF <*> use bitfield |
93 | |||
94 | clientWant :: P2P Bitfield | ||
95 | clientWant = BF.difference <$> use bitfield <*> getPeerBF | ||
74 | 96 | ||
75 | canOffer :: PeerSession -> PeerWire Bitfield | 97 | peerOffer :: P2P Bitfield |
76 | canOffer PeerSession {..} = liftIO $ do | 98 | peerOffer = do |
77 | pbf <- readIORef $ peerBitfield | 99 | sessionStatus <- use status |
78 | cbf <- readIORef $ clientBitfield $ swarmSession | 100 | if canDownload sessionStatus then clientWant else emptyBF |
79 | return $ BF.difference pbf cbf | ||
80 | 101 | ||
81 | revise :: PeerSession -> PeerWire () | 102 | clientOffer :: P2P Bitfield |
82 | revise se @ PeerSession {..} = do | 103 | clientOffer = do |
83 | isInteresting <- (not . BF.null) <$> canOffer se | 104 | sessionStatus <- use status |
84 | SessionStatus {..} <- liftIO $ readIORef peerSessionStatus | 105 | if canUpload sessionStatus then peerWant else emptyBF |
85 | 106 | ||
86 | when (isInteresting /= _interested seClientStatus) $ | 107 | revise :: P2P () |
87 | signalMessage se $ if isInteresting then Interested else NotInterested | 108 | revise = do |
109 | peerInteresting <- (not . BF.null) <$> clientWant | ||
110 | clientInterested <- use (status.clientStatus.interested) | ||
88 | 111 | ||
112 | when (clientInterested /= peerInteresting) $ | ||
113 | signalMessage $ if peerInteresting then Interested else NotInterested | ||
89 | 114 | ||
90 | nextEvent :: PeerSession -> PeerWire Event | 115 | requireExtension :: Extension -> P2P () |
91 | nextEvent se @ PeerSession {..} = waitMessage se >>= go | 116 | requireExtension required = do |
117 | enabled <- asks enabledExtensions | ||
118 | unless (required `elem` enabled) $ | ||
119 | sessionError $ ppExtension required <+> "not enabled" | ||
120 | |||
121 | peerHave :: P2P Event | ||
122 | peerHave = undefined | ||
123 | |||
124 | -- haveMessage bf = do | ||
125 | -- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession | ||
126 | -- if undefined -- ix `member` bf | ||
127 | -- then nextEvent se | ||
128 | -- else undefined -- return $ Available diff | ||
129 | |||
130 | |||
131 | -- | | ||
132 | -- properties: | ||
133 | -- | ||
134 | -- forall (Fragment block). isPiece block == True | ||
135 | -- | ||
136 | awaitEvent :: P2P Event | ||
137 | awaitEvent = waitMessage >>= go | ||
92 | where | 138 | where |
93 | go KeepAlive = nextEvent se | 139 | go KeepAlive = awaitEvent |
94 | go Choke = do | 140 | go Choke = do |
95 | SessionStatus {..} <- liftIO $ readIORef peerSessionStatus | 141 | status.peerStatus.choking .= True |
96 | if _choking sePeerStatus | 142 | awaitEvent |
97 | then nextEvent se | ||
98 | else undefined | ||
99 | 143 | ||
100 | go Unchoke = do | 144 | go Unchoke = do |
101 | SessionStatus {..} <- liftIO $ readIORef peerSessionStatus | 145 | status.clientStatus.choking .= False |
102 | if not (_choking sePeerStatus) | 146 | awaitEvent |
103 | then nextEvent se | 147 | |
104 | else if undefined | 148 | go Interested = do |
105 | then undefined | 149 | status.peerStatus.interested .= True |
106 | else undefined | 150 | awaitEvent |
107 | --return $ Available BF.difference | 151 | |
108 | 152 | go NotInterested = do | |
109 | go Interested = return undefined | 153 | status.peerStatus.interested .= False |
110 | go NotInterested = return undefined | 154 | awaitEvent |
111 | 155 | ||
112 | go (Have ix) = do | 156 | -- go (Have ix) = peerHave =<< singletonBF ix |
113 | pc <- liftIO $ getPieceCount se | 157 | -- go (Bitfield bf) = peerHave =<< adjustBF bf |
114 | haveMessage $ have ix (haveNone pc) -- TODO singleton | ||
115 | |||
116 | go (Bitfield bf) = undefined | ||
117 | go (Request bix) = do | 158 | go (Request bix) = do |
118 | undefined | 159 | bf <- use bitfield |
160 | if ixPiece bix `BF.member` bf | ||
161 | then return (Want bix) | ||
162 | else do | ||
163 | signalMessage (RejectRequest bix) | ||
164 | awaitEvent | ||
165 | |||
166 | go (Piece blk) = undefined | ||
119 | 167 | ||
120 | go msg @ (Piece blk) = undefined | 168 | {- |
121 | go msg @ (Port _) | 169 | go msg @ (Port _) |
122 | = checkExtension msg ExtDHT $ do | 170 | = checkExtension msg ExtDHT $ do |
123 | undefined | 171 | undefined |
124 | 172 | ||
125 | go msg @ HaveAll | 173 | go HaveAll = do |
126 | = checkExtension msg ExtFast $ do | 174 | requireExtension ExtFast |
127 | pc <- liftIO $ getPieceCount se | 175 | bitfield <~ fullBF |
128 | haveMessage (haveAll pc) | 176 | revise |
129 | 177 | awaitEvent | |
130 | go msg @ HaveNone | 178 | |
131 | = checkExtension msg ExtFast $ do | 179 | go HaveNone = do |
132 | pc <- liftIO $ getPieceCount se | 180 | requireExtension ExtFast |
133 | haveMessage (haveNone pc) | 181 | bitfield <~ emptyBF |
134 | 182 | revise | |
135 | go msg @ (SuggestPiece ix) | 183 | awaitEvent |
136 | = checkExtension msg ExtFast $ do | 184 | |
137 | undefined | 185 | go (SuggestPiece ix) = do |
186 | requireExtension ExtFast | ||
187 | bf <- use bitfield | ||
188 | if ix `BF.notMember` bf | ||
189 | then Available <$> singletonBF ix | ||
190 | else awaitEvent | ||
138 | 191 | ||
139 | go msg @ (RejectRequest ix) | 192 | go msg @ (RejectRequest ix) |
140 | = checkExtension msg ExtFast $ do | 193 | = checkExtension msg ExtFast $ do |
@@ -143,20 +196,16 @@ nextEvent se @ PeerSession {..} = waitMessage se >>= go | |||
143 | go msg @ (AllowedFast pix) | 196 | go msg @ (AllowedFast pix) |
144 | = checkExtension msg ExtFast $ do | 197 | = checkExtension msg ExtFast $ do |
145 | undefined | 198 | undefined |
199 | -} | ||
146 | 200 | ||
147 | haveMessage bf = do | 201 | signalEvent :: Event -> P2P () |
148 | cbf <- liftIO $ readIORef $ clientBitfield swarmSession | 202 | signalEvent (Available bf) = undefined |
149 | if undefined -- ix `member` bf | 203 | signalEvent _ = undefined |
150 | then nextEvent se | ||
151 | else undefined -- return $ Available diff | ||
152 | 204 | ||
153 | checkExtension msg requredExtension action | 205 | --flushBroadcast :: P2P () |
154 | | requredExtension `elem` enabledExtensions = action | 206 | --flushBroadcast = nextBroadcast >>= maybe (return ()) go |
155 | | otherwise = liftIO $ throwIO $ userError errorMsg | 207 | -- where |
156 | where | 208 | -- go = undefined |
157 | errorMsg = show (ppExtension requredExtension) | ||
158 | ++ "not enabled, but peer sent" | ||
159 | ++ show (ppMessage msg) | ||
160 | 209 | ||
161 | {----------------------------------------------------------------------- | 210 | {----------------------------------------------------------------------- |
162 | P2P monad | 211 | P2P monad |
@@ -164,29 +213,9 @@ nextEvent se @ PeerSession {..} = waitMessage se >>= go | |||
164 | 213 | ||
165 | newtype P2P a = P2P { | 214 | newtype P2P a = P2P { |
166 | runP2P :: ReaderT PeerSession PeerWire a | 215 | runP2P :: ReaderT PeerSession PeerWire a |
167 | } deriving (Monad, MonadReader PeerSession, MonadIO) | 216 | } deriving (Functor, Applicative, Monad, MonadReader PeerSession, MonadIO) |
168 | |||
169 | instance MonadState SessionStatus P2P where | ||
170 | get = asks peerSessionStatus >>= liftIO . readIORef | ||
171 | put x = asks peerSessionStatus >>= liftIO . (`writeIORef` x) | ||
172 | |||
173 | |||
174 | runConduit :: Socket -> Conduit Message IO Message -> IO () | ||
175 | runConduit sock p2p = | ||
176 | sourceSocket sock $= | ||
177 | conduitGet S.get $= | ||
178 | forever p2p $= | ||
179 | conduitPut S.put $$ | ||
180 | sinkSocket sock | ||
181 | 217 | ||
182 | withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO () | 218 | withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO () |
183 | withPeer se addr p2p = | 219 | withPeer se addr p2p = |
184 | withPeerSession se addr $ \(sock, pses) -> do | 220 | withPeerSession se addr $ \(sock, pses) -> do |
185 | runConduit sock (runReaderT (runP2P p2p) pses) | 221 | runConduit sock (runReaderT (runP2P p2p) pses) |
186 | |||
187 | |||
188 | awaitEvent :: P2P Event | ||
189 | awaitEvent = P2P (ReaderT nextEvent) | ||
190 | |||
191 | signalEvent :: Event -> P2P () | ||
192 | signalEvent = undefined | ||
diff --git a/src/Network/BitTorrent/Exchange/Protocol.hs b/src/Network/BitTorrent/Exchange/Protocol.hs index 46e25fa3..718e339d 100644 --- a/src/Network/BitTorrent/Exchange/Protocol.hs +++ b/src/Network/BitTorrent/Exchange/Protocol.hs | |||
@@ -26,7 +26,8 @@ | |||
26 | -- <https://wiki.theory.org/BitTorrentSpecification#Peer_wire_protocol_.28TCP.29> | 26 | -- <https://wiki.theory.org/BitTorrentSpecification#Peer_wire_protocol_.28TCP.29> |
27 | -- | 27 | -- |
28 | {-# LANGUAGE OverloadedStrings #-} | 28 | {-# LANGUAGE OverloadedStrings #-} |
29 | {-# LANGUAGE RecordWildCards #-} | 29 | {-# LANGUAGE RecordWildCards #-} |
30 | {-# LANGUAGE TemplateHaskell #-} | ||
30 | module Network.BitTorrent.Exchange.Protocol | 31 | module Network.BitTorrent.Exchange.Protocol |
31 | ( -- * Inital handshake | 32 | ( -- * Inital handshake |
32 | Handshake(..), ppHandshake | 33 | Handshake(..), ppHandshake |
@@ -50,18 +51,22 @@ module Network.BitTorrent.Exchange.Protocol | |||
50 | , Message(..) | 51 | , Message(..) |
51 | , ppMessage | 52 | , ppMessage |
52 | 53 | ||
53 | -- * Exchange control | 54 | -- * control |
54 | , PeerStatus(..) | 55 | , PeerStatus(..) |
56 | , choking, interested | ||
57 | |||
55 | , SessionStatus(..) | 58 | , SessionStatus(..) |
56 | -- , canUpload, canDownload | 59 | , clientStatus, peerStatus |
60 | , canUpload, canDownload | ||
57 | 61 | ||
58 | -- ** Defaults | 62 | -- ** Defaults |
59 | , defaultUnchokeSlots | 63 | , defaultUnchokeSlots |
60 | ) where | 64 | ) where |
61 | 65 | ||
62 | import Control.Applicative | 66 | import Control.Applicative |
63 | import Control.Monad | ||
64 | import Control.Exception | 67 | import Control.Exception |
68 | import Control.Monad | ||
69 | import Control.Lens | ||
65 | import Data.ByteString (ByteString) | 70 | import Data.ByteString (ByteString) |
66 | import qualified Data.ByteString as B | 71 | import qualified Data.ByteString as B |
67 | import qualified Data.ByteString.Char8 as BC | 72 | import qualified Data.ByteString.Char8 as BC |
@@ -427,29 +432,31 @@ data PeerStatus = PeerStatus { | |||
427 | , _interested :: Bool | 432 | , _interested :: Bool |
428 | } | 433 | } |
429 | 434 | ||
435 | $(makeLenses ''PeerStatus) | ||
436 | |||
430 | instance Default PeerStatus where | 437 | instance Default PeerStatus where |
431 | def = PeerStatus True False | 438 | def = PeerStatus True False |
432 | 439 | ||
433 | -- | | 440 | -- | |
434 | data SessionStatus = SessionStatus { | 441 | data SessionStatus = SessionStatus { |
435 | seClientStatus :: PeerStatus | 442 | _clientStatus :: PeerStatus |
436 | , sePeerStatus :: PeerStatus | 443 | , _peerStatus :: PeerStatus |
437 | } | 444 | } |
438 | 445 | ||
446 | $(makeLenses ''SessionStatus) | ||
447 | |||
439 | instance Default SessionStatus where | 448 | instance Default SessionStatus where |
440 | def = SessionStatus def def | 449 | def = SessionStatus def def |
441 | 450 | ||
442 | {- | 451 | -- | Can the /client/ transfer to the /peer/? |
443 | -- | Can the /client/ to upload to the /peer/? | ||
444 | canUpload :: SessionStatus -> Bool | 452 | canUpload :: SessionStatus -> Bool |
445 | canUpload SessionStatus {..} | 453 | canUpload SessionStatus {..} |
446 | = psInterested sePeerStatus && not (psChoking seClientStatus) | 454 | = _interested _peerStatus && not (_choking _clientStatus) |
447 | 455 | ||
448 | -- | Can the /client/ download from the /peer/? | 456 | -- | Can the /client/ transfer from the /peer/? |
449 | canDownload :: SessionStatus -> Bool | 457 | canDownload :: SessionStatus -> Bool |
450 | canDownload SessionStatus {..} | 458 | canDownload SessionStatus {..} |
451 | = psInterested seClientStatus && not (psChoking sePeerStatus) | 459 | = _interested _clientStatus && not (_choking _peerStatus) |
452 | -} | ||
453 | 460 | ||
454 | -- | Indicates how many peers are allowed to download from the client | 461 | -- | Indicates how many peers are allowed to download from the client |
455 | -- by default. | 462 | -- by default. |
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs index 38087f0d..2f538652 100644 --- a/src/Network/BitTorrent/Internal.hs +++ b/src/Network/BitTorrent/Internal.hs | |||
@@ -10,12 +10,33 @@ | |||
10 | -- Network.BitTorrent.Exchange and modules. To hide some internals | 10 | -- Network.BitTorrent.Exchange and modules. To hide some internals |
11 | -- of this module we detach it from Exchange. | 11 | -- of this module we detach it from Exchange. |
12 | -- | 12 | -- |
13 | {-# LANGUAGE RecordWildCards #-} | 13 | -- Note: expose only static data in data field lists, all dynamic |
14 | -- data should be modified through standalone functions. | ||
15 | -- | ||
16 | {-# LANGUAGE OverloadedStrings #-} | ||
17 | {-# LANGUAGE RecordWildCards #-} | ||
18 | {-# LANGUAGE TemplateHaskell #-} | ||
19 | {-# LANGUAGE FlexibleInstances #-} | ||
20 | {-# LANGUAGE FlexibleContexts #-} | ||
21 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
22 | {-# LANGUAGE UndecidableInstances #-} | ||
14 | module Network.BitTorrent.Internal | 23 | module Network.BitTorrent.Internal |
15 | ( Progress(..), startProgress | 24 | ( Progress(..), startProgress |
16 | , ClientSession(..), newClient | 25 | |
17 | , SwarmSession(..), newLeacher, newSeeder | 26 | , ClientSession (clientPeerID, allowedExtensions) |
18 | , PeerSession(..), withPeerSession | 27 | , newClient, getCurrentProgress |
28 | |||
29 | , SwarmSession(SwarmSession, torrentMeta, clientSession) | ||
30 | , newLeacher, newSeeder | ||
31 | |||
32 | , PeerSession(PeerSession, connectedPeerAddr | ||
33 | , swarmSession, enabledExtensions | ||
34 | ) | ||
35 | , SessionState | ||
36 | , bitfield, status | ||
37 | , emptyBF, fullBF, singletonBF | ||
38 | , getPieceCount, getPeerBF | ||
39 | , sessionError, withPeerSession | ||
19 | 40 | ||
20 | -- * Timeouts | 41 | -- * Timeouts |
21 | , updateIncoming, updateOutcoming | 42 | , updateIncoming, updateOutcoming |
@@ -24,6 +45,9 @@ module Network.BitTorrent.Internal | |||
24 | import Control.Applicative | 45 | import Control.Applicative |
25 | import Control.Concurrent | 46 | import Control.Concurrent |
26 | import Control.Concurrent.STM | 47 | import Control.Concurrent.STM |
48 | import Control.Lens | ||
49 | import Control.Monad.State | ||
50 | import Control.Monad.Reader | ||
27 | import Control.Exception | 51 | import Control.Exception |
28 | 52 | ||
29 | import Data.IORef | 53 | import Data.IORef |
@@ -32,10 +56,8 @@ import Data.Function | |||
32 | import Data.Ord | 56 | import Data.Ord |
33 | import Data.Set as S | 57 | import Data.Set as S |
34 | 58 | ||
35 | import Data.Conduit | 59 | import Data.Serialize hiding (get) |
36 | import Data.Conduit.Cereal | 60 | import Text.PrettyPrint |
37 | import Data.Conduit.Network | ||
38 | import Data.Serialize | ||
39 | 61 | ||
40 | import Network | 62 | import Network |
41 | import Network.Socket | 63 | import Network.Socket |
@@ -70,13 +92,20 @@ startProgress = Progress 0 0 | |||
70 | Client session | 92 | Client session |
71 | -----------------------------------------------------------------------} | 93 | -----------------------------------------------------------------------} |
72 | 94 | ||
73 | -- | In one application you could have many clients. | 95 | -- | In one application we could have many clients with difference |
96 | -- ID's and enabled extensions. | ||
74 | data ClientSession = ClientSession { | 97 | data ClientSession = ClientSession { |
75 | clientPeerID :: PeerID -- ^ | 98 | -- | Our peer ID used in handshaked and discovery mechanism. |
76 | , allowedExtensions :: [Extension] -- ^ | 99 | clientPeerID :: PeerID |
100 | |||
101 | -- | Extensions we should try to use. Hovewer some particular peer | ||
102 | -- might not support some extension, so we keep enableExtension in | ||
103 | -- 'PeerSession'. | ||
104 | , allowedExtensions :: [Extension] | ||
105 | |||
77 | , swarmSessions :: TVar (Set SwarmSession) | 106 | , swarmSessions :: TVar (Set SwarmSession) |
78 | , eventManager :: EventManager | 107 | , eventManager :: EventManager |
79 | , currentProgress :: IORef Progress | 108 | , currentProgress :: TVar Progress |
80 | } | 109 | } |
81 | 110 | ||
82 | instance Eq ClientSession where | 111 | instance Eq ClientSession where |
@@ -85,6 +114,9 @@ instance Eq ClientSession where | |||
85 | instance Ord ClientSession where | 114 | instance Ord ClientSession where |
86 | compare = comparing clientPeerID | 115 | compare = comparing clientPeerID |
87 | 116 | ||
117 | getCurrentProgress :: MonadIO m => ClientSession -> m Progress | ||
118 | getCurrentProgress = liftIO . readTVarIO . currentProgress | ||
119 | |||
88 | newClient :: [Extension] -> IO ClientSession | 120 | newClient :: [Extension] -> IO ClientSession |
89 | newClient exts = do | 121 | newClient exts = do |
90 | mgr <- Ev.new | 122 | mgr <- Ev.new |
@@ -95,7 +127,7 @@ newClient exts = do | |||
95 | <*> pure exts | 127 | <*> pure exts |
96 | <*> newTVarIO S.empty | 128 | <*> newTVarIO S.empty |
97 | <*> pure mgr | 129 | <*> pure mgr |
98 | <*> newIORef (startProgress 0) | 130 | <*> newTVarIO (startProgress 0) |
99 | 131 | ||
100 | {----------------------------------------------------------------------- | 132 | {----------------------------------------------------------------------- |
101 | Swarm session | 133 | Swarm session |
@@ -106,7 +138,9 @@ newClient exts = do | |||
106 | data SwarmSession = SwarmSession { | 138 | data SwarmSession = SwarmSession { |
107 | torrentMeta :: Torrent | 139 | torrentMeta :: Torrent |
108 | , clientSession :: ClientSession | 140 | , clientSession :: ClientSession |
109 | , clientBitfield :: IORef Bitfield | 141 | |
142 | -- | Modify this carefully updating global progress. | ||
143 | , clientBitfield :: TVar Bitfield | ||
110 | , connectedPeers :: TVar (Set PeerSession) | 144 | , connectedPeers :: TVar (Set PeerSession) |
111 | } | 145 | } |
112 | 146 | ||
@@ -120,7 +154,7 @@ newSwarmSession :: Bitfield -> ClientSession -> Torrent -> IO SwarmSession | |||
120 | newSwarmSession bf cs @ ClientSession {..} t @ Torrent {..} | 154 | newSwarmSession bf cs @ ClientSession {..} t @ Torrent {..} |
121 | = SwarmSession <$> pure t | 155 | = SwarmSession <$> pure t |
122 | <*> pure cs | 156 | <*> pure cs |
123 | <*> newIORef bf | 157 | <*> newTVarIO bf |
124 | <*> newTVarIO S.empty | 158 | <*> newTVarIO S.empty |
125 | 159 | ||
126 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | 160 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession |
@@ -134,13 +168,29 @@ newLeacher cs t @ Torrent {..} | |||
134 | isLeacher :: SwarmSession -> IO Bool | 168 | isLeacher :: SwarmSession -> IO Bool |
135 | isLeacher = undefined | 169 | isLeacher = undefined |
136 | 170 | ||
171 | getClientBitfield :: MonadIO m => SwarmSession -> m Bitfield | ||
172 | getClientBitfield = liftIO . readTVarIO . clientBitfield | ||
173 | |||
174 | {- | ||
175 | haveDone :: MonadIO m => PieceIx -> SwarmSession -> m () | ||
176 | haveDone ix = | ||
177 | liftIO $ atomically $ do | ||
178 | bf <- readTVar clientBitfield | ||
179 | writeTVar (have ix bf) | ||
180 | currentProgress | ||
181 | -} | ||
137 | {----------------------------------------------------------------------- | 182 | {----------------------------------------------------------------------- |
138 | Peer session | 183 | Peer session |
139 | -----------------------------------------------------------------------} | 184 | -----------------------------------------------------------------------} |
140 | 185 | ||
141 | data PeerSession = PeerSession { | 186 | data PeerSession = PeerSession { |
187 | -- | Used as unique 'PeerSession' identifier within one | ||
188 | -- 'SwarmSession'. | ||
142 | connectedPeerAddr :: PeerAddr | 189 | connectedPeerAddr :: PeerAddr |
190 | |||
143 | , swarmSession :: SwarmSession | 191 | , swarmSession :: SwarmSession |
192 | |||
193 | -- | Extensions such that both peer and client support. | ||
144 | , enabledExtensions :: [Extension] | 194 | , enabledExtensions :: [Extension] |
145 | 195 | ||
146 | -- | To dissconnect from died peers appropriately we should check | 196 | -- | To dissconnect from died peers appropriately we should check |
@@ -163,16 +213,31 @@ data PeerSession = PeerSession { | |||
163 | , outcomingTimeout :: TimeoutKey | 213 | , outcomingTimeout :: TimeoutKey |
164 | 214 | ||
165 | , broadcastMessages :: Chan [Message] | 215 | , broadcastMessages :: Chan [Message] |
166 | , peerBitfield :: IORef Bitfield | 216 | , sessionState :: IORef SessionState |
167 | , peerSessionStatus :: IORef SessionStatus | 217 | } |
218 | |||
219 | data SessionState = SessionState { | ||
220 | _bitfield :: Bitfield | ||
221 | , _status :: SessionStatus | ||
168 | } | 222 | } |
169 | 223 | ||
224 | $(makeLenses ''SessionState) | ||
225 | |||
170 | instance Eq PeerSession where | 226 | instance Eq PeerSession where |
171 | (==) = (==) `on` connectedPeerAddr | 227 | (==) = (==) `on` connectedPeerAddr |
172 | 228 | ||
173 | instance Ord PeerSession where | 229 | instance Ord PeerSession where |
174 | compare = comparing connectedPeerAddr | 230 | compare = comparing connectedPeerAddr |
175 | 231 | ||
232 | instance (MonadIO m, MonadReader PeerSession m) | ||
233 | => MonadState SessionState m where | ||
234 | get = asks sessionState >>= liftIO . readIORef | ||
235 | put s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s | ||
236 | |||
237 | sessionError :: MonadIO m => Doc -> m () | ||
238 | sessionError msg | ||
239 | = liftIO $ throwIO $ userError $ render $ msg <+> "in session" | ||
240 | |||
176 | -- TODO check if it connected yet peer | 241 | -- TODO check if it connected yet peer |
177 | withPeerSession :: SwarmSession -> PeerAddr | 242 | withPeerSession :: SwarmSession -> PeerAddr |
178 | -> ((Socket, PeerSession) -> IO a) | 243 | -> ((Socket, PeerSession) -> IO a) |
@@ -196,12 +261,34 @@ withPeerSession ss @ SwarmSession {..} addr | |||
196 | <*> registerTimeout (eventManager clientSession) | 261 | <*> registerTimeout (eventManager clientSession) |
197 | maxOutcomingTime (sendKA sock) | 262 | maxOutcomingTime (sendKA sock) |
198 | <*> newChan | 263 | <*> newChan |
199 | <*> pure clientBitfield | 264 | <*> do { |
200 | <*> newIORef def | 265 | ; tc <- totalCount <$> readTVarIO clientBitfield |
266 | ; newIORef (SessionState (haveNone tc) def) | ||
267 | } | ||
201 | return (sock, ps) | 268 | return (sock, ps) |
202 | 269 | ||
203 | closeSession = close . fst | 270 | closeSession = close . fst |
204 | 271 | ||
272 | getPieceCount :: (MonadReader PeerSession m) => m PieceCount | ||
273 | getPieceCount = asks (pieceCount . tInfo . torrentMeta . swarmSession) | ||
274 | |||
275 | emptyBF :: (MonadReader PeerSession m) => m Bitfield | ||
276 | emptyBF = liftM haveNone getPieceCount | ||
277 | |||
278 | fullBF :: (MonadReader PeerSession m) => m Bitfield | ||
279 | fullBF = liftM haveAll getPieceCount | ||
280 | |||
281 | singletonBF :: (MonadReader PeerSession m) => PieceIx -> m Bitfield | ||
282 | singletonBF ix = liftM (BF.singleton ix) getPieceCount | ||
283 | |||
284 | getPeerBF :: (MonadIO m, MonadReader PeerSession m) => m Bitfield | ||
285 | getPeerBF = asks swarmSession >>= liftIO . readTVarIO . clientBitfield | ||
286 | |||
287 | --data Signal = | ||
288 | --nextBroadcast :: P2P (Maybe Signal) | ||
289 | --nextBroadcast = | ||
290 | |||
291 | |||
205 | {----------------------------------------------------------------------- | 292 | {----------------------------------------------------------------------- |
206 | Timeouts | 293 | Timeouts |
207 | -----------------------------------------------------------------------} | 294 | -----------------------------------------------------------------------} |