diff options
-rw-r--r-- | examples/dhtd.hs | 12 | ||||
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 37 |
2 files changed, 31 insertions, 18 deletions
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 8fd1402d..6e2647d1 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -990,12 +990,12 @@ noArgPing f [] x = f x | |||
990 | noArgPing _ _ _ = return Nothing | 990 | noArgPing _ _ _ = return Nothing |
991 | 991 | ||
992 | newXmmpSource :: Tox.NetCryptoSession -> C.Source IO Tox.CryptoMessage | 992 | newXmmpSource :: Tox.NetCryptoSession -> C.Source IO Tox.CryptoMessage |
993 | newXmmpSource sessions = do | 993 | newXmmpSource session = do |
994 | v <- liftIO $ _todo sessions {- receive a fucking message -} | 994 | v <- liftIO $ Tox.receiveCrypto session |
995 | case v of | 995 | case v of |
996 | Nothing -> return () -- Nothing indicates EOF. | 996 | Nothing -> return () -- Nothing indicates EOF. |
997 | Just cryptomessage -> do C.yield cryptomessage | 997 | Just cryptomessage -> do C.yield cryptomessage |
998 | newXmmpSource sessions | 998 | newXmmpSource session |
999 | 999 | ||
1000 | newXmmpSink :: Tox.NetCryptoSession -> C.Sink (Flush Tox.CryptoMessage) IO () | 1000 | newXmmpSink :: Tox.NetCryptoSession -> C.Sink (Flush Tox.CryptoMessage) IO () |
1001 | newXmmpSink sessions = C.awaitForever $ \flush_cyptomessage -> do | 1001 | newXmmpSink sessions = C.awaitForever $ \flush_cyptomessage -> do |
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index d43535ad..e6669c3e 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -29,6 +29,7 @@ import qualified Data.Set as Set | |||
29 | import qualified Data.Array.Unboxed as A | 29 | import qualified Data.Array.Unboxed as A |
30 | import SensibleDir | 30 | import SensibleDir |
31 | import System.FilePath | 31 | import System.FilePath |
32 | import System.IO | ||
32 | import System.IO.Temp | 33 | import System.IO.Temp |
33 | import System.Environment | 34 | import System.Environment |
34 | import System.Directory | 35 | import System.Directory |
@@ -102,7 +103,6 @@ data NetCryptoSession = NCrypto { ncState :: TVar NetCryptoSessionStatu | |||
102 | , ncView :: TVar SessionView | 103 | , ncView :: TVar SessionView |
103 | , ncPacketQueue :: PacketQueue CryptoData | 104 | , ncPacketQueue :: PacketQueue CryptoData |
104 | , ncBufferStart :: TVar Word32 | 105 | , ncBufferStart :: TVar Word32 |
105 | , ncDequeueThread :: Maybe ThreadId | ||
106 | , ncPingMachine :: Maybe PingMachine | 106 | , ncPingMachine :: Maybe PingMachine |
107 | , ncOutgoingQueue :: PQ.PacketOutQueue (State,Nonce24,TVar MsgOutMap) CryptoMessage (CryptoPacket Encrypted) CryptoData | 107 | , ncOutgoingQueue :: PQ.PacketOutQueue (State,Nonce24,TVar MsgOutMap) CryptoMessage (CryptoPacket Encrypted) CryptoData |
108 | } | 108 | } |
@@ -266,6 +266,19 @@ ncToWire getState seqno bufend pktno msg = do | |||
266 | pkt = CryptoPacket { pktNonce = nonce24ToWord16 n24, pktData = encrypted } | 266 | pkt = CryptoPacket { pktNonce = nonce24ToWord16 n24, pktData = encrypted } |
267 | in return (Just (pkt, pktno+1)) | 267 | in return (Just (pkt, pktno+1)) |
268 | 268 | ||
269 | -- | Blocks until a packet is available, or the session is terminated. | ||
270 | receiveCrypto :: NetCryptoSession -> IO (Maybe CryptoMessage) | ||
271 | receiveCrypto nc@NCrypto { ncState, ncPacketQueue } = do | ||
272 | cd <- atomically $ do | ||
273 | st <- readTVar ncState | ||
274 | case st of | ||
275 | Accepted -> Just <$> PQ.dequeue ncPacketQueue | ||
276 | Confirmed -> Just <$> PQ.dequeue ncPacketQueue | ||
277 | _ -> pure Nothing | ||
278 | tid <- myThreadId | ||
279 | forM_ cd $ runCryptoHook nc | ||
280 | return $ bufferData <$> cd | ||
281 | |||
269 | -- | called when we recieve a crypto handshake with valid cookie | 282 | -- | called when we recieve a crypto handshake with valid cookie |
270 | -- TODO set priority on contact addr to 0 if it is older than ForgetPeriod, | 283 | -- TODO set priority on contact addr to 0 if it is older than ForgetPeriod, |
271 | -- then increment it regardless. (Keep addr in MinMaxPSQ in Roster.Contact) | 284 | -- then increment it regardless. (Keep addr in MinMaxPSQ in Roster.Contact) |
@@ -340,23 +353,24 @@ freshCryptoSession sessions | |||
340 | , ncView = ncView0 | 353 | , ncView = ncView0 |
341 | , ncPacketQueue = pktq | 354 | , ncPacketQueue = pktq |
342 | , ncBufferStart = bufstart | 355 | , ncBufferStart = bufstart |
343 | , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" | ||
344 | , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" | 356 | , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" |
345 | , ncOutgoingQueue = pktoq | 357 | , ncOutgoingQueue = pktoq |
346 | } | 358 | } |
347 | -- launch dequeue thread | 359 | |
348 | threadid <- forkIO $ do | 360 | hooks <- atomically $ readTVar (announceNewSessionHooks sessions) |
349 | tid <- myThreadId | 361 | |
350 | labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey)) | 362 | -- Dequeue thread: |
351 | fix $ \loop -> do | 363 | -- |
352 | cd <- atomically $ PQ.dequeue pktq | 364 | -- Hopefully, somebody will launch a thread to repeatedly call |
353 | _ <- runCryptoHook (netCryptoSession0 {ncDequeueThread=Just tid}) cd | 365 | -- 'receiveCrypto' in order to dequeue messages from ncPacketQueue. |
354 | loop | 366 | when (null hooks) $ do |
367 | hPutStrLn stderr "Warning: Missing new-session handler. Lost session!" | ||
368 | |||
355 | -- launch ping thread | 369 | -- launch ping thread |
356 | fuzz <- randomRIO (0,2000) | 370 | fuzz <- randomRIO (0,2000) |
357 | pingMachine <- forkPingMachine ("NetCrypto." ++ show (key2id remotePublicKey)) (15000 + fuzz) 2000 | 371 | pingMachine <- forkPingMachine ("NetCrypto." ++ show (key2id remotePublicKey)) (15000 + fuzz) 2000 |
358 | -- update session with thread ids | 372 | -- update session with thread ids |
359 | let netCryptoSession = netCryptoSession0 {ncDequeueThread=Just threadid, ncPingMachine=Just pingMachine} | 373 | let netCryptoSession = netCryptoSession0 {ncPingMachine=Just pingMachine} |
360 | -- add this session to the lookup maps | 374 | -- add this session to the lookup maps |
361 | atomically $ do | 375 | atomically $ do |
362 | modifyTVar allsessions (Map.insert addr netCryptoSession) | 376 | modifyTVar allsessions (Map.insert addr netCryptoSession) |
@@ -365,7 +379,6 @@ freshCryptoSession sessions | |||
365 | Nothing -> modifyTVar allsessionsByKey (Map.insert remotePublicKey [netCryptoSession]) | 379 | Nothing -> modifyTVar allsessionsByKey (Map.insert remotePublicKey [netCryptoSession]) |
366 | Just xs -> modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs)) | 380 | Just xs -> modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs)) |
367 | -- run announceNewSessionHooks | 381 | -- run announceNewSessionHooks |
368 | hooks <- atomically $ readTVar (announceNewSessionHooks sessions) | ||
369 | flip fix (hooks,netCryptoSession) $ \loop (hooks,session) -> | 382 | flip fix (hooks,netCryptoSession) $ \loop (hooks,session) -> |
370 | case hooks of | 383 | case hooks of |
371 | [] -> return () | 384 | [] -> return () |