diff options
author | joe <joe@jerkface.net> | 2017-08-04 18:34:18 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-08-04 18:34:18 -0400 |
commit | 1f1bcd70f5c0b7d3c1a135fa8b53a03b507442c4 (patch) | |
tree | d74c8f5e8f6acb025ec939d12ff26e275f72be43 | |
parent | 4198ce253ea9ef9184b325e4bb8d18fcc483b381 (diff) |
Switched awaitMessage to continuation-passing style.
-rw-r--r-- | Mainline.hs | 9 | ||||
-rw-r--r-- | Tox.hs | 9 | ||||
-rw-r--r-- | examples/dhtd.hs | 4 | ||||
-rw-r--r-- | src/Network/QueryResponse.hs | 143 |
4 files changed, 86 insertions, 79 deletions
diff --git a/Mainline.hs b/Mainline.hs index 291a196f..860372dc 100644 --- a/Mainline.hs +++ b/Mainline.hs | |||
@@ -419,11 +419,10 @@ showPacket f addr flow bs = L8.unpack $ L8.unlines es | |||
419 | 419 | ||
420 | -- Add detailed printouts for every packet. | 420 | -- Add detailed printouts for every packet. |
421 | addVerbosity tr = | 421 | addVerbosity tr = |
422 | tr { awaitMessage = do | 422 | tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do |
423 | m <- awaitMessage tr | ||
424 | forM_ m $ mapM_ $ \(msg,addr) -> do | 423 | forM_ m $ mapM_ $ \(msg,addr) -> do |
425 | hPutStrLn stderr (showPacket id addr " --> " msg) | 424 | hPutStrLn stderr (showPacket id addr " --> " msg) |
426 | return m | 425 | kont m |
427 | , sendMessage = \addr msg -> do | 426 | , sendMessage = \addr msg -> do |
428 | hPutStrLn stderr (showPacket id addr " <-- " msg) | 427 | hPutStrLn stderr (showPacket id addr " <-- " msg) |
429 | sendMessage tr addr msg | 428 | sendMessage tr addr msg |
@@ -566,7 +565,7 @@ newClient addr = do | |||
566 | -- recursive since 'updateRouting' does not invoke 'awaitMessage' which | 565 | -- recursive since 'updateRouting' does not invoke 'awaitMessage' which |
567 | -- which was modified by 'onInbound'. However, I'm going to avoid the | 566 | -- which was modified by 'onInbound'. However, I'm going to avoid the |
568 | -- mutual reference just to be safe. | 567 | -- mutual reference just to be safe. |
569 | outgoingClient = client { clientNet = net { awaitMessage = return Nothing } } | 568 | outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } } |
570 | 569 | ||
571 | dispatch = DispatchMethods | 570 | dispatch = DispatchMethods |
572 | { classifyInbound = classify -- :: x -> MessageClass err meth tid | 571 | { classifyInbound = classify -- :: x -> MessageClass err meth tid |
@@ -587,7 +586,7 @@ newClient addr = do | |||
587 | gen cnt = (TransactionId $ S.encode cnt, cnt+1) | 586 | gen cnt = (TransactionId $ S.encode cnt, cnt+1) |
588 | 587 | ||
589 | client = Client | 588 | client = Client |
590 | { clientNet = net | 589 | { clientNet = addHandler (handleMessage client) net |
591 | , clientDispatcher = dispatch | 590 | , clientDispatcher = dispatch |
592 | , clientErrorReporter = ignoreErrors -- printErrors stderr | 591 | , clientErrorReporter = ignoreErrors -- printErrors stderr |
593 | , clientPending = map_var | 592 | , clientPending = map_var |
@@ -597,7 +597,7 @@ newClient addr = do | |||
597 | -- recursive since 'updateRouting' does not invoke 'awaitMessage' which | 597 | -- recursive since 'updateRouting' does not invoke 'awaitMessage' which |
598 | -- which was modified by 'onInbound'. However, I'm going to avoid the | 598 | -- which was modified by 'onInbound'. However, I'm going to avoid the |
599 | -- mutual reference just to be safe. | 599 | -- mutual reference just to be safe. |
600 | outgoingClient = client { clientNet = net { awaitMessage = return Nothing } } | 600 | outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } } |
601 | 601 | ||
602 | dispatch tbl var = DispatchMethods | 602 | dispatch tbl var = DispatchMethods |
603 | { classifyInbound = classify | 603 | { classifyInbound = classify |
@@ -608,6 +608,7 @@ newClient addr = do | |||
608 | -- handlers :: TVar -> Method -> Maybe Handler | 608 | -- handlers :: TVar -> Method -> Maybe Handler |
609 | handlers var PingType = handler PongType pingH | 609 | handlers var PingType = handler PongType pingH |
610 | handlers var GetNodesType = handler SendNodesType $ getNodesH routing | 610 | handlers var GetNodesType = handler SendNodesType $ getNodesH routing |
611 | {- | ||
611 | handlers var OnionRequest0 = noreply OnionRequest0 | 612 | handlers var OnionRequest0 = noreply OnionRequest0 |
612 | $ onionSend0H (symmetricCipher (return symkey) | 613 | $ onionSend0H (symmetricCipher (return symkey) |
613 | (fst <$> readTVar var) | 614 | (fst <$> readTVar var) |
@@ -616,6 +617,7 @@ newClient addr = do | |||
616 | handlers var OnionResponse1 = noreply OnionResponse1 | 617 | handlers var OnionResponse1 = noreply OnionResponse1 |
617 | $ onionResponse1H (symmetricDecipher (return symkey)) | 618 | $ onionResponse1H (symmetricDecipher (return symkey)) |
618 | udp | 619 | udp |
620 | -} | ||
619 | handlers var _ = Nothing | 621 | handlers var _ = Nothing |
620 | -- TODO DHTRequest public key (onion) | 622 | -- TODO DHTRequest public key (onion) |
621 | -- TODO DHTRequest NAT ping | 623 | -- TODO DHTRequest NAT ping |
@@ -690,12 +692,11 @@ dropEnd8 bs = B.take (B.length bs - 8) bs | |||
690 | 692 | ||
691 | -- Add detailed printouts for every packet. | 693 | -- Add detailed printouts for every packet. |
692 | addVerbosity tr = | 694 | addVerbosity tr = |
693 | tr { awaitMessage = do | 695 | tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do |
694 | m <- awaitMessage tr | ||
695 | forM_ m $ mapM_ $ \(msg,addr) -> do | 696 | forM_ m $ mapM_ $ \(msg,addr) -> do |
696 | hPutStrLn stderr ( (show addr) | 697 | hPutStrLn stderr ( (show addr) |
697 | ++ " --> " ++ show (msgType msg)) | 698 | ++ " --> " ++ show (msgType msg)) |
698 | return m | 699 | kont m |
699 | , sendMessage = \addr msg -> do | 700 | , sendMessage = \addr msg -> do |
700 | hPutStrLn stderr ( (show addr) | 701 | hPutStrLn stderr ( (show addr) |
701 | ++ " <-- " ++ show (msgType msg)) | 702 | ++ " <-- " ++ show (msgType msg)) |
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index f0a48bb2..8e8d47a2 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -469,13 +469,13 @@ main = do | |||
469 | (atomically . writeTVar (Mainline.contactInfo swarms)) | 469 | (atomically . writeTVar (Mainline.contactInfo swarms)) |
470 | (peerdb >>= S.decodeLazy) | 470 | (peerdb >>= S.decodeLazy) |
471 | 471 | ||
472 | quitBt <- forkListener bt | 472 | quitBt <- forkListener (clientNet bt) |
473 | 473 | ||
474 | let toxport = succ $ fromMaybe 33445 (fromIntegral <$> sockAddrPort addr) | 474 | let toxport = succ $ fromMaybe 33445 (fromIntegral <$> sockAddrPort addr) |
475 | addrTox <- getBindAddress (show toxport) True | 475 | addrTox <- getBindAddress (show toxport) True |
476 | (tox,toxR) <- Tox.newClient addrTox | 476 | (tox,toxR) <- Tox.newClient addrTox |
477 | 477 | ||
478 | quitTox <- forkListener tox | 478 | quitTox <- forkListener (clientNet tox) |
479 | 479 | ||
480 | mainlineSearches <- atomically $ newTVar Map.empty | 480 | mainlineSearches <- atomically $ newTVar Map.empty |
481 | toxSearches <- atomically $ newTVar Map.empty | 481 | toxSearches <- atomically $ newTVar Map.empty |
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs index c8a6fa80..190cc73e 100644 --- a/src/Network/QueryResponse.hs +++ b/src/Network/QueryResponse.hs | |||
@@ -7,6 +7,7 @@ | |||
7 | {-# LANGUAGE PartialTypeSignatures #-} | 7 | {-# LANGUAGE PartialTypeSignatures #-} |
8 | {-# LANGUAGE RankNTypes #-} | 8 | {-# LANGUAGE RankNTypes #-} |
9 | {-# LANGUAGE ScopedTypeVariables #-} | 9 | {-# LANGUAGE ScopedTypeVariables #-} |
10 | {-# LANGUAGE TupleSections #-} | ||
10 | module Network.QueryResponse where | 11 | module Network.QueryResponse where |
11 | 12 | ||
12 | #ifdef THREAD_DEBUG | 13 | #ifdef THREAD_DEBUG |
@@ -33,7 +34,55 @@ import System.IO | |||
33 | import System.IO.Error | 34 | import System.IO.Error |
34 | import System.Timeout | 35 | import System.Timeout |
35 | 36 | ||
36 | -- * Using a query\/response 'Client'. | 37 | -- | Three methods are required to implement a datagram based query\/response protocol. |
38 | data Transport err addr x = Transport | ||
39 | { -- | Blocks until an inbound packet is available. Returns 'Nothing' when | ||
40 | -- no more packets are expected due to a shutdown or close event. | ||
41 | -- Otherwise, the packet will be parsed as type /x/ and an origin address | ||
42 | -- /addr/. Parse failure is indicated by the type 'err'. | ||
43 | awaitMessage :: forall a. (Maybe (Either err (x, addr)) -> IO a) -> IO a | ||
44 | -- | Send an /x/ packet to the given destination /addr/. | ||
45 | , sendMessage :: addr -> x -> IO () | ||
46 | -- | Shutdown and clean up any state related to this 'Transport'. | ||
47 | , closeTransport :: IO () | ||
48 | } | ||
49 | |||
50 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
51 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
52 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
53 | -- associated public keys. | ||
54 | layerTransport :: | ||
55 | (x -> addr -> Either err (x', addr')) | ||
56 | -- ^ Function that attempts to transform a low-level address/packet | ||
57 | -- pair into a higher level representation. | ||
58 | -> (x' -> addr' -> (x, addr)) | ||
59 | -- ^ Function to encode a high-level address/packet into a lower level | ||
60 | -- representation. | ||
61 | -> Transport err addr x | ||
62 | -- ^ The low-level transport to be transformed. | ||
63 | -> Transport err addr' x' | ||
64 | layerTransport parse encode tr = | ||
65 | tr { awaitMessage = \kont -> | ||
66 | awaitMessage tr $ \m -> kont $ fmap (>>= uncurry parse) m | ||
67 | , sendMessage = \addr' msg' -> do | ||
68 | let (msg,addr) = encode msg' addr' | ||
69 | sendMessage tr addr msg | ||
70 | } | ||
71 | |||
72 | addHandler :: (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x | ||
73 | addHandler f tr = tr | ||
74 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \m -> do | ||
75 | case m of | ||
76 | Just (Right (x, addr)) -> f addr x >>= maybe eat (kont . Just . Right . (, addr) . ($ x)) | ||
77 | Just (Left e ) -> kont $ Just (Left e) | ||
78 | Nothing -> kont $ Nothing | ||
79 | } | ||
80 | |||
81 | -- | Modify a 'Transport' to invoke an action upon every received packet. | ||
82 | onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x | ||
83 | onInbound f tr = addHandler (\addr x -> f addr x >> return (Just id)) tr | ||
84 | |||
85 | -- * Using a query\/response client. | ||
37 | 86 | ||
38 | -- | Fork a thread that handles inbound packets. The returned action may be used | 87 | -- | Fork a thread that handles inbound packets. The returned action may be used |
39 | -- to terminate the thread and clean up any related state. | 88 | -- to terminate the thread and clean up any related state. |
@@ -41,18 +90,18 @@ import System.Timeout | |||
41 | -- Example usage: | 90 | -- Example usage: |
42 | -- | 91 | -- |
43 | -- > -- Start client. | 92 | -- > -- Start client. |
44 | -- > quitServer <- forkListener client | 93 | -- > quitServer <- forkListener (clientNet client) |
45 | -- > -- Send a query q, recieve a response r. | 94 | -- > -- Send a query q, recieve a response r. |
46 | -- > r <- sendQuery client method q | 95 | -- > r <- sendQuery client method q |
47 | -- > -- Quit client. | 96 | -- > -- Quit client. |
48 | -- > quitServer | 97 | -- > quitServer |
49 | forkListener :: Client err meth tid addr x -> IO (IO ()) | 98 | forkListener :: Transport err addr x -> IO (IO ()) |
50 | forkListener client = do | 99 | forkListener client = do |
51 | thread_id <- forkIO $ do | 100 | thread_id <- forkIO $ do |
52 | myThreadId >>= flip labelThread "listener" | 101 | myThreadId >>= flip labelThread "listener" |
53 | fix $ handleMessage client | 102 | fix $ awaitMessage client . const |
54 | return $ do | 103 | return $ do |
55 | closeTransport (clientNet client) | 104 | closeTransport client |
56 | killThread thread_id | 105 | killThread thread_id |
57 | 106 | ||
58 | -- | Send a query to a remote peer. Note that this funciton will always time | 107 | -- | Send a query to a remote peer. Note that this funciton will always time |
@@ -165,53 +214,6 @@ data MethodSerializer tid addr x meth a b = MethodSerializer | |||
165 | } | 214 | } |
166 | 215 | ||
167 | 216 | ||
168 | -- | Three methods are required to implement a datagram based query\/response protocol. | ||
169 | data Transport err addr x = Transport | ||
170 | { -- | Blocks until an inbound packet is available. Returns 'Nothing' when | ||
171 | -- no more packets are expected due to a shutdown or close event. | ||
172 | -- Otherwise, the packet will be parsed as type /x/ and an origin address | ||
173 | -- /addr/. Parse failure is indicated by the type 'err'. | ||
174 | awaitMessage :: IO (Maybe (Either err (x, addr))) | ||
175 | -- | Send an /x/ packet to the given destination /addr/. | ||
176 | , sendMessage :: addr -> x -> IO () | ||
177 | -- | Shutdown and clean up any state related to this 'Transport'. | ||
178 | , closeTransport :: IO () | ||
179 | } | ||
180 | |||
181 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
182 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
183 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
184 | -- associated public keys. | ||
185 | layerTransport :: | ||
186 | (x -> addr -> Either err (x', addr')) | ||
187 | -- ^ Function that attempts to transform a low-level address/packet | ||
188 | -- pair into a higher level representation. | ||
189 | -> (x' -> addr' -> (x, addr)) | ||
190 | -- ^ Function to encode a high-level address/packet into a lower level | ||
191 | -- representation. | ||
192 | -> Transport err addr x | ||
193 | -- ^ The low-level transport to be transformed. | ||
194 | -> Transport err addr' x' | ||
195 | layerTransport parse encode tr = | ||
196 | tr { awaitMessage = do | ||
197 | m <- awaitMessage tr | ||
198 | return $ fmap (>>= uncurry parse) m | ||
199 | , sendMessage = \addr' msg' -> do | ||
200 | let (msg,addr) = encode msg' addr' | ||
201 | sendMessage tr addr msg | ||
202 | } | ||
203 | |||
204 | -- | Modify a 'Transport' to invoke an action upon every received packet. | ||
205 | onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x | ||
206 | onInbound f tr = tr | ||
207 | { awaitMessage = do | ||
208 | m <- awaitMessage tr | ||
209 | case m of | ||
210 | Just (Right (x, addr)) -> f addr x | ||
211 | _ -> return () | ||
212 | return m | ||
213 | } | ||
214 | |||
215 | -- | To dipatch responses to our outbound queries, we require three primitives. | 217 | -- | To dipatch responses to our outbound queries, we require three primitives. |
216 | -- See the 'transactionMethods' function to create these primitives out of a | 218 | -- See the 'transactionMethods' function to create these primitives out of a |
217 | -- lookup table and a generator for transaction ids. | 219 | -- lookup table and a generator for transaction ids. |
@@ -360,21 +362,24 @@ contramapE f (ErrorReporter pe mh unk tim) | |||
360 | -- or throws an exception. | 362 | -- or throws an exception. |
361 | handleMessage :: | 363 | handleMessage :: |
362 | Client err meth tid addr x | 364 | Client err meth tid addr x |
363 | -> IO () | 365 | -> addr |
364 | -> IO () | 366 | -> x |
365 | handleMessage (Client net d err pending whoami responseID) again = do | 367 | -> IO (Maybe (x -> x)) |
366 | awaitMessage net >>= \case | 368 | handleMessage (Client net d err pending whoami responseID) addr plain = do |
367 | Just (Left e) -> do reportParseError err e | 369 | -- Just (Left e) -> do reportParseError err e |
368 | again | 370 | -- return $! Just id |
369 | Just (Right (plain, addr)) -> do | 371 | -- Just (Right (plain, addr)) -> do |
370 | case classifyInbound d plain of | 372 | case classifyInbound d plain of |
371 | IsQuery meth tid -> case lookupHandler d meth of | 373 | IsQuery meth tid -> case lookupHandler d meth of |
372 | Nothing -> reportMissingHandler err meth addr plain | 374 | Nothing -> do reportMissingHandler err meth addr plain |
375 | return $! Just id | ||
373 | Just m -> do | 376 | Just m -> do |
374 | self <- whoami (Just addr) | 377 | self <- whoami (Just addr) |
375 | tid' <- responseID tid | 378 | tid' <- responseID tid |
376 | either (reportParseError err) | 379 | either (\e -> do reportParseError err e |
377 | (>>= mapM_ (sendMessage net addr)) | 380 | return $! Just id) |
381 | (>>= \m -> do mapM_ (sendMessage net addr) m | ||
382 | return $! Nothing) | ||
378 | (dispatchQuery m tid' self plain addr) | 383 | (dispatchQuery m tid' self plain addr) |
379 | IsResponse tid -> do | 384 | IsResponse tid -> do |
380 | action <- atomically $ do | 385 | action <- atomically $ do |
@@ -383,9 +388,10 @@ handleMessage (Client net d err pending whoami responseID) again = do | |||
383 | writeTVar pending ts | 388 | writeTVar pending ts |
384 | return action | 389 | return action |
385 | action | 390 | action |
386 | IsUnknown e -> reportUnknown err addr plain e | 391 | return $! Nothing |
387 | again | 392 | IsUnknown e -> do reportUnknown err addr plain e |
388 | Nothing -> return () | 393 | return $! Just id |
394 | -- Nothing -> return $! id | ||
389 | 395 | ||
390 | -- * UDP Datagrams. | 396 | -- * UDP Datagrams. |
391 | 397 | ||
@@ -420,9 +426,10 @@ udpTransport bind_address = do | |||
420 | setSocketOption sock IPv6Only 0 | 426 | setSocketOption sock IPv6Only 0 |
421 | bind sock bind_address | 427 | bind sock bind_address |
422 | return Transport | 428 | return Transport |
423 | { awaitMessage = handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do | 429 | { awaitMessage = \kont -> do |
424 | r <- B.recvFrom sock udpBufferSize | 430 | r <- handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do |
425 | return $ Just $ Right r | 431 | Just . Right <$!> B.recvFrom sock udpBufferSize |
432 | kont $! r | ||
426 | , sendMessage = \addr bs -> void $ B.sendTo sock bs addr | 433 | , sendMessage = \addr bs -> void $ B.sendTo sock bs addr |
427 | -- TODO: sendTo: does not exist (Network is unreachable) | 434 | -- TODO: sendTo: does not exist (Network is unreachable) |
428 | -- Occurs when IPv6 network is not available. | 435 | -- Occurs when IPv6 network is not available. |