blob: 7de9118071ed7b7898ebfcdf5bb58cea74ab3108 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
module Network.BitTorrent.Exchange.Bus ( ) where
type PeerWire = ConduitM Message Message IO
runPeerWire :: Socket -> PeerWire () -> IO ()
runPeerWire sock action =
sourceSocket sock $=
S.conduitGet S.get $=
-- B.conduitDecode $=
action $=
S.conduitPut S.put $$
-- B.conduitEncode $$
sinkSocket sock
awaitMessage :: P2P Message
awaitMessage = P2P $ ReaderT $ const $ {-# SCC awaitMessage #-} go
where
go = await >>= maybe (monadThrow PeerDisconnected) return
{-# INLINE awaitMessage #-}
yieldMessage :: Message -> P2P ()
yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} do
C.yield msg
{-# INLINE yieldMessage #-}
-- TODO send vectored
flushPending :: P2P ()
flushPending = {-# SCC flushPending #-} do
session <- ask
queue <- liftIO (getPending session)
mapM_ yieldMessage queue
{-----------------------------------------------------------------------
P2P monad
-----------------------------------------------------------------------}
filterMeaninless :: P2P Message Message
filterMeaninless = undefined
-- |
-- Exceptions:
--
-- * SessionException: is visible only within one peer
-- session. Use this exception to terminate P2P session, but not
-- the swarm session.
--
newtype P2P a = P2P {
unP2P :: ReaderT PeerSession PeerWire a
} deriving ( Functor, Applicative, Monad
, MonadIO, MonadThrow, MonadActive
, MonadReader PeerSession
)
instance MonadState SessionState P2P where
get = asks sessionState >>= liftIO . readIORef
{-# INLINE get #-}
put !s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s
{-# INLINE put #-}
runP2P :: (Socket, PeerSession) -> P2P () -> IO ()
runP2P (sock, ses) action =
handle isIOException $
runPeerWire sock (runReaderT (unP2P action) ses)
where
isIOException :: IOException -> IO ()
isIOException _ = return ()
|