From 27cb2cbe0338c19fd4f8a22b4453086288dae5c4 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Wed, 22 Nov 2017 21:25:25 +0000 Subject: dequeue thread & handle session close --- dht-client.cabal | 1 + examples/dhtd.hs | 30 +++++++++++++++++++++++------- src/Network/Tox/Crypto/Handlers.hs | 24 ++++++++++++------------ src/Network/Tox/Crypto/Transport.hs | 9 +++++++++ 4 files changed, 45 insertions(+), 19 deletions(-) diff --git a/dht-client.cabal b/dht-client.cabal index a9e1f847..023f837e 100644 --- a/dht-client.cabal +++ b/dht-client.cabal @@ -249,6 +249,7 @@ executable dhtd , unix , containers , stm + , stm-chans , cereal , bencoding , unordered-containers diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 89b747be..088e0c67 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -21,6 +21,7 @@ module Main where import Control.Arrow import Control.Applicative import Control.Concurrent.STM +import Control.Concurrent.STM.TMChan import Control.Exception import Control.Monad import Control.Monad.IO.Class (liftIO) @@ -88,7 +89,7 @@ import qualified Network.Tox.DHT.Transport as Tox import qualified Network.Tox.DHT.Handlers as Tox import qualified Network.Tox.Onion.Transport as Tox import qualified Network.Tox.Onion.Handlers as Tox -import qualified Network.Tox.Crypto.Transport as Tox (CryptoMessage) +import qualified Network.Tox.Crypto.Transport as Tox (CryptoMessage(..),CryptoData(..), isOFFLINE, isKillPacket) import qualified Network.Tox.Crypto.Handlers as Tox import Data.Typeable import Network.Tox.ContactInfo as Tox @@ -1000,13 +1001,14 @@ noArgPing :: (x -> IO (Maybe r)) -> [String] -> x -> IO (Maybe r) noArgPing f [] x = f x noArgPing _ _ _ = return Nothing -newXmmpSource :: Tox.NetCryptoSession -> C.Source IO Tox.CryptoMessage -newXmmpSource session = do - v <- liftIO $ Tox.receiveCrypto session +-- todo: session parameter obsolete? +newXmmpSource :: (IO (Maybe Tox.CryptoMessage)) -> Tox.NetCryptoSession -> C.Source IO Tox.CryptoMessage +newXmmpSource receiveCrypto session = do + v <- liftIO receiveCrypto case v of Nothing -> return () -- Nothing indicates EOF. Just cryptomessage -> do C.yield cryptomessage - newXmmpSource session + newXmmpSource receiveCrypto session newXmmpSink :: Tox.NetCryptoSession -> C.Sink (Flush Tox.CryptoMessage) IO () newXmmpSink session@(Tox.NCrypto { ncOutgoingQueue, ncPacketQueue }) = C.awaitForever $ \flush_cyptomessage -> do @@ -1046,6 +1048,7 @@ toxman tox = ToxManager _ -> return () -- Remove contact. } +#ifdef XMPP announceToxXMPPClients :: TChan ((ConnectionKey,SockAddr), Tcp.ConnectionEvent XML.Event) -> SockAddr @@ -1064,7 +1067,7 @@ announceToxXMPPClients echan laddr saddr pingflag tsrc tsnk xsrc = tsrc =$= toxToXmpp xsnk = flushPassThrough xmppToTox =$= tsnk - +#endif main :: IO () main = runResourceT $ liftBaseWith $ \resT -> do @@ -1379,11 +1382,24 @@ main = runResourceT $ liftBaseWith $ \resT -> do forM_ (take 1 taddrs) $ \addrTox -> do atomically $ Tox.addNewSessionHook netCryptoSessionsState $ \mbNoSpam netcrypto -> do + tmchan <- atomically newTMChan let Just pingMachine = Tox.ncPingMachine netcrypto pingflag = readTVar (pingFlag pingMachine) - xmppSrc = newXmmpSource netcrypto + receiveCrypto = atomically $ readTMChan tmchan + handleIncoming typ session cd | any ($ typ) [Tox.isKillPacket, Tox.isOFFLINE] = atomically $ do + closeTMChan tmchan + Tox.forgetCrypto crypto netCryptoSessionsState netcrypto + return Nothing + handleIncoming mTyp session cd = do + atomically $ writeTMChan tmchan (Tox.bufferData cd) + return Nothing +#ifdef XMPP + xmppSrc = newXmmpSource receiveCrypto netcrypto xmppSink = newXmmpSink netcrypto announceToxXMPPClients (xmppEventChannel sv) addrTox (Tox.ncSockAddr netcrypto) pingflag xmppSrc xmppSink +#endif + atomically $ writeTVar (Tox.ncUnrecognizedHook netcrypto) handleIncoming + return Nothing let dhts = Map.union btdhts toxdhts diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index 4f53888b..50dd8c67 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs @@ -28,7 +28,6 @@ import qualified Data.Set as Set import qualified Data.Array.Unboxed as A import SensibleDir import System.FilePath -import System.IO import System.IO.Temp import System.Environment import System.Directory @@ -108,6 +107,7 @@ data NetCryptoSession = NCrypto , ncView :: TVar SessionView , ncPacketQueue :: PacketQueue CryptoData , ncBufferStart :: TVar Word32 + , ncDequeueThread :: Maybe ThreadId , ncPingMachine :: Maybe PingMachine , ncOutgoingQueue :: PQ.PacketOutQueue (State,Nonce24,TVar MsgOutMap) CryptoMessage @@ -364,24 +364,23 @@ freshCryptoSession sessions , ncView = ncView0 , ncPacketQueue = pktq , ncBufferStart = bufstart + , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" , ncOutgoingQueue = pktoq } - - hooks <- atomically $ readTVar (announceNewSessionHooks sessions) - - -- Dequeue thread: - -- - -- Hopefully, somebody will launch a thread to repeatedly call - -- 'receiveCrypto' in order to dequeue messages from ncPacketQueue. - when (null hooks) $ do - hPutStrLn stderr "Warning: Missing new-session handler. Lost session!" - + -- launch dequeue thread + threadid <- forkIO $ do + tid <- myThreadId + labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey)) + fix $ \loop -> do + cd <- atomically $ PQ.dequeue pktq + _ <- runCryptoHook (netCryptoSession0 {ncDequeueThread=Just tid}) cd + loop -- launch ping thread fuzz <- randomRIO (0,2000) pingMachine <- forkPingMachine ("NetCrypto." ++ show (key2id remotePublicKey)) (15000 + fuzz) 2000 -- update session with thread ids - let netCryptoSession = netCryptoSession0 {ncPingMachine=Just pingMachine} + let netCryptoSession = netCryptoSession0 {ncDequeueThread=Just threadid, ncPingMachine=Just pingMachine} -- add this session to the lookup maps atomically $ do modifyTVar allsessions (Map.insert addr netCryptoSession) @@ -390,6 +389,7 @@ freshCryptoSession sessions Nothing -> modifyTVar allsessionsByKey (Map.insert remotePublicKey [netCryptoSession]) Just xs -> modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs)) -- run announceNewSessionHooks + hooks <- atomically $ readTVar (announceNewSessionHooks sessions) flip fix (hooks,netCryptoSession) $ \loop (hooks,session) -> case hooks of [] -> return () diff --git a/src/Network/Tox/Crypto/Transport.hs b/src/Network/Tox/Crypto/Transport.hs index 3133ee9b..70405a3e 100644 --- a/src/Network/Tox/Crypto/Transport.hs +++ b/src/Network/Tox/Crypto/Transport.hs @@ -21,6 +21,7 @@ module Network.Tox.Crypto.Transport , TypingStatus(..) , GroupChatId(..) , MessageType(..) + , isKillPacket, isOFFLINE , KnownLossyness(..) , AsWord16(..) , AsWord64(..) @@ -694,6 +695,14 @@ lossyness (fromEnum -> x) | x >= 192, x < 255 = Lossy lossyness (fromEnum -> 255) = Lossless lossyness _ = UnknownLossyness +isKillPacket :: MessageType -> Bool +isKillPacket (Msg KillPacket) = True +isKillPacket _ = False + +isOFFLINE :: MessageType -> Bool +isOFFLINE (Msg OFFLINE) = True +isOFFLINE _ = False + -- TODO: Flesh this out. data MessageID -- First byte indicates data = Padding -- ^ 0 padding (skipped until we hit a non zero (data id) byte) -- cgit v1.2.3