summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Bus.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Bus.hs')
-rw-r--r--src/Network/BitTorrent/Exchange/Bus.hs63
1 files changed, 63 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Exchange/Bus.hs b/src/Network/BitTorrent/Exchange/Bus.hs
new file mode 100644
index 00000000..4800c4a0
--- /dev/null
+++ b/src/Network/BitTorrent/Exchange/Bus.hs
@@ -0,0 +1,63 @@
1module Network.BitTorrent.Exchange.Bus ( ) where
2
3type PeerWire = ConduitM Message Message IO
4
5runPeerWire :: Socket -> PeerWire () -> IO ()
6runPeerWire sock action =
7 sourceSocket sock $=
8 S.conduitGet S.get $=
9-- B.conduitDecode $=
10 action $=
11 S.conduitPut S.put $$
12-- B.conduitEncode $$
13 sinkSocket sock
14
15awaitMessage :: P2P Message
16awaitMessage = P2P $ ReaderT $ const $ {-# SCC awaitMessage #-} go
17 where
18 go = await >>= maybe (monadThrow PeerDisconnected) return
19{-# INLINE awaitMessage #-}
20
21yieldMessage :: Message -> P2P ()
22yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} do
23 C.yield msg
24{-# INLINE yieldMessage #-}
25
26-- TODO send vectored
27flushPending :: P2P ()
28flushPending = {-# SCC flushPending #-} do
29 session <- ask
30 queue <- liftIO (getPending session)
31 mapM_ yieldMessage queue
32
33{-----------------------------------------------------------------------
34 P2P monad
35-----------------------------------------------------------------------}
36
37-- |
38-- Exceptions:
39--
40-- * SessionException: is visible only within one peer
41-- session. Use this exception to terminate P2P session, but not
42-- the swarm session.
43--
44newtype P2P a = P2P {
45 unP2P :: ReaderT PeerSession PeerWire a
46 } deriving ( Functor, Applicative, Monad
47 , MonadIO, MonadThrow, MonadActive
48 , MonadReader PeerSession
49 )
50
51instance MonadState SessionState P2P where
52 get = asks sessionState >>= liftIO . readIORef
53 {-# INLINE get #-}
54 put !s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s
55 {-# INLINE put #-}
56
57runP2P :: (Socket, PeerSession) -> P2P () -> IO ()
58runP2P (sock, ses) action =
59 handle isIOException $
60 runPeerWire sock (runReaderT (unP2P action) ses)
61 where
62 isIOException :: IOException -> IO ()
63 isIOException _ = return ()