summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-08-04 18:34:18 -0400
committerjoe <joe@jerkface.net>2017-08-04 18:34:18 -0400
commit1f1bcd70f5c0b7d3c1a135fa8b53a03b507442c4 (patch)
treed74c8f5e8f6acb025ec939d12ff26e275f72be43
parent4198ce253ea9ef9184b325e4bb8d18fcc483b381 (diff)
Switched awaitMessage to continuation-passing style.
-rw-r--r--Mainline.hs9
-rw-r--r--Tox.hs9
-rw-r--r--examples/dhtd.hs4
-rw-r--r--src/Network/QueryResponse.hs143
4 files changed, 86 insertions, 79 deletions
diff --git a/Mainline.hs b/Mainline.hs
index 291a196f..860372dc 100644
--- a/Mainline.hs
+++ b/Mainline.hs
@@ -419,11 +419,10 @@ showPacket f addr flow bs = L8.unpack $ L8.unlines es
419 419
420-- Add detailed printouts for every packet. 420-- Add detailed printouts for every packet.
421addVerbosity tr = 421addVerbosity tr =
422 tr { awaitMessage = do 422 tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do
423 m <- awaitMessage tr
424 forM_ m $ mapM_ $ \(msg,addr) -> do 423 forM_ m $ mapM_ $ \(msg,addr) -> do
425 hPutStrLn stderr (showPacket id addr " --> " msg) 424 hPutStrLn stderr (showPacket id addr " --> " msg)
426 return m 425 kont m
427 , sendMessage = \addr msg -> do 426 , sendMessage = \addr msg -> do
428 hPutStrLn stderr (showPacket id addr " <-- " msg) 427 hPutStrLn stderr (showPacket id addr " <-- " msg)
429 sendMessage tr addr msg 428 sendMessage tr addr msg
@@ -566,7 +565,7 @@ newClient addr = do
566 -- recursive since 'updateRouting' does not invoke 'awaitMessage' which 565 -- recursive since 'updateRouting' does not invoke 'awaitMessage' which
567 -- which was modified by 'onInbound'. However, I'm going to avoid the 566 -- which was modified by 'onInbound'. However, I'm going to avoid the
568 -- mutual reference just to be safe. 567 -- mutual reference just to be safe.
569 outgoingClient = client { clientNet = net { awaitMessage = return Nothing } } 568 outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } }
570 569
571 dispatch = DispatchMethods 570 dispatch = DispatchMethods
572 { classifyInbound = classify -- :: x -> MessageClass err meth tid 571 { classifyInbound = classify -- :: x -> MessageClass err meth tid
@@ -587,7 +586,7 @@ newClient addr = do
587 gen cnt = (TransactionId $ S.encode cnt, cnt+1) 586 gen cnt = (TransactionId $ S.encode cnt, cnt+1)
588 587
589 client = Client 588 client = Client
590 { clientNet = net 589 { clientNet = addHandler (handleMessage client) net
591 , clientDispatcher = dispatch 590 , clientDispatcher = dispatch
592 , clientErrorReporter = ignoreErrors -- printErrors stderr 591 , clientErrorReporter = ignoreErrors -- printErrors stderr
593 , clientPending = map_var 592 , clientPending = map_var
diff --git a/Tox.hs b/Tox.hs
index 253c83e7..bd5ebbc2 100644
--- a/Tox.hs
+++ b/Tox.hs
@@ -597,7 +597,7 @@ newClient addr = do
597 -- recursive since 'updateRouting' does not invoke 'awaitMessage' which 597 -- recursive since 'updateRouting' does not invoke 'awaitMessage' which
598 -- which was modified by 'onInbound'. However, I'm going to avoid the 598 -- which was modified by 'onInbound'. However, I'm going to avoid the
599 -- mutual reference just to be safe. 599 -- mutual reference just to be safe.
600 outgoingClient = client { clientNet = net { awaitMessage = return Nothing } } 600 outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } }
601 601
602 dispatch tbl var = DispatchMethods 602 dispatch tbl var = DispatchMethods
603 { classifyInbound = classify 603 { classifyInbound = classify
@@ -608,6 +608,7 @@ newClient addr = do
608 -- handlers :: TVar -> Method -> Maybe Handler 608 -- handlers :: TVar -> Method -> Maybe Handler
609 handlers var PingType = handler PongType pingH 609 handlers var PingType = handler PongType pingH
610 handlers var GetNodesType = handler SendNodesType $ getNodesH routing 610 handlers var GetNodesType = handler SendNodesType $ getNodesH routing
611 {-
611 handlers var OnionRequest0 = noreply OnionRequest0 612 handlers var OnionRequest0 = noreply OnionRequest0
612 $ onionSend0H (symmetricCipher (return symkey) 613 $ onionSend0H (symmetricCipher (return symkey)
613 (fst <$> readTVar var) 614 (fst <$> readTVar var)
@@ -616,6 +617,7 @@ newClient addr = do
616 handlers var OnionResponse1 = noreply OnionResponse1 617 handlers var OnionResponse1 = noreply OnionResponse1
617 $ onionResponse1H (symmetricDecipher (return symkey)) 618 $ onionResponse1H (symmetricDecipher (return symkey))
618 udp 619 udp
620 -}
619 handlers var _ = Nothing 621 handlers var _ = Nothing
620 -- TODO DHTRequest public key (onion) 622 -- TODO DHTRequest public key (onion)
621 -- TODO DHTRequest NAT ping 623 -- TODO DHTRequest NAT ping
@@ -690,12 +692,11 @@ dropEnd8 bs = B.take (B.length bs - 8) bs
690 692
691-- Add detailed printouts for every packet. 693-- Add detailed printouts for every packet.
692addVerbosity tr = 694addVerbosity tr =
693 tr { awaitMessage = do 695 tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do
694 m <- awaitMessage tr
695 forM_ m $ mapM_ $ \(msg,addr) -> do 696 forM_ m $ mapM_ $ \(msg,addr) -> do
696 hPutStrLn stderr ( (show addr) 697 hPutStrLn stderr ( (show addr)
697 ++ " --> " ++ show (msgType msg)) 698 ++ " --> " ++ show (msgType msg))
698 return m 699 kont m
699 , sendMessage = \addr msg -> do 700 , sendMessage = \addr msg -> do
700 hPutStrLn stderr ( (show addr) 701 hPutStrLn stderr ( (show addr)
701 ++ " <-- " ++ show (msgType msg)) 702 ++ " <-- " ++ show (msgType msg))
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index f0a48bb2..8e8d47a2 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -469,13 +469,13 @@ main = do
469 (atomically . writeTVar (Mainline.contactInfo swarms)) 469 (atomically . writeTVar (Mainline.contactInfo swarms))
470 (peerdb >>= S.decodeLazy) 470 (peerdb >>= S.decodeLazy)
471 471
472 quitBt <- forkListener bt 472 quitBt <- forkListener (clientNet bt)
473 473
474 let toxport = succ $ fromMaybe 33445 (fromIntegral <$> sockAddrPort addr) 474 let toxport = succ $ fromMaybe 33445 (fromIntegral <$> sockAddrPort addr)
475 addrTox <- getBindAddress (show toxport) True 475 addrTox <- getBindAddress (show toxport) True
476 (tox,toxR) <- Tox.newClient addrTox 476 (tox,toxR) <- Tox.newClient addrTox
477 477
478 quitTox <- forkListener tox 478 quitTox <- forkListener (clientNet tox)
479 479
480 mainlineSearches <- atomically $ newTVar Map.empty 480 mainlineSearches <- atomically $ newTVar Map.empty
481 toxSearches <- atomically $ newTVar Map.empty 481 toxSearches <- atomically $ newTVar Map.empty
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs
index c8a6fa80..190cc73e 100644
--- a/src/Network/QueryResponse.hs
+++ b/src/Network/QueryResponse.hs
@@ -7,6 +7,7 @@
7{-# LANGUAGE PartialTypeSignatures #-} 7{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE RankNTypes #-} 8{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE ScopedTypeVariables #-} 9{-# LANGUAGE ScopedTypeVariables #-}
10{-# LANGUAGE TupleSections #-}
10module Network.QueryResponse where 11module Network.QueryResponse where
11 12
12#ifdef THREAD_DEBUG 13#ifdef THREAD_DEBUG
@@ -33,7 +34,55 @@ import System.IO
33import System.IO.Error 34import System.IO.Error
34import System.Timeout 35import System.Timeout
35 36
36-- * Using a query\/response 'Client'. 37-- | Three methods are required to implement a datagram based query\/response protocol.
38data Transport err addr x = Transport
39 { -- | Blocks until an inbound packet is available. Returns 'Nothing' when
40 -- no more packets are expected due to a shutdown or close event.
41 -- Otherwise, the packet will be parsed as type /x/ and an origin address
42 -- /addr/. Parse failure is indicated by the type 'err'.
43 awaitMessage :: forall a. (Maybe (Either err (x, addr)) -> IO a) -> IO a
44 -- | Send an /x/ packet to the given destination /addr/.
45 , sendMessage :: addr -> x -> IO ()
46 -- | Shutdown and clean up any state related to this 'Transport'.
47 , closeTransport :: IO ()
48 }
49
50-- | This function modifies a 'Transport' to use higher-level addresses and
51-- packet representations. It could be used to change UDP 'ByteString's into
52-- bencoded syntax trees or to add an encryption layer in which addresses have
53-- associated public keys.
54layerTransport ::
55 (x -> addr -> Either err (x', addr'))
56 -- ^ Function that attempts to transform a low-level address/packet
57 -- pair into a higher level representation.
58 -> (x' -> addr' -> (x, addr))
59 -- ^ Function to encode a high-level address/packet into a lower level
60 -- representation.
61 -> Transport err addr x
62 -- ^ The low-level transport to be transformed.
63 -> Transport err addr' x'
64layerTransport parse encode tr =
65 tr { awaitMessage = \kont ->
66 awaitMessage tr $ \m -> kont $ fmap (>>= uncurry parse) m
67 , sendMessage = \addr' msg' -> do
68 let (msg,addr) = encode msg' addr'
69 sendMessage tr addr msg
70 }
71
72addHandler :: (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x
73addHandler f tr = tr
74 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \m -> do
75 case m of
76 Just (Right (x, addr)) -> f addr x >>= maybe eat (kont . Just . Right . (, addr) . ($ x))
77 Just (Left e ) -> kont $ Just (Left e)
78 Nothing -> kont $ Nothing
79 }
80
81-- | Modify a 'Transport' to invoke an action upon every received packet.
82onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x
83onInbound f tr = addHandler (\addr x -> f addr x >> return (Just id)) tr
84
85-- * Using a query\/response client.
37 86
38-- | Fork a thread that handles inbound packets. The returned action may be used 87-- | Fork a thread that handles inbound packets. The returned action may be used
39-- to terminate the thread and clean up any related state. 88-- to terminate the thread and clean up any related state.
@@ -41,18 +90,18 @@ import System.Timeout
41-- Example usage: 90-- Example usage:
42-- 91--
43-- > -- Start client. 92-- > -- Start client.
44-- > quitServer <- forkListener client 93-- > quitServer <- forkListener (clientNet client)
45-- > -- Send a query q, recieve a response r. 94-- > -- Send a query q, recieve a response r.
46-- > r <- sendQuery client method q 95-- > r <- sendQuery client method q
47-- > -- Quit client. 96-- > -- Quit client.
48-- > quitServer 97-- > quitServer
49forkListener :: Client err meth tid addr x -> IO (IO ()) 98forkListener :: Transport err addr x -> IO (IO ())
50forkListener client = do 99forkListener client = do
51 thread_id <- forkIO $ do 100 thread_id <- forkIO $ do
52 myThreadId >>= flip labelThread "listener" 101 myThreadId >>= flip labelThread "listener"
53 fix $ handleMessage client 102 fix $ awaitMessage client . const
54 return $ do 103 return $ do
55 closeTransport (clientNet client) 104 closeTransport client
56 killThread thread_id 105 killThread thread_id
57 106
58-- | Send a query to a remote peer. Note that this funciton will always time 107-- | Send a query to a remote peer. Note that this funciton will always time
@@ -165,53 +214,6 @@ data MethodSerializer tid addr x meth a b = MethodSerializer
165 } 214 }
166 215
167 216
168-- | Three methods are required to implement a datagram based query\/response protocol.
169data Transport err addr x = Transport
170 { -- | Blocks until an inbound packet is available. Returns 'Nothing' when
171 -- no more packets are expected due to a shutdown or close event.
172 -- Otherwise, the packet will be parsed as type /x/ and an origin address
173 -- /addr/. Parse failure is indicated by the type 'err'.
174 awaitMessage :: IO (Maybe (Either err (x, addr)))
175 -- | Send an /x/ packet to the given destination /addr/.
176 , sendMessage :: addr -> x -> IO ()
177 -- | Shutdown and clean up any state related to this 'Transport'.
178 , closeTransport :: IO ()
179 }
180
181-- | This function modifies a 'Transport' to use higher-level addresses and
182-- packet representations. It could be used to change UDP 'ByteString's into
183-- bencoded syntax trees or to add an encryption layer in which addresses have
184-- associated public keys.
185layerTransport ::
186 (x -> addr -> Either err (x', addr'))
187 -- ^ Function that attempts to transform a low-level address/packet
188 -- pair into a higher level representation.
189 -> (x' -> addr' -> (x, addr))
190 -- ^ Function to encode a high-level address/packet into a lower level
191 -- representation.
192 -> Transport err addr x
193 -- ^ The low-level transport to be transformed.
194 -> Transport err addr' x'
195layerTransport parse encode tr =
196 tr { awaitMessage = do
197 m <- awaitMessage tr
198 return $ fmap (>>= uncurry parse) m
199 , sendMessage = \addr' msg' -> do
200 let (msg,addr) = encode msg' addr'
201 sendMessage tr addr msg
202 }
203
204-- | Modify a 'Transport' to invoke an action upon every received packet.
205onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x
206onInbound f tr = tr
207 { awaitMessage = do
208 m <- awaitMessage tr
209 case m of
210 Just (Right (x, addr)) -> f addr x
211 _ -> return ()
212 return m
213 }
214
215-- | To dipatch responses to our outbound queries, we require three primitives. 217-- | To dipatch responses to our outbound queries, we require three primitives.
216-- See the 'transactionMethods' function to create these primitives out of a 218-- See the 'transactionMethods' function to create these primitives out of a
217-- lookup table and a generator for transaction ids. 219-- lookup table and a generator for transaction ids.
@@ -360,21 +362,24 @@ contramapE f (ErrorReporter pe mh unk tim)
360-- or throws an exception. 362-- or throws an exception.
361handleMessage :: 363handleMessage ::
362 Client err meth tid addr x 364 Client err meth tid addr x
363 -> IO () 365 -> addr
364 -> IO () 366 -> x
365handleMessage (Client net d err pending whoami responseID) again = do 367 -> IO (Maybe (x -> x))
366 awaitMessage net >>= \case 368handleMessage (Client net d err pending whoami responseID) addr plain = do
367 Just (Left e) -> do reportParseError err e 369 -- Just (Left e) -> do reportParseError err e
368 again 370 -- return $! Just id
369 Just (Right (plain, addr)) -> do 371 -- Just (Right (plain, addr)) -> do
370 case classifyInbound d plain of 372 case classifyInbound d plain of
371 IsQuery meth tid -> case lookupHandler d meth of 373 IsQuery meth tid -> case lookupHandler d meth of
372 Nothing -> reportMissingHandler err meth addr plain 374 Nothing -> do reportMissingHandler err meth addr plain
375 return $! Just id
373 Just m -> do 376 Just m -> do
374 self <- whoami (Just addr) 377 self <- whoami (Just addr)
375 tid' <- responseID tid 378 tid' <- responseID tid
376 either (reportParseError err) 379 either (\e -> do reportParseError err e
377 (>>= mapM_ (sendMessage net addr)) 380 return $! Just id)
381 (>>= \m -> do mapM_ (sendMessage net addr) m
382 return $! Nothing)
378 (dispatchQuery m tid' self plain addr) 383 (dispatchQuery m tid' self plain addr)
379 IsResponse tid -> do 384 IsResponse tid -> do
380 action <- atomically $ do 385 action <- atomically $ do
@@ -383,9 +388,10 @@ handleMessage (Client net d err pending whoami responseID) again = do
383 writeTVar pending ts 388 writeTVar pending ts
384 return action 389 return action
385 action 390 action
386 IsUnknown e -> reportUnknown err addr plain e 391 return $! Nothing
387 again 392 IsUnknown e -> do reportUnknown err addr plain e
388 Nothing -> return () 393 return $! Just id
394 -- Nothing -> return $! id
389 395
390-- * UDP Datagrams. 396-- * UDP Datagrams.
391 397
@@ -420,9 +426,10 @@ udpTransport bind_address = do
420 setSocketOption sock IPv6Only 0 426 setSocketOption sock IPv6Only 0
421 bind sock bind_address 427 bind sock bind_address
422 return Transport 428 return Transport
423 { awaitMessage = handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do 429 { awaitMessage = \kont -> do
424 r <- B.recvFrom sock udpBufferSize 430 r <- handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do
425 return $ Just $ Right r 431 Just . Right <$!> B.recvFrom sock udpBufferSize
432 kont $! r
426 , sendMessage = \addr bs -> void $ B.sendTo sock bs addr 433 , sendMessage = \addr bs -> void $ B.sendTo sock bs addr
427 -- TODO: sendTo: does not exist (Network is unreachable) 434 -- TODO: sendTo: does not exist (Network is unreachable)
428 -- Occurs when IPv6 network is not available. 435 -- Occurs when IPv6 network is not available.