From 570b5f9983292117ed8cd34c88f65a47915edebb Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 21 Nov 2017 13:26:51 -0500 Subject: Moved dequeue-thread responsibility, implemented receiveCrypto. --- src/Network/Tox/Crypto/Handlers.hs | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) (limited to 'src/Network/Tox/Crypto') diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index d43535ad..e6669c3e 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs @@ -29,6 +29,7 @@ import qualified Data.Set as Set import qualified Data.Array.Unboxed as A import SensibleDir import System.FilePath +import System.IO import System.IO.Temp import System.Environment import System.Directory @@ -102,7 +103,6 @@ data NetCryptoSession = NCrypto { ncState :: TVar NetCryptoSessionStatu , ncView :: TVar SessionView , ncPacketQueue :: PacketQueue CryptoData , ncBufferStart :: TVar Word32 - , ncDequeueThread :: Maybe ThreadId , ncPingMachine :: Maybe PingMachine , ncOutgoingQueue :: PQ.PacketOutQueue (State,Nonce24,TVar MsgOutMap) CryptoMessage (CryptoPacket Encrypted) CryptoData } @@ -266,6 +266,19 @@ ncToWire getState seqno bufend pktno msg = do pkt = CryptoPacket { pktNonce = nonce24ToWord16 n24, pktData = encrypted } in return (Just (pkt, pktno+1)) +-- | Blocks until a packet is available, or the session is terminated. +receiveCrypto :: NetCryptoSession -> IO (Maybe CryptoMessage) +receiveCrypto nc@NCrypto { ncState, ncPacketQueue } = do + cd <- atomically $ do + st <- readTVar ncState + case st of + Accepted -> Just <$> PQ.dequeue ncPacketQueue + Confirmed -> Just <$> PQ.dequeue ncPacketQueue + _ -> pure Nothing + tid <- myThreadId + forM_ cd $ runCryptoHook nc + return $ bufferData <$> cd + -- | called when we recieve a crypto handshake with valid cookie -- TODO set priority on contact addr to 0 if it is older than ForgetPeriod, -- then increment it regardless. (Keep addr in MinMaxPSQ in Roster.Contact) @@ -340,23 +353,24 @@ freshCryptoSession sessions , ncView = ncView0 , ncPacketQueue = pktq , ncBufferStart = bufstart - , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" , ncOutgoingQueue = pktoq } - -- launch dequeue thread - threadid <- forkIO $ do - tid <- myThreadId - labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey)) - fix $ \loop -> do - cd <- atomically $ PQ.dequeue pktq - _ <- runCryptoHook (netCryptoSession0 {ncDequeueThread=Just tid}) cd - loop + + hooks <- atomically $ readTVar (announceNewSessionHooks sessions) + + -- Dequeue thread: + -- + -- Hopefully, somebody will launch a thread to repeatedly call + -- 'receiveCrypto' in order to dequeue messages from ncPacketQueue. + when (null hooks) $ do + hPutStrLn stderr "Warning: Missing new-session handler. Lost session!" + -- launch ping thread fuzz <- randomRIO (0,2000) pingMachine <- forkPingMachine ("NetCrypto." ++ show (key2id remotePublicKey)) (15000 + fuzz) 2000 -- update session with thread ids - let netCryptoSession = netCryptoSession0 {ncDequeueThread=Just threadid, ncPingMachine=Just pingMachine} + let netCryptoSession = netCryptoSession0 {ncPingMachine=Just pingMachine} -- add this session to the lookup maps atomically $ do modifyTVar allsessions (Map.insert addr netCryptoSession) @@ -365,7 +379,6 @@ freshCryptoSession sessions Nothing -> modifyTVar allsessionsByKey (Map.insert remotePublicKey [netCryptoSession]) Just xs -> modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs)) -- run announceNewSessionHooks - hooks <- atomically $ readTVar (announceNewSessionHooks sessions) flip fix (hooks,netCryptoSession) $ \loop (hooks,session) -> case hooks of [] -> return () -- cgit v1.2.3