summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Network/BitTorrent.hs7
-rw-r--r--src/Network/BitTorrent/Exchange.hs182
-rw-r--r--src/Network/BitTorrent/Internal.hs210
3 files changed, 312 insertions, 87 deletions
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs
index 5d6034f6..5fbc5ff6 100644
--- a/src/Network/BitTorrent.hs
+++ b/src/Network/BitTorrent.hs
@@ -7,6 +7,7 @@
7-- 7--
8module Network.BitTorrent 8module Network.BitTorrent
9 ( module BT 9 ( module BT
10 , module Data.Torrent
10 11
11 -- * Tracker 12 -- * Tracker
12 13
@@ -16,12 +17,12 @@ module Network.BitTorrent
16 , PeerSession 17 , PeerSession
17 ) where 18 ) where
18 19
20import Data.Torrent
19import Network.BitTorrent.Internal 21import Network.BitTorrent.Internal
20
21import Network.BitTorrent.Extension as BT 22import Network.BitTorrent.Extension as BT
22import Network.BitTorrent.Peer as BT 23import Network.BitTorrent.Peer as BT
23import Network.BitTorrent.Exchange as BT 24import Network.BitTorrent.Exchange as BT
24import Network.BitTorrent.Tracker as BT 25import Network.BitTorrent.Tracker as BT
25 26
26--discover :: SwarmSession -> (Chan PeerAddr -> IO a) -> IO a 27--discover :: SwarmSession -> ([PeerAddr] -> IO a) -> IO a
27--discover = undefined 28--discover = withTracker
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index 4fe90cda..b23ca667 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -11,6 +11,7 @@
11{-# LANGUAGE RecordWildCards #-} 11{-# LANGUAGE RecordWildCards #-}
12module Network.BitTorrent.Exchange 12module Network.BitTorrent.Exchange
13 ( P2P, withPeer 13 ( P2P, withPeer
14 , awaitEvent, signalEvent
14 ) where 15 ) where
15 16
16import Control.Applicative 17import Control.Applicative
@@ -41,117 +42,130 @@ import Network.BitTorrent.Peer
41import Data.Bitfield as BF 42import Data.Bitfield as BF
42import Data.Torrent 43import Data.Torrent
43 44
44{-----------------------------------------------------------------------
45 P2P monad
46-----------------------------------------------------------------------}
47
48type PeerWire = ConduitM Message Message IO
49
50waitMessage :: PeerWire Message
51waitMessage = await >>= maybe waitMessage return
52
53signalMessage :: Message -> PeerWire ()
54signalMessage = C.yield
55
56newtype P2P a = P2P {
57 runP2P :: ReaderT PeerSession PeerWire a
58 } deriving (Monad, MonadReader PeerSession, MonadIO)
59
60instance MonadState Bitfield P2P where
61
62runConduit :: Socket -> Conduit Message IO Message -> IO ()
63runConduit sock p2p =
64 sourceSocket sock $=
65 conduitGet S.get $=
66 forever p2p $=
67 conduitPut S.put $$
68 sinkSocket sock
69
70withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO ()
71withPeer se addr p2p =
72 withPeerSession se addr $ \(sock, pses) -> do
73 runConduit sock (runReaderT (runP2P p2p) pses)
74 45
75data Event = Available Bitfield 46data Event = Available Bitfield
76 | Want 47 | Want
77 | Block 48 | Block
78 49
50{-----------------------------------------------------------------------
51 Peer wire
52-----------------------------------------------------------------------}
79 53
54type PeerWire = ConduitM Message Message IO
80 55
81waitForEvent :: P2P Event 56waitMessage :: PeerSession -> PeerWire Message
82waitForEvent = P2P (ReaderT nextEvent) 57waitMessage se = do
83 where 58 mmsg <- await
84 nextEvent se @ PeerSession {..} = waitMessage >>= diff 59 case mmsg of
85 where 60 Nothing -> waitMessage se
86 -- diff finds difference between 61 Just msg -> do
87 diff KeepAlive = do 62 liftIO $ updateIncoming se
88 signalMessage KeepAlive 63 return msg
89 nextEvent se
90 64
91 handleMessage Choke = do 65signalMessage :: Message -> PeerSession -> PeerWire ()
92 SessionStatus {..} <- liftIO $ readIORef peerSessionStatus 66signalMessage msg se = do
93 if psChoking sePeerStatus 67 C.yield msg
94 then nextEvent se 68 liftIO $ updateOutcoming se
95 else undefined
96 69
97 handleMessage Unchoke = return $ Available BF.difference
98 70
99 handleMessage Interested = return undefined 71getPieceCount :: PeerSession -> IO PieceCount
100 handleMessage NotInterested = return undefined 72getPieceCount = undefined
101 73
102 handleMessage (Have ix) = do 74nextEvent :: PeerSession -> PeerWire Event
75nextEvent se @ PeerSession {..} = waitMessage se >>= diff
76 where
77 -- diff finds difference between
78-- diff KeepAlive = nextEvent se
79 diff msg = do
80 liftIO $ print (ppMessage msg)
81 nextEvent se
82
83 handleMessage Choke = do
84 SessionStatus {..} <- liftIO $ readIORef peerSessionStatus
85 if psChoking sePeerStatus
86 then nextEvent se
87 else undefined
88
89 handleMessage Unchoke = undefined
90 --return $ Available BF.difference
91
92 handleMessage Interested = return undefined
93 handleMessage NotInterested = return undefined
94 handleMessage (Have ix) = do
95 pc <- liftIO $ getPieceCount se
96 haveMessage $ have ix (haveNone pc) -- TODO singleton
97
98 handleMessage (Bitfield bf) = undefined
99 handleMessage (Request bix) = do
100 undefined
101
102 handleMessage msg @ (Piece blk) = undefined
103 handleMessage msg @ (Port _)
104 = checkExtension msg ExtDHT $ do
105 undefined
106
107 handleMessage msg @ HaveAll
108 = checkExtension msg ExtFast $ do
103 pc <- liftIO $ getPieceCount se 109 pc <- liftIO $ getPieceCount se
104 haveMessage $ have ix (haveNone pc) -- TODO singleton 110 haveMessage (haveAll pc)
105
106 handleMessage (Bitfield bf) = undefined
107 handleMessage (Request bix) = do
108 undefined
109
110 handleMessage (Piece blk) = undefined
111 handleMessage (Port _)
112 = checkExtension msg ExtDHT $ do
113 undefined
114 111
115 handleMessage msg @ HaveAll 112 handleMessage msg @ HaveNone
116 = checkExtension msg ExtFast $ do 113 = checkExtension msg ExtFast $ do
117 pc <- liftIO $ getPieceCount se 114 pc <- liftIO $ getPieceCount se
118 haveMessage (haveAll pc) 115 haveMessage (haveNone pc)
119
120 handleMessage msg @ HaveNone
121 = checkExtension msg ExtFast $ do
122 pc <- liftIO $ getPieceCount se
123 haveMessage (haveNone pc)
124 116
125 handleMessage msg @ (SuggestPiece ix) 117 handleMessage msg @ (SuggestPiece ix)
126 = checkExtension msg ExtFast $ do 118 = checkExtension msg ExtFast $ do
127 undefined 119 undefined
128 120
129 handleMessage msg @ (RejectRequest ix) 121 handleMessage msg @ (RejectRequest ix)
130 = checkExtension msg ExtFast $ do 122 = checkExtension msg ExtFast $ do
131 undefined 123 undefined
132 124
133 handleMessage msg @ (AllowedFast pix) 125 handleMessage msg @ (AllowedFast pix)
134 = checkExtension msg ExtFast $ do 126 = checkExtension msg ExtFast $ do
135 undefined 127 undefined
136 128
137 haveMessage bf = do 129 haveMessage bf = do
138 cbf <- liftIO $ readIORef $ clientBitfield swarmSession 130 cbf <- liftIO $ readIORef $ clientBitfield swarmSession
139 if undefined -- ix `member` bf 131 if undefined -- ix `member` bf
140 then nextEvent se 132 then nextEvent se
141 else return $ Available diff 133 else undefined -- return $ Available diff
142 134
143 checkExtension msg requredExtension action 135 checkExtension msg requredExtension action
144 | requredExtension `elem` enabledExtensions = action 136 | requredExtension `elem` enabledExtensions = action
145 | otherwise = liftIO $ throwIO $ userError errorMsg 137 | otherwise = liftIO $ throwIO $ userError errorMsg
146 where 138 where
147 errorMsg = show (ppExtension requredExtension) 139 errorMsg = show (ppExtension requredExtension)
148 ++ "not enabled, but peer sent" 140 ++ "not enabled, but peer sent"
149 ++ show (ppMessage msg) 141 ++ show (ppMessage msg)
150 142
143{-----------------------------------------------------------------------
144 P2P monad
145-----------------------------------------------------------------------}
151 146
147newtype P2P a = P2P {
148 runP2P :: ReaderT PeerSession PeerWire a
149 } deriving (Monad, MonadReader PeerSession, MonadIO)
152 150
153getPieceCount :: PeerSession -> IO PieceCount 151instance MonadState Bitfield P2P where
154getPieceCount = undefined 152
153runConduit :: Socket -> Conduit Message IO Message -> IO ()
154runConduit sock p2p =
155 sourceSocket sock $=
156 conduitGet S.get $=
157 forever p2p $=
158 conduitPut S.put $$
159 sinkSocket sock
160
161withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO ()
162withPeer se addr p2p =
163 withPeerSession se addr $ \(sock, pses) -> do
164 runConduit sock (runReaderT (runP2P p2p) pses)
165
166
167awaitEvent :: P2P Event
168awaitEvent = P2P (ReaderT nextEvent)
155 169
156signalEvent :: Event -> P2P () 170signalEvent :: Event -> P2P ()
157signalEvent = undefined 171signalEvent = undefined
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs
new file mode 100644
index 00000000..d34c6236
--- /dev/null
+++ b/src/Network/BitTorrent/Internal.hs
@@ -0,0 +1,210 @@
1-- |
2-- Copyright : (c) Sam T. 2013
3-- License : MIT
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This module implement opaque broadcast message passing. It
9-- provides sessions needed by Network.BitTorrent and
10-- Network.BitTorrent.Exchange and modules. To hide some internals
11-- of this module we detach it from Exchange.
12--
13{-# LANGUAGE RecordWildCards #-}
14module Network.BitTorrent.Internal
15 ( ClientSession(..), newClient
16 , SwarmSession(..), newLeacher, newSeeder
17 , PeerSession(..), withPeerSession
18
19 -- * Timeouts
20 , updateIncoming, updateOutcoming
21 ) where
22
23import Control.Applicative
24import Control.Concurrent
25import Control.Concurrent.STM
26import Control.Exception
27
28import Data.IORef
29import Data.Function
30import Data.Ord
31import Data.Set as S
32
33import Data.Conduit
34import Data.Conduit.Cereal
35import Data.Conduit.Network
36import Data.Serialize
37
38import Network
39import Network.Socket
40import Network.Socket.ByteString
41
42import GHC.Event as Ev
43
44import Data.Bitfield as BF
45import Data.Torrent
46import Network.BitTorrent.Extension
47import Network.BitTorrent.Peer
48import Network.BitTorrent.Exchange.Protocol as BT
49
50
51
52{-----------------------------------------------------------------------
53 Client session
54-----------------------------------------------------------------------}
55
56-- | In one application you could have many clients.
57data ClientSession = ClientSession {
58 clientPeerID :: PeerID -- ^
59 , allowedExtensions :: [Extension] -- ^
60 , swarmSessions :: TVar (Set SwarmSession)
61 , eventManager :: EventManager
62 }
63
64instance Eq ClientSession where
65 (==) = (==) `on` clientPeerID
66
67instance Ord ClientSession where
68 compare = comparing clientPeerID
69
70newClient :: [Extension] -> IO ClientSession
71newClient exts = ClientSession <$> newPeerID
72 <*> pure exts
73 <*> newTVarIO S.empty
74 <*> Ev.new
75
76{-----------------------------------------------------------------------
77 Swarm session
78-----------------------------------------------------------------------}
79
80-- | Extensions are set globally by
81-- Swarm session are un
82data SwarmSession = SwarmSession {
83 torrentInfoHash :: InfoHash
84 , clientSession :: ClientSession
85 , clientBitfield :: IORef Bitfield
86 , connectedPeers :: TVar (Set PeerSession)
87 }
88
89instance Eq SwarmSession where
90 (==) = (==) `on` torrentInfoHash
91
92instance Ord SwarmSession where
93 compare = comparing torrentInfoHash
94
95newSwarmSession :: Bitfield -> ClientSession -> Torrent -> IO SwarmSession
96newSwarmSession bf cs @ ClientSession {..} Torrent {..}
97 = SwarmSession <$> pure tInfoHash
98 <*> pure cs
99 <*> newIORef bf
100 <*> newTVarIO S.empty
101
102newSeeder :: ClientSession -> Torrent -> IO SwarmSession
103newSeeder cs t @ Torrent {..}
104 = newSwarmSession (haveAll (pieceCount tInfo)) cs t
105
106newLeacher :: ClientSession -> Torrent -> IO SwarmSession
107newLeacher cs t @ Torrent {..}
108 = newSwarmSession (haveNone (pieceCount tInfo)) cs t
109
110isLeacher :: SwarmSession -> IO Bool
111isLeacher = undefined
112
113{-----------------------------------------------------------------------
114 Peer session
115-----------------------------------------------------------------------}
116
117data PeerSession = PeerSession {
118 connectedPeerAddr :: PeerAddr
119 , swarmSession :: SwarmSession
120 , enabledExtensions :: [Extension]
121
122 -- | To dissconnect from died peers appropriately we should check
123 -- if a peer do not sent the KA message within given interval. If
124 -- yes, we should throw an exception in 'TimeoutCallback' and
125 -- close session between peers.
126 --
127 -- We should update timeout if we /receive/ any message within
128 -- timeout interval to keep connection up.
129 , incomingTimeout :: TimeoutKey
130
131 -- | To send KA message appropriately we should know when was last
132 -- time we sent a message to a peer. To do that we keep registered
133 -- timeout in event manager and if we do not sent any message to
134 -- the peer within given interval then we send KA message in
135 -- 'TimeoutCallback'.
136 --
137 -- We should update timeout if we /send/ any message within timeout
138 -- to avoid reduntant KA messages.
139 , outcomingTimeout :: TimeoutKey
140
141 , broadcastMessages :: Chan [Message]
142 , peerBitfield :: IORef Bitfield
143 , peerSessionStatus :: IORef SessionStatus
144 }
145
146instance Eq PeerSession where
147 (==) = (==) `on` connectedPeerAddr
148
149instance Ord PeerSession where
150 compare = comparing connectedPeerAddr
151
152-- TODO check if it connected yet peer
153withPeerSession :: SwarmSession -> PeerAddr
154 -> ((Socket, PeerSession) -> IO a)
155 -> IO a
156
157withPeerSession ss @ SwarmSession {..} addr
158 = bracket openSession closeSession
159 where
160 openSession = do
161 let caps = encodeExts $ allowedExtensions $ clientSession
162 let pid = clientPeerID $ clientSession
163 let chs = Handshake defaultBTProtocol caps torrentInfoHash pid
164
165 sock <- connectToPeer addr
166 phs <- handshake sock chs `onException` close sock
167
168 let enabled = decodeExts (enabledCaps caps (handshakeCaps phs))
169 ps <- PeerSession addr ss enabled
170 <$> registerTimeout (eventManager clientSession)
171 maxIncomingTime abortSession
172 <*> registerTimeout (eventManager clientSession)
173 maxOutcomingTime (sendKA sock)
174 <*> newChan
175 <*> pure clientBitfield
176 <*> newIORef initSessionStatus
177 return (sock, ps)
178
179 closeSession = close . fst
180
181{-----------------------------------------------------------------------
182 Timeouts
183-----------------------------------------------------------------------}
184
185sec :: Int
186sec = 1000 * 1000
187
188maxIncomingTime :: Int
189maxIncomingTime = 120 * sec
190
191maxOutcomingTime :: Int
192maxOutcomingTime = 60 * sec
193
194-- | Should be called after we have received any message from a peer.
195updateIncoming :: PeerSession -> IO ()
196updateIncoming PeerSession {..} = do
197 updateTimeout (eventManager (clientSession swarmSession))
198 incomingTimeout maxIncomingTime
199
200-- | Should be called before we have send any message to a peer.
201updateOutcoming :: PeerSession -> IO ()
202updateOutcoming PeerSession {..} =
203 updateTimeout (eventManager (clientSession swarmSession))
204 outcomingTimeout maxOutcomingTime
205
206sendKA :: Socket -> IO ()
207sendKA sock = sendAll sock (encode BT.KeepAlive)
208
209abortSession :: IO ()
210abortSession = error "abortSession: not implemented"