summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Bus.hs
blob: 4800c4a007235f32a9a5f2913c1ae925e8751155 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
module Network.BitTorrent.Exchange.Bus ( ) where

type PeerWire = ConduitM Message Message IO

runPeerWire :: Socket -> PeerWire () -> IO ()
runPeerWire sock action =
  sourceSocket sock     $=
    S.conduitGet S.get  $=
--    B.conduitDecode     $=
      action            $=
    S.conduitPut S.put  $$
--    B.conduitEncode     $$
  sinkSocket sock

awaitMessage :: P2P Message
awaitMessage = P2P $ ReaderT $ const $ {-# SCC awaitMessage #-} go
  where
    go = await >>= maybe (monadThrow PeerDisconnected) return
{-# INLINE awaitMessage #-}

yieldMessage :: Message -> P2P ()
yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} do
  C.yield msg
{-# INLINE yieldMessage #-}

-- TODO send vectored
flushPending :: P2P ()
flushPending = {-# SCC flushPending #-} do
  session <- ask
  queue   <- liftIO (getPending session)
  mapM_ yieldMessage queue

{-----------------------------------------------------------------------
    P2P monad
-----------------------------------------------------------------------}

-- |
--   Exceptions:
--
--     * SessionException: is visible only within one peer
--     session. Use this exception to terminate P2P session, but not
--     the swarm session.
--
newtype P2P a = P2P {
    unP2P :: ReaderT PeerSession PeerWire a
  } deriving ( Functor, Applicative, Monad
             , MonadIO, MonadThrow, MonadActive
             , MonadReader PeerSession
             )

instance MonadState SessionState P2P where
  get    = asks sessionState >>= liftIO . readIORef
  {-# INLINE get #-}
  put !s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s
  {-# INLINE put #-}

runP2P :: (Socket, PeerSession) -> P2P () -> IO ()
runP2P (sock, ses) action =
  handle isIOException $
    runPeerWire sock (runReaderT (unP2P action) ses)
  where
    isIOException :: IOException -> IO ()
    isIOException _ = return ()