diff options
-rw-r--r-- | src/Network/QueryResponse.hs | 332 |
1 files changed, 332 insertions, 0 deletions
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs new file mode 100644 index 00000000..5b8bcda4 --- /dev/null +++ b/src/Network/QueryResponse.hs | |||
@@ -0,0 +1,332 @@ | |||
1 | -- | This module can implement any query\/response protocol. It was written | ||
2 | -- with Kademlia implementations in mind. | ||
3 | |||
4 | {-# LANGUAGE CPP #-} | ||
5 | {-# LANGUAGE RankNTypes #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | {-# LANGUAGE PartialTypeSignatures #-} | ||
8 | {-# LANGUAGE GADTs #-} | ||
9 | module Network.QueryResponse where | ||
10 | |||
11 | #ifdef THREAD_DEBUG | ||
12 | import Control.Concurrent.Lifted.Instrument | ||
13 | #else | ||
14 | import GHC.Conc (labelThread) | ||
15 | import Control.Concurrent | ||
16 | #endif | ||
17 | import Control.Concurrent.STM | ||
18 | import System.Timeout | ||
19 | import Data.Function | ||
20 | import Control.Exception | ||
21 | import Control.Monad | ||
22 | import qualified Data.ByteString as B | ||
23 | ;import Data.ByteString (ByteString) | ||
24 | import Network.Socket | ||
25 | import Network.Socket.ByteString as B | ||
26 | import System.IO.Error | ||
27 | import Data.Maybe | ||
28 | |||
29 | -- * Using a query\/response 'Client'. | ||
30 | |||
31 | -- | Fork a thread that handles inbound packets. The returned action may be used | ||
32 | -- to terminate the thread and clean up any related state. | ||
33 | -- | ||
34 | -- Example usage: | ||
35 | -- | ||
36 | -- > -- Start client. | ||
37 | -- > quitServer <- forkListener client | ||
38 | -- > -- Send a query q, recieve a response r. | ||
39 | -- > r <- sendQuery client method q | ||
40 | -- > -- Quit client. | ||
41 | -- > quitServer | ||
42 | forkListener :: Client err tbl meth tid addr x -> IO (IO ()) | ||
43 | forkListener client = do | ||
44 | thread_id <- fork $ do | ||
45 | myThreadId >>= flip labelThread "listener" | ||
46 | fix $ handleMessage client | ||
47 | return $ do | ||
48 | closeTransport (clientNet client) | ||
49 | killThread thread_id | ||
50 | |||
51 | -- | Send a query to a remote peer. Note that this funciton will always time | ||
52 | -- out if 'forkListener' was never invoked to spawn a thread receive and | ||
53 | -- dispatch the response. | ||
54 | sendQuery :: | ||
55 | forall err a b tbl x meth tid addr. | ||
56 | Client err tbl meth tid addr x -- ^ A query/response implementation. | ||
57 | -> Method addr x meth a b -- ^ Information for marshalling the query. | ||
58 | -> a -- ^ The outbound query. | ||
59 | -> addr -- ^ Destination address of query. | ||
60 | -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. | ||
61 | sendQuery (Client net d err pending whoami) meth q addr = do | ||
62 | mvar <- newEmptyMVar | ||
63 | tid <- atomically $ do | ||
64 | tbl <- readTVar pending | ||
65 | let (tid, tbl') = dispatchRegister (tableMethods d) mvar tbl | ||
66 | writeTVar pending tbl' | ||
67 | return tid | ||
68 | self <- whoami | ||
69 | sendMessage net addr (wrapQuery meth self addr q) | ||
70 | mres <- timeout (methodTimeout meth) $ takeMVar mvar | ||
71 | case mres of | ||
72 | Just x -> return $ Just $ unwrapResponse meth x | ||
73 | Nothing -> do | ||
74 | atomically $ modifyTVar' pending (dispatchCancel (tableMethods d) tid) | ||
75 | reportTimeout err (method meth) tid addr | ||
76 | return Nothing | ||
77 | |||
78 | -- * Implementing a query\/response 'Client'. | ||
79 | |||
80 | -- | All inputs required to implement a query\/response client. | ||
81 | data Client err tbl meth tid addr x = Client | ||
82 | { -- | The 'Transport' used to dispatch and receive packets. | ||
83 | clientNet :: Transport err addr x | ||
84 | -- | Methods for handling inbound packets. | ||
85 | , clientDispatcher :: DispatchMethods tbl err meth tid addr x | ||
86 | -- | Methods for reporting various conditions. | ||
87 | , clientErrorReporter :: ErrorReporter addr x meth tid err | ||
88 | -- | State necessary for routing inbound responses and assigning unique | ||
89 | -- /tid/ values for outgoing queries. | ||
90 | , clientPending :: TVar tbl | ||
91 | -- | An action yielding this client\'s own address. It is invoked once on | ||
92 | -- each outbound and inbound packet. It is valid for this to always | ||
93 | -- return the same value. | ||
94 | , clientMyAddress :: IO addr | ||
95 | } | ||
96 | |||
97 | -- | An incomming message can be classified into three cases. | ||
98 | data MessageClass err meth tid | ||
99 | = IsQuery meth -- ^ An unsolicited query is handled based on it's /meth/ value. | ||
100 | | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. | ||
101 | | IsUnknown err -- ^ None of the above. | ||
102 | |||
103 | -- | Handler for an inbound query of type _x_ from an address of type _addr_. | ||
104 | data MethodHandler err addr x = forall a b. MethodHandler | ||
105 | { -- | Parse the query into a more specific type for this method. | ||
106 | methodParse :: x -> Either err a | ||
107 | -- | Serialize the response type for transmission. Origin and destination | ||
108 | -- addresses for the packet are supplied in case they are required. | ||
109 | , methodSerialize :: addr -> addr -> b -> x | ||
110 | -- | Fully typed action to perform upon the query. The remote origin | ||
111 | -- address of the query is provided to the handler. | ||
112 | , methodAction :: addr -> a -> IO b | ||
113 | } | ||
114 | |||
115 | -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the | ||
116 | -- parse is successful, the returned IO action will construct our reply. | ||
117 | -- Otherwise, a parse err is returned. | ||
118 | dispatchQuery :: MethodHandler err addr x -- ^ Handler to invoke. | ||
119 | -> addr -- ^ Our own address, to which the query was sent. | ||
120 | -> x -- ^ The query packet. | ||
121 | -> addr -- ^ The origin address of the query. | ||
122 | -> Either err (IO x) | ||
123 | dispatchQuery (MethodHandler unwrapQ wrapR f) self x addr = | ||
124 | fmap (\a -> wrapR self addr <$> f addr a) $ unwrapQ x | ||
125 | |||
126 | -- | These four parameters are required to implement an ougoing query. A | ||
127 | -- peer-to-peer algorithm will define a 'Method' for every 'MethodHandler' that | ||
128 | -- might be returned by 'lookupHandler'. | ||
129 | data Method addr x meth a b = Method | ||
130 | { -- | Seconds to wait for a response. | ||
131 | methodTimeout :: Int | ||
132 | -- | A method identifier used for error reporting. This needn't be the | ||
133 | -- same as the /meth/ argument to 'MethodHandler', but it is suggested. | ||
134 | , method :: meth | ||
135 | -- | Serialize the outgoing query /a/ into a transmitable packet /x/. | ||
136 | -- The /addr/ arguments are, respectively, our own origin address and the | ||
137 | -- destination of the request. | ||
138 | , wrapQuery :: addr -> addr -> a -> x | ||
139 | -- | Parse an inbound packet /x/ into a response /b/ for this query. | ||
140 | , unwrapResponse :: x -> b | ||
141 | } | ||
142 | |||
143 | |||
144 | -- | Three methods are required to implement a datagram based query\/response protocol. | ||
145 | data Transport err addr x = Transport | ||
146 | { -- | Blocks until an inbound packet is available. Returns 'Nothing' when | ||
147 | -- no more packets are expected due to a shutdown or close event. | ||
148 | -- Otherwise, the packet will be parsed as type /x/ and an origin address | ||
149 | -- /addr/. Parse failure is indicated by the type 'err'. | ||
150 | awaitMessage :: IO (Maybe (Either err (x, addr))) | ||
151 | -- | Send an /x/ packet to the given destination /addr/. | ||
152 | , sendMessage :: addr -> x -> IO () | ||
153 | -- | Shutdown and clean up any state related to this 'Transport'. | ||
154 | , closeTransport :: IO () | ||
155 | } | ||
156 | |||
157 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
158 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
159 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
160 | -- associated public keys. | ||
161 | layerTransport :: | ||
162 | (x -> addr -> Either err (x', addr')) | ||
163 | -- ^ Function that attempts to transform a low-level address/packet | ||
164 | -- pair into a higher level representation. | ||
165 | -> (x' -> addr' -> (x, addr)) | ||
166 | -- ^ Function to encode a high-level address/packet into a lower level | ||
167 | -- representation. | ||
168 | -> Transport err addr x | ||
169 | -- ^ The low-level transport to be transformed. | ||
170 | -> Transport err addr' x' | ||
171 | layerTransport parse encode tr = | ||
172 | tr { awaitMessage = do | ||
173 | m <- awaitMessage tr | ||
174 | return $ fmap (>>= uncurry parse) m | ||
175 | , sendMessage = \addr' msg' -> do | ||
176 | let (msg,addr) = encode msg' addr' | ||
177 | sendMessage tr addr msg | ||
178 | } | ||
179 | |||
180 | |||
181 | -- | To dipatch responses to our outbound queries, we require three primitives. | ||
182 | -- See the 'transactionTableMethods' function to create these primitives out of a | ||
183 | -- lookup table and a generator for transaction ids. | ||
184 | -- | ||
185 | -- The type variable /d/ is used to represent the current state of the | ||
186 | -- transaction generator and the table of pending transactions. | ||
187 | data TableMethods d tid x = TableMethods | ||
188 | { | ||
189 | -- | Before a query is sent, this function stores an 'MVar' to which the | ||
190 | -- response will be written too. The returned _tid_ is a transaction id | ||
191 | -- that can be used to forget the 'MVar' if the remote peer is not | ||
192 | -- responding. | ||
193 | dispatchRegister :: MVar x -> d -> (tid, d) | ||
194 | -- | This method is invoked when an incomming packet _x_ indicates it is | ||
195 | -- a response to the transaction with id _tid_. The returned IO action | ||
196 | -- is will write the packet to the correct 'MVar' thus completing the | ||
197 | -- dispatch. | ||
198 | , dispatchResponse :: tid -> x -> d -> (d, IO ()) | ||
199 | -- | When a timeout interval elapses, this method is called to remove the | ||
200 | -- transaction from the table. | ||
201 | , dispatchCancel :: tid -> d -> d | ||
202 | } | ||
203 | |||
204 | -- | Construct 'TableMethods' methods out of 3 lookup table primitives and a | ||
205 | -- function for generating unique transaction ids. | ||
206 | transactionTableMethods :: | ||
207 | (forall a. tid -> a -> t a -> t a) | ||
208 | -- ^ Insert a new _tid_ entry into the transaction table. | ||
209 | -> (forall a. tid -> t a -> t a) | ||
210 | -- ^ Delete transaction _tid_ from the transaction table. | ||
211 | -> (forall a. tid -> t a -> Maybe a) | ||
212 | -- ^ Lookup the value associated with transaction _tid_. | ||
213 | -> (g -> (tid,g)) | ||
214 | -- ^ Generate a new unique _tid_ value and update the generator state _g_. | ||
215 | -> TableMethods (g,t (MVar x)) tid x | ||
216 | transactionTableMethods insert delete lookup generate = TableMethods | ||
217 | { dispatchCancel = \tid (g,t) -> (g, delete tid t) | ||
218 | , dispatchRegister = \v (g,t) -> | ||
219 | let (tid,g') = generate g | ||
220 | t' = insert tid v t | ||
221 | in ( tid, (g',t') ) | ||
222 | , dispatchResponse = \tid x (g,t) -> | ||
223 | case lookup tid t of | ||
224 | Just v -> let t' = delete tid t | ||
225 | in ((g,t'),void $ tryPutMVar v x) | ||
226 | Nothing -> ((g,t), return ()) | ||
227 | } | ||
228 | |||
229 | -- | A set of methods neccessary for dispatching incomming packets. | ||
230 | data DispatchMethods tbl err meth tid addr x = DispatchMethods | ||
231 | { -- | Clasify an inbound packet as a query or response. | ||
232 | classifyInbound :: x -> MessageClass err meth tid | ||
233 | -- | Lookup the handler for a inbound query. | ||
234 | , lookupHandler :: meth -> Maybe (MethodHandler err addr x) | ||
235 | -- | Methods for handling incomming responses. | ||
236 | , tableMethods :: TableMethods tbl tid x | ||
237 | } | ||
238 | |||
239 | -- | These methods indicate what should be done upon various conditions. Write | ||
240 | -- to a log file, make debug prints, or simply ignore them. | ||
241 | -- | ||
242 | -- [ /addr/ ] Address of remote peer. | ||
243 | -- | ||
244 | -- [ /x/ ] Incomming or outgoing packet. | ||
245 | -- | ||
246 | -- [ /meth/ ] Method id of incomming or outgoing request. | ||
247 | -- | ||
248 | -- [ /tid/ ] Transaction id for outgoing packet. | ||
249 | -- | ||
250 | -- [ /err/ ] Error information, typically a 'String'. | ||
251 | data ErrorReporter addr x meth tid err = ErrorReporter | ||
252 | { -- | Incomming: failed to parse packet. | ||
253 | reportParseError :: err -> IO () | ||
254 | -- | Incomming: no handler for request. | ||
255 | , reportMissingHandler :: meth -> addr -> x -> IO () | ||
256 | -- | Incomming: unable to identify request. | ||
257 | , reportUnknown :: addr -> x -> err -> IO () | ||
258 | -- | Outgoing: remote peer is not responding. | ||
259 | , reportTimeout :: meth -> tid -> addr -> IO () | ||
260 | } | ||
261 | |||
262 | -- | Handle a single inbound packet and then invoke the given continuation. | ||
263 | -- The 'forkListener' function is implemeneted by passing this function to | ||
264 | -- 'fix' in a forked thread that loops until 'awaitMessage' returns 'Nothing' | ||
265 | -- or throws an exception. | ||
266 | handleMessage :: | ||
267 | Client err tbl meth tid addr x | ||
268 | -> IO () | ||
269 | -> IO () | ||
270 | handleMessage (Client net d err pending whoami) again = do | ||
271 | awaitMessage net >>= \case | ||
272 | Just (Left e) -> do reportParseError err e | ||
273 | again | ||
274 | Just (Right (plain, addr)) -> do | ||
275 | case classifyInbound d plain of | ||
276 | IsQuery meth -> case lookupHandler d meth of | ||
277 | Nothing -> reportMissingHandler err meth addr plain | ||
278 | Just m -> do | ||
279 | self <- whoami | ||
280 | either (reportParseError err) | ||
281 | (>>= sendMessage net addr) | ||
282 | (dispatchQuery m self plain addr) | ||
283 | IsResponse tid -> do | ||
284 | action <- atomically $ do | ||
285 | ts0 <- readTVar pending | ||
286 | let (ts, action) = dispatchResponse (tableMethods d) tid plain ts0 | ||
287 | writeTVar pending ts | ||
288 | return action | ||
289 | action | ||
290 | IsUnknown e -> reportUnknown err addr plain e | ||
291 | again | ||
292 | Nothing -> return () | ||
293 | |||
294 | -- * UDP Datagrams. | ||
295 | |||
296 | -- | Access the address family of a given 'SockAddr'. This convenient accessor | ||
297 | -- is missing from 'Network.Socket', so I implemented it here. | ||
298 | sockAddrFamily :: SockAddr -> Family | ||
299 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
300 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
301 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
302 | sockAddrFamily (SockAddrCan _ ) = AF_CAN | ||
303 | |||
304 | -- | Packets with an empty payload may trigger eof exception. | ||
305 | -- 'udpTransport' uses this function to avoid throwing in that | ||
306 | -- case. | ||
307 | ignoreEOF def e | isEOFError e = pure def | ||
308 | | otherwise = throwIO e | ||
309 | |||
310 | -- | Hardcoded maximum packet size for incomming udp packets received via | ||
311 | -- 'udpTransport'. | ||
312 | udpBufferSize :: Int | ||
313 | udpBufferSize = 65536 | ||
314 | |||
315 | -- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The | ||
316 | -- argument is the listen-address for incomming packets. This is a useful | ||
317 | -- low-level 'Transport' that can be transformed for higher-level protocols | ||
318 | -- using 'layerTransport'. | ||
319 | udpTransport :: SockAddr -> IO (Transport err SockAddr ByteString) | ||
320 | udpTransport bind_address = do | ||
321 | let family = sockAddrFamily bind_address | ||
322 | sock <- socket family Datagram defaultProtocol | ||
323 | when (family == AF_INET6) $ do | ||
324 | setSocketOption sock IPv6Only 0 | ||
325 | bind sock bind_address | ||
326 | return Transport | ||
327 | { awaitMessage = handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do | ||
328 | r <- B.recvFrom sock udpBufferSize | ||
329 | return $ Just $ Right r | ||
330 | , sendMessage = \addr bs -> void $ B.sendTo sock bs addr | ||
331 | , closeTransport = close sock | ||
332 | } | ||