summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2019-12-14 01:03:07 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-01 23:26:05 -0500
commitb5a3c7b92e7effcd234037241b00f9f29773d870 (patch)
tree4047e11c9102585001dd3be95855038a6816a5c2
parent97043e1069e172a0f389610610892ca060f395dd (diff)
STM-based awaitMessage.
-rw-r--r--dht/src/Data/Tox/Onion.hs15
-rw-r--r--dht/src/Network/BitTorrent/MainlineDHT.hs7
-rw-r--r--dht/src/Network/Lossless.hs33
-rw-r--r--dht/src/Network/QueryResponse.hs65
-rw-r--r--dht/src/Network/QueryResponse/TCP.hs18
-rw-r--r--dht/src/Network/SessionTransports.hs31
-rw-r--r--dht/src/Network/Tox.hs4
-rw-r--r--dht/src/Network/Tox/AggregateSession.hs12
-rw-r--r--dht/src/Network/Tox/DHT/Transport.hs8
9 files changed, 112 insertions, 81 deletions
diff --git a/dht/src/Data/Tox/Onion.hs b/dht/src/Data/Tox/Onion.hs
index d3c8086d..e0d7c744 100644
--- a/dht/src/Data/Tox/Onion.hs
+++ b/dht/src/Data/Tox/Onion.hs
@@ -60,7 +60,7 @@ import Data.Bits (shiftR,shiftL)
60import qualified Rank2 60import qualified Rank2
61import Util (sameAddress) 61import Util (sameAddress)
62 62
63type HandleLo a = Maybe (Either String (ByteString, SockAddr)) -> IO a 63type HandleLo a = Arrival String SockAddr ByteString -> IO a
64 64
65type UDPTransport = Transport String SockAddr ByteString 65type UDPTransport = Transport String SockAddr ByteString
66 66
@@ -264,11 +264,12 @@ forwardOnions crypto baddr udp sendTCP = udp { awaitMessage = forwardAwait crypt
264 264
265forwardAwait :: TransportCrypto 265forwardAwait :: TransportCrypto
266 -> UDPTransport 266 -> UDPTransport
267 -> (Int -> OnionMessage Encrypted -> IO ()) {- ^ TCP relay send -} -> HandleLo a -> IO a 267 -> (Int -> OnionMessage Encrypted -> IO ()) {- ^ TCP relay send -} -> HandleLo a -> STM (IO a)
268forwardAwait crypto udp sendTCP kont = do 268forwardAwait crypto udp sendTCP kont = do
269 fix $ \another -> do 269 fix $ \another0 -> do
270 let another = join $ atomically another0
270 awaitMessage udp $ \case 271 awaitMessage udp $ \case
271 m@(Just (Right (bs,saddr))) -> case B.head bs of 272 m@(Arrival saddr bs) -> case B.head bs of
272 0x80 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N0) crypto (Addressed saddr) udp another 273 0x80 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N0) crypto (Addressed saddr) udp another
273 0x81 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N1) crypto (Addressed saddr) udp another 274 0x81 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N1) crypto (Addressed saddr) udp another
274 0x82 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N2) crypto (Addressed saddr) udp another 275 0x82 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N2) crypto (Addressed saddr) udp another
@@ -278,9 +279,9 @@ forwardAwait crypto udp sendTCP kont = do
278 _ -> kont m 279 _ -> kont m
279 m -> kont m 280 m -> kont m
280 281
281forward :: forall c b b1. (Serialize b, Show b) => 282forward :: (Serialize b, Show b) =>
282 (Maybe (Either String b1) -> c) -> ByteString -> (b -> c) -> c 283 HandleLo a -> ByteString -> (b -> IO a) -> IO a
283forward kont bs f = either (kont . Just . Left) f $ decode $ B.tail bs 284forward kont bs f = either (kont . ParseError) f $ decode $ B.tail bs
284 285
285class SumToThree a b 286class SumToThree a b
286 287
diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs
index ed97ee31..0269268f 100644
--- a/dht/src/Network/BitTorrent/MainlineDHT.hs
+++ b/dht/src/Network/BitTorrent/MainlineDHT.hs
@@ -428,8 +428,9 @@ showPacket f addr flow bs = L8.unpack $ L8.unlines es
428addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString 428addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString
429addVerbosity tr = 429addVerbosity tr =
430 tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do 430 tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do
431 forM_ m $ mapM_ $ \(msg,addr) -> do 431 case m of
432 dput XBitTorrent (showPacket id addr " --> " msg) 432 Arrival addr msg -> dput XBitTorrent (showPacket id addr " --> " msg)
433 _ -> return ()
433 kont m 434 kont m
434 , sendMessage = \addr msg -> do 435 , sendMessage = \addr msg -> do
435 dput XBitTorrent (showPacket id addr " <-- " msg) 436 dput XBitTorrent (showPacket id addr " <-- " msg)
@@ -598,7 +599,7 @@ newClient swarms addr = do
598 -- recursive since 'updateRouting' does not invoke 'awaitMessage' which 599 -- recursive since 'updateRouting' does not invoke 'awaitMessage' which
599 -- which was modified by 'onInbound'. However, I'm going to avoid the 600 -- which was modified by 'onInbound'. However, I'm going to avoid the
600 -- mutual reference just to be safe. 601 -- mutual reference just to be safe.
601 outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } } 602 outgoingClient = client { clientNet = net { awaitMessage = pure . ($ Terminated) } }
602 603
603 dispatch = DispatchMethods 604 dispatch = DispatchMethods
604 { classifyInbound = classify -- :: x -> MessageClass err meth tid addr x 605 { classifyInbound = classify -- :: x -> MessageClass err meth tid addr x
diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs
index 861792ab..5a313aed 100644
--- a/dht/src/Network/Lossless.hs
+++ b/dht/src/Network/Lossless.hs
@@ -59,37 +59,38 @@ lossless isLossless encode saddr udp = do
59 rloop <- forkIO $ do 59 rloop <- forkIO $ do
60 -- This thread enqueues inbound packets or writes them to the oob 60 -- This thread enqueues inbound packets or writes them to the oob
61 -- channel. 61 -- channel.
62 myThreadId >>= flip labelThread ("lossless."++show saddr) 62 fix $ \loop -> join $ atomically $ awaitMessage udp $ \m -> do
63 fix $ \loop -> do 63 m' <- case m of Terminated -> return Nothing
64 awaitMessage udp $ \m -> do 64 ParseError e -> return $ Just (Left e)
65 m' <- mapM (mapM $ uncurry isLossless) m 65 Arrival a x -> Just . Right <$> isLossless x a
66 case m' of 66 case m' of
67 Nothing -> do 67 Nothing -> do
68 atomically $ writeTChan oob Nothing 68 atomically $ writeTChan oob Terminated
69 -- Quit thread here. 69 -- Quit thread here.
70 Just (Left e) -> do 70 Just (Left e) -> do
71 atomically $ writeTChan oob (Just $ Left e) 71 atomically $ writeTChan oob (ParseError e)
72 loop 72 loop
73 Just (Right event) -> do 73 Just (Right event) -> do
74 atomically $ do 74 atomically $ do
75 -- x' <- isLossless xaddr x 75 -- x' <- isLossless xaddr x
76 PB.grokInboundPacket pb event 76 PB.grokInboundPacket pb event
77 case event of 77 case event of
78 PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event) 78 PacketReceivedLossy {} -> writeTChan oob (uncurry (flip Arrival) $ peReceivedPayload event)
79 _ -> do 79 _ -> do
80 report <- pbReport "enqueued" pb 80 report <- pbReport "enqueued" pb
81 writeTChan oob (Just $ Left report) 81 writeTChan oob (ParseError report)
82 loop 82 loop
83 labelThread rloop ("lossless."++show saddr)
83 let tr = Transport 84 let tr = Transport
84 { awaitMessage = \kont -> do 85 { awaitMessage = \kont ->
85 join $ atomically $ orElse 86 orElse
86 (do x <- readTChan oob 87 (do x <- readTChan oob
87 return $ kont $! x) 88 return $ kont $! x)
88 (do x <- PB.awaitReadyPacket pb 89 (do x <- PB.awaitReadyPacket pb
89 report <- pbReport "dequeued" pb 90 report <- pbReport "dequeued" pb
90 return $ do 91 return $ do
91 atomically $ writeTChan oob (Just $ Left report) 92 atomically $ writeTChan oob (ParseError report)
92 kont $! Just (Right x)) 93 kont $! uncurry (flip Arrival) x)
93 , sendMessage = \a' x' -> do 94 , sendMessage = \a' x' -> do
94 seqno <- atomically $ do 95 seqno <- atomically $ do
95 seqno <- PB.nextToSendSequenceNumber pb 96 seqno <- PB.nextToSendSequenceNumber pb
@@ -111,9 +112,11 @@ lossless isLossless encode saddr udp = do
111 when isfull retry 112 when isfull retry
112 let sendit = sendMessage udp saddr x 113 let sendit = sendMessage udp saddr x
113 maybe sendit (catchIOError sendit) oops 114 maybe sendit (catchIOError sendit) oops
114 , closeTransport = do 115 , setActive = \case
115 atomically $ writeTChan oob Nothing -- quit rloop thread 116 False -> do
116 closeTransport udp 117 atomically $ writeTChan oob Terminated -- quit rloop thread
118 setActive udp False
119 True -> return ()
117 } 120 }
118 resend ns = do 121 resend ns = do
119 xs <- atomically $ retrieveForResend pb ns 122 xs <- atomically $ retrieveForResend pb ns
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs
index f62fbefe..ba686929 100644
--- a/dht/src/Network/QueryResponse.hs
+++ b/dht/src/Network/QueryResponse.hs
@@ -54,15 +54,18 @@ data Arrival err addr x
54data TransportA err addr x y = Transport 54data TransportA err addr x y = Transport
55 { -- | Blocks until an inbound packet is available. Then calls the provided 55 { -- | Blocks until an inbound packet is available. Then calls the provided
56 -- continuation with the packet and origin addres or an error condition. 56 -- continuation with the packet and origin addres or an error condition.
57 awaitMessage :: forall a. (Arrival err addr x -> IO a) -> IO a 57 awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a)
58 -- | Send an /y/ packet to the given destination /addr/. 58 -- | Send an /y/ packet to the given destination /addr/.
59 , sendMessage :: addr -> y -> IO () 59 , sendMessage :: addr -> y -> IO ()
60 -- | Shutdown and clean up any state related to this 'Transport'. 60 -- | Shutdown and clean up any state related to this 'Transport'.
61 , closeTransport :: IO () 61 , setActive :: Bool -> IO ()
62 } 62 }
63 63
64type Transport err addr x = TransportA err addr x x 64type Transport err addr x = TransportA err addr x x
65 65
66closeTransport :: TransportA err addr x y -> IO ()
67closeTransport tr = setActive tr False
68
66-- | This function modifies a 'Transport' to use higher-level addresses and 69-- | This function modifies a 'Transport' to use higher-level addresses and
67-- packet representations. It could be used to change UDP 'ByteString's into 70-- packet representations. It could be used to change UDP 'ByteString's into
68-- bencoded syntax trees or to add an encryption layer in which addresses have 71-- bencoded syntax trees or to add an encryption layer in which addresses have
@@ -110,7 +113,7 @@ layerTransport parse encode tr =
110 (\x' addr' -> return $ encode x' addr') 113 (\x' addr' -> return $ encode x' addr')
111 tr 114 tr
112 115
113-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' 116-- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan'
114-- is used to share the same underlying socket, so be sure to fork a thread for 117-- is used to share the same underlying socket, so be sure to fork a thread for
115-- both returned 'Transport's to avoid hanging. 118-- both returned 'Transport's to avoid hanging.
116partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) 119partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a)))
@@ -118,28 +121,28 @@ partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a)))
118 -> Transport err a b 121 -> Transport err a b
119 -> IO (Transport err xaddr x, Transport err a b) 122 -> IO (Transport err xaddr x, Transport err a b)
120partitionTransportM parse encodex tr = do 123partitionTransportM parse encodex tr = do
121 mvar <- newEmptyMVar 124 tchan <- atomically newTChan
122 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do 125 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do
123 awaitMessage tr $ \m -> case m of 126 awaitMessage tr $ \m -> case m of
124 Arrival adr msg -> parse (msg,adr) >>= \case 127 Arrival adr msg -> parse (msg,adr) >>= \case
125 Left (x,xaddr) -> kont $ Arrival xaddr x 128 Left (x,xaddr) -> kont $ Arrival xaddr x
126 Right y -> putMVar mvar (Just y) >> again 129 Right y -> atomically (writeTChan tchan (Just y)) >> join (atomically again)
127 ParseError e -> kont $ ParseError e 130 ParseError e -> kont $ ParseError e
128 Terminated -> putMVar mvar Nothing >> kont Terminated 131 Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated
129 , sendMessage = \addr' msg' -> do 132 , sendMessage = \addr' msg' -> do
130 msg_addr <- encodex (msg',addr') 133 msg_addr <- encodex (msg',addr')
131 mapM_ (uncurry . flip $ sendMessage tr) msg_addr 134 mapM_ (uncurry . flip $ sendMessage tr) msg_addr
132 } 135 }
133 ytr = Transport 136 ytr = Transport
134 { awaitMessage = \kont -> takeMVar mvar >>= kont . \case 137 { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case
135 Nothing -> Terminated 138 Nothing -> Terminated
136 Just (y,yaddr) -> Arrival yaddr y 139 Just (y,yaddr) -> Arrival yaddr y
137 , sendMessage = sendMessage tr 140 , sendMessage = sendMessage tr
138 , closeTransport = return () 141 , setActive = const $ return ()
139 } 142 }
140 return (xtr, ytr) 143 return (xtr, ytr)
141 144
142-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' 145-- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan'
143-- is used to share the same underlying socket, so be sure to fork a thread for 146-- is used to share the same underlying socket, so be sure to fork a thread for
144-- both returned 'Transport's to avoid hanging. 147-- both returned 'Transport's to avoid hanging.
145partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) 148partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a))
@@ -157,14 +160,14 @@ partitionAndForkTransport ::
157 -> Transport err a b 160 -> Transport err a b
158 -> IO (Transport err xaddr x, Transport err a b) 161 -> IO (Transport err xaddr x, Transport err a b)
159partitionAndForkTransport forkedSend parse encodex tr = do 162partitionAndForkTransport forkedSend parse encodex tr = do
160 mvar <- newEmptyMVar 163 tchan <- atomically newTChan
161 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do 164 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do
162 awaitMessage tr $ \case 165 awaitMessage tr $ \case
163 Arrival a b -> parse (b,a) >>= \case 166 Arrival a b -> parse (b,a) >>= \case
164 Left (x,xaddr) -> kont $ Arrival xaddr x 167 Left (x,xaddr) -> kont $ Arrival xaddr x
165 Right (b,a) -> putMVar mvar (Arrival a b) >> again 168 Right (b,a) -> atomically (writeTChan tchan (Arrival a b)) >> join (atomically again)
166 ParseError e -> kont $ ParseError e 169 ParseError e -> kont $ ParseError e
167 Terminated -> putMVar mvar Terminated >> kont Terminated 170 Terminated -> atomically (writeTChan tchan Terminated) >> kont Terminated
168 , sendMessage = \addr' msg' -> do 171 , sendMessage = \addr' msg' -> do
169 msg_addr <- encodex (msg',addr') 172 msg_addr <- encodex (msg',addr')
170 case msg_addr of 173 case msg_addr of
@@ -173,9 +176,9 @@ partitionAndForkTransport forkedSend parse encodex tr = do
173 Nothing -> return () 176 Nothing -> return ()
174 } 177 }
175 ytr = Transport 178 ytr = Transport
176 { awaitMessage = \kont -> takeMVar mvar >>= kont 179 { awaitMessage = \kont -> readTChan tchan >>= pure . kont
177 , sendMessage = sendMessage tr 180 , sendMessage = sendMessage tr
178 , closeTransport = return () 181 , setActive = \_ -> return ()
179 } 182 }
180 return (xtr, ytr) 183 return (xtr, ytr)
181 184
@@ -186,7 +189,7 @@ partitionAndForkTransport forkedSend parse encodex tr = do
186addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x 189addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x
187addHandler onParseError f tr = tr 190addHandler onParseError f tr = tr
188 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case 191 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case
189 Arrival addr x -> f addr x >>= maybe eat (kont . Arrival addr . ($ x)) 192 Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x))
190 ParseError e -> onParseError e >> kont (ParseError e) 193 ParseError e -> onParseError e >> kont (ParseError e)
191 Terminated -> kont Terminated 194 Terminated -> kont Terminated
192 } 195 }
@@ -210,14 +213,15 @@ onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (
210-- > quitServer 213-- > quitServer
211forkListener :: String -> Transport err addr x -> IO (IO ()) 214forkListener :: String -> Transport err addr x -> IO (IO ())
212forkListener name client = do 215forkListener name client = do
216 setActive client True
213 thread_id <- forkIO $ do 217 thread_id <- forkIO $ do
214 myThreadId >>= flip labelThread ("listener."++name) 218 myThreadId >>= flip labelThread ("listener."++name)
215 fix $ \loop -> awaitMessage client $ \case 219 fix $ \loop -> join $ atomically $ awaitMessage client $ \case
216 Terminated -> return () 220 Terminated -> return ()
217 _ -> loop 221 _ -> loop
218 dput XMisc $ "Listener died: " ++ name 222 dput XMisc $ "Listener died: " ++ name
219 return $ do 223 return $ do
220 closeTransport client 224 setActive client False
221 -- killThread thread_id 225 -- killThread thread_id
222 226
223-- * Implementing a query\/response 'Client'. 227-- * Implementing a query\/response 'Client'.
@@ -606,11 +610,11 @@ udpTransport' bind_address = do
606 setSocketOption sock Broadcast 1 610 setSocketOption sock Broadcast 1
607 bind sock bind_address 611 bind sock bind_address
608 isClosed <- newEmptyMVar 612 isClosed <- newEmptyMVar
613 udpTChan <- atomically newTChan
609 let tr = Transport { 614 let tr = Transport {
610 awaitMessage = \kont -> do 615 awaitMessage = \kont -> do
611 r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do 616 r <- readTChan udpTChan
612 uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize 617 return $ kont $! r
613 kont $! r
614 , sendMessage = case family of 618 , sendMessage = case family of
615 AF_INET6 -> \case 619 AF_INET6 -> \case
616 (SockAddrInet port addr) -> \bs -> 620 (SockAddrInet port addr) -> \bs ->
@@ -626,7 +630,8 @@ udpTransport' bind_address = do
626 addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) 630 addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr)
627 addr4 -> \bs -> saferSendTo sock bs addr4 631 addr4 -> \bs -> saferSendTo sock bs addr4
628 _ -> \addr bs -> saferSendTo sock bs addr 632 _ -> \addr bs -> saferSendTo sock bs addr
629 , closeTransport = do 633 , setActive = \case
634 False -> do
630 dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address 635 dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address
631 tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. 636 tryPutMVar isClosed () -- signal awaitMessage that the transport is closed.
632#if MIN_VERSION_network (3,1,0) 637#if MIN_VERSION_network (3,1,0)
@@ -639,6 +644,14 @@ udpTransport' bind_address = do
639 let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () 644 let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return ()
640 -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. 645 -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage.
641 closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) 646 closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd)
647 True -> do
648 udpThread <- forkIO $ fix $ \again -> do
649 r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do
650 uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize
651 atomically $ writeTChan udpTChan r
652 case r of Terminated -> return ()
653 _ -> again
654 labelThread udpThread ("udp.io."++show bind_address)
642 } 655 }
643 return (tr, sock) 656 return (tr, sock)
644 657
@@ -652,13 +665,15 @@ udpTransport bind_address = fst <$> udpTransport' bind_address
652chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x 665chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x
653chanTransport chanFromAddr self achan aclosed = Transport 666chanTransport chanFromAddr self achan aclosed = Transport
654 { awaitMessage = \kont -> do 667 { awaitMessage = \kont -> do
655 x <- atomically $ (uncurry (flip Arrival) <$> readTChan achan) 668 x <- (uncurry (flip Arrival) <$> readTChan achan)
656 `orElse` 669 `orElse`
657 (readTVar aclosed >>= check >> return Terminated) 670 (readTVar aclosed >>= check >> return Terminated)
658 kont x 671 return $ kont x
659 , sendMessage = \them bs -> do 672 , sendMessage = \them bs -> do
660 atomically $ writeTChan (chanFromAddr them) (bs,self) 673 atomically $ writeTChan (chanFromAddr them) (bs,self)
661 , closeTransport = atomically $ writeTVar aclosed True 674 , setActive = \case
675 False -> atomically $ writeTVar aclosed True
676 True -> return ()
662 } 677 }
663 678
664-- | Returns a pair of transports linked together to simulate two computers talking to each other. 679-- | Returns a pair of transports linked together to simulate two computers talking to each other.
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs
index 67c19512..0028a5b6 100644
--- a/dht/src/Network/QueryResponse/TCP.hs
+++ b/dht/src/Network/QueryResponse/TCP.hs
@@ -1,5 +1,6 @@
1{-# LANGUAGE CPP #-}
1{-# LANGUAGE GeneralizedNewtypeDeriving #-} 2{-# LANGUAGE GeneralizedNewtypeDeriving #-}
2{-# LANGUAGE CPP #-} 3{-# LANGUAGE LambdaCase #-}
3module Network.QueryResponse.TCP where 4module Network.QueryResponse.TCP where
4 5
5#ifdef THREAD_DEBUG 6#ifdef THREAD_DEBUG
@@ -11,6 +12,7 @@ import GHC.Conc (labelThread)
11 12
12import Control.Arrow 13import Control.Arrow
13import Control.Concurrent.STM 14import Control.Concurrent.STM
15import Control.Concurrent.STM.TMVar
14import Control.Monad 16import Control.Monad
15import Data.ByteString (ByteString,hPut) 17import Data.ByteString (ByteString,hPut)
16import Data.Function 18import Data.Function
@@ -78,7 +80,7 @@ showStat r = case r of PendingTCPSession -> "pending."
78tcp_timeout :: Int 80tcp_timeout :: Int
79tcp_timeout = 10000000 81tcp_timeout = 10000000
80 82
81acquireConnection :: MVar (Maybe (Either a (x, addr))) 83acquireConnection :: TMVar (Arrival a addr x)
82 -> TCPCache (SessionProtocol x y) 84 -> TCPCache (SessionProtocol x y)
83 -> StreamHandshake addr x y 85 -> StreamHandshake addr x y
84 -> addr 86 -> addr
@@ -131,10 +133,10 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
131 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x 133 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
132 case x of 134 case x of
133 Just u -> do 135 Just u -> do
134 m <- timeout tcp_timeout $ putMVar mvar $ Just $ Right (u, addr) 136 m <- timeout tcp_timeout $ atomically (putTMVar mvar $ Arrival addr u)
135 when (isNothing m) $ do 137 when (isNothing m) $ do
136 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." 138 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet."
137 tryTakeMVar mvar 139 atomically $ tryTakeTMVar mvar
138 return () 140 return ()
139 loop 141 loop
140 Nothing -> do 142 Nothing -> do
@@ -206,14 +208,16 @@ tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
206 -> StreamHandshake addr x y 208 -> StreamHandshake addr x y
207 -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) 209 -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y))
208tcpTransport maxcon stream = do 210tcpTransport maxcon stream = do
209 msgvar <- newEmptyMVar 211 msgvar <- atomically newEmptyTMVar
210 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) 212 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
211 return $ (,) tcpcache Transport 213 return $ (,) tcpcache Transport
212 { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) 214 { awaitMessage = \f -> takeTMVar msgvar >>= \x -> return $ do
215 f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Terminated)
213 , sendMessage = \addr (bDoCon,y) -> do 216 , sendMessage = \addr (bDoCon,y) -> do
214 void . forkLabeled "tcp-send" $ do 217 void . forkLabeled "tcp-send" $ do
215 msock <- acquireConnection msgvar tcpcache stream addr bDoCon 218 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
216 mapM_ ($ y) msock 219 mapM_ ($ y) msock
217 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e 220 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e
218 , closeTransport = closeAll tcpcache stream >> putMVar msgvar Nothing 221 , setActive = \case False -> closeAll tcpcache stream >> atomically (putTMVar msgvar Terminated)
222 True -> return ()
219 } 223 }
diff --git a/dht/src/Network/SessionTransports.hs b/dht/src/Network/SessionTransports.hs
index e9daf6c1..b36fbcfd 100644
--- a/dht/src/Network/SessionTransports.hs
+++ b/dht/src/Network/SessionTransports.hs
@@ -1,3 +1,4 @@
1{-# LANGUAGE LambdaCase #-}
1{-# LANGUAGE NamedFieldPuns #-} 2{-# LANGUAGE NamedFieldPuns #-}
2module Network.SessionTransports 3module Network.SessionTransports
3 ( Sessions 4 ( Sessions
@@ -8,6 +9,7 @@ module Network.SessionTransports
8 9
9import Control.Concurrent 10import Control.Concurrent
10import Control.Concurrent.STM 11import Control.Concurrent.STM
12import Control.Concurrent.STM.TMVar
11import Control.Monad 13import Control.Monad
12import qualified Data.IntMap.Strict as IntMap 14import qualified Data.IntMap.Strict as IntMap
13 ;import Data.IntMap.Strict (IntMap) 15 ;import Data.IntMap.Strict (IntMap)
@@ -51,14 +53,14 @@ newSession :: Sessions raw
51 -> SockAddr 53 -> SockAddr
52 -> IO (Maybe (Int,TransportA err addr x y)) 54 -> IO (Maybe (Int,TransportA err addr x y))
53newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwrap wrap addr0 = do 55newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwrap wrap addr0 = do
54 mvar <- newEmptyMVar 56 mvar <- atomically newEmptyTMVar
55 let saddr = -- Canonical in case of 6-mapped-4 addresses. 57 let saddr = -- Canonical in case of 6-mapped-4 addresses.
56 either id id $ either4or6 addr0 58 either id id $ either4or6 addr0
57 handlePacket x = do 59 handlePacket x = do
58 m <- wrap saddr x 60 m <- wrap saddr x
59 case m of 61 case m of
60 Nothing -> return False 62 Nothing -> return False
61 Just x' -> do putMVar mvar $! Just $! x' 63 Just x' -> do atomically $ putTMVar mvar $! Just $! x'
62 return True 64 return True
63 msid <- atomically $ do 65 msid <- atomically $ do
64 msid <- S.nearestOutsider 0 <$> readTVar sessionIds 66 msid <- S.nearestOutsider 0 <$> readTVar sessionIds
@@ -69,22 +71,25 @@ newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwr
69 $ IntMap.singleton sid handlePacket 71 $ IntMap.singleton sid handlePacket
70 return sid 72 return sid
71 forM msid $ \sid -> do 73 forM msid $ \sid -> do
72 let tr = Transport 74 let tr = Transport
73 { awaitMessage = \kont -> do 75 { awaitMessage = \kont -> do
74 x <- takeMVar mvar 76 x <- takeTMVar mvar
75 kont $! Right <$> x 77 return $ kont $! maybe Terminated (uncurry $ flip Arrival) x
76 , sendMessage = \addr x -> do 78 , sendMessage = \addr x -> do
77 x' <- unwrap addr x 79 x' <- unwrap addr x
78 sessionsSendRaw saddr x' 80 sessionsSendRaw saddr x'
79 , closeTransport = do 81 , setActive = \case
80 tryTakeMVar mvar 82 False -> do
81 putMVar mvar Nothing 83 atomically $ do
82 atomically $ do 84 tryTakeTMVar mvar
83 modifyTVar' sessionIds $ S.delete sid 85 putTMVar mvar Nothing
84 modifyTVar' sessionsById $ IntMap.delete sid 86 atomically $ do
85 modifyTVar' sessionsByAddr $ Map.alter (rmSession sid) saddr 87 modifyTVar' sessionIds $ S.delete sid
88 modifyTVar' sessionsById $ IntMap.delete sid
89 modifyTVar' sessionsByAddr $ Map.alter (rmSession sid) saddr
90 True -> return ()
86 } 91 }
87 return (sid,tr) 92 return (sid,tr)
88 93
89sessionHandler :: Sessions x -> (SockAddr -> x -> IO (Maybe (x -> x))) 94sessionHandler :: Sessions x -> (SockAddr -> x -> IO (Maybe (x -> x)))
90sessionHandler Sessions{sessionsByAddr} = \addr0 x -> do 95sessionHandler Sessions{sessionsByAddr} = \addr0 x -> do
diff --git a/dht/src/Network/Tox.hs b/dht/src/Network/Tox.hs
index 61a1d117..4b66cfc8 100644
--- a/dht/src/Network/Tox.hs
+++ b/dht/src/Network/Tox.hs
@@ -222,10 +222,12 @@ isLocalHost _ = False
222addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString 222addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString
223addVerbosity tr = 223addVerbosity tr =
224 tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do 224 tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do
225 forM_ m $ mapM_ $ \(msg,addr) -> do 225 case m of
226 Arrival addr msg -> do
226 when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x82,0x8c,0x8d])) $ do 227 when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x82,0x8c,0x8d])) $ do
227 mapM_ (\x -> dput XMisc ( (show addr) ++ " --> " ++ x)) 228 mapM_ (\x -> dput XMisc ( (show addr) ++ " --> " ++ x))
228 $ xxd 0 msg 229 $ xxd 0 msg
230 _ -> return ()
229 kont m 231 kont m
230 , sendMessage = \addr msg -> do 232 , sendMessage = \addr msg -> do
231 when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x8c,0x8d])) $ do 233 when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x8c,0x8d])) $ do
diff --git a/dht/src/Network/Tox/AggregateSession.hs b/dht/src/Network/Tox/AggregateSession.hs
index 8c728660..999c7399 100644
--- a/dht/src/Network/Tox/AggregateSession.hs
+++ b/dht/src/Network/Tox/AggregateSession.hs
@@ -188,21 +188,21 @@ forkSession c s setStatus = forkIO $ do
188 now <- getPOSIXTime 188 now <- getPOSIXTime
189 atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15) 189 atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15)
190 190
191 onPacket body loop Nothing = return () 191 onPacket body loop Terminated = return ()
192 onPacket body loop (Just (Left e)) = inPrint e >> loop 192 onPacket body loop (ParseError e) = inPrint e >> loop
193 onPacket body loop (Just (Right x)) = body loop x 193 onPacket body loop (Arrival _ x) = body loop x
194 194
195 awaitPacket body = fix $ awaitMessage (sTransport s) . onPacket body 195 awaitPacket body = fix $ join . atomically . awaitMessage (sTransport s) . onPacket body
196 196
197 atomically $ setStatus $ InProgress AwaitingSessionPacket 197 atomically $ setStatus $ InProgress AwaitingSessionPacket
198 awaitPacket $ \_ (online,()) -> do 198 awaitPacket $ \_ online -> do
199 when (msgID online /= M ONLINE) $ do 199 when (msgID online /= M ONLINE) $ do
200 inPrint $ "Unexpected initial packet: " ++ show (msgID online) 200 inPrint $ "Unexpected initial packet: " ++ show (msgID online)
201 atomically $ do setStatus Established 201 atomically $ do setStatus Established
202 sendPacket online 202 sendPacket online
203 bump 203 bump
204 beacon <- forkIO $ keepAlive s q 204 beacon <- forkIO $ keepAlive s q
205 awaitPacket $ \awaitNext (x,()) -> do 205 awaitPacket $ \awaitNext x -> do
206 bump 206 bump
207 case msgID x of 207 case msgID x of
208 M ALIVE -> return () 208 M ALIVE -> return ()
diff --git a/dht/src/Network/Tox/DHT/Transport.hs b/dht/src/Network/Tox/DHT/Transport.hs
index 7475b3b1..7414343d 100644
--- a/dht/src/Network/Tox/DHT/Transport.hs
+++ b/dht/src/Network/Tox/DHT/Transport.hs
@@ -59,7 +59,7 @@ import GHC.Generics
59import Network.Socket 59import Network.Socket
60 60
61type DHTTransport = Transport String NodeInfo (DHTMessage Encrypted8) 61type DHTTransport = Transport String NodeInfo (DHTMessage Encrypted8)
62type HandleHi a = Maybe (Either String (DHTMessage Encrypted8, NodeInfo)) -> IO a 62type HandleHi a = Arrival String NodeInfo (DHTMessage Encrypted8) -> IO a
63 63
64 64
65data DHTMessage (f :: * -> *) 65data DHTMessage (f :: * -> *)
@@ -399,13 +399,13 @@ instance Serialize CookieRequest where
399forwardDHTRequests :: TransportCrypto -> (PublicKey -> IO (Maybe NodeInfo)) -> DHTTransport -> DHTTransport 399forwardDHTRequests :: TransportCrypto -> (PublicKey -> IO (Maybe NodeInfo)) -> DHTTransport -> DHTTransport
400forwardDHTRequests crypto closeLookup dht = dht { awaitMessage = await' } 400forwardDHTRequests crypto closeLookup dht = dht { awaitMessage = await' }
401 where 401 where
402 await' :: HandleHi a -> IO a 402 await' :: HandleHi a -> STM (IO a)
403 await' pass = awaitMessage dht $ \case 403 await' pass = awaitMessage dht $ \case
404 Just (Right (m@(DHTDHTRequest target payload),src)) | target /= transportPublic crypto 404 Arrival src m@(DHTDHTRequest target payload) | target /= transportPublic crypto
405 -> do mni <- closeLookup target 405 -> do mni <- closeLookup target
406 -- Forward the message if the target is in our close list. 406 -- Forward the message if the target is in our close list.
407 forM_ mni $ \ni -> sendMessage dht ni m 407 forM_ mni $ \ni -> sendMessage dht ni m
408 await' pass 408 join $ atomically (await' pass)
409 m -> pass m 409 m -> pass m
410 410
411encrypt :: TransportCrypto -> DHTMessage ((,) Nonce8) -> NodeInfo -> IO (DHTMessage Encrypted8, NodeInfo) 411encrypt :: TransportCrypto -> DHTMessage ((,) Nonce8) -> NodeInfo -> IO (DHTMessage Encrypted8, NodeInfo)