diff options
Diffstat (limited to 'src/Network/BitTorrent/Exchange.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 233 |
1 files changed, 131 insertions, 102 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index dda7d304..65ec0eb7 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -5,12 +5,19 @@ | |||
5 | -- Stability : experimental | 5 | -- Stability : experimental |
6 | -- Portability : portable | 6 | -- Portability : portable |
7 | -- | 7 | -- |
8 | {-# LANGUAGE DoAndIfThenElse #-} | 8 | {-# LANGUAGE OverloadedStrings #-} |
9 | {-# LANGUAGE DoAndIfThenElse #-} | ||
9 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | 10 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} |
10 | {-# LANGUAGE MultiParamTypeClasses #-} | 11 | {-# LANGUAGE MultiParamTypeClasses #-} |
11 | {-# LANGUAGE RecordWildCards #-} | 12 | {-# LANGUAGE RecordWildCards #-} |
12 | module Network.BitTorrent.Exchange | 13 | module Network.BitTorrent.Exchange |
13 | ( P2P, withPeer | 14 | ( -- * Block |
15 | Block(..), BlockIx(..) | ||
16 | |||
17 | -- * Event | ||
18 | , Event(..) | ||
19 | |||
20 | , P2P, withPeer | ||
14 | , awaitEvent, signalEvent | 21 | , awaitEvent, signalEvent |
15 | ) where | 22 | ) where |
16 | 23 | ||
@@ -18,6 +25,7 @@ import Control.Applicative | |||
18 | import Control.Concurrent | 25 | import Control.Concurrent |
19 | import Control.Concurrent.STM | 26 | import Control.Concurrent.STM |
20 | import Control.Exception | 27 | import Control.Exception |
28 | import Control.Lens | ||
21 | import Control.Monad.Reader | 29 | import Control.Monad.Reader |
22 | import Control.Monad.State | 30 | import Control.Monad.State |
23 | 31 | ||
@@ -30,6 +38,7 @@ import Data.Conduit as C | |||
30 | import Data.Conduit.Cereal | 38 | import Data.Conduit.Cereal |
31 | import Data.Conduit.Network | 39 | import Data.Conduit.Network |
32 | import Data.Serialize as S | 40 | import Data.Serialize as S |
41 | import Text.PrettyPrint as PP hiding (($$)) | ||
33 | 42 | ||
34 | import Network | 43 | import Network |
35 | 44 | ||
@@ -43,8 +52,8 @@ import Data.Torrent | |||
43 | 52 | ||
44 | 53 | ||
45 | data Event = Available Bitfield | 54 | data Event = Available Bitfield |
46 | | Want | 55 | | Want BlockIx |
47 | | Block | 56 | | Fragment Block |
48 | deriving Show | 57 | deriving Show |
49 | 58 | ||
50 | {----------------------------------------------------------------------- | 59 | {----------------------------------------------------------------------- |
@@ -53,88 +62,132 @@ data Event = Available Bitfield | |||
53 | 62 | ||
54 | type PeerWire = ConduitM Message Message IO | 63 | type PeerWire = ConduitM Message Message IO |
55 | 64 | ||
56 | waitMessage :: PeerSession -> PeerWire Message | 65 | runConduit :: Socket -> PeerWire () -> IO () |
57 | waitMessage se = do | 66 | runConduit sock p2p = |
58 | mmsg <- await | 67 | sourceSocket sock $= |
59 | case mmsg of | 68 | conduitGet S.get $= |
60 | Nothing -> waitMessage se | 69 | forever p2p $= |
61 | Just msg -> do | 70 | conduitPut S.put $$ |
62 | liftIO $ updateIncoming se | 71 | sinkSocket sock |
63 | liftIO $ print msg | 72 | |
64 | return msg | 73 | waitMessage :: P2P Message |
65 | 74 | waitMessage = P2P (ReaderT go) | |
66 | signalMessage :: PeerSession -> Message -> PeerWire () | 75 | where |
67 | signalMessage se msg = do | 76 | go se = do |
77 | mmsg <- await | ||
78 | case mmsg of | ||
79 | Nothing -> go se | ||
80 | Just msg -> do | ||
81 | liftIO $ updateIncoming se | ||
82 | liftIO $ print msg | ||
83 | return msg | ||
84 | |||
85 | signalMessage :: Message -> P2P () | ||
86 | signalMessage msg = P2P $ ReaderT $ \se -> do | ||
68 | C.yield msg | 87 | C.yield msg |
69 | liftIO $ updateOutcoming se | 88 | liftIO $ updateOutcoming se |
70 | 89 | ||
71 | 90 | ||
72 | getPieceCount :: PeerSession -> IO PieceCount | 91 | peerWant :: P2P Bitfield |
73 | getPieceCount = undefined | 92 | peerWant = BF.difference <$> getPeerBF <*> use bitfield |
93 | |||
94 | clientWant :: P2P Bitfield | ||
95 | clientWant = BF.difference <$> use bitfield <*> getPeerBF | ||
74 | 96 | ||
75 | canOffer :: PeerSession -> PeerWire Bitfield | 97 | peerOffer :: P2P Bitfield |
76 | canOffer PeerSession {..} = liftIO $ do | 98 | peerOffer = do |
77 | pbf <- readIORef $ peerBitfield | 99 | sessionStatus <- use status |
78 | cbf <- readIORef $ clientBitfield $ swarmSession | 100 | if canDownload sessionStatus then clientWant else emptyBF |
79 | return $ BF.difference pbf cbf | ||
80 | 101 | ||
81 | revise :: PeerSession -> PeerWire () | 102 | clientOffer :: P2P Bitfield |
82 | revise se @ PeerSession {..} = do | 103 | clientOffer = do |
83 | isInteresting <- (not . BF.null) <$> canOffer se | 104 | sessionStatus <- use status |
84 | SessionStatus {..} <- liftIO $ readIORef peerSessionStatus | 105 | if canUpload sessionStatus then peerWant else emptyBF |
85 | 106 | ||
86 | when (isInteresting /= _interested seClientStatus) $ | 107 | revise :: P2P () |
87 | signalMessage se $ if isInteresting then Interested else NotInterested | 108 | revise = do |
109 | peerInteresting <- (not . BF.null) <$> clientWant | ||
110 | clientInterested <- use (status.clientStatus.interested) | ||
88 | 111 | ||
112 | when (clientInterested /= peerInteresting) $ | ||
113 | signalMessage $ if peerInteresting then Interested else NotInterested | ||
89 | 114 | ||
90 | nextEvent :: PeerSession -> PeerWire Event | 115 | requireExtension :: Extension -> P2P () |
91 | nextEvent se @ PeerSession {..} = waitMessage se >>= go | 116 | requireExtension required = do |
117 | enabled <- asks enabledExtensions | ||
118 | unless (required `elem` enabled) $ | ||
119 | sessionError $ ppExtension required <+> "not enabled" | ||
120 | |||
121 | peerHave :: P2P Event | ||
122 | peerHave = undefined | ||
123 | |||
124 | -- haveMessage bf = do | ||
125 | -- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession | ||
126 | -- if undefined -- ix `member` bf | ||
127 | -- then nextEvent se | ||
128 | -- else undefined -- return $ Available diff | ||
129 | |||
130 | |||
131 | -- | | ||
132 | -- properties: | ||
133 | -- | ||
134 | -- forall (Fragment block). isPiece block == True | ||
135 | -- | ||
136 | awaitEvent :: P2P Event | ||
137 | awaitEvent = waitMessage >>= go | ||
92 | where | 138 | where |
93 | go KeepAlive = nextEvent se | 139 | go KeepAlive = awaitEvent |
94 | go Choke = do | 140 | go Choke = do |
95 | SessionStatus {..} <- liftIO $ readIORef peerSessionStatus | 141 | status.peerStatus.choking .= True |
96 | if _choking sePeerStatus | 142 | awaitEvent |
97 | then nextEvent se | ||
98 | else undefined | ||
99 | 143 | ||
100 | go Unchoke = do | 144 | go Unchoke = do |
101 | SessionStatus {..} <- liftIO $ readIORef peerSessionStatus | 145 | status.clientStatus.choking .= False |
102 | if not (_choking sePeerStatus) | 146 | awaitEvent |
103 | then nextEvent se | 147 | |
104 | else if undefined | 148 | go Interested = do |
105 | then undefined | 149 | status.peerStatus.interested .= True |
106 | else undefined | 150 | awaitEvent |
107 | --return $ Available BF.difference | 151 | |
108 | 152 | go NotInterested = do | |
109 | go Interested = return undefined | 153 | status.peerStatus.interested .= False |
110 | go NotInterested = return undefined | 154 | awaitEvent |
111 | 155 | ||
112 | go (Have ix) = do | 156 | -- go (Have ix) = peerHave =<< singletonBF ix |
113 | pc <- liftIO $ getPieceCount se | 157 | -- go (Bitfield bf) = peerHave =<< adjustBF bf |
114 | haveMessage $ have ix (haveNone pc) -- TODO singleton | ||
115 | |||
116 | go (Bitfield bf) = undefined | ||
117 | go (Request bix) = do | 158 | go (Request bix) = do |
118 | undefined | 159 | bf <- use bitfield |
160 | if ixPiece bix `BF.member` bf | ||
161 | then return (Want bix) | ||
162 | else do | ||
163 | signalMessage (RejectRequest bix) | ||
164 | awaitEvent | ||
165 | |||
166 | go (Piece blk) = undefined | ||
119 | 167 | ||
120 | go msg @ (Piece blk) = undefined | 168 | {- |
121 | go msg @ (Port _) | 169 | go msg @ (Port _) |
122 | = checkExtension msg ExtDHT $ do | 170 | = checkExtension msg ExtDHT $ do |
123 | undefined | 171 | undefined |
124 | 172 | ||
125 | go msg @ HaveAll | 173 | go HaveAll = do |
126 | = checkExtension msg ExtFast $ do | 174 | requireExtension ExtFast |
127 | pc <- liftIO $ getPieceCount se | 175 | bitfield <~ fullBF |
128 | haveMessage (haveAll pc) | 176 | revise |
129 | 177 | awaitEvent | |
130 | go msg @ HaveNone | 178 | |
131 | = checkExtension msg ExtFast $ do | 179 | go HaveNone = do |
132 | pc <- liftIO $ getPieceCount se | 180 | requireExtension ExtFast |
133 | haveMessage (haveNone pc) | 181 | bitfield <~ emptyBF |
134 | 182 | revise | |
135 | go msg @ (SuggestPiece ix) | 183 | awaitEvent |
136 | = checkExtension msg ExtFast $ do | 184 | |
137 | undefined | 185 | go (SuggestPiece ix) = do |
186 | requireExtension ExtFast | ||
187 | bf <- use bitfield | ||
188 | if ix `BF.notMember` bf | ||
189 | then Available <$> singletonBF ix | ||
190 | else awaitEvent | ||
138 | 191 | ||
139 | go msg @ (RejectRequest ix) | 192 | go msg @ (RejectRequest ix) |
140 | = checkExtension msg ExtFast $ do | 193 | = checkExtension msg ExtFast $ do |
@@ -143,20 +196,16 @@ nextEvent se @ PeerSession {..} = waitMessage se >>= go | |||
143 | go msg @ (AllowedFast pix) | 196 | go msg @ (AllowedFast pix) |
144 | = checkExtension msg ExtFast $ do | 197 | = checkExtension msg ExtFast $ do |
145 | undefined | 198 | undefined |
199 | -} | ||
146 | 200 | ||
147 | haveMessage bf = do | 201 | signalEvent :: Event -> P2P () |
148 | cbf <- liftIO $ readIORef $ clientBitfield swarmSession | 202 | signalEvent (Available bf) = undefined |
149 | if undefined -- ix `member` bf | 203 | signalEvent _ = undefined |
150 | then nextEvent se | ||
151 | else undefined -- return $ Available diff | ||
152 | 204 | ||
153 | checkExtension msg requredExtension action | 205 | --flushBroadcast :: P2P () |
154 | | requredExtension `elem` enabledExtensions = action | 206 | --flushBroadcast = nextBroadcast >>= maybe (return ()) go |
155 | | otherwise = liftIO $ throwIO $ userError errorMsg | 207 | -- where |
156 | where | 208 | -- go = undefined |
157 | errorMsg = show (ppExtension requredExtension) | ||
158 | ++ "not enabled, but peer sent" | ||
159 | ++ show (ppMessage msg) | ||
160 | 209 | ||
161 | {----------------------------------------------------------------------- | 210 | {----------------------------------------------------------------------- |
162 | P2P monad | 211 | P2P monad |
@@ -164,29 +213,9 @@ nextEvent se @ PeerSession {..} = waitMessage se >>= go | |||
164 | 213 | ||
165 | newtype P2P a = P2P { | 214 | newtype P2P a = P2P { |
166 | runP2P :: ReaderT PeerSession PeerWire a | 215 | runP2P :: ReaderT PeerSession PeerWire a |
167 | } deriving (Monad, MonadReader PeerSession, MonadIO) | 216 | } deriving (Functor, Applicative, Monad, MonadReader PeerSession, MonadIO) |
168 | |||
169 | instance MonadState SessionStatus P2P where | ||
170 | get = asks peerSessionStatus >>= liftIO . readIORef | ||
171 | put x = asks peerSessionStatus >>= liftIO . (`writeIORef` x) | ||
172 | |||
173 | |||
174 | runConduit :: Socket -> Conduit Message IO Message -> IO () | ||
175 | runConduit sock p2p = | ||
176 | sourceSocket sock $= | ||
177 | conduitGet S.get $= | ||
178 | forever p2p $= | ||
179 | conduitPut S.put $$ | ||
180 | sinkSocket sock | ||
181 | 217 | ||
182 | withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO () | 218 | withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO () |
183 | withPeer se addr p2p = | 219 | withPeer se addr p2p = |
184 | withPeerSession se addr $ \(sock, pses) -> do | 220 | withPeerSession se addr $ \(sock, pses) -> do |
185 | runConduit sock (runReaderT (runP2P p2p) pses) | 221 | runConduit sock (runReaderT (runP2P p2p) pses) |
186 | |||
187 | |||
188 | awaitEvent :: P2P Event | ||
189 | awaitEvent = P2P (ReaderT nextEvent) | ||
190 | |||
191 | signalEvent :: Event -> P2P () | ||
192 | signalEvent = undefined | ||