diff options
Diffstat (limited to 'dht/src')
-rw-r--r-- | dht/src/Data/Tox/Onion.hs | 15 | ||||
-rw-r--r-- | dht/src/Network/BitTorrent/MainlineDHT.hs | 7 | ||||
-rw-r--r-- | dht/src/Network/Lossless.hs | 33 | ||||
-rw-r--r-- | dht/src/Network/QueryResponse.hs | 65 | ||||
-rw-r--r-- | dht/src/Network/QueryResponse/TCP.hs | 18 | ||||
-rw-r--r-- | dht/src/Network/SessionTransports.hs | 31 | ||||
-rw-r--r-- | dht/src/Network/Tox.hs | 4 | ||||
-rw-r--r-- | dht/src/Network/Tox/AggregateSession.hs | 12 | ||||
-rw-r--r-- | dht/src/Network/Tox/DHT/Transport.hs | 8 |
9 files changed, 112 insertions, 81 deletions
diff --git a/dht/src/Data/Tox/Onion.hs b/dht/src/Data/Tox/Onion.hs index d3c8086d..e0d7c744 100644 --- a/dht/src/Data/Tox/Onion.hs +++ b/dht/src/Data/Tox/Onion.hs | |||
@@ -60,7 +60,7 @@ import Data.Bits (shiftR,shiftL) | |||
60 | import qualified Rank2 | 60 | import qualified Rank2 |
61 | import Util (sameAddress) | 61 | import Util (sameAddress) |
62 | 62 | ||
63 | type HandleLo a = Maybe (Either String (ByteString, SockAddr)) -> IO a | 63 | type HandleLo a = Arrival String SockAddr ByteString -> IO a |
64 | 64 | ||
65 | type UDPTransport = Transport String SockAddr ByteString | 65 | type UDPTransport = Transport String SockAddr ByteString |
66 | 66 | ||
@@ -264,11 +264,12 @@ forwardOnions crypto baddr udp sendTCP = udp { awaitMessage = forwardAwait crypt | |||
264 | 264 | ||
265 | forwardAwait :: TransportCrypto | 265 | forwardAwait :: TransportCrypto |
266 | -> UDPTransport | 266 | -> UDPTransport |
267 | -> (Int -> OnionMessage Encrypted -> IO ()) {- ^ TCP relay send -} -> HandleLo a -> IO a | 267 | -> (Int -> OnionMessage Encrypted -> IO ()) {- ^ TCP relay send -} -> HandleLo a -> STM (IO a) |
268 | forwardAwait crypto udp sendTCP kont = do | 268 | forwardAwait crypto udp sendTCP kont = do |
269 | fix $ \another -> do | 269 | fix $ \another0 -> do |
270 | let another = join $ atomically another0 | ||
270 | awaitMessage udp $ \case | 271 | awaitMessage udp $ \case |
271 | m@(Just (Right (bs,saddr))) -> case B.head bs of | 272 | m@(Arrival saddr bs) -> case B.head bs of |
272 | 0x80 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N0) crypto (Addressed saddr) udp another | 273 | 0x80 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N0) crypto (Addressed saddr) udp another |
273 | 0x81 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N1) crypto (Addressed saddr) udp another | 274 | 0x81 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N1) crypto (Addressed saddr) udp another |
274 | 0x82 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N2) crypto (Addressed saddr) udp another | 275 | 0x82 -> forward kont bs $ handleOnionRequest (Proxy :: Proxy N2) crypto (Addressed saddr) udp another |
@@ -278,9 +279,9 @@ forwardAwait crypto udp sendTCP kont = do | |||
278 | _ -> kont m | 279 | _ -> kont m |
279 | m -> kont m | 280 | m -> kont m |
280 | 281 | ||
281 | forward :: forall c b b1. (Serialize b, Show b) => | 282 | forward :: (Serialize b, Show b) => |
282 | (Maybe (Either String b1) -> c) -> ByteString -> (b -> c) -> c | 283 | HandleLo a -> ByteString -> (b -> IO a) -> IO a |
283 | forward kont bs f = either (kont . Just . Left) f $ decode $ B.tail bs | 284 | forward kont bs f = either (kont . ParseError) f $ decode $ B.tail bs |
284 | 285 | ||
285 | class SumToThree a b | 286 | class SumToThree a b |
286 | 287 | ||
diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs index ed97ee31..0269268f 100644 --- a/dht/src/Network/BitTorrent/MainlineDHT.hs +++ b/dht/src/Network/BitTorrent/MainlineDHT.hs | |||
@@ -428,8 +428,9 @@ showPacket f addr flow bs = L8.unpack $ L8.unlines es | |||
428 | addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString | 428 | addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString |
429 | addVerbosity tr = | 429 | addVerbosity tr = |
430 | tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do | 430 | tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do |
431 | forM_ m $ mapM_ $ \(msg,addr) -> do | 431 | case m of |
432 | dput XBitTorrent (showPacket id addr " --> " msg) | 432 | Arrival addr msg -> dput XBitTorrent (showPacket id addr " --> " msg) |
433 | _ -> return () | ||
433 | kont m | 434 | kont m |
434 | , sendMessage = \addr msg -> do | 435 | , sendMessage = \addr msg -> do |
435 | dput XBitTorrent (showPacket id addr " <-- " msg) | 436 | dput XBitTorrent (showPacket id addr " <-- " msg) |
@@ -598,7 +599,7 @@ newClient swarms addr = do | |||
598 | -- recursive since 'updateRouting' does not invoke 'awaitMessage' which | 599 | -- recursive since 'updateRouting' does not invoke 'awaitMessage' which |
599 | -- which was modified by 'onInbound'. However, I'm going to avoid the | 600 | -- which was modified by 'onInbound'. However, I'm going to avoid the |
600 | -- mutual reference just to be safe. | 601 | -- mutual reference just to be safe. |
601 | outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } } | 602 | outgoingClient = client { clientNet = net { awaitMessage = pure . ($ Terminated) } } |
602 | 603 | ||
603 | dispatch = DispatchMethods | 604 | dispatch = DispatchMethods |
604 | { classifyInbound = classify -- :: x -> MessageClass err meth tid addr x | 605 | { classifyInbound = classify -- :: x -> MessageClass err meth tid addr x |
diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs index 861792ab..5a313aed 100644 --- a/dht/src/Network/Lossless.hs +++ b/dht/src/Network/Lossless.hs | |||
@@ -59,37 +59,38 @@ lossless isLossless encode saddr udp = do | |||
59 | rloop <- forkIO $ do | 59 | rloop <- forkIO $ do |
60 | -- This thread enqueues inbound packets or writes them to the oob | 60 | -- This thread enqueues inbound packets or writes them to the oob |
61 | -- channel. | 61 | -- channel. |
62 | myThreadId >>= flip labelThread ("lossless."++show saddr) | 62 | fix $ \loop -> join $ atomically $ awaitMessage udp $ \m -> do |
63 | fix $ \loop -> do | 63 | m' <- case m of Terminated -> return Nothing |
64 | awaitMessage udp $ \m -> do | 64 | ParseError e -> return $ Just (Left e) |
65 | m' <- mapM (mapM $ uncurry isLossless) m | 65 | Arrival a x -> Just . Right <$> isLossless x a |
66 | case m' of | 66 | case m' of |
67 | Nothing -> do | 67 | Nothing -> do |
68 | atomically $ writeTChan oob Nothing | 68 | atomically $ writeTChan oob Terminated |
69 | -- Quit thread here. | 69 | -- Quit thread here. |
70 | Just (Left e) -> do | 70 | Just (Left e) -> do |
71 | atomically $ writeTChan oob (Just $ Left e) | 71 | atomically $ writeTChan oob (ParseError e) |
72 | loop | 72 | loop |
73 | Just (Right event) -> do | 73 | Just (Right event) -> do |
74 | atomically $ do | 74 | atomically $ do |
75 | -- x' <- isLossless xaddr x | 75 | -- x' <- isLossless xaddr x |
76 | PB.grokInboundPacket pb event | 76 | PB.grokInboundPacket pb event |
77 | case event of | 77 | case event of |
78 | PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event) | 78 | PacketReceivedLossy {} -> writeTChan oob (uncurry (flip Arrival) $ peReceivedPayload event) |
79 | _ -> do | 79 | _ -> do |
80 | report <- pbReport "enqueued" pb | 80 | report <- pbReport "enqueued" pb |
81 | writeTChan oob (Just $ Left report) | 81 | writeTChan oob (ParseError report) |
82 | loop | 82 | loop |
83 | labelThread rloop ("lossless."++show saddr) | ||
83 | let tr = Transport | 84 | let tr = Transport |
84 | { awaitMessage = \kont -> do | 85 | { awaitMessage = \kont -> |
85 | join $ atomically $ orElse | 86 | orElse |
86 | (do x <- readTChan oob | 87 | (do x <- readTChan oob |
87 | return $ kont $! x) | 88 | return $ kont $! x) |
88 | (do x <- PB.awaitReadyPacket pb | 89 | (do x <- PB.awaitReadyPacket pb |
89 | report <- pbReport "dequeued" pb | 90 | report <- pbReport "dequeued" pb |
90 | return $ do | 91 | return $ do |
91 | atomically $ writeTChan oob (Just $ Left report) | 92 | atomically $ writeTChan oob (ParseError report) |
92 | kont $! Just (Right x)) | 93 | kont $! uncurry (flip Arrival) x) |
93 | , sendMessage = \a' x' -> do | 94 | , sendMessage = \a' x' -> do |
94 | seqno <- atomically $ do | 95 | seqno <- atomically $ do |
95 | seqno <- PB.nextToSendSequenceNumber pb | 96 | seqno <- PB.nextToSendSequenceNumber pb |
@@ -111,9 +112,11 @@ lossless isLossless encode saddr udp = do | |||
111 | when isfull retry | 112 | when isfull retry |
112 | let sendit = sendMessage udp saddr x | 113 | let sendit = sendMessage udp saddr x |
113 | maybe sendit (catchIOError sendit) oops | 114 | maybe sendit (catchIOError sendit) oops |
114 | , closeTransport = do | 115 | , setActive = \case |
115 | atomically $ writeTChan oob Nothing -- quit rloop thread | 116 | False -> do |
116 | closeTransport udp | 117 | atomically $ writeTChan oob Terminated -- quit rloop thread |
118 | setActive udp False | ||
119 | True -> return () | ||
117 | } | 120 | } |
118 | resend ns = do | 121 | resend ns = do |
119 | xs <- atomically $ retrieveForResend pb ns | 122 | xs <- atomically $ retrieveForResend pb ns |
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs index f62fbefe..ba686929 100644 --- a/dht/src/Network/QueryResponse.hs +++ b/dht/src/Network/QueryResponse.hs | |||
@@ -54,15 +54,18 @@ data Arrival err addr x | |||
54 | data TransportA err addr x y = Transport | 54 | data TransportA err addr x y = Transport |
55 | { -- | Blocks until an inbound packet is available. Then calls the provided | 55 | { -- | Blocks until an inbound packet is available. Then calls the provided |
56 | -- continuation with the packet and origin addres or an error condition. | 56 | -- continuation with the packet and origin addres or an error condition. |
57 | awaitMessage :: forall a. (Arrival err addr x -> IO a) -> IO a | 57 | awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a) |
58 | -- | Send an /y/ packet to the given destination /addr/. | 58 | -- | Send an /y/ packet to the given destination /addr/. |
59 | , sendMessage :: addr -> y -> IO () | 59 | , sendMessage :: addr -> y -> IO () |
60 | -- | Shutdown and clean up any state related to this 'Transport'. | 60 | -- | Shutdown and clean up any state related to this 'Transport'. |
61 | , closeTransport :: IO () | 61 | , setActive :: Bool -> IO () |
62 | } | 62 | } |
63 | 63 | ||
64 | type Transport err addr x = TransportA err addr x x | 64 | type Transport err addr x = TransportA err addr x x |
65 | 65 | ||
66 | closeTransport :: TransportA err addr x y -> IO () | ||
67 | closeTransport tr = setActive tr False | ||
68 | |||
66 | -- | This function modifies a 'Transport' to use higher-level addresses and | 69 | -- | This function modifies a 'Transport' to use higher-level addresses and |
67 | -- packet representations. It could be used to change UDP 'ByteString's into | 70 | -- packet representations. It could be used to change UDP 'ByteString's into |
68 | -- bencoded syntax trees or to add an encryption layer in which addresses have | 71 | -- bencoded syntax trees or to add an encryption layer in which addresses have |
@@ -110,7 +113,7 @@ layerTransport parse encode tr = | |||
110 | (\x' addr' -> return $ encode x' addr') | 113 | (\x' addr' -> return $ encode x' addr') |
111 | tr | 114 | tr |
112 | 115 | ||
113 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' | 116 | -- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan' |
114 | -- is used to share the same underlying socket, so be sure to fork a thread for | 117 | -- is used to share the same underlying socket, so be sure to fork a thread for |
115 | -- both returned 'Transport's to avoid hanging. | 118 | -- both returned 'Transport's to avoid hanging. |
116 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | 119 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) |
@@ -118,28 +121,28 @@ partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | |||
118 | -> Transport err a b | 121 | -> Transport err a b |
119 | -> IO (Transport err xaddr x, Transport err a b) | 122 | -> IO (Transport err xaddr x, Transport err a b) |
120 | partitionTransportM parse encodex tr = do | 123 | partitionTransportM parse encodex tr = do |
121 | mvar <- newEmptyMVar | 124 | tchan <- atomically newTChan |
122 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do | 125 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do |
123 | awaitMessage tr $ \m -> case m of | 126 | awaitMessage tr $ \m -> case m of |
124 | Arrival adr msg -> parse (msg,adr) >>= \case | 127 | Arrival adr msg -> parse (msg,adr) >>= \case |
125 | Left (x,xaddr) -> kont $ Arrival xaddr x | 128 | Left (x,xaddr) -> kont $ Arrival xaddr x |
126 | Right y -> putMVar mvar (Just y) >> again | 129 | Right y -> atomically (writeTChan tchan (Just y)) >> join (atomically again) |
127 | ParseError e -> kont $ ParseError e | 130 | ParseError e -> kont $ ParseError e |
128 | Terminated -> putMVar mvar Nothing >> kont Terminated | 131 | Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated |
129 | , sendMessage = \addr' msg' -> do | 132 | , sendMessage = \addr' msg' -> do |
130 | msg_addr <- encodex (msg',addr') | 133 | msg_addr <- encodex (msg',addr') |
131 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr | 134 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr |
132 | } | 135 | } |
133 | ytr = Transport | 136 | ytr = Transport |
134 | { awaitMessage = \kont -> takeMVar mvar >>= kont . \case | 137 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case |
135 | Nothing -> Terminated | 138 | Nothing -> Terminated |
136 | Just (y,yaddr) -> Arrival yaddr y | 139 | Just (y,yaddr) -> Arrival yaddr y |
137 | , sendMessage = sendMessage tr | 140 | , sendMessage = sendMessage tr |
138 | , closeTransport = return () | 141 | , setActive = const $ return () |
139 | } | 142 | } |
140 | return (xtr, ytr) | 143 | return (xtr, ytr) |
141 | 144 | ||
142 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' | 145 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan' |
143 | -- is used to share the same underlying socket, so be sure to fork a thread for | 146 | -- is used to share the same underlying socket, so be sure to fork a thread for |
144 | -- both returned 'Transport's to avoid hanging. | 147 | -- both returned 'Transport's to avoid hanging. |
145 | partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) | 148 | partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) |
@@ -157,14 +160,14 @@ partitionAndForkTransport :: | |||
157 | -> Transport err a b | 160 | -> Transport err a b |
158 | -> IO (Transport err xaddr x, Transport err a b) | 161 | -> IO (Transport err xaddr x, Transport err a b) |
159 | partitionAndForkTransport forkedSend parse encodex tr = do | 162 | partitionAndForkTransport forkedSend parse encodex tr = do |
160 | mvar <- newEmptyMVar | 163 | tchan <- atomically newTChan |
161 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do | 164 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do |
162 | awaitMessage tr $ \case | 165 | awaitMessage tr $ \case |
163 | Arrival a b -> parse (b,a) >>= \case | 166 | Arrival a b -> parse (b,a) >>= \case |
164 | Left (x,xaddr) -> kont $ Arrival xaddr x | 167 | Left (x,xaddr) -> kont $ Arrival xaddr x |
165 | Right (b,a) -> putMVar mvar (Arrival a b) >> again | 168 | Right (b,a) -> atomically (writeTChan tchan (Arrival a b)) >> join (atomically again) |
166 | ParseError e -> kont $ ParseError e | 169 | ParseError e -> kont $ ParseError e |
167 | Terminated -> putMVar mvar Terminated >> kont Terminated | 170 | Terminated -> atomically (writeTChan tchan Terminated) >> kont Terminated |
168 | , sendMessage = \addr' msg' -> do | 171 | , sendMessage = \addr' msg' -> do |
169 | msg_addr <- encodex (msg',addr') | 172 | msg_addr <- encodex (msg',addr') |
170 | case msg_addr of | 173 | case msg_addr of |
@@ -173,9 +176,9 @@ partitionAndForkTransport forkedSend parse encodex tr = do | |||
173 | Nothing -> return () | 176 | Nothing -> return () |
174 | } | 177 | } |
175 | ytr = Transport | 178 | ytr = Transport |
176 | { awaitMessage = \kont -> takeMVar mvar >>= kont | 179 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont |
177 | , sendMessage = sendMessage tr | 180 | , sendMessage = sendMessage tr |
178 | , closeTransport = return () | 181 | , setActive = \_ -> return () |
179 | } | 182 | } |
180 | return (xtr, ytr) | 183 | return (xtr, ytr) |
181 | 184 | ||
@@ -186,7 +189,7 @@ partitionAndForkTransport forkedSend parse encodex tr = do | |||
186 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x | 189 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x |
187 | addHandler onParseError f tr = tr | 190 | addHandler onParseError f tr = tr |
188 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case | 191 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case |
189 | Arrival addr x -> f addr x >>= maybe eat (kont . Arrival addr . ($ x)) | 192 | Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) |
190 | ParseError e -> onParseError e >> kont (ParseError e) | 193 | ParseError e -> onParseError e >> kont (ParseError e) |
191 | Terminated -> kont Terminated | 194 | Terminated -> kont Terminated |
192 | } | 195 | } |
@@ -210,14 +213,15 @@ onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return ( | |||
210 | -- > quitServer | 213 | -- > quitServer |
211 | forkListener :: String -> Transport err addr x -> IO (IO ()) | 214 | forkListener :: String -> Transport err addr x -> IO (IO ()) |
212 | forkListener name client = do | 215 | forkListener name client = do |
216 | setActive client True | ||
213 | thread_id <- forkIO $ do | 217 | thread_id <- forkIO $ do |
214 | myThreadId >>= flip labelThread ("listener."++name) | 218 | myThreadId >>= flip labelThread ("listener."++name) |
215 | fix $ \loop -> awaitMessage client $ \case | 219 | fix $ \loop -> join $ atomically $ awaitMessage client $ \case |
216 | Terminated -> return () | 220 | Terminated -> return () |
217 | _ -> loop | 221 | _ -> loop |
218 | dput XMisc $ "Listener died: " ++ name | 222 | dput XMisc $ "Listener died: " ++ name |
219 | return $ do | 223 | return $ do |
220 | closeTransport client | 224 | setActive client False |
221 | -- killThread thread_id | 225 | -- killThread thread_id |
222 | 226 | ||
223 | -- * Implementing a query\/response 'Client'. | 227 | -- * Implementing a query\/response 'Client'. |
@@ -606,11 +610,11 @@ udpTransport' bind_address = do | |||
606 | setSocketOption sock Broadcast 1 | 610 | setSocketOption sock Broadcast 1 |
607 | bind sock bind_address | 611 | bind sock bind_address |
608 | isClosed <- newEmptyMVar | 612 | isClosed <- newEmptyMVar |
613 | udpTChan <- atomically newTChan | ||
609 | let tr = Transport { | 614 | let tr = Transport { |
610 | awaitMessage = \kont -> do | 615 | awaitMessage = \kont -> do |
611 | r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do | 616 | r <- readTChan udpTChan |
612 | uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize | 617 | return $ kont $! r |
613 | kont $! r | ||
614 | , sendMessage = case family of | 618 | , sendMessage = case family of |
615 | AF_INET6 -> \case | 619 | AF_INET6 -> \case |
616 | (SockAddrInet port addr) -> \bs -> | 620 | (SockAddrInet port addr) -> \bs -> |
@@ -626,7 +630,8 @@ udpTransport' bind_address = do | |||
626 | addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) | 630 | addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) |
627 | addr4 -> \bs -> saferSendTo sock bs addr4 | 631 | addr4 -> \bs -> saferSendTo sock bs addr4 |
628 | _ -> \addr bs -> saferSendTo sock bs addr | 632 | _ -> \addr bs -> saferSendTo sock bs addr |
629 | , closeTransport = do | 633 | , setActive = \case |
634 | False -> do | ||
630 | dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address | 635 | dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address |
631 | tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. | 636 | tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. |
632 | #if MIN_VERSION_network (3,1,0) | 637 | #if MIN_VERSION_network (3,1,0) |
@@ -639,6 +644,14 @@ udpTransport' bind_address = do | |||
639 | let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () | 644 | let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () |
640 | -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. | 645 | -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. |
641 | closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) | 646 | closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) |
647 | True -> do | ||
648 | udpThread <- forkIO $ fix $ \again -> do | ||
649 | r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do | ||
650 | uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize | ||
651 | atomically $ writeTChan udpTChan r | ||
652 | case r of Terminated -> return () | ||
653 | _ -> again | ||
654 | labelThread udpThread ("udp.io."++show bind_address) | ||
642 | } | 655 | } |
643 | return (tr, sock) | 656 | return (tr, sock) |
644 | 657 | ||
@@ -652,13 +665,15 @@ udpTransport bind_address = fst <$> udpTransport' bind_address | |||
652 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x | 665 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x |
653 | chanTransport chanFromAddr self achan aclosed = Transport | 666 | chanTransport chanFromAddr self achan aclosed = Transport |
654 | { awaitMessage = \kont -> do | 667 | { awaitMessage = \kont -> do |
655 | x <- atomically $ (uncurry (flip Arrival) <$> readTChan achan) | 668 | x <- (uncurry (flip Arrival) <$> readTChan achan) |
656 | `orElse` | 669 | `orElse` |
657 | (readTVar aclosed >>= check >> return Terminated) | 670 | (readTVar aclosed >>= check >> return Terminated) |
658 | kont x | 671 | return $ kont x |
659 | , sendMessage = \them bs -> do | 672 | , sendMessage = \them bs -> do |
660 | atomically $ writeTChan (chanFromAddr them) (bs,self) | 673 | atomically $ writeTChan (chanFromAddr them) (bs,self) |
661 | , closeTransport = atomically $ writeTVar aclosed True | 674 | , setActive = \case |
675 | False -> atomically $ writeTVar aclosed True | ||
676 | True -> return () | ||
662 | } | 677 | } |
663 | 678 | ||
664 | -- | Returns a pair of transports linked together to simulate two computers talking to each other. | 679 | -- | Returns a pair of transports linked together to simulate two computers talking to each other. |
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs index 67c19512..0028a5b6 100644 --- a/dht/src/Network/QueryResponse/TCP.hs +++ b/dht/src/Network/QueryResponse/TCP.hs | |||
@@ -1,5 +1,6 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
1 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | 2 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} |
2 | {-# LANGUAGE CPP #-} | 3 | {-# LANGUAGE LambdaCase #-} |
3 | module Network.QueryResponse.TCP where | 4 | module Network.QueryResponse.TCP where |
4 | 5 | ||
5 | #ifdef THREAD_DEBUG | 6 | #ifdef THREAD_DEBUG |
@@ -11,6 +12,7 @@ import GHC.Conc (labelThread) | |||
11 | 12 | ||
12 | import Control.Arrow | 13 | import Control.Arrow |
13 | import Control.Concurrent.STM | 14 | import Control.Concurrent.STM |
15 | import Control.Concurrent.STM.TMVar | ||
14 | import Control.Monad | 16 | import Control.Monad |
15 | import Data.ByteString (ByteString,hPut) | 17 | import Data.ByteString (ByteString,hPut) |
16 | import Data.Function | 18 | import Data.Function |
@@ -78,7 +80,7 @@ showStat r = case r of PendingTCPSession -> "pending." | |||
78 | tcp_timeout :: Int | 80 | tcp_timeout :: Int |
79 | tcp_timeout = 10000000 | 81 | tcp_timeout = 10000000 |
80 | 82 | ||
81 | acquireConnection :: MVar (Maybe (Either a (x, addr))) | 83 | acquireConnection :: TMVar (Arrival a addr x) |
82 | -> TCPCache (SessionProtocol x y) | 84 | -> TCPCache (SessionProtocol x y) |
83 | -> StreamHandshake addr x y | 85 | -> StreamHandshake addr x y |
84 | -> addr | 86 | -> addr |
@@ -131,10 +133,10 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
131 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x | 133 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x |
132 | case x of | 134 | case x of |
133 | Just u -> do | 135 | Just u -> do |
134 | m <- timeout tcp_timeout $ putMVar mvar $ Just $ Right (u, addr) | 136 | m <- timeout tcp_timeout $ atomically (putTMVar mvar $ Arrival addr u) |
135 | when (isNothing m) $ do | 137 | when (isNothing m) $ do |
136 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." | 138 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." |
137 | tryTakeMVar mvar | 139 | atomically $ tryTakeTMVar mvar |
138 | return () | 140 | return () |
139 | loop | 141 | loop |
140 | Nothing -> do | 142 | Nothing -> do |
@@ -206,14 +208,16 @@ tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | |||
206 | -> StreamHandshake addr x y | 208 | -> StreamHandshake addr x y |
207 | -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) | 209 | -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) |
208 | tcpTransport maxcon stream = do | 210 | tcpTransport maxcon stream = do |
209 | msgvar <- newEmptyMVar | 211 | msgvar <- atomically newEmptyTMVar |
210 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | 212 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) |
211 | return $ (,) tcpcache Transport | 213 | return $ (,) tcpcache Transport |
212 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) | 214 | { awaitMessage = \f -> takeTMVar msgvar >>= \x -> return $ do |
215 | f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Terminated) | ||
213 | , sendMessage = \addr (bDoCon,y) -> do | 216 | , sendMessage = \addr (bDoCon,y) -> do |
214 | void . forkLabeled "tcp-send" $ do | 217 | void . forkLabeled "tcp-send" $ do |
215 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon | 218 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon |
216 | mapM_ ($ y) msock | 219 | mapM_ ($ y) msock |
217 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e | 220 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e |
218 | , closeTransport = closeAll tcpcache stream >> putMVar msgvar Nothing | 221 | , setActive = \case False -> closeAll tcpcache stream >> atomically (putTMVar msgvar Terminated) |
222 | True -> return () | ||
219 | } | 223 | } |
diff --git a/dht/src/Network/SessionTransports.hs b/dht/src/Network/SessionTransports.hs index e9daf6c1..b36fbcfd 100644 --- a/dht/src/Network/SessionTransports.hs +++ b/dht/src/Network/SessionTransports.hs | |||
@@ -1,3 +1,4 @@ | |||
1 | {-# LANGUAGE LambdaCase #-} | ||
1 | {-# LANGUAGE NamedFieldPuns #-} | 2 | {-# LANGUAGE NamedFieldPuns #-} |
2 | module Network.SessionTransports | 3 | module Network.SessionTransports |
3 | ( Sessions | 4 | ( Sessions |
@@ -8,6 +9,7 @@ module Network.SessionTransports | |||
8 | 9 | ||
9 | import Control.Concurrent | 10 | import Control.Concurrent |
10 | import Control.Concurrent.STM | 11 | import Control.Concurrent.STM |
12 | import Control.Concurrent.STM.TMVar | ||
11 | import Control.Monad | 13 | import Control.Monad |
12 | import qualified Data.IntMap.Strict as IntMap | 14 | import qualified Data.IntMap.Strict as IntMap |
13 | ;import Data.IntMap.Strict (IntMap) | 15 | ;import Data.IntMap.Strict (IntMap) |
@@ -51,14 +53,14 @@ newSession :: Sessions raw | |||
51 | -> SockAddr | 53 | -> SockAddr |
52 | -> IO (Maybe (Int,TransportA err addr x y)) | 54 | -> IO (Maybe (Int,TransportA err addr x y)) |
53 | newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwrap wrap addr0 = do | 55 | newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwrap wrap addr0 = do |
54 | mvar <- newEmptyMVar | 56 | mvar <- atomically newEmptyTMVar |
55 | let saddr = -- Canonical in case of 6-mapped-4 addresses. | 57 | let saddr = -- Canonical in case of 6-mapped-4 addresses. |
56 | either id id $ either4or6 addr0 | 58 | either id id $ either4or6 addr0 |
57 | handlePacket x = do | 59 | handlePacket x = do |
58 | m <- wrap saddr x | 60 | m <- wrap saddr x |
59 | case m of | 61 | case m of |
60 | Nothing -> return False | 62 | Nothing -> return False |
61 | Just x' -> do putMVar mvar $! Just $! x' | 63 | Just x' -> do atomically $ putTMVar mvar $! Just $! x' |
62 | return True | 64 | return True |
63 | msid <- atomically $ do | 65 | msid <- atomically $ do |
64 | msid <- S.nearestOutsider 0 <$> readTVar sessionIds | 66 | msid <- S.nearestOutsider 0 <$> readTVar sessionIds |
@@ -69,22 +71,25 @@ newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwr | |||
69 | $ IntMap.singleton sid handlePacket | 71 | $ IntMap.singleton sid handlePacket |
70 | return sid | 72 | return sid |
71 | forM msid $ \sid -> do | 73 | forM msid $ \sid -> do |
72 | let tr = Transport | 74 | let tr = Transport |
73 | { awaitMessage = \kont -> do | 75 | { awaitMessage = \kont -> do |
74 | x <- takeMVar mvar | 76 | x <- takeTMVar mvar |
75 | kont $! Right <$> x | 77 | return $ kont $! maybe Terminated (uncurry $ flip Arrival) x |
76 | , sendMessage = \addr x -> do | 78 | , sendMessage = \addr x -> do |
77 | x' <- unwrap addr x | 79 | x' <- unwrap addr x |
78 | sessionsSendRaw saddr x' | 80 | sessionsSendRaw saddr x' |
79 | , closeTransport = do | 81 | , setActive = \case |
80 | tryTakeMVar mvar | 82 | False -> do |
81 | putMVar mvar Nothing | 83 | atomically $ do |
82 | atomically $ do | 84 | tryTakeTMVar mvar |
83 | modifyTVar' sessionIds $ S.delete sid | 85 | putTMVar mvar Nothing |
84 | modifyTVar' sessionsById $ IntMap.delete sid | 86 | atomically $ do |
85 | modifyTVar' sessionsByAddr $ Map.alter (rmSession sid) saddr | 87 | modifyTVar' sessionIds $ S.delete sid |
88 | modifyTVar' sessionsById $ IntMap.delete sid | ||
89 | modifyTVar' sessionsByAddr $ Map.alter (rmSession sid) saddr | ||
90 | True -> return () | ||
86 | } | 91 | } |
87 | return (sid,tr) | 92 | return (sid,tr) |
88 | 93 | ||
89 | sessionHandler :: Sessions x -> (SockAddr -> x -> IO (Maybe (x -> x))) | 94 | sessionHandler :: Sessions x -> (SockAddr -> x -> IO (Maybe (x -> x))) |
90 | sessionHandler Sessions{sessionsByAddr} = \addr0 x -> do | 95 | sessionHandler Sessions{sessionsByAddr} = \addr0 x -> do |
diff --git a/dht/src/Network/Tox.hs b/dht/src/Network/Tox.hs index 61a1d117..4b66cfc8 100644 --- a/dht/src/Network/Tox.hs +++ b/dht/src/Network/Tox.hs | |||
@@ -222,10 +222,12 @@ isLocalHost _ = False | |||
222 | addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString | 222 | addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString |
223 | addVerbosity tr = | 223 | addVerbosity tr = |
224 | tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do | 224 | tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do |
225 | forM_ m $ mapM_ $ \(msg,addr) -> do | 225 | case m of |
226 | Arrival addr msg -> do | ||
226 | when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x82,0x8c,0x8d])) $ do | 227 | when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x82,0x8c,0x8d])) $ do |
227 | mapM_ (\x -> dput XMisc ( (show addr) ++ " --> " ++ x)) | 228 | mapM_ (\x -> dput XMisc ( (show addr) ++ " --> " ++ x)) |
228 | $ xxd 0 msg | 229 | $ xxd 0 msg |
230 | _ -> return () | ||
229 | kont m | 231 | kont m |
230 | , sendMessage = \addr msg -> do | 232 | , sendMessage = \addr msg -> do |
231 | when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x8c,0x8d])) $ do | 233 | when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x8c,0x8d])) $ do |
diff --git a/dht/src/Network/Tox/AggregateSession.hs b/dht/src/Network/Tox/AggregateSession.hs index 8c728660..999c7399 100644 --- a/dht/src/Network/Tox/AggregateSession.hs +++ b/dht/src/Network/Tox/AggregateSession.hs | |||
@@ -188,21 +188,21 @@ forkSession c s setStatus = forkIO $ do | |||
188 | now <- getPOSIXTime | 188 | now <- getPOSIXTime |
189 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15) | 189 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15) |
190 | 190 | ||
191 | onPacket body loop Nothing = return () | 191 | onPacket body loop Terminated = return () |
192 | onPacket body loop (Just (Left e)) = inPrint e >> loop | 192 | onPacket body loop (ParseError e) = inPrint e >> loop |
193 | onPacket body loop (Just (Right x)) = body loop x | 193 | onPacket body loop (Arrival _ x) = body loop x |
194 | 194 | ||
195 | awaitPacket body = fix $ awaitMessage (sTransport s) . onPacket body | 195 | awaitPacket body = fix $ join . atomically . awaitMessage (sTransport s) . onPacket body |
196 | 196 | ||
197 | atomically $ setStatus $ InProgress AwaitingSessionPacket | 197 | atomically $ setStatus $ InProgress AwaitingSessionPacket |
198 | awaitPacket $ \_ (online,()) -> do | 198 | awaitPacket $ \_ online -> do |
199 | when (msgID online /= M ONLINE) $ do | 199 | when (msgID online /= M ONLINE) $ do |
200 | inPrint $ "Unexpected initial packet: " ++ show (msgID online) | 200 | inPrint $ "Unexpected initial packet: " ++ show (msgID online) |
201 | atomically $ do setStatus Established | 201 | atomically $ do setStatus Established |
202 | sendPacket online | 202 | sendPacket online |
203 | bump | 203 | bump |
204 | beacon <- forkIO $ keepAlive s q | 204 | beacon <- forkIO $ keepAlive s q |
205 | awaitPacket $ \awaitNext (x,()) -> do | 205 | awaitPacket $ \awaitNext x -> do |
206 | bump | 206 | bump |
207 | case msgID x of | 207 | case msgID x of |
208 | M ALIVE -> return () | 208 | M ALIVE -> return () |
diff --git a/dht/src/Network/Tox/DHT/Transport.hs b/dht/src/Network/Tox/DHT/Transport.hs index 7475b3b1..7414343d 100644 --- a/dht/src/Network/Tox/DHT/Transport.hs +++ b/dht/src/Network/Tox/DHT/Transport.hs | |||
@@ -59,7 +59,7 @@ import GHC.Generics | |||
59 | import Network.Socket | 59 | import Network.Socket |
60 | 60 | ||
61 | type DHTTransport = Transport String NodeInfo (DHTMessage Encrypted8) | 61 | type DHTTransport = Transport String NodeInfo (DHTMessage Encrypted8) |
62 | type HandleHi a = Maybe (Either String (DHTMessage Encrypted8, NodeInfo)) -> IO a | 62 | type HandleHi a = Arrival String NodeInfo (DHTMessage Encrypted8) -> IO a |
63 | 63 | ||
64 | 64 | ||
65 | data DHTMessage (f :: * -> *) | 65 | data DHTMessage (f :: * -> *) |
@@ -399,13 +399,13 @@ instance Serialize CookieRequest where | |||
399 | forwardDHTRequests :: TransportCrypto -> (PublicKey -> IO (Maybe NodeInfo)) -> DHTTransport -> DHTTransport | 399 | forwardDHTRequests :: TransportCrypto -> (PublicKey -> IO (Maybe NodeInfo)) -> DHTTransport -> DHTTransport |
400 | forwardDHTRequests crypto closeLookup dht = dht { awaitMessage = await' } | 400 | forwardDHTRequests crypto closeLookup dht = dht { awaitMessage = await' } |
401 | where | 401 | where |
402 | await' :: HandleHi a -> IO a | 402 | await' :: HandleHi a -> STM (IO a) |
403 | await' pass = awaitMessage dht $ \case | 403 | await' pass = awaitMessage dht $ \case |
404 | Just (Right (m@(DHTDHTRequest target payload),src)) | target /= transportPublic crypto | 404 | Arrival src m@(DHTDHTRequest target payload) | target /= transportPublic crypto |
405 | -> do mni <- closeLookup target | 405 | -> do mni <- closeLookup target |
406 | -- Forward the message if the target is in our close list. | 406 | -- Forward the message if the target is in our close list. |
407 | forM_ mni $ \ni -> sendMessage dht ni m | 407 | forM_ mni $ \ni -> sendMessage dht ni m |
408 | await' pass | 408 | join $ atomically (await' pass) |
409 | m -> pass m | 409 | m -> pass m |
410 | 410 | ||
411 | encrypt :: TransportCrypto -> DHTMessage ((,) Nonce8) -> NodeInfo -> IO (DHTMessage Encrypted8, NodeInfo) | 411 | encrypt :: TransportCrypto -> DHTMessage ((,) Nonce8) -> NodeInfo -> IO (DHTMessage Encrypted8, NodeInfo) |