summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-16 21:50:34 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-17 00:25:22 -0500
commit6f72701a1f67132649236513959791d8ff4a884f (patch)
treeb0ad7792014c164afce00bb9ffb45d86318cc8be
parentf669ed747e7c4eb37c4a6821e0eaaadefbab9d3b (diff)
Improved netcrypto session cleanup.
-rw-r--r--dht/ToxManager.hs4
-rw-r--r--dht/src/Data/Tox/DHT/Multi.hs10
-rw-r--r--dht/src/Network/Lossless.hs27
-rw-r--r--dht/src/Network/Tox/AggregateSession.hs27
-rw-r--r--dht/src/Network/Tox/Session.hs6
-rw-r--r--server/src/Network/StreamServer.hs5
6 files changed, 51 insertions, 28 deletions
diff --git a/dht/ToxManager.hs b/dht/ToxManager.hs
index b0990430..25a4f0f2 100644
--- a/dht/ToxManager.hs
+++ b/dht/ToxManager.hs
@@ -709,6 +709,8 @@ forkAccountWatcher :: TVar (Map.Map Uniq24 AggregateSession)
709 -> TCP.RelayClient 709 -> TCP.RelayClient
710 -> Account JabberClients -> Tox JabberClients -> PresenceState Pending -> Announcer -> IO ThreadId 710 -> Account JabberClients -> Tox JabberClients -> PresenceState Pending -> Announcer -> IO ThreadId
711forkAccountWatcher ssvar tcp acc tox st announcer = forkIO $ do 711forkAccountWatcher ssvar tcp acc tox st announcer = forkIO $ do
712 let me = key2id $ toPublic $ userSecret acc
713 dput XMan $ "forkAccountWatcher(" ++ show me ++") started"
712 myThreadId >>= flip labelThread ("online:" 714 myThreadId >>= flip labelThread ("online:"
713 ++ show (key2id $ toPublic $ userSecret acc)) 715 ++ show (key2id $ toPublic $ userSecret acc))
714 (chan,cs) <- atomically $ do 716 (chan,cs) <- atomically $ do
@@ -738,10 +740,10 @@ forkAccountWatcher ssvar tcp acc tox st announcer = forkIO $ do
738 740
739 -- Stop tasks associated with each contact for this account. 741 -- Stop tasks associated with each contact for this account.
740 cs <- atomically $ readTVar (contacts acc) 742 cs <- atomically $ readTVar (contacts acc)
741 let me = key2id $ toPublic $ userSecret acc
742 forM_ (HashMap.toList cs) $ \(them,c) -> do 743 forM_ (HashMap.toList cs) $ \(them,c) -> do
743 stopConnecting tx (id2key them) "disabled account" 744 stopConnecting tx (id2key them) "disabled account"
744 closeSessions me them ssvar 745 closeSessions me them ssvar
746 dput XMan $ "forkAccountWatcher(" ++ show me ++") stopped"
745 747
746 748
747toxAnnounceInterval :: POSIXTime 749toxAnnounceInterval :: POSIXTime
diff --git a/dht/src/Data/Tox/DHT/Multi.hs b/dht/src/Data/Tox/DHT/Multi.hs
index d31ae4b8..878b47e6 100644
--- a/dht/src/Data/Tox/DHT/Multi.hs
+++ b/dht/src/Data/Tox/DHT/Multi.hs
@@ -13,7 +13,7 @@ import qualified Network.Tox.NodeId as UDP
13import qualified Network.Tox.TCP.NodeId as TCP 13import qualified Network.Tox.TCP.NodeId as TCP
14import Data.Tox.Onion (OnionDestination,RouteId) 14import Data.Tox.Onion (OnionDestination,RouteId)
15import Data.Tox.Relay hiding (NodeInfo) 15import Data.Tox.Relay hiding (NodeInfo)
16import Network.Address (either4or6) 16import Network.Address as SockAddr (canonize)
17import Network.Tox.TCP as TCP (ViaRelay(..), tcpConnectionRequest_) 17import Network.Tox.TCP as TCP (ViaRelay(..), tcpConnectionRequest_)
18import Network.QueryResponse as QR (Tagged(..), Client) 18import Network.QueryResponse as QR (Tagged(..), Client)
19 19
@@ -88,7 +88,7 @@ untagOnion (OnionTCP :=> Identity o) = o
88 88
89-- Canonical in case of 6-mapped-4 addresses. 89-- Canonical in case of 6-mapped-4 addresses.
90canonize :: DSum S Identity -> DSum S Identity 90canonize :: DSum S Identity -> DSum S Identity
91canonize (SessionUDP :=> Identity saddr) = SessionUDP ==> either id id (either4or6 saddr) 91canonize (SessionUDP :=> Identity saddr) = SessionUDP ==> SockAddr.canonize saddr
92canonize taddr = taddr 92canonize taddr = taddr
93 93
94type NodeInfo = DSum T Identity 94type NodeInfo = DSum T Identity
@@ -144,3 +144,9 @@ tcpConnectionRequest :: QR.Client err PacketNumber tid TCP.NodeInfo (Bool, Relay
144tcpConnectionRequest client pubkey ni = do 144tcpConnectionRequest client pubkey ni = do
145 mcon <- tcpConnectionRequest_ client pubkey ni 145 mcon <- tcpConnectionRequest_ client pubkey ni
146 return $ fmap (\conid -> TCP ==> ViaRelay (Just conid) (UDP.key2id pubkey) ni) mcon 146 return $ fmap (\conid -> TCP ==> ViaRelay (Just conid) (UDP.key2id pubkey) ni) mcon
147
148showSessionAddr :: SessionAddress -> String
149showSessionAddr (SessionUDP :=> Identity udp) =
150 show (SockAddr.canonize udp)
151showSessionAddr (SessionTCP :=> Identity (ViaRelay mcon _ tcp)) =
152 "TCP:" ++ maybe "" (\(ConId con) -> "(" ++ show con ++ ")") mcon ++ show tcp
diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs
index 5a313aed..079f4d07 100644
--- a/dht/src/Network/Lossless.hs
+++ b/dht/src/Network/Lossless.hs
@@ -9,6 +9,7 @@
9{-# LANGUAGE TupleSections #-} 9{-# LANGUAGE TupleSections #-}
10module Network.Lossless where 10module Network.Lossless where
11 11
12import Control.Concurrent.STM
12import Control.Concurrent.STM.TChan 13import Control.Concurrent.STM.TChan
13import Control.Monad 14import Control.Monad
14import Control.Monad.STM 15import Control.Monad.STM
@@ -42,7 +43,8 @@ data OutgoingInfo y = OutgoingInfo
42 43
43-- | Obtain a reliable transport form an unreliable one. 44-- | Obtain a reliable transport form an unreliable one.
44lossless :: Show addr => 45lossless :: Show addr =>
45 (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets. 46 String -- ^ Label for debugging.
47 -> (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets.
46 -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets. 48 -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets.
47 -> addr -- ^ The remote address for this session. 49 -> addr -- ^ The remote address for this session.
48 -> TransportA String addr x y -- ^ An unreliable lossy transport. 50 -> TransportA String addr x y -- ^ An unreliable lossy transport.
@@ -51,7 +53,7 @@ lossless :: Show addr =>
51 , [Word32] -> IO () -- ^ Use this to request lost packets be re-sent. 53 , [Word32] -> IO () -- ^ Use this to request lost packets be re-sent.
52 , IO ([Word32],Word32) -- ^ Use this to discover missing packets to request. 54 , IO ([Word32],Word32) -- ^ Use this to discover missing packets to request.
53 ) 55 )
54lossless isLossless encode saddr udp = do 56lossless lbl isLossless encode saddr udp = do
55 pb <- atomically newPacketBuffer 57 pb <- atomically newPacketBuffer
56 oob <- atomically newTChan -- Out-of-band channel, these packets (or 58 oob <- atomically newTChan -- Out-of-band channel, these packets (or
57 -- errors) bypass the packet buffer to be 59 -- errors) bypass the packet buffer to be
@@ -80,11 +82,14 @@ lossless isLossless encode saddr udp = do
80 report <- pbReport "enqueued" pb 82 report <- pbReport "enqueued" pb
81 writeTChan oob (ParseError report) 83 writeTChan oob (ParseError report)
82 loop 84 loop
83 labelThread rloop ("lossless."++show saddr) 85 labelThread rloop ("lossless."++lbl)
86 term <- newTVarIO retry -- In case awaitMessage is called multiple times beyond termination,
87 -- we will use this STM action stop it from waiting on the oob TChan.
88 -- XXX: This shouldn't be neccessary and might be costly.
84 let tr = Transport 89 let tr = Transport
85 { awaitMessage = \kont -> 90 { awaitMessage = \kont ->
86 orElse 91 orElse
87 (do x <- readTChan oob 92 (do x <- readTChan oob `orElse` join (readTVar term)
88 return $ kont $! x) 93 return $ kont $! x)
89 (do x <- PB.awaitReadyPacket pb 94 (do x <- PB.awaitReadyPacket pb
90 report <- pbReport "dequeued" pb 95 report <- pbReport "dequeued" pb
@@ -100,13 +105,13 @@ lossless isLossless encode saddr udp = do
100 (isfull,nn) <- 105 (isfull,nn) <-
101 if islossy 106 if islossy
102 then do 107 then do
103 dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno 108 dput XNetCrypto $ mappend lbl $ " <-- Lossy packet " ++ show seqno
104 return (False,(0,0)) -- avoid updating seqno on lossy packets. 109 return (False,(0,0)) -- avoid updating seqno on lossy packets.
105 else do 110 else do
106 dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno 111 dput XNetCrypto $ mappend lbl $ " <-- Lossless packet " ++ show seqno
107 atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) 112 atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
108 when isfull $ do 113 when isfull $ do
109 dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) 114 dput XNetCrypto $ mappend lbl $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno)
110 atomically $ do 115 atomically $ do
111 (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) 116 (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
112 when isfull retry 117 when isfull retry
@@ -114,14 +119,16 @@ lossless isLossless encode saddr udp = do
114 maybe sendit (catchIOError sendit) oops 119 maybe sendit (catchIOError sendit) oops
115 , setActive = \case 120 , setActive = \case
116 False -> do 121 False -> do
117 atomically $ writeTChan oob Terminated -- quit rloop thread 122 atomically $ do
123 writeTChan oob Terminated -- quit rloop thread
124 writeTVar term (return Terminated)
118 setActive udp False 125 setActive udp False
119 True -> return () 126 True -> return ()
120 } 127 }
121 resend ns = do 128 resend ns = do
122 xs <- atomically $ retrieveForResend pb ns 129 xs <- atomically $ retrieveForResend pb ns
123 dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets." 130 dput XNetCrypto $ mappend lbl $ " <-- Resending " ++ show (length xs) ++ " packets."
124 forM_ xs $ \x -> do 131 forM_ xs $ \x -> do
125 dput XNetCrypto $ shows saddr $ " <-- Resending packet." 132 dput XNetCrypto $ mappend lbl $ " <-- Resending packet."
126 sendMessage udp saddr . snd $ x 133 sendMessage udp saddr . snd $ x
127 return (tr, resend, atomically $ PB.packetNumbersToRequest pb) 134 return (tr, resend, atomically $ PB.packetNumbersToRequest pb)
diff --git a/dht/src/Network/Tox/AggregateSession.hs b/dht/src/Network/Tox/AggregateSession.hs
index 9a784291..d1f42e91 100644
--- a/dht/src/Network/Tox/AggregateSession.hs
+++ b/dht/src/Network/Tox/AggregateSession.hs
@@ -24,6 +24,7 @@ module Network.Tox.AggregateSession
24 24
25import Control.Concurrent.STM 25import Control.Concurrent.STM
26import Control.Concurrent.STM.TMChan 26import Control.Concurrent.STM.TMChan
27import Control.Exception
27import Control.Monad 28import Control.Monad
28import Data.Dependent.Sum 29import Data.Dependent.Sum
29import Data.Function 30import Data.Function
@@ -114,7 +115,7 @@ data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it.
114 | DoRequestMissing -- ^ Detect and request lost packets. 115 | DoRequestMissing -- ^ Detect and request lost packets.
115 deriving Enum 116 deriving Enum
116 117
117-- | This call loops until the provided sesison is closed or times out. It 118-- | This call loops until the provided session is closed or times out. It
118-- monitors the provided (non-empty) priority queue for scheduled tasks (see 119-- monitors the provided (non-empty) priority queue for scheduled tasks (see
119-- 'KeepAliveEvents') to perform for the connection. 120-- 'KeepAliveEvents') to perform for the connection.
120keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO () 121keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO ()
@@ -142,23 +143,23 @@ keepAlive s q = do
142 now <- getPOSIXTime 143 now <- getPOSIXTime
143 atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) now 144 atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) now
144 145
145 re tm again e io = do 146 re tm e io = do
146 io 147 io
147 atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm 148 atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm
148 again
149 149
150 doEvent again now e = case e of 150 doEvent again now e = case e of
151 DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s) 151 DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s)
152 sClose s 152 sClose s
153 DoAlive -> re (now + 10) again e doAlive 153 DoAlive -> re (now + 10) e doAlive >> again
154 DoRequestMissing -> re (now + 5) again e doRequestMissing -- tox-core does this at 1 second intervals 154 DoRequestMissing -> re (now + 5{- toxcore uses 1sec -}) e doRequestMissing >> again
155 155
156 fix $ \again -> do 156 fix $ \again -> do
157 157
158 now <- getPOSIXTime 158 now <- getPOSIXTime
159 join $ atomically $ do 159 join $ atomically $ do
160 PSQ.findMin <$> readTVar q >>= \case 160 PSQ.findMin <$> readTVar q >>= \case
161 Nothing -> error "keepAlive: unexpected empty PSQ." 161 Nothing -> return $ do dput XUnexpected "keepAlive: unexpected empty PSQ."
162 sClose s
162 Just ( k :-> tm ) -> 163 Just ( k :-> tm ) ->
163 return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again 164 return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again
164 else doEvent again now (toEnum k) 165 else doEvent again now (toEnum k)
@@ -202,7 +203,7 @@ forkSession c s setStatus = forkIO $ do
202 atomically $ do setStatus Established 203 atomically $ do setStatus Established
203 sendPacket online 204 sendPacket online
204 bump 205 bump
205 beacon <- forkIO $ keepAlive s q 206 beacon <- forkIO $ keepAlive s q `finally` sClose s
206 awaitPacket $ \awaitNext x -> do 207 awaitPacket $ \awaitNext x -> do
207 bump 208 bump
208 case msgID x of 209 case msgID x of
@@ -223,7 +224,7 @@ forkSession c s setStatus = forkIO $ do
223-- one active session). 224-- one active session).
224addSession :: AggregateSession -> Session -> IO AddResult 225addSession :: AggregateSession -> Session -> IO AddResult
225addSession c s = do 226addSession c s = do
226 (result,mcon,replaced) <- atomically $ do 227 (result,mcon,rejected) <- atomically $ do
227 let them = sTheirUserKey s 228 let them = sTheirUserKey s
228 me = toPublic $ sOurKey s 229 me = toPublic $ sOurKey s
229 compat <- checkCompatible me them c 230 compat <- checkCompatible me them c
@@ -232,7 +233,7 @@ addSession c s = do
232 Just True -> AddedSession 233 Just True -> AddedSession
233 Just False -> RejectedSession 234 Just False -> RejectedSession
234 case result of 235 case result of
235 RejectedSession -> return (result,Nothing,Nothing) 236 RejectedSession -> return (result,Nothing,Just s)
236 _ -> do 237 _ -> do
237 statvar <- newTVar Dormant 238 statvar <- newTVar Dormant
238 imap <- readTVar (contactSession c) 239 imap <- readTVar (contactSession c)
@@ -240,9 +241,9 @@ addSession c s = do
240 s0 = IntMap.lookup (sSessionID s) imap 241 s0 = IntMap.lookup (sSessionID s) imap
241 imap' = IntMap.insert (sSessionID s) con imap 242 imap' = IntMap.insert (sSessionID s) con imap
242 writeTVar (contactSession c) imap' 243 writeTVar (contactSession c) imap'
243 return (result,Just con,s0) 244 return (result,Just con,singleSession <$> s0)
244 245
245 mapM_ (sClose . singleSession) replaced 246 mapM_ sClose rejected
246 forM_ (mcon :: Maybe SingleCon) $ \con -> 247 forM_ (mcon :: Maybe SingleCon) $ \con ->
247 forkSession c s $ \progress -> do 248 forkSession c s $ \progress -> do
248 status0 <- aggregateStatus c 249 status0 <- aggregateStatus c
@@ -313,7 +314,9 @@ closeAll :: AggregateSession -> IO ()
313closeAll c = join $ atomically $ do 314closeAll c = join $ atomically $ do
314 imap <- readTVar (contactSession c) 315 imap <- readTVar (contactSession c)
315 closeTMChan (contactChannel c) 316 closeTMChan (contactChannel c)
316 return $ forM_ (IntMap.keys imap) $ \sid -> delSession c sid 317 return $ forM_ (IntMap.toList imap) $ \(sid,SingleCon s _) -> do
318 sClose s
319 delSession c sid
317 320
318-- | Query the current status of the aggregate, there are three possible 321-- | Query the current status of the aggregate, there are three possible
319-- values: 322-- values:
diff --git a/dht/src/Network/Tox/Session.hs b/dht/src/Network/Tox/Session.hs
index ff86e502..9bd12c69 100644
--- a/dht/src/Network/Tox/Session.hs
+++ b/dht/src/Network/Tox/Session.hs
@@ -27,7 +27,7 @@ import Network.Lossless
27import Network.QueryResponse 27import Network.QueryResponse
28import Network.SessionTransports 28import Network.SessionTransports
29import Network.Tox.Crypto.Transport 29import Network.Tox.Crypto.Transport
30import Network.Tox.DHT.Transport (Cookie (..), key2id, longTermKey) 30import Network.Tox.DHT.Transport (Cookie (..), key2id, longTermKey, CookieData (..))
31import Network.Tox.Handshake 31import Network.Tox.Handshake
32 32
33-- | Alias for 'SecretKey' to document that it is used as the temporary Tox 33-- | Alias for 'SecretKey' to document that it is used as the temporary Tox
@@ -128,6 +128,7 @@ plainHandshakeH sp saddr0 skey handshake = do
128 -- TODO: this is always returning sent = Nothing 128 -- TODO: this is always returning sent = Nothing
129 dput XNetCrypto $ " <-- (cached) handshake baseNonce " ++ show (fmap (baseNonce . snd . snd) sent) 129 dput XNetCrypto $ " <-- (cached) handshake baseNonce " ++ show (fmap (baseNonce . snd . snd) sent)
130 forM_ sent $ \(saddr, (hd_skey,hd_sent)) -> do 130 forM_ sent $ \(saddr, (hd_skey,hd_sent)) -> do
131 let Cookie _ (Identity CookieData{ longTermKey = them }) = handshakeCookie handshake
131 sk <- SessionKeys (spCrypto sp) 132 sk <- SessionKeys (spCrypto sp)
132 hd_skey 133 hd_skey
133 (sessionKey hd) 134 (sessionKey hd)
@@ -137,7 +138,8 @@ plainHandshakeH sp saddr0 skey handshake = do
137 dput XNetCrypto $ prelude ++ "plainHandshakeH: session " ++ maybe "Nothing" (const "Just") m 138 dput XNetCrypto $ prelude ++ "plainHandshakeH: session " ++ maybe "Nothing" (const "Just") m
138 forM_ m $ \(sid, t) -> do 139 forM_ m $ \(sid, t) -> do
139 (t2,resend,getMissing) 140 (t2,resend,getMissing)
140 <- lossless (\cp a -> return $ fmap (,a) $ checkLossless $ runIdentity $ pktData cp) 141 <- lossless (take 8 (showKey256 them) ++ "." ++ Multi.showSessionAddr saddr)
142 (\cp a -> return $ fmap (,a) $ checkLossless $ runIdentity $ pktData cp)
141 (\seqno p@(Pkt m :=> _) _ -> do 143 (\seqno p@(Pkt m :=> _) _ -> do
142 y <- encryptPacket sk $ bookKeeping seqno p 144 y <- encryptPacket sk $ bookKeeping seqno p
143 return OutgoingInfo 145 return OutgoingInfo
diff --git a/server/src/Network/StreamServer.hs b/server/src/Network/StreamServer.hs
index 8ebdf678..eda5212f 100644
--- a/server/src/Network/StreamServer.hs
+++ b/server/src/Network/StreamServer.hs
@@ -161,8 +161,11 @@ acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do
161 let conkey = n + 1 161 let conkey = n + 1
162 laddr <- Socket.getSocketName con 162 laddr <- Socket.getSocketName con
163 h <- socketToHandle con ReadWriteMode 163 h <- socketToHandle con ReadWriteMode
164 let tlbl = case canonize laddr of
165 SockAddrUnix {} -> show laddr ++ "," ++ show n
166 _ -> show raddr
164 forkIO $ do 167 forkIO $ do
165 myThreadId >>= flip labelThread ("stream.session." ++ show (canonize raddr)) 168 myThreadId >>= flip labelThread ("stream.session." ++ tlbl)
166 serverSession cfg (restrictHandleSocket h con, (Local laddr, Remote raddr)) conkey h 169 serverSession cfg (restrictHandleSocket h con, (Local laddr, Remote raddr)) conkey h
167 acceptLoop cfg sock (n + 1) 170 acceptLoop cfg sock (n + 1)
168 171