diff options
Diffstat (limited to 'src/Network/QueryResponse.hs')
-rw-r--r-- | src/Network/QueryResponse.hs | 143 |
1 files changed, 75 insertions, 68 deletions
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs index c8a6fa80..190cc73e 100644 --- a/src/Network/QueryResponse.hs +++ b/src/Network/QueryResponse.hs | |||
@@ -7,6 +7,7 @@ | |||
7 | {-# LANGUAGE PartialTypeSignatures #-} | 7 | {-# LANGUAGE PartialTypeSignatures #-} |
8 | {-# LANGUAGE RankNTypes #-} | 8 | {-# LANGUAGE RankNTypes #-} |
9 | {-# LANGUAGE ScopedTypeVariables #-} | 9 | {-# LANGUAGE ScopedTypeVariables #-} |
10 | {-# LANGUAGE TupleSections #-} | ||
10 | module Network.QueryResponse where | 11 | module Network.QueryResponse where |
11 | 12 | ||
12 | #ifdef THREAD_DEBUG | 13 | #ifdef THREAD_DEBUG |
@@ -33,7 +34,55 @@ import System.IO | |||
33 | import System.IO.Error | 34 | import System.IO.Error |
34 | import System.Timeout | 35 | import System.Timeout |
35 | 36 | ||
36 | -- * Using a query\/response 'Client'. | 37 | -- | Three methods are required to implement a datagram based query\/response protocol. |
38 | data Transport err addr x = Transport | ||
39 | { -- | Blocks until an inbound packet is available. Returns 'Nothing' when | ||
40 | -- no more packets are expected due to a shutdown or close event. | ||
41 | -- Otherwise, the packet will be parsed as type /x/ and an origin address | ||
42 | -- /addr/. Parse failure is indicated by the type 'err'. | ||
43 | awaitMessage :: forall a. (Maybe (Either err (x, addr)) -> IO a) -> IO a | ||
44 | -- | Send an /x/ packet to the given destination /addr/. | ||
45 | , sendMessage :: addr -> x -> IO () | ||
46 | -- | Shutdown and clean up any state related to this 'Transport'. | ||
47 | , closeTransport :: IO () | ||
48 | } | ||
49 | |||
50 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
51 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
52 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
53 | -- associated public keys. | ||
54 | layerTransport :: | ||
55 | (x -> addr -> Either err (x', addr')) | ||
56 | -- ^ Function that attempts to transform a low-level address/packet | ||
57 | -- pair into a higher level representation. | ||
58 | -> (x' -> addr' -> (x, addr)) | ||
59 | -- ^ Function to encode a high-level address/packet into a lower level | ||
60 | -- representation. | ||
61 | -> Transport err addr x | ||
62 | -- ^ The low-level transport to be transformed. | ||
63 | -> Transport err addr' x' | ||
64 | layerTransport parse encode tr = | ||
65 | tr { awaitMessage = \kont -> | ||
66 | awaitMessage tr $ \m -> kont $ fmap (>>= uncurry parse) m | ||
67 | , sendMessage = \addr' msg' -> do | ||
68 | let (msg,addr) = encode msg' addr' | ||
69 | sendMessage tr addr msg | ||
70 | } | ||
71 | |||
72 | addHandler :: (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x | ||
73 | addHandler f tr = tr | ||
74 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \m -> do | ||
75 | case m of | ||
76 | Just (Right (x, addr)) -> f addr x >>= maybe eat (kont . Just . Right . (, addr) . ($ x)) | ||
77 | Just (Left e ) -> kont $ Just (Left e) | ||
78 | Nothing -> kont $ Nothing | ||
79 | } | ||
80 | |||
81 | -- | Modify a 'Transport' to invoke an action upon every received packet. | ||
82 | onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x | ||
83 | onInbound f tr = addHandler (\addr x -> f addr x >> return (Just id)) tr | ||
84 | |||
85 | -- * Using a query\/response client. | ||
37 | 86 | ||
38 | -- | Fork a thread that handles inbound packets. The returned action may be used | 87 | -- | Fork a thread that handles inbound packets. The returned action may be used |
39 | -- to terminate the thread and clean up any related state. | 88 | -- to terminate the thread and clean up any related state. |
@@ -41,18 +90,18 @@ import System.Timeout | |||
41 | -- Example usage: | 90 | -- Example usage: |
42 | -- | 91 | -- |
43 | -- > -- Start client. | 92 | -- > -- Start client. |
44 | -- > quitServer <- forkListener client | 93 | -- > quitServer <- forkListener (clientNet client) |
45 | -- > -- Send a query q, recieve a response r. | 94 | -- > -- Send a query q, recieve a response r. |
46 | -- > r <- sendQuery client method q | 95 | -- > r <- sendQuery client method q |
47 | -- > -- Quit client. | 96 | -- > -- Quit client. |
48 | -- > quitServer | 97 | -- > quitServer |
49 | forkListener :: Client err meth tid addr x -> IO (IO ()) | 98 | forkListener :: Transport err addr x -> IO (IO ()) |
50 | forkListener client = do | 99 | forkListener client = do |
51 | thread_id <- forkIO $ do | 100 | thread_id <- forkIO $ do |
52 | myThreadId >>= flip labelThread "listener" | 101 | myThreadId >>= flip labelThread "listener" |
53 | fix $ handleMessage client | 102 | fix $ awaitMessage client . const |
54 | return $ do | 103 | return $ do |
55 | closeTransport (clientNet client) | 104 | closeTransport client |
56 | killThread thread_id | 105 | killThread thread_id |
57 | 106 | ||
58 | -- | Send a query to a remote peer. Note that this funciton will always time | 107 | -- | Send a query to a remote peer. Note that this funciton will always time |
@@ -165,53 +214,6 @@ data MethodSerializer tid addr x meth a b = MethodSerializer | |||
165 | } | 214 | } |
166 | 215 | ||
167 | 216 | ||
168 | -- | Three methods are required to implement a datagram based query\/response protocol. | ||
169 | data Transport err addr x = Transport | ||
170 | { -- | Blocks until an inbound packet is available. Returns 'Nothing' when | ||
171 | -- no more packets are expected due to a shutdown or close event. | ||
172 | -- Otherwise, the packet will be parsed as type /x/ and an origin address | ||
173 | -- /addr/. Parse failure is indicated by the type 'err'. | ||
174 | awaitMessage :: IO (Maybe (Either err (x, addr))) | ||
175 | -- | Send an /x/ packet to the given destination /addr/. | ||
176 | , sendMessage :: addr -> x -> IO () | ||
177 | -- | Shutdown and clean up any state related to this 'Transport'. | ||
178 | , closeTransport :: IO () | ||
179 | } | ||
180 | |||
181 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
182 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
183 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
184 | -- associated public keys. | ||
185 | layerTransport :: | ||
186 | (x -> addr -> Either err (x', addr')) | ||
187 | -- ^ Function that attempts to transform a low-level address/packet | ||
188 | -- pair into a higher level representation. | ||
189 | -> (x' -> addr' -> (x, addr)) | ||
190 | -- ^ Function to encode a high-level address/packet into a lower level | ||
191 | -- representation. | ||
192 | -> Transport err addr x | ||
193 | -- ^ The low-level transport to be transformed. | ||
194 | -> Transport err addr' x' | ||
195 | layerTransport parse encode tr = | ||
196 | tr { awaitMessage = do | ||
197 | m <- awaitMessage tr | ||
198 | return $ fmap (>>= uncurry parse) m | ||
199 | , sendMessage = \addr' msg' -> do | ||
200 | let (msg,addr) = encode msg' addr' | ||
201 | sendMessage tr addr msg | ||
202 | } | ||
203 | |||
204 | -- | Modify a 'Transport' to invoke an action upon every received packet. | ||
205 | onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x | ||
206 | onInbound f tr = tr | ||
207 | { awaitMessage = do | ||
208 | m <- awaitMessage tr | ||
209 | case m of | ||
210 | Just (Right (x, addr)) -> f addr x | ||
211 | _ -> return () | ||
212 | return m | ||
213 | } | ||
214 | |||
215 | -- | To dipatch responses to our outbound queries, we require three primitives. | 217 | -- | To dipatch responses to our outbound queries, we require three primitives. |
216 | -- See the 'transactionMethods' function to create these primitives out of a | 218 | -- See the 'transactionMethods' function to create these primitives out of a |
217 | -- lookup table and a generator for transaction ids. | 219 | -- lookup table and a generator for transaction ids. |
@@ -360,21 +362,24 @@ contramapE f (ErrorReporter pe mh unk tim) | |||
360 | -- or throws an exception. | 362 | -- or throws an exception. |
361 | handleMessage :: | 363 | handleMessage :: |
362 | Client err meth tid addr x | 364 | Client err meth tid addr x |
363 | -> IO () | 365 | -> addr |
364 | -> IO () | 366 | -> x |
365 | handleMessage (Client net d err pending whoami responseID) again = do | 367 | -> IO (Maybe (x -> x)) |
366 | awaitMessage net >>= \case | 368 | handleMessage (Client net d err pending whoami responseID) addr plain = do |
367 | Just (Left e) -> do reportParseError err e | 369 | -- Just (Left e) -> do reportParseError err e |
368 | again | 370 | -- return $! Just id |
369 | Just (Right (plain, addr)) -> do | 371 | -- Just (Right (plain, addr)) -> do |
370 | case classifyInbound d plain of | 372 | case classifyInbound d plain of |
371 | IsQuery meth tid -> case lookupHandler d meth of | 373 | IsQuery meth tid -> case lookupHandler d meth of |
372 | Nothing -> reportMissingHandler err meth addr plain | 374 | Nothing -> do reportMissingHandler err meth addr plain |
375 | return $! Just id | ||
373 | Just m -> do | 376 | Just m -> do |
374 | self <- whoami (Just addr) | 377 | self <- whoami (Just addr) |
375 | tid' <- responseID tid | 378 | tid' <- responseID tid |
376 | either (reportParseError err) | 379 | either (\e -> do reportParseError err e |
377 | (>>= mapM_ (sendMessage net addr)) | 380 | return $! Just id) |
381 | (>>= \m -> do mapM_ (sendMessage net addr) m | ||
382 | return $! Nothing) | ||
378 | (dispatchQuery m tid' self plain addr) | 383 | (dispatchQuery m tid' self plain addr) |
379 | IsResponse tid -> do | 384 | IsResponse tid -> do |
380 | action <- atomically $ do | 385 | action <- atomically $ do |
@@ -383,9 +388,10 @@ handleMessage (Client net d err pending whoami responseID) again = do | |||
383 | writeTVar pending ts | 388 | writeTVar pending ts |
384 | return action | 389 | return action |
385 | action | 390 | action |
386 | IsUnknown e -> reportUnknown err addr plain e | 391 | return $! Nothing |
387 | again | 392 | IsUnknown e -> do reportUnknown err addr plain e |
388 | Nothing -> return () | 393 | return $! Just id |
394 | -- Nothing -> return $! id | ||
389 | 395 | ||
390 | -- * UDP Datagrams. | 396 | -- * UDP Datagrams. |
391 | 397 | ||
@@ -420,9 +426,10 @@ udpTransport bind_address = do | |||
420 | setSocketOption sock IPv6Only 0 | 426 | setSocketOption sock IPv6Only 0 |
421 | bind sock bind_address | 427 | bind sock bind_address |
422 | return Transport | 428 | return Transport |
423 | { awaitMessage = handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do | 429 | { awaitMessage = \kont -> do |
424 | r <- B.recvFrom sock udpBufferSize | 430 | r <- handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do |
425 | return $ Just $ Right r | 431 | Just . Right <$!> B.recvFrom sock udpBufferSize |
432 | kont $! r | ||
426 | , sendMessage = \addr bs -> void $ B.sendTo sock bs addr | 433 | , sendMessage = \addr bs -> void $ B.sendTo sock bs addr |
427 | -- TODO: sendTo: does not exist (Network is unreachable) | 434 | -- TODO: sendTo: does not exist (Network is unreachable) |
428 | -- Occurs when IPv6 network is not available. | 435 | -- Occurs when IPv6 network is not available. |