diff options
-rw-r--r-- | PingMachine.hs | 48 | ||||
-rw-r--r-- | examples/dhtd.hs | 29 | ||||
-rwxr-xr-x | g | 2 | ||||
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 115 |
4 files changed, 153 insertions, 41 deletions
diff --git a/PingMachine.hs b/PingMachine.hs index 5cd70f95..4a1cb008 100644 --- a/PingMachine.hs +++ b/PingMachine.hs | |||
@@ -1,6 +1,8 @@ | |||
1 | {-# LANGUAGE CPP #-} | 1 | {-# LANGUAGE CPP #-} |
2 | {-# LANGUAGE TupleSections #-} | ||
2 | module PingMachine where | 3 | module PingMachine where |
3 | 4 | ||
5 | import Control.Applicative | ||
4 | import Control.Monad | 6 | import Control.Monad |
5 | import Data.Function | 7 | import Data.Function |
6 | #ifdef THREAD_DEBUG | 8 | #ifdef THREAD_DEBUG |
@@ -89,6 +91,52 @@ forkPingMachine label idle timeout = do | |||
89 | , pingStarted = started | 91 | , pingStarted = started |
90 | } | 92 | } |
91 | 93 | ||
94 | -- | like 'forkPingMachine' but the timeout and idle parameters can be changed dynamically | ||
95 | -- Unlike 'forkPingMachine', 'forkPingMachineDynamic' always launches a thread | ||
96 | -- regardless of idle value. | ||
97 | forkPingMachineDynamic | ||
98 | :: String | ||
99 | -> TVar PingInterval -- ^ Milliseconds of idle before a ping is considered necessary. | ||
100 | -> TVar TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'. | ||
101 | -> IO PingMachine | ||
102 | forkPingMachineDynamic label idleV timeoutV = do | ||
103 | d <- interruptibleDelay | ||
104 | flag <- atomically $ newTVar False | ||
105 | canceled <- atomically $ newTVar False | ||
106 | event <- atomically newEmptyTMVar | ||
107 | started <- atomically $ newEmptyTMVar | ||
108 | void . forkIO $ do | ||
109 | myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog") | ||
110 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
111 | fix $ \loop -> do | ||
112 | atomically $ writeTVar flag False | ||
113 | (idle,timeout) <- atomically $ (,) <$> readTVar idleV <*> readTVar timeoutV | ||
114 | fin <- startDelay d (1000*idle) | ||
115 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
116 | if (not fin) then loop | ||
117 | else do | ||
118 | -- Idle event | ||
119 | atomically $ do | ||
120 | tryTakeTMVar event | ||
121 | putTMVar event PingIdle | ||
122 | writeTVar flag True | ||
123 | fin <- startDelay d (1000*timeout) | ||
124 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
125 | me <- myThreadId | ||
126 | if (not fin) then loop | ||
127 | else do | ||
128 | -- Timeout event | ||
129 | atomically $ do | ||
130 | tryTakeTMVar event | ||
131 | writeTVar flag False | ||
132 | putTMVar event PingTimeOut | ||
133 | return PingMachine | ||
134 | { pingFlag = flag | ||
135 | , pingInterruptible = d | ||
136 | , pingEvent = event | ||
137 | , pingStarted = started | ||
138 | } | ||
139 | |||
92 | -- | Terminate the watchdog thread. Call this upon connection close. | 140 | -- | Terminate the watchdog thread. Call this upon connection close. |
93 | -- | 141 | -- |
94 | -- You should ensure no threads are waiting on 'pingWait' because there is no | 142 | -- You should ensure no threads are waiting on 'pingWait' because there is no |
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index ce6cc8f7..6ef4539f 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -702,6 +702,35 @@ clientSession s@Session{..} sock cnum h = do | |||
702 | else do | 702 | else do |
703 | rows <- sessionsReport | 703 | rows <- sessionsReport |
704 | hPutClient h (showColumns (headers:rows)) | 704 | hPutClient h (showColumns (headers:rows)) |
705 | -- session <N> set key val | ||
706 | ("session",s) | (idStr,"set",unstripped) <- twoWords s | ||
707 | , (key,val,unstripped2) <- twoWords unstripped | ||
708 | , let setmap = [("ncRequestInterval", \s x -> writeTVar (Tox.ncRequestInterval s) x) | ||
709 | ,("ncAliveInterval", \s x -> writeTVar (Tox.ncAliveInterval s) x) | ||
710 | ,("ncIdleEvent", \s x -> writeTVar (Tox.ncIdleEvent s) x) | ||
711 | ,("ncTimeOut", \s x -> writeTVar (Tox.ncTimeOut s) x) | ||
712 | ] | ||
713 | , Just stmFunc <- Data.List.lookup key setmap | ||
714 | -> cmd0 $ do | ||
715 | lrSession <- strToSession idStr | ||
716 | case lrSession of | ||
717 | Left s -> hPutClient h s | ||
718 | Right session -> do | ||
719 | case readMaybe val of | ||
720 | Just (x::Int) -> do | ||
721 | atomically (stmFunc session x) | ||
722 | hPutClient h $ "Session " ++ idStr ++ ": " ++ key ++ " = " ++ val | ||
723 | _ -> | ||
724 | hPutClient h $ "Invalid " ++ key ++ " value: " ++ val | ||
725 | |||
726 | -- report error when setting invalid keys | ||
727 | ("session",s) | (idStr,"set",unstripped) <- twoWords s | ||
728 | , (key,val,unstripped2) <- twoWords unstripped | ||
729 | -> cmd0 $ do | ||
730 | lrSession <- strToSession idStr | ||
731 | case lrSession of | ||
732 | Left s -> hPutClient h s | ||
733 | Right session -> hPutClient h $ "What is " ++ key ++ "?" | ||
705 | -- session <N> tail | 734 | -- session <N> tail |
706 | -- show context (latest lossless messages) | 735 | -- show context (latest lossless messages) |
707 | ("session", s) | (idStr,tailcmd,unstripped) <- twoWords s | 736 | ("session", s) | (idStr,tailcmd,unstripped) <- twoWords s |
@@ -2,7 +2,7 @@ | |||
2 | 2 | ||
3 | rootname=$(cat /etc/debian_chroot 2>/dev/null) | 3 | rootname=$(cat /etc/debian_chroot 2>/dev/null) |
4 | echo $PATH | grep '\.stack' >/dev/null && rootname="stack" | 4 | echo $PATH | grep '\.stack' >/dev/null && rootname="stack" |
5 | BUILDB=build/b | 5 | BUILDB=.stack-work/dist/x86_64-linux/Cabal-2.0.1.0/build |
6 | 6 | ||
7 | warn="-freverse-errors -fwarn-unused-imports -Wmissing-signatures -fdefer-typed-holes" | 7 | warn="-freverse-errors -fwarn-unused-imports -Wmissing-signatures -fdefer-typed-holes" |
8 | exts="-XOverloadedStrings -XRecordWildCards" | 8 | exts="-XOverloadedStrings -XRecordWildCards" |
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index c43542b1..2211e0f2 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -311,6 +311,7 @@ data NetCryptoSession = NCrypto | |||
311 | -- where as the prior fields will be used in any implementation -- | 311 | -- where as the prior fields will be used in any implementation -- |
312 | , ncHooks :: TVar (Map.Map MessageType [NetCryptoHook]) | 312 | , ncHooks :: TVar (Map.Map MessageType [NetCryptoHook]) |
313 | , ncUnrecognizedHook :: TVar (MessageType -> NetCryptoHook) | 313 | , ncUnrecognizedHook :: TVar (MessageType -> NetCryptoHook) |
314 | , ncIdleEventHooks :: TVar [(Int,NetCryptoSession -> IO ())] | ||
314 | , ncIncomingTypeArray :: TVar MsgTypeArray | 315 | , ncIncomingTypeArray :: TVar MsgTypeArray |
315 | -- ^ This array maps 255 Id bytes to MessageType | 316 | -- ^ This array maps 255 Id bytes to MessageType |
316 | -- It should contain all messages this session understands. | 317 | -- It should contain all messages this session understands. |
@@ -342,11 +343,19 @@ data NetCryptoSession = NCrypto | |||
342 | -- ^ a buffer in which incoming packets may be stored out of order | 343 | -- ^ a buffer in which incoming packets may be stored out of order |
343 | -- but from which they may be extracted in sequence, | 344 | -- but from which they may be extracted in sequence, |
344 | -- helps ensure lossless packets are processed in order | 345 | -- helps ensure lossless packets are processed in order |
345 | , ncRequestInterval :: TVar Int | 346 | , ncStoredRequests :: CyclicBuffer CryptoData |
346 | -- ^ How long (in microseconds) to wait between packet requests | ||
347 | -- , ncStoredRequests :: CyclicBuffer CryptoData | ||
348 | -- ^ Store the last 5 packet requests, try handling in any order | 347 | -- ^ Store the last 5 packet requests, try handling in any order |
349 | -- if the connection seems like it is locked (TODO) | 348 | -- if the connection seems like it is locked (TODO) |
349 | , ncRequestInterval :: TVar Int | ||
350 | -- ^ How long (in miliseconds) to wait between packet requests | ||
351 | , ncAliveInterval :: TVar Int | ||
352 | -- ^ How long before the next ALIVE packet ("PING") | ||
353 | -- is to be sent regardless of activity | ||
354 | , ncTimeOut :: TVar Int | ||
355 | -- ^ How many miliseconds of inactivity before this session is abandoned | ||
356 | , ncIdleEvent :: TVar Int | ||
357 | -- ^ How many miliseconds of inactivity before emergency measures are taken | ||
358 | -- Emergency measures = (rehandle the packet requests stored in ncStoredRequests) | ||
350 | , ncRequestThread :: TVar (Maybe ThreadId) | 359 | , ncRequestThread :: TVar (Maybe ThreadId) |
351 | -- ^ thread which sends packet requests | 360 | -- ^ thread which sends packet requests |
352 | , ncDequeueThread :: TVar (Maybe ThreadId) | 361 | , ncDequeueThread :: TVar (Maybe ThreadId) |
@@ -358,6 +367,7 @@ data NetCryptoSession = NCrypto | |||
358 | -- ^ thread which triggers ping events | 367 | -- ^ thread which triggers ping events |
359 | , ncPingThread :: TVar (Maybe ThreadId) | 368 | , ncPingThread :: TVar (Maybe ThreadId) |
360 | -- ^ thread which actually queues outgoing pings | 369 | -- ^ thread which actually queues outgoing pings |
370 | , ncIdleEventThread :: TVar (Maybe ThreadId) | ||
361 | , ncOutgoingQueue :: TVar | 371 | , ncOutgoingQueue :: TVar |
362 | (UponHandshake | 372 | (UponHandshake |
363 | (PQ.PacketOutQueue | 373 | (PQ.PacketOutQueue |
@@ -387,6 +397,7 @@ data NetCryptoSessions = NCSessions | |||
387 | , transportCrypto :: TransportCrypto | 397 | , transportCrypto :: TransportCrypto |
388 | , defaultHooks :: Map.Map MessageType [NetCryptoHook] | 398 | , defaultHooks :: Map.Map MessageType [NetCryptoHook] |
389 | , defaultUnrecognizedHook :: MessageType -> NetCryptoHook | 399 | , defaultUnrecognizedHook :: MessageType -> NetCryptoHook |
400 | , defaultIdleEventHooks :: [(Int,NetCryptoSession -> IO ())] | ||
390 | , sessionView :: SessionView | 401 | , sessionView :: SessionView |
391 | , msgTypeArray :: MsgTypeArray | 402 | , msgTypeArray :: MsgTypeArray |
392 | , inboundQueueCapacity :: Word32 | 403 | , inboundQueueCapacity :: Word32 |
@@ -453,6 +464,7 @@ newSessionsState crypto unrechook hooks = do | |||
453 | , transportCrypto = crypto | 464 | , transportCrypto = crypto |
454 | , defaultHooks = hooks | 465 | , defaultHooks = hooks |
455 | , defaultUnrecognizedHook = unrechook | 466 | , defaultUnrecognizedHook = unrechook |
467 | , defaultIdleEventHooks = [(0,handleRequestsOutOfOrder)] | ||
456 | , sessionView = SessionView | 468 | , sessionView = SessionView |
457 | { svNick = nick | 469 | { svNick = nick |
458 | , svStatus = status | 470 | , svStatus = status |
@@ -507,11 +519,11 @@ ncToWire getState seqno bufend pktno msg = do | |||
507 | GrpMsg KnownLossless _ -> Lossless | 519 | GrpMsg KnownLossless _ -> Lossless |
508 | (state,n24,msgOutMapVar) <- getState | 520 | (state,n24,msgOutMapVar) <- getState |
509 | -- msgOutMap <- readTVar msgOutMapVar | 521 | -- msgOutMap <- readTVar msgOutMapVar |
510 | result1 <- trace ("lookupInRangeMap typ64=" ++ show typ64) | 522 | result1 <- dtrace XNetCrypto ("lookupInRangeMap typ64=" ++ show typ64) |
511 | $ lookupInRangeMap typ64 msgOutMapVar | 523 | $ lookupInRangeMap typ64 msgOutMapVar |
512 | case result1 of -- msgOutMapLookup typ64 msgOutMap of | 524 | case result1 of -- msgOutMapLookup typ64 msgOutMap of |
513 | Nothing -> trace "lookupInRangeMap gave Nothing!" $ return Nothing | 525 | Nothing -> dtrace XNetCrypto "lookupInRangeMap gave Nothing!" $ return Nothing |
514 | Just outid -> trace ("encrypting packet with Nonce: " ++ show n24) $ do | 526 | Just outid -> dtrace XNetCrypto ("encrypting packet with Nonce: " ++ show n24) $ do |
515 | let setMessageId (OneByte _) mid = OneByte (toEnum8 mid) | 527 | let setMessageId (OneByte _) mid = OneByte (toEnum8 mid) |
516 | setMessageId (TwoByte _ x) mid = TwoByte (toEnum8 mid) x | 528 | setMessageId (TwoByte _ x) mid = TwoByte (toEnum8 mid) x |
517 | setMessageId (UpToN _ x) mid = UpToN (toEnum8 mid) x | 529 | setMessageId (UpToN _ x) mid = UpToN (toEnum8 mid) x |
@@ -527,7 +539,7 @@ ncToWire getState seqno bufend pktno msg = do | |||
527 | plain = encodePlain cd | 539 | plain = encodePlain cd |
528 | encrypted = encrypt state plain | 540 | encrypted = encrypt state plain |
529 | pkt = CryptoPacket { pktNonce = let r = nonce24ToWord16 n24 | 541 | pkt = CryptoPacket { pktNonce = let r = nonce24ToWord16 n24 |
530 | in trace (printf "converting n24 to word16: 0x%x" r) r | 542 | in dtrace XNetCrypto (printf "converting n24 to word16: 0x%x" r) r |
531 | , pktData = encrypted } | 543 | , pktData = encrypted } |
532 | in return (Just (pkt, pktno)) | 544 | in return (Just (pkt, pktno)) |
533 | Lossless -> let cd = | 545 | Lossless -> let cd = |
@@ -562,7 +574,7 @@ freshCryptoSession sessions | |||
562 | let crypto = transportCrypto sessions | 574 | let crypto = transportCrypto sessions |
563 | allsessions = netCryptoSessions sessions | 575 | allsessions = netCryptoSessions sessions |
564 | allsessionsByKey = netCryptoSessionsByKey sessions | 576 | allsessionsByKey = netCryptoSessionsByKey sessions |
565 | dmsg msg = trace msg (return ()) | 577 | dmsg msg = dtrace XNetCrypto msg (return ()) |
566 | sessionId <- do | 578 | sessionId <- do |
567 | x <- readTVar (nextSessionId sessions) | 579 | x <- readTVar (nextSessionId sessions) |
568 | modifyTVar (nextSessionId sessions) (+1) | 580 | modifyTVar (nextSessionId sessions) (+1) |
@@ -582,6 +594,7 @@ freshCryptoSession sessions | |||
582 | cookie0 <- newTVar (HaveCookie otherCookie) | 594 | cookie0 <- newTVar (HaveCookie otherCookie) |
583 | ncHooks0 <- newTVar (defaultHooks sessions) | 595 | ncHooks0 <- newTVar (defaultHooks sessions) |
584 | ncUnrecognizedHook0 <- newTVar (defaultUnrecognizedHook sessions) | 596 | ncUnrecognizedHook0 <- newTVar (defaultUnrecognizedHook sessions) |
597 | ncIdleEventHooks0 <- newTVar (defaultIdleEventHooks sessions) | ||
585 | ncIncomingTypeArray0 <- newTVar (msgTypeArray sessions) | 598 | ncIncomingTypeArray0 <- newTVar (msgTypeArray sessions) |
586 | let idMap = foldl (\mp (x,y) -> W64.insert x y mp) W64.empty (zip [0..255] [0..255]) | 599 | let idMap = foldl (\mp (x,y) -> W64.insert x y mp) W64.empty (zip [0..255] [0..255]) |
587 | (ncOutgoingIdMap0,lossyEscapeIdMap,losslessEscapeIdMap) <- do | 600 | (ncOutgoingIdMap0,lossyEscapeIdMap,losslessEscapeIdMap) <- do |
@@ -605,7 +618,7 @@ freshCryptoSession sessions | |||
605 | Just theirSessionKey -> createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce0 ncOutgoingIdMap0 | 618 | Just theirSessionKey -> createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce0 ncOutgoingIdMap0 |
606 | mbpktoqVar <- newTVar mbpktoq | 619 | mbpktoqVar <- newTVar mbpktoq |
607 | lastNQ <- CB.new 10 0 :: STM (CyclicBuffer (Bool,(ViewSnapshot,InOrOut CryptoMessage))) | 620 | lastNQ <- CB.new 10 0 :: STM (CyclicBuffer (Bool,(ViewSnapshot,InOrOut CryptoMessage))) |
608 | -- ncStoredRequests0 <- CB.new 5 0 :: STM (CyclicBuffer CryptoData) | 621 | ncStoredRequests0 <- CB.new 5 0 :: STM (CyclicBuffer CryptoData) |
609 | listeners <- newTVar IntMap.empty | 622 | listeners <- newTVar IntMap.empty |
610 | msgNum <- newTVar 0 | 623 | msgNum <- newTVar 0 |
611 | dropNum <- newTVar 0 | 624 | dropNum <- newTVar 0 |
@@ -613,12 +626,19 @@ freshCryptoSession sessions | |||
613 | dmsg $ "freshCryptoSession: Session ncTheirBaseNonce=" ++ show theirbasenonce | 626 | dmsg $ "freshCryptoSession: Session ncTheirBaseNonce=" ++ show theirbasenonce |
614 | dmsg $ "freshCryptoSession: My Session Public =" ++ show (key2id $ toPublic newsession) | 627 | dmsg $ "freshCryptoSession: My Session Public =" ++ show (key2id $ toPublic newsession) |
615 | ncTheirSessionPublic0 <- newTVar (frmMaybe mbtheirSessionKey) | 628 | ncTheirSessionPublic0 <- newTVar (frmMaybe mbtheirSessionKey) |
616 | ncRequestInterval0 <- newTVar 2000000 -- (TODO: shrink this) long interval while debugging slows trace flood | ||
617 | ncRequestThread0 <- newTVar Nothing | 629 | ncRequestThread0 <- newTVar Nothing |
618 | ncDequeueThread0 <- newTVar Nothing | 630 | ncDequeueThread0 <- newTVar Nothing |
619 | ncDequeueOutGoingThread0 <- newTVar Nothing | 631 | ncDequeueOutGoingThread0 <- newTVar Nothing |
620 | ncPingMachine0 <- newTVar Nothing | 632 | ncPingMachine0 <- newTVar Nothing |
621 | ncPingThread0 <- newTVar Nothing | 633 | ncPingThread0 <- newTVar Nothing |
634 | ncIdleEventThread0 <- newTVar Nothing | ||
635 | ncRequestInterval0 <- newTVar 2000 -- (TODO: shrink this) long interval while debugging slows trace flood | ||
636 | ncAliveInterval0 <- newTVar 8000 -- 8 seconds | ||
637 | -- ping Machine parameters | ||
638 | fuzz <- return 0 -- randomRIO (0,2000) -- Fuzz to prevent simultaneous ping/pong exchanges. | ||
639 | -- Disabled because tox has no pong event. | ||
640 | ncTimeOut0 <- newTVar 32000 -- 32 seconds | ||
641 | ncIdleEvent0 <- newTVar (5000 + fuzz) -- 5 seconds | ||
622 | let netCryptoSession0 = | 642 | let netCryptoSession0 = |
623 | NCrypto { ncState = ncState0 | 643 | NCrypto { ncState = ncState0 |
624 | , ncMyPublicKey = toPublic key | 644 | , ncMyPublicKey = toPublic key |
@@ -634,6 +654,7 @@ freshCryptoSession sessions | |||
634 | , ncSockAddr = HaveDHTKey addr | 654 | , ncSockAddr = HaveDHTKey addr |
635 | , ncHooks = ncHooks0 | 655 | , ncHooks = ncHooks0 |
636 | , ncUnrecognizedHook = ncUnrecognizedHook0 | 656 | , ncUnrecognizedHook = ncUnrecognizedHook0 |
657 | , ncIdleEventHooks = ncIdleEventHooks0 | ||
637 | , ncAllSessions = sessions | 658 | , ncAllSessions = sessions |
638 | , ncIncomingTypeArray = ncIncomingTypeArray0 | 659 | , ncIncomingTypeArray = ncIncomingTypeArray0 |
639 | , ncOutgoingIdMap = ncOutgoingIdMap0 | 660 | , ncOutgoingIdMap = ncOutgoingIdMap0 |
@@ -641,13 +662,17 @@ freshCryptoSession sessions | |||
641 | , ncOutgoingIdMapEscapedLossless = losslessEscapeIdMap | 662 | , ncOutgoingIdMapEscapedLossless = losslessEscapeIdMap |
642 | , ncView = ncView0 | 663 | , ncView = ncView0 |
643 | , ncPacketQueue = pktq | 664 | , ncPacketQueue = pktq |
644 | -- , ncStoredRequests = ncStoredRequests0 (TODO) | 665 | , ncStoredRequests = ncStoredRequests0 |
645 | , ncRequestInterval = ncRequestInterval0 | 666 | , ncRequestInterval = ncRequestInterval0 |
667 | , ncAliveInterval = ncAliveInterval0 | ||
668 | , ncTimeOut = ncTimeOut0 | ||
669 | , ncIdleEvent = ncIdleEvent0 | ||
646 | , ncRequestThread = ncRequestThread0 | 670 | , ncRequestThread = ncRequestThread0 |
647 | , ncDequeueThread = ncDequeueThread0 | 671 | , ncDequeueThread = ncDequeueThread0 |
648 | , ncDequeueOutGoingThread = ncDequeueOutGoingThread0 | 672 | , ncDequeueOutGoingThread = ncDequeueOutGoingThread0 |
649 | , ncPingMachine = ncPingMachine0 | 673 | , ncPingMachine = ncPingMachine0 |
650 | , ncPingThread = ncPingThread0 | 674 | , ncPingThread = ncPingThread0 |
675 | , ncIdleEventThread = ncIdleEventThread0 | ||
651 | , ncOutgoingQueue = mbpktoqVar | 676 | , ncOutgoingQueue = mbpktoqVar |
652 | , ncLastNMsgs = lastNQ | 677 | , ncLastNMsgs = lastNQ |
653 | , ncListeners = listeners | 678 | , ncListeners = listeners |
@@ -674,7 +699,7 @@ createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce | |||
674 | atomically $ do | 699 | atomically $ do |
675 | n24 <- readTVar ncMyPacketNonce0 | 700 | n24 <- readTVar ncMyPacketNonce0 |
676 | let n24plus1 = incrementNonce24 n24 | 701 | let n24plus1 = incrementNonce24 n24 |
677 | trace ("ncMyPacketNonce+1=" ++ show n24plus1 | 702 | dtrace XNetCrypto ("ncMyPacketNonce+1=" ++ show n24plus1 |
678 | ++ "\n toWireIO: theirSessionKey = " ++ show (key2id theirSessionKey) | 703 | ++ "\n toWireIO: theirSessionKey = " ++ show (key2id theirSessionKey) |
679 | ++ "\n toWireIO: my public session key = " ++ show (key2id (toPublic newsession)) | 704 | ++ "\n toWireIO: my public session key = " ++ show (key2id (toPublic newsession)) |
680 | ) $ writeTVar ncMyPacketNonce0 n24plus1 | 705 | ) $ writeTVar ncMyPacketNonce0 n24plus1 |
@@ -762,7 +787,7 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
762 | atomically $ writeTVar (ncDequeueThread netCryptoSession0) (Just tid) | 787 | atomically $ writeTVar (ncDequeueThread netCryptoSession0) (Just tid) |
763 | labelThread tid ("NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr) | 788 | labelThread tid ("NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr) |
764 | fix $ \loop -> do | 789 | fix $ \loop -> do |
765 | atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay | 790 | atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay . (* 1000) |
766 | nums <- atomically $ PQ.getMissing pktq | 791 | nums <- atomically $ PQ.getMissing pktq |
767 | dput XNetCrypto $ "(Request Thread) Missing Packets detected:" ++ show nums | 792 | dput XNetCrypto $ "(Request Thread) Missing Packets detected:" ++ show nums |
768 | getOutGoingParam <- PQ.readyOutGoing pktoq | 793 | getOutGoingParam <- PQ.readyOutGoing pktoq |
@@ -786,30 +811,35 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
786 | loop | 811 | loop |
787 | dput XNetCrypto $ "runUponHandshake: " ++ show threadidOutgoing ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr | 812 | dput XNetCrypto $ "runUponHandshake: " ++ show threadidOutgoing ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr |
788 | -- launch ping Machine thread | 813 | -- launch ping Machine thread |
789 | fuzz <- return 0 -- randomRIO (0,2000) -- Fuzz to prevent simultaneous ping/pong exchanges. | 814 | pingMachine <- forkPingMachineDynamic ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (ncIdleEvent netCryptoSession0) (ncTimeOut netCryptoSession0) |
790 | -- Disabled because tox has no pong event. | ||
791 | pingMachine <- forkPingMachine ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (8000 + fuzz) 4000 | ||
792 | atomically $ writeTVar (ncPingMachine netCryptoSession0) (Just pingMachine) | 815 | atomically $ writeTVar (ncPingMachine netCryptoSession0) (Just pingMachine) |
793 | -- launch ping thread | 816 | -- launch ping thread |
794 | pingThreadId <- forkIO $ do | 817 | pingThreadId <- forkIO $ do |
795 | tid <- myThreadId | 818 | tid <- myThreadId |
796 | atomically $ writeTVar (ncPingThread netCryptoSession0) (Just tid) | 819 | atomically $ writeTVar (ncPingThread netCryptoSession0) (Just tid) |
797 | labelThread tid ("NetCryptoPingSender." ++ show (key2id remotePublicKey) ++ sidStr) | 820 | labelThread tid ("NetCryptoPingSender." ++ show (key2id remotePublicKey) ++ sidStr) |
821 | fix $ \loop -> do | ||
822 | atomically (readTVar (ncAliveInterval netCryptoSession0)) >>= threadDelay . (* 1000) | ||
823 | dput XNetCrypto $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") Sending Alive(PING) Packet" | ||
824 | lr <- sendPing crypto netCryptoSession0 | ||
825 | case lr of | ||
826 | Left s -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s | ||
827 | Right _ -> return () | ||
828 | loop | ||
829 | atomically $ writeTVar (ncPingThread netCryptoSession0) (Just pingThreadId) | ||
830 | -- launch IdleEvent thread | ||
831 | idleThreadId <- forkIO $ do | ||
832 | tid <- myThreadId | ||
833 | atomically $ writeTVar (ncPingThread netCryptoSession0) (Just tid) | ||
834 | labelThread tid ("NetCryptoIdleEvent." ++ show (key2id remotePublicKey) ++ sidStr) | ||
798 | event <- atomically $ pingWait pingMachine | 835 | event <- atomically $ pingWait pingMachine |
799 | case event of | 836 | case event of |
800 | PingIdle -> do | 837 | PingIdle -> do |
801 | dput XNetCrypto $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") PingIdle" | 838 | hooks <- atomically (readTVar (ncIdleEventHooks netCryptoSession0)) |
802 | -- Normally, we would not bump the PingMachine until we receive | 839 | mapM_ (($ netCryptoSession0) . snd) hooks |
803 | -- an inbound packet. We are doing this here because tox has | 840 | PingTimeOut -> destroySession netCryptoSession0 |
804 | -- no pong response packet and so we need to mark the | 841 | atomically $ writeTVar (ncIdleEventThread netCryptoSession0) (Just idleThreadId) |
805 | -- connection non-idle here. Doing this prevents a PingTimeOut | 842 | |
806 | -- from ever occurring. (TODO: handle timed-out sessions somehow.) | ||
807 | pingBump pingMachine | ||
808 | lr <- sendPing crypto netCryptoSession0 | ||
809 | case lr of | ||
810 | Left s -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s | ||
811 | Right _ -> return () | ||
812 | PingTimeOut -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") PingTimeOut TODO" | ||
813 | -- update session with thread ids | 843 | -- update session with thread ids |
814 | let netCryptoSession = netCryptoSession0 | 844 | let netCryptoSession = netCryptoSession0 |
815 | -- add this session to the lookup maps | 845 | -- add this session to the lookup maps |
@@ -852,6 +882,8 @@ destroySession session = do | |||
852 | stopMachine (ncPingMachine session) | 882 | stopMachine (ncPingMachine session) |
853 | stopThread (ncDequeueThread session) | 883 | stopThread (ncDequeueThread session) |
854 | stopThread (ncDequeueOutGoingThread session) | 884 | stopThread (ncDequeueOutGoingThread session) |
885 | stopThread (ncRequestThread session) | ||
886 | stopThread (ncIdleEventThread session) | ||
855 | 887 | ||
856 | -- | Called when we get a handshake, but there's already a session entry. | 888 | -- | Called when we get a handshake, but there's already a session entry. |
857 | -- | 889 | -- |
@@ -948,7 +980,7 @@ handshakeH sessions addrRaw hshake@(Handshake (Cookie n24 ecookie) nonce24 encry | |||
948 | -- IO action to get a new session key in case we need it in transaction to come | 980 | -- IO action to get a new session key in case we need it in transaction to come |
949 | newsession <- generateSecretKey | 981 | newsession <- generateSecretKey |
950 | -- Do a lookup, so we can handle the update case differently | 982 | -- Do a lookup, so we can handle the update case differently |
951 | let dmsg msg = trace msg (return ()) | 983 | let dmsg msg = dtrace XNetCrypto msg (return ()) |
952 | timestamp <- getPOSIXTime | 984 | timestamp <- getPOSIXTime |
953 | (myhandshake,launchThreads) | 985 | (myhandshake,launchThreads) |
954 | <- atomically $ do | 986 | <- atomically $ do |
@@ -987,7 +1019,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
987 | return Nothing -- drop packet, we have no session | 1019 | return Nothing -- drop packet, we have no session |
988 | Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketQueue, ncHooks, | 1020 | Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketQueue, ncHooks, |
989 | ncSessionSecret, ncTheirSessionPublic, ncTheirBaseNonce, | 1021 | ncSessionSecret, ncTheirSessionPublic, ncTheirBaseNonce, |
990 | ncPingMachine, ncSessionId}) -> do | 1022 | ncPingMachine, ncSessionId, ncStoredRequests}) -> do |
991 | -- Unrecognized packets, try them thrice so as to give | 1023 | -- Unrecognized packets, try them thrice so as to give |
992 | -- handshakes some time to come in | 1024 | -- handshakes some time to come in |
993 | -- TODO: Remove this loop, as it is probably unnecessary. | 1025 | -- TODO: Remove this loop, as it is probably unnecessary. |
@@ -1048,7 +1080,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1048 | atomically $ do | 1080 | atomically $ do |
1049 | HaveHandshake y <- readTVar ncTheirBaseNonce | 1081 | HaveHandshake y <- readTVar ncTheirBaseNonce |
1050 | let x = addtoNonce24 y (fromIntegral dATA_NUM_THRESHOLD) | 1082 | let x = addtoNonce24 y (fromIntegral dATA_NUM_THRESHOLD) |
1051 | trace ("nonce y(" ++ show y ++ ") + " ++ show (fromIntegral dATA_NUM_THRESHOLD) | 1083 | dtrace XNetCrypto ("nonce y(" ++ show y ++ ") + " ++ show (fromIntegral dATA_NUM_THRESHOLD) |
1052 | ++ " = " ++ show x) (return ()) | 1084 | ++ " = " ++ show x) (return ()) |
1053 | writeTVar ncTheirBaseNonce (HaveHandshake y) | 1085 | writeTVar ncTheirBaseNonce (HaveHandshake y) |
1054 | -- then set session confirmed, | 1086 | -- then set session confirmed, |
@@ -1056,13 +1088,8 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1056 | -- bump ping machine | 1088 | -- bump ping machine |
1057 | ncPingMachine0 <- atomically $ readTVar ncPingMachine | 1089 | ncPingMachine0 <- atomically $ readTVar ncPingMachine |
1058 | case ncPingMachine0 of | 1090 | case ncPingMachine0 of |
1059 | Just pingMachine -> -- Normally, we would bump the PingMachine to mark the connection | 1091 | -- the ping machine is used to detect inactivity and respond accordingly |
1060 | -- as non-idle so that we don't need to send a ping message. | 1092 | Just pingMachine -> pingBump pingMachine |
1061 | -- Because tox has no pong message, we need to send a ping every | ||
1062 | -- eight seconds regardless, so we will let the PingIdle event be | ||
1063 | -- signaled even when we receive packets. | ||
1064 | -- pingBump pingMachine | ||
1065 | return () | ||
1066 | Nothing -> return () | 1093 | Nothing -> return () |
1067 | msgTypes <- atomically $ readTVar ncIncomingTypeArray | 1094 | msgTypes <- atomically $ readTVar ncIncomingTypeArray |
1068 | let msgTyp = cd ^. messageType | 1095 | let msgTyp = cd ^. messageType |
@@ -1078,12 +1105,15 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1078 | else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm | 1105 | else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm |
1079 | when (msgID cm == PING) $ | 1106 | when (msgID cm == PING) $ |
1080 | dput XNetCrypto $ "NetCrypto Recieved PING (session " ++ show ncSessionId ++")" | 1107 | dput XNetCrypto $ "NetCrypto Recieved PING (session " ++ show ncSessionId ++")" |
1108 | when (msgID cm == PacketRequest) . atomically $ do | ||
1109 | num <- CB.getNextSequenceNum ncStoredRequests | ||
1110 | CB.enqueue ncStoredRequests num cd | ||
1081 | atomically $ PQ.enqueue ncPacketQueue bufferEnd cd | 1111 | atomically $ PQ.enqueue ncPacketQueue bufferEnd cd |
1082 | return Nothing | 1112 | return Nothing |
1083 | where | 1113 | where |
1084 | last2Bytes :: Nonce24 -> Word16 | 1114 | last2Bytes :: Nonce24 -> Word16 |
1085 | last2Bytes (Nonce24 bs) = case S.decode (B.drop 22 bs) of | 1115 | last2Bytes (Nonce24 bs) = case S.decode (B.drop 22 bs) of |
1086 | Right n -> n -- trace ("byteSwap16 " ++ printf "0x%x" n ++ " = " ++ printf "0x%x" (byteSwap16 n)) $ byteSwap16 n | 1116 | Right n -> n -- dtrace XNetCrypto ("byteSwap16 " ++ printf "0x%x" n ++ " = " ++ printf "0x%x" (byteSwap16 n)) $ byteSwap16 n |
1087 | _ -> error "unreachable-last2Bytes" | 1117 | _ -> error "unreachable-last2Bytes" |
1088 | dATA_NUM_THRESHOLD = 21845 -- = 65535 / 3 | 1118 | dATA_NUM_THRESHOLD = 21845 -- = 65535 / 3 |
1089 | 1119 | ||
@@ -1337,6 +1367,11 @@ defaultCryptoDataHooks | |||
1337 | , (Msg KillPacket, [defaultKillHook]) | 1367 | , (Msg KillPacket, [defaultKillHook]) |
1338 | ] | 1368 | ] |
1339 | 1369 | ||
1370 | handleRequestsOutOfOrder :: NetCryptoSession -> IO () | ||
1371 | handleRequestsOutOfOrder session = do | ||
1372 | cds <- atomically $ CB.cyclicBufferViewList (ncStoredRequests session) | ||
1373 | mapM_ (handlePacketRequest session) (map snd cds) | ||
1374 | |||
1340 | handlePacketRequest :: NetCryptoSession -> CryptoData -> IO () | 1375 | handlePacketRequest :: NetCryptoSession -> CryptoData -> IO () |
1341 | handlePacketRequest session (CryptoData { bufferStart=num | 1376 | handlePacketRequest session (CryptoData { bufferStart=num |
1342 | , bufferData=cm@(msgID -> PacketRequest) | 1377 | , bufferData=cm@(msgID -> PacketRequest) |