summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-07-12 19:54:44 -0400
committerjoe <joe@jerkface.net>2017-07-12 19:54:44 -0400
commit58db745cc60428ee7a959599b2e83ff7504d5b57 (patch)
treeb174e62fae6d5b640d1ca242b5fc90bce0b6f393
parent883545312ff1e354302f8b4733d77b0abc8a4025 (diff)
Polymorphic implementation of a query/response protocol.
-rw-r--r--src/Network/QueryResponse.hs332
1 files changed, 332 insertions, 0 deletions
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs
new file mode 100644
index 00000000..5b8bcda4
--- /dev/null
+++ b/src/Network/QueryResponse.hs
@@ -0,0 +1,332 @@
1-- | This module can implement any query\/response protocol. It was written
2-- with Kademlia implementations in mind.
3
4{-# LANGUAGE CPP #-}
5{-# LANGUAGE RankNTypes #-}
6{-# LANGUAGE LambdaCase #-}
7{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE GADTs #-}
9module Network.QueryResponse where
10
11#ifdef THREAD_DEBUG
12import Control.Concurrent.Lifted.Instrument
13#else
14import GHC.Conc (labelThread)
15import Control.Concurrent
16#endif
17import Control.Concurrent.STM
18import System.Timeout
19import Data.Function
20import Control.Exception
21import Control.Monad
22import qualified Data.ByteString as B
23 ;import Data.ByteString (ByteString)
24import Network.Socket
25import Network.Socket.ByteString as B
26import System.IO.Error
27import Data.Maybe
28
29-- * Using a query\/response 'Client'.
30
31-- | Fork a thread that handles inbound packets. The returned action may be used
32-- to terminate the thread and clean up any related state.
33--
34-- Example usage:
35--
36-- > -- Start client.
37-- > quitServer <- forkListener client
38-- > -- Send a query q, recieve a response r.
39-- > r <- sendQuery client method q
40-- > -- Quit client.
41-- > quitServer
42forkListener :: Client err tbl meth tid addr x -> IO (IO ())
43forkListener client = do
44 thread_id <- fork $ do
45 myThreadId >>= flip labelThread "listener"
46 fix $ handleMessage client
47 return $ do
48 closeTransport (clientNet client)
49 killThread thread_id
50
51-- | Send a query to a remote peer. Note that this funciton will always time
52-- out if 'forkListener' was never invoked to spawn a thread receive and
53-- dispatch the response.
54sendQuery ::
55 forall err a b tbl x meth tid addr.
56 Client err tbl meth tid addr x -- ^ A query/response implementation.
57 -> Method addr x meth a b -- ^ Information for marshalling the query.
58 -> a -- ^ The outbound query.
59 -> addr -- ^ Destination address of query.
60 -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out.
61sendQuery (Client net d err pending whoami) meth q addr = do
62 mvar <- newEmptyMVar
63 tid <- atomically $ do
64 tbl <- readTVar pending
65 let (tid, tbl') = dispatchRegister (tableMethods d) mvar tbl
66 writeTVar pending tbl'
67 return tid
68 self <- whoami
69 sendMessage net addr (wrapQuery meth self addr q)
70 mres <- timeout (methodTimeout meth) $ takeMVar mvar
71 case mres of
72 Just x -> return $ Just $ unwrapResponse meth x
73 Nothing -> do
74 atomically $ modifyTVar' pending (dispatchCancel (tableMethods d) tid)
75 reportTimeout err (method meth) tid addr
76 return Nothing
77
78-- * Implementing a query\/response 'Client'.
79
80-- | All inputs required to implement a query\/response client.
81data Client err tbl meth tid addr x = Client
82 { -- | The 'Transport' used to dispatch and receive packets.
83 clientNet :: Transport err addr x
84 -- | Methods for handling inbound packets.
85 , clientDispatcher :: DispatchMethods tbl err meth tid addr x
86 -- | Methods for reporting various conditions.
87 , clientErrorReporter :: ErrorReporter addr x meth tid err
88 -- | State necessary for routing inbound responses and assigning unique
89 -- /tid/ values for outgoing queries.
90 , clientPending :: TVar tbl
91 -- | An action yielding this client\'s own address. It is invoked once on
92 -- each outbound and inbound packet. It is valid for this to always
93 -- return the same value.
94 , clientMyAddress :: IO addr
95 }
96
97-- | An incomming message can be classified into three cases.
98data MessageClass err meth tid
99 = IsQuery meth -- ^ An unsolicited query is handled based on it's /meth/ value.
100 | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value.
101 | IsUnknown err -- ^ None of the above.
102
103-- | Handler for an inbound query of type _x_ from an address of type _addr_.
104data MethodHandler err addr x = forall a b. MethodHandler
105 { -- | Parse the query into a more specific type for this method.
106 methodParse :: x -> Either err a
107 -- | Serialize the response type for transmission. Origin and destination
108 -- addresses for the packet are supplied in case they are required.
109 , methodSerialize :: addr -> addr -> b -> x
110 -- | Fully typed action to perform upon the query. The remote origin
111 -- address of the query is provided to the handler.
112 , methodAction :: addr -> a -> IO b
113 }
114
115-- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the
116-- parse is successful, the returned IO action will construct our reply.
117-- Otherwise, a parse err is returned.
118dispatchQuery :: MethodHandler err addr x -- ^ Handler to invoke.
119 -> addr -- ^ Our own address, to which the query was sent.
120 -> x -- ^ The query packet.
121 -> addr -- ^ The origin address of the query.
122 -> Either err (IO x)
123dispatchQuery (MethodHandler unwrapQ wrapR f) self x addr =
124 fmap (\a -> wrapR self addr <$> f addr a) $ unwrapQ x
125
126-- | These four parameters are required to implement an ougoing query. A
127-- peer-to-peer algorithm will define a 'Method' for every 'MethodHandler' that
128-- might be returned by 'lookupHandler'.
129data Method addr x meth a b = Method
130 { -- | Seconds to wait for a response.
131 methodTimeout :: Int
132 -- | A method identifier used for error reporting. This needn't be the
133 -- same as the /meth/ argument to 'MethodHandler', but it is suggested.
134 , method :: meth
135 -- | Serialize the outgoing query /a/ into a transmitable packet /x/.
136 -- The /addr/ arguments are, respectively, our own origin address and the
137 -- destination of the request.
138 , wrapQuery :: addr -> addr -> a -> x
139 -- | Parse an inbound packet /x/ into a response /b/ for this query.
140 , unwrapResponse :: x -> b
141 }
142
143
144-- | Three methods are required to implement a datagram based query\/response protocol.
145data Transport err addr x = Transport
146 { -- | Blocks until an inbound packet is available. Returns 'Nothing' when
147 -- no more packets are expected due to a shutdown or close event.
148 -- Otherwise, the packet will be parsed as type /x/ and an origin address
149 -- /addr/. Parse failure is indicated by the type 'err'.
150 awaitMessage :: IO (Maybe (Either err (x, addr)))
151 -- | Send an /x/ packet to the given destination /addr/.
152 , sendMessage :: addr -> x -> IO ()
153 -- | Shutdown and clean up any state related to this 'Transport'.
154 , closeTransport :: IO ()
155 }
156
157-- | This function modifies a 'Transport' to use higher-level addresses and
158-- packet representations. It could be used to change UDP 'ByteString's into
159-- bencoded syntax trees or to add an encryption layer in which addresses have
160-- associated public keys.
161layerTransport ::
162 (x -> addr -> Either err (x', addr'))
163 -- ^ Function that attempts to transform a low-level address/packet
164 -- pair into a higher level representation.
165 -> (x' -> addr' -> (x, addr))
166 -- ^ Function to encode a high-level address/packet into a lower level
167 -- representation.
168 -> Transport err addr x
169 -- ^ The low-level transport to be transformed.
170 -> Transport err addr' x'
171layerTransport parse encode tr =
172 tr { awaitMessage = do
173 m <- awaitMessage tr
174 return $ fmap (>>= uncurry parse) m
175 , sendMessage = \addr' msg' -> do
176 let (msg,addr) = encode msg' addr'
177 sendMessage tr addr msg
178 }
179
180
181-- | To dipatch responses to our outbound queries, we require three primitives.
182-- See the 'transactionTableMethods' function to create these primitives out of a
183-- lookup table and a generator for transaction ids.
184--
185-- The type variable /d/ is used to represent the current state of the
186-- transaction generator and the table of pending transactions.
187data TableMethods d tid x = TableMethods
188 {
189 -- | Before a query is sent, this function stores an 'MVar' to which the
190 -- response will be written too. The returned _tid_ is a transaction id
191 -- that can be used to forget the 'MVar' if the remote peer is not
192 -- responding.
193 dispatchRegister :: MVar x -> d -> (tid, d)
194 -- | This method is invoked when an incomming packet _x_ indicates it is
195 -- a response to the transaction with id _tid_. The returned IO action
196 -- is will write the packet to the correct 'MVar' thus completing the
197 -- dispatch.
198 , dispatchResponse :: tid -> x -> d -> (d, IO ())
199 -- | When a timeout interval elapses, this method is called to remove the
200 -- transaction from the table.
201 , dispatchCancel :: tid -> d -> d
202 }
203
204-- | Construct 'TableMethods' methods out of 3 lookup table primitives and a
205-- function for generating unique transaction ids.
206transactionTableMethods ::
207 (forall a. tid -> a -> t a -> t a)
208 -- ^ Insert a new _tid_ entry into the transaction table.
209 -> (forall a. tid -> t a -> t a)
210 -- ^ Delete transaction _tid_ from the transaction table.
211 -> (forall a. tid -> t a -> Maybe a)
212 -- ^ Lookup the value associated with transaction _tid_.
213 -> (g -> (tid,g))
214 -- ^ Generate a new unique _tid_ value and update the generator state _g_.
215 -> TableMethods (g,t (MVar x)) tid x
216transactionTableMethods insert delete lookup generate = TableMethods
217 { dispatchCancel = \tid (g,t) -> (g, delete tid t)
218 , dispatchRegister = \v (g,t) ->
219 let (tid,g') = generate g
220 t' = insert tid v t
221 in ( tid, (g',t') )
222 , dispatchResponse = \tid x (g,t) ->
223 case lookup tid t of
224 Just v -> let t' = delete tid t
225 in ((g,t'),void $ tryPutMVar v x)
226 Nothing -> ((g,t), return ())
227 }
228
229-- | A set of methods neccessary for dispatching incomming packets.
230data DispatchMethods tbl err meth tid addr x = DispatchMethods
231 { -- | Clasify an inbound packet as a query or response.
232 classifyInbound :: x -> MessageClass err meth tid
233 -- | Lookup the handler for a inbound query.
234 , lookupHandler :: meth -> Maybe (MethodHandler err addr x)
235 -- | Methods for handling incomming responses.
236 , tableMethods :: TableMethods tbl tid x
237 }
238
239-- | These methods indicate what should be done upon various conditions. Write
240-- to a log file, make debug prints, or simply ignore them.
241--
242-- [ /addr/ ] Address of remote peer.
243--
244-- [ /x/ ] Incomming or outgoing packet.
245--
246-- [ /meth/ ] Method id of incomming or outgoing request.
247--
248-- [ /tid/ ] Transaction id for outgoing packet.
249--
250-- [ /err/ ] Error information, typically a 'String'.
251data ErrorReporter addr x meth tid err = ErrorReporter
252 { -- | Incomming: failed to parse packet.
253 reportParseError :: err -> IO ()
254 -- | Incomming: no handler for request.
255 , reportMissingHandler :: meth -> addr -> x -> IO ()
256 -- | Incomming: unable to identify request.
257 , reportUnknown :: addr -> x -> err -> IO ()
258 -- | Outgoing: remote peer is not responding.
259 , reportTimeout :: meth -> tid -> addr -> IO ()
260 }
261
262-- | Handle a single inbound packet and then invoke the given continuation.
263-- The 'forkListener' function is implemeneted by passing this function to
264-- 'fix' in a forked thread that loops until 'awaitMessage' returns 'Nothing'
265-- or throws an exception.
266handleMessage ::
267 Client err tbl meth tid addr x
268 -> IO ()
269 -> IO ()
270handleMessage (Client net d err pending whoami) again = do
271 awaitMessage net >>= \case
272 Just (Left e) -> do reportParseError err e
273 again
274 Just (Right (plain, addr)) -> do
275 case classifyInbound d plain of
276 IsQuery meth -> case lookupHandler d meth of
277 Nothing -> reportMissingHandler err meth addr plain
278 Just m -> do
279 self <- whoami
280 either (reportParseError err)
281 (>>= sendMessage net addr)
282 (dispatchQuery m self plain addr)
283 IsResponse tid -> do
284 action <- atomically $ do
285 ts0 <- readTVar pending
286 let (ts, action) = dispatchResponse (tableMethods d) tid plain ts0
287 writeTVar pending ts
288 return action
289 action
290 IsUnknown e -> reportUnknown err addr plain e
291 again
292 Nothing -> return ()
293
294-- * UDP Datagrams.
295
296-- | Access the address family of a given 'SockAddr'. This convenient accessor
297-- is missing from 'Network.Socket', so I implemented it here.
298sockAddrFamily :: SockAddr -> Family
299sockAddrFamily (SockAddrInet _ _ ) = AF_INET
300sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
301sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
302sockAddrFamily (SockAddrCan _ ) = AF_CAN
303
304-- | Packets with an empty payload may trigger eof exception.
305-- 'udpTransport' uses this function to avoid throwing in that
306-- case.
307ignoreEOF def e | isEOFError e = pure def
308 | otherwise = throwIO e
309
310-- | Hardcoded maximum packet size for incomming udp packets received via
311-- 'udpTransport'.
312udpBufferSize :: Int
313udpBufferSize = 65536
314
315-- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The
316-- argument is the listen-address for incomming packets. This is a useful
317-- low-level 'Transport' that can be transformed for higher-level protocols
318-- using 'layerTransport'.
319udpTransport :: SockAddr -> IO (Transport err SockAddr ByteString)
320udpTransport bind_address = do
321 let family = sockAddrFamily bind_address
322 sock <- socket family Datagram defaultProtocol
323 when (family == AF_INET6) $ do
324 setSocketOption sock IPv6Only 0
325 bind sock bind_address
326 return Transport
327 { awaitMessage = handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do
328 r <- B.recvFrom sock udpBufferSize
329 return $ Just $ Right r
330 , sendMessage = \addr bs -> void $ B.sendTo sock bs addr
331 , closeTransport = close sock
332 }