diff options
-rw-r--r-- | src/Network/BitTorrent.hs | 7 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 182 | ||||
-rw-r--r-- | src/Network/BitTorrent/Internal.hs | 210 |
3 files changed, 312 insertions, 87 deletions
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs index 5d6034f6..5fbc5ff6 100644 --- a/src/Network/BitTorrent.hs +++ b/src/Network/BitTorrent.hs | |||
@@ -7,6 +7,7 @@ | |||
7 | -- | 7 | -- |
8 | module Network.BitTorrent | 8 | module Network.BitTorrent |
9 | ( module BT | 9 | ( module BT |
10 | , module Data.Torrent | ||
10 | 11 | ||
11 | -- * Tracker | 12 | -- * Tracker |
12 | 13 | ||
@@ -16,12 +17,12 @@ module Network.BitTorrent | |||
16 | , PeerSession | 17 | , PeerSession |
17 | ) where | 18 | ) where |
18 | 19 | ||
20 | import Data.Torrent | ||
19 | import Network.BitTorrent.Internal | 21 | import Network.BitTorrent.Internal |
20 | |||
21 | import Network.BitTorrent.Extension as BT | 22 | import Network.BitTorrent.Extension as BT |
22 | import Network.BitTorrent.Peer as BT | 23 | import Network.BitTorrent.Peer as BT |
23 | import Network.BitTorrent.Exchange as BT | 24 | import Network.BitTorrent.Exchange as BT |
24 | import Network.BitTorrent.Tracker as BT | 25 | import Network.BitTorrent.Tracker as BT |
25 | 26 | ||
26 | --discover :: SwarmSession -> (Chan PeerAddr -> IO a) -> IO a | 27 | --discover :: SwarmSession -> ([PeerAddr] -> IO a) -> IO a |
27 | --discover = undefined | 28 | --discover = withTracker |
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 4fe90cda..b23ca667 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -11,6 +11,7 @@ | |||
11 | {-# LANGUAGE RecordWildCards #-} | 11 | {-# LANGUAGE RecordWildCards #-} |
12 | module Network.BitTorrent.Exchange | 12 | module Network.BitTorrent.Exchange |
13 | ( P2P, withPeer | 13 | ( P2P, withPeer |
14 | , awaitEvent, signalEvent | ||
14 | ) where | 15 | ) where |
15 | 16 | ||
16 | import Control.Applicative | 17 | import Control.Applicative |
@@ -41,117 +42,130 @@ import Network.BitTorrent.Peer | |||
41 | import Data.Bitfield as BF | 42 | import Data.Bitfield as BF |
42 | import Data.Torrent | 43 | import Data.Torrent |
43 | 44 | ||
44 | {----------------------------------------------------------------------- | ||
45 | P2P monad | ||
46 | -----------------------------------------------------------------------} | ||
47 | |||
48 | type PeerWire = ConduitM Message Message IO | ||
49 | |||
50 | waitMessage :: PeerWire Message | ||
51 | waitMessage = await >>= maybe waitMessage return | ||
52 | |||
53 | signalMessage :: Message -> PeerWire () | ||
54 | signalMessage = C.yield | ||
55 | |||
56 | newtype P2P a = P2P { | ||
57 | runP2P :: ReaderT PeerSession PeerWire a | ||
58 | } deriving (Monad, MonadReader PeerSession, MonadIO) | ||
59 | |||
60 | instance MonadState Bitfield P2P where | ||
61 | |||
62 | runConduit :: Socket -> Conduit Message IO Message -> IO () | ||
63 | runConduit sock p2p = | ||
64 | sourceSocket sock $= | ||
65 | conduitGet S.get $= | ||
66 | forever p2p $= | ||
67 | conduitPut S.put $$ | ||
68 | sinkSocket sock | ||
69 | |||
70 | withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO () | ||
71 | withPeer se addr p2p = | ||
72 | withPeerSession se addr $ \(sock, pses) -> do | ||
73 | runConduit sock (runReaderT (runP2P p2p) pses) | ||
74 | 45 | ||
75 | data Event = Available Bitfield | 46 | data Event = Available Bitfield |
76 | | Want | 47 | | Want |
77 | | Block | 48 | | Block |
78 | 49 | ||
50 | {----------------------------------------------------------------------- | ||
51 | Peer wire | ||
52 | -----------------------------------------------------------------------} | ||
79 | 53 | ||
54 | type PeerWire = ConduitM Message Message IO | ||
80 | 55 | ||
81 | waitForEvent :: P2P Event | 56 | waitMessage :: PeerSession -> PeerWire Message |
82 | waitForEvent = P2P (ReaderT nextEvent) | 57 | waitMessage se = do |
83 | where | 58 | mmsg <- await |
84 | nextEvent se @ PeerSession {..} = waitMessage >>= diff | 59 | case mmsg of |
85 | where | 60 | Nothing -> waitMessage se |
86 | -- diff finds difference between | 61 | Just msg -> do |
87 | diff KeepAlive = do | 62 | liftIO $ updateIncoming se |
88 | signalMessage KeepAlive | 63 | return msg |
89 | nextEvent se | ||
90 | 64 | ||
91 | handleMessage Choke = do | 65 | signalMessage :: Message -> PeerSession -> PeerWire () |
92 | SessionStatus {..} <- liftIO $ readIORef peerSessionStatus | 66 | signalMessage msg se = do |
93 | if psChoking sePeerStatus | 67 | C.yield msg |
94 | then nextEvent se | 68 | liftIO $ updateOutcoming se |
95 | else undefined | ||
96 | 69 | ||
97 | handleMessage Unchoke = return $ Available BF.difference | ||
98 | 70 | ||
99 | handleMessage Interested = return undefined | 71 | getPieceCount :: PeerSession -> IO PieceCount |
100 | handleMessage NotInterested = return undefined | 72 | getPieceCount = undefined |
101 | 73 | ||
102 | handleMessage (Have ix) = do | 74 | nextEvent :: PeerSession -> PeerWire Event |
75 | nextEvent se @ PeerSession {..} = waitMessage se >>= diff | ||
76 | where | ||
77 | -- diff finds difference between | ||
78 | -- diff KeepAlive = nextEvent se | ||
79 | diff msg = do | ||
80 | liftIO $ print (ppMessage msg) | ||
81 | nextEvent se | ||
82 | |||
83 | handleMessage Choke = do | ||
84 | SessionStatus {..} <- liftIO $ readIORef peerSessionStatus | ||
85 | if psChoking sePeerStatus | ||
86 | then nextEvent se | ||
87 | else undefined | ||
88 | |||
89 | handleMessage Unchoke = undefined | ||
90 | --return $ Available BF.difference | ||
91 | |||
92 | handleMessage Interested = return undefined | ||
93 | handleMessage NotInterested = return undefined | ||
94 | handleMessage (Have ix) = do | ||
95 | pc <- liftIO $ getPieceCount se | ||
96 | haveMessage $ have ix (haveNone pc) -- TODO singleton | ||
97 | |||
98 | handleMessage (Bitfield bf) = undefined | ||
99 | handleMessage (Request bix) = do | ||
100 | undefined | ||
101 | |||
102 | handleMessage msg @ (Piece blk) = undefined | ||
103 | handleMessage msg @ (Port _) | ||
104 | = checkExtension msg ExtDHT $ do | ||
105 | undefined | ||
106 | |||
107 | handleMessage msg @ HaveAll | ||
108 | = checkExtension msg ExtFast $ do | ||
103 | pc <- liftIO $ getPieceCount se | 109 | pc <- liftIO $ getPieceCount se |
104 | haveMessage $ have ix (haveNone pc) -- TODO singleton | 110 | haveMessage (haveAll pc) |
105 | |||
106 | handleMessage (Bitfield bf) = undefined | ||
107 | handleMessage (Request bix) = do | ||
108 | undefined | ||
109 | |||
110 | handleMessage (Piece blk) = undefined | ||
111 | handleMessage (Port _) | ||
112 | = checkExtension msg ExtDHT $ do | ||
113 | undefined | ||
114 | 111 | ||
115 | handleMessage msg @ HaveAll | 112 | handleMessage msg @ HaveNone |
116 | = checkExtension msg ExtFast $ do | 113 | = checkExtension msg ExtFast $ do |
117 | pc <- liftIO $ getPieceCount se | 114 | pc <- liftIO $ getPieceCount se |
118 | haveMessage (haveAll pc) | 115 | haveMessage (haveNone pc) |
119 | |||
120 | handleMessage msg @ HaveNone | ||
121 | = checkExtension msg ExtFast $ do | ||
122 | pc <- liftIO $ getPieceCount se | ||
123 | haveMessage (haveNone pc) | ||
124 | 116 | ||
125 | handleMessage msg @ (SuggestPiece ix) | 117 | handleMessage msg @ (SuggestPiece ix) |
126 | = checkExtension msg ExtFast $ do | 118 | = checkExtension msg ExtFast $ do |
127 | undefined | 119 | undefined |
128 | 120 | ||
129 | handleMessage msg @ (RejectRequest ix) | 121 | handleMessage msg @ (RejectRequest ix) |
130 | = checkExtension msg ExtFast $ do | 122 | = checkExtension msg ExtFast $ do |
131 | undefined | 123 | undefined |
132 | 124 | ||
133 | handleMessage msg @ (AllowedFast pix) | 125 | handleMessage msg @ (AllowedFast pix) |
134 | = checkExtension msg ExtFast $ do | 126 | = checkExtension msg ExtFast $ do |
135 | undefined | 127 | undefined |
136 | 128 | ||
137 | haveMessage bf = do | 129 | haveMessage bf = do |
138 | cbf <- liftIO $ readIORef $ clientBitfield swarmSession | 130 | cbf <- liftIO $ readIORef $ clientBitfield swarmSession |
139 | if undefined -- ix `member` bf | 131 | if undefined -- ix `member` bf |
140 | then nextEvent se | 132 | then nextEvent se |
141 | else return $ Available diff | 133 | else undefined -- return $ Available diff |
142 | 134 | ||
143 | checkExtension msg requredExtension action | 135 | checkExtension msg requredExtension action |
144 | | requredExtension `elem` enabledExtensions = action | 136 | | requredExtension `elem` enabledExtensions = action |
145 | | otherwise = liftIO $ throwIO $ userError errorMsg | 137 | | otherwise = liftIO $ throwIO $ userError errorMsg |
146 | where | 138 | where |
147 | errorMsg = show (ppExtension requredExtension) | 139 | errorMsg = show (ppExtension requredExtension) |
148 | ++ "not enabled, but peer sent" | 140 | ++ "not enabled, but peer sent" |
149 | ++ show (ppMessage msg) | 141 | ++ show (ppMessage msg) |
150 | 142 | ||
143 | {----------------------------------------------------------------------- | ||
144 | P2P monad | ||
145 | -----------------------------------------------------------------------} | ||
151 | 146 | ||
147 | newtype P2P a = P2P { | ||
148 | runP2P :: ReaderT PeerSession PeerWire a | ||
149 | } deriving (Monad, MonadReader PeerSession, MonadIO) | ||
152 | 150 | ||
153 | getPieceCount :: PeerSession -> IO PieceCount | 151 | instance MonadState Bitfield P2P where |
154 | getPieceCount = undefined | 152 | |
153 | runConduit :: Socket -> Conduit Message IO Message -> IO () | ||
154 | runConduit sock p2p = | ||
155 | sourceSocket sock $= | ||
156 | conduitGet S.get $= | ||
157 | forever p2p $= | ||
158 | conduitPut S.put $$ | ||
159 | sinkSocket sock | ||
160 | |||
161 | withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO () | ||
162 | withPeer se addr p2p = | ||
163 | withPeerSession se addr $ \(sock, pses) -> do | ||
164 | runConduit sock (runReaderT (runP2P p2p) pses) | ||
165 | |||
166 | |||
167 | awaitEvent :: P2P Event | ||
168 | awaitEvent = P2P (ReaderT nextEvent) | ||
155 | 169 | ||
156 | signalEvent :: Event -> P2P () | 170 | signalEvent :: Event -> P2P () |
157 | signalEvent = undefined | 171 | signalEvent = undefined |
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs new file mode 100644 index 00000000..d34c6236 --- /dev/null +++ b/src/Network/BitTorrent/Internal.hs | |||
@@ -0,0 +1,210 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam T. 2013 | ||
3 | -- License : MIT | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module implement opaque broadcast message passing. It | ||
9 | -- provides sessions needed by Network.BitTorrent and | ||
10 | -- Network.BitTorrent.Exchange and modules. To hide some internals | ||
11 | -- of this module we detach it from Exchange. | ||
12 | -- | ||
13 | {-# LANGUAGE RecordWildCards #-} | ||
14 | module Network.BitTorrent.Internal | ||
15 | ( ClientSession(..), newClient | ||
16 | , SwarmSession(..), newLeacher, newSeeder | ||
17 | , PeerSession(..), withPeerSession | ||
18 | |||
19 | -- * Timeouts | ||
20 | , updateIncoming, updateOutcoming | ||
21 | ) where | ||
22 | |||
23 | import Control.Applicative | ||
24 | import Control.Concurrent | ||
25 | import Control.Concurrent.STM | ||
26 | import Control.Exception | ||
27 | |||
28 | import Data.IORef | ||
29 | import Data.Function | ||
30 | import Data.Ord | ||
31 | import Data.Set as S | ||
32 | |||
33 | import Data.Conduit | ||
34 | import Data.Conduit.Cereal | ||
35 | import Data.Conduit.Network | ||
36 | import Data.Serialize | ||
37 | |||
38 | import Network | ||
39 | import Network.Socket | ||
40 | import Network.Socket.ByteString | ||
41 | |||
42 | import GHC.Event as Ev | ||
43 | |||
44 | import Data.Bitfield as BF | ||
45 | import Data.Torrent | ||
46 | import Network.BitTorrent.Extension | ||
47 | import Network.BitTorrent.Peer | ||
48 | import Network.BitTorrent.Exchange.Protocol as BT | ||
49 | |||
50 | |||
51 | |||
52 | {----------------------------------------------------------------------- | ||
53 | Client session | ||
54 | -----------------------------------------------------------------------} | ||
55 | |||
56 | -- | In one application you could have many clients. | ||
57 | data ClientSession = ClientSession { | ||
58 | clientPeerID :: PeerID -- ^ | ||
59 | , allowedExtensions :: [Extension] -- ^ | ||
60 | , swarmSessions :: TVar (Set SwarmSession) | ||
61 | , eventManager :: EventManager | ||
62 | } | ||
63 | |||
64 | instance Eq ClientSession where | ||
65 | (==) = (==) `on` clientPeerID | ||
66 | |||
67 | instance Ord ClientSession where | ||
68 | compare = comparing clientPeerID | ||
69 | |||
70 | newClient :: [Extension] -> IO ClientSession | ||
71 | newClient exts = ClientSession <$> newPeerID | ||
72 | <*> pure exts | ||
73 | <*> newTVarIO S.empty | ||
74 | <*> Ev.new | ||
75 | |||
76 | {----------------------------------------------------------------------- | ||
77 | Swarm session | ||
78 | -----------------------------------------------------------------------} | ||
79 | |||
80 | -- | Extensions are set globally by | ||
81 | -- Swarm session are un | ||
82 | data SwarmSession = SwarmSession { | ||
83 | torrentInfoHash :: InfoHash | ||
84 | , clientSession :: ClientSession | ||
85 | , clientBitfield :: IORef Bitfield | ||
86 | , connectedPeers :: TVar (Set PeerSession) | ||
87 | } | ||
88 | |||
89 | instance Eq SwarmSession where | ||
90 | (==) = (==) `on` torrentInfoHash | ||
91 | |||
92 | instance Ord SwarmSession where | ||
93 | compare = comparing torrentInfoHash | ||
94 | |||
95 | newSwarmSession :: Bitfield -> ClientSession -> Torrent -> IO SwarmSession | ||
96 | newSwarmSession bf cs @ ClientSession {..} Torrent {..} | ||
97 | = SwarmSession <$> pure tInfoHash | ||
98 | <*> pure cs | ||
99 | <*> newIORef bf | ||
100 | <*> newTVarIO S.empty | ||
101 | |||
102 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
103 | newSeeder cs t @ Torrent {..} | ||
104 | = newSwarmSession (haveAll (pieceCount tInfo)) cs t | ||
105 | |||
106 | newLeacher :: ClientSession -> Torrent -> IO SwarmSession | ||
107 | newLeacher cs t @ Torrent {..} | ||
108 | = newSwarmSession (haveNone (pieceCount tInfo)) cs t | ||
109 | |||
110 | isLeacher :: SwarmSession -> IO Bool | ||
111 | isLeacher = undefined | ||
112 | |||
113 | {----------------------------------------------------------------------- | ||
114 | Peer session | ||
115 | -----------------------------------------------------------------------} | ||
116 | |||
117 | data PeerSession = PeerSession { | ||
118 | connectedPeerAddr :: PeerAddr | ||
119 | , swarmSession :: SwarmSession | ||
120 | , enabledExtensions :: [Extension] | ||
121 | |||
122 | -- | To dissconnect from died peers appropriately we should check | ||
123 | -- if a peer do not sent the KA message within given interval. If | ||
124 | -- yes, we should throw an exception in 'TimeoutCallback' and | ||
125 | -- close session between peers. | ||
126 | -- | ||
127 | -- We should update timeout if we /receive/ any message within | ||
128 | -- timeout interval to keep connection up. | ||
129 | , incomingTimeout :: TimeoutKey | ||
130 | |||
131 | -- | To send KA message appropriately we should know when was last | ||
132 | -- time we sent a message to a peer. To do that we keep registered | ||
133 | -- timeout in event manager and if we do not sent any message to | ||
134 | -- the peer within given interval then we send KA message in | ||
135 | -- 'TimeoutCallback'. | ||
136 | -- | ||
137 | -- We should update timeout if we /send/ any message within timeout | ||
138 | -- to avoid reduntant KA messages. | ||
139 | , outcomingTimeout :: TimeoutKey | ||
140 | |||
141 | , broadcastMessages :: Chan [Message] | ||
142 | , peerBitfield :: IORef Bitfield | ||
143 | , peerSessionStatus :: IORef SessionStatus | ||
144 | } | ||
145 | |||
146 | instance Eq PeerSession where | ||
147 | (==) = (==) `on` connectedPeerAddr | ||
148 | |||
149 | instance Ord PeerSession where | ||
150 | compare = comparing connectedPeerAddr | ||
151 | |||
152 | -- TODO check if it connected yet peer | ||
153 | withPeerSession :: SwarmSession -> PeerAddr | ||
154 | -> ((Socket, PeerSession) -> IO a) | ||
155 | -> IO a | ||
156 | |||
157 | withPeerSession ss @ SwarmSession {..} addr | ||
158 | = bracket openSession closeSession | ||
159 | where | ||
160 | openSession = do | ||
161 | let caps = encodeExts $ allowedExtensions $ clientSession | ||
162 | let pid = clientPeerID $ clientSession | ||
163 | let chs = Handshake defaultBTProtocol caps torrentInfoHash pid | ||
164 | |||
165 | sock <- connectToPeer addr | ||
166 | phs <- handshake sock chs `onException` close sock | ||
167 | |||
168 | let enabled = decodeExts (enabledCaps caps (handshakeCaps phs)) | ||
169 | ps <- PeerSession addr ss enabled | ||
170 | <$> registerTimeout (eventManager clientSession) | ||
171 | maxIncomingTime abortSession | ||
172 | <*> registerTimeout (eventManager clientSession) | ||
173 | maxOutcomingTime (sendKA sock) | ||
174 | <*> newChan | ||
175 | <*> pure clientBitfield | ||
176 | <*> newIORef initSessionStatus | ||
177 | return (sock, ps) | ||
178 | |||
179 | closeSession = close . fst | ||
180 | |||
181 | {----------------------------------------------------------------------- | ||
182 | Timeouts | ||
183 | -----------------------------------------------------------------------} | ||
184 | |||
185 | sec :: Int | ||
186 | sec = 1000 * 1000 | ||
187 | |||
188 | maxIncomingTime :: Int | ||
189 | maxIncomingTime = 120 * sec | ||
190 | |||
191 | maxOutcomingTime :: Int | ||
192 | maxOutcomingTime = 60 * sec | ||
193 | |||
194 | -- | Should be called after we have received any message from a peer. | ||
195 | updateIncoming :: PeerSession -> IO () | ||
196 | updateIncoming PeerSession {..} = do | ||
197 | updateTimeout (eventManager (clientSession swarmSession)) | ||
198 | incomingTimeout maxIncomingTime | ||
199 | |||
200 | -- | Should be called before we have send any message to a peer. | ||
201 | updateOutcoming :: PeerSession -> IO () | ||
202 | updateOutcoming PeerSession {..} = | ||
203 | updateTimeout (eventManager (clientSession swarmSession)) | ||
204 | outcomingTimeout maxOutcomingTime | ||
205 | |||
206 | sendKA :: Socket -> IO () | ||
207 | sendKA sock = sendAll sock (encode BT.KeepAlive) | ||
208 | |||
209 | abortSession :: IO () | ||
210 | abortSession = error "abortSession: not implemented" | ||