diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 61 |
1 files changed, 59 insertions, 2 deletions
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index 0e349196..602b14cc 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -10,6 +10,7 @@ import Network.Tox.DHT.Transport (Cookie(..),CookieData(..), CookieRequest(..), | |||
10 | import Network.Tox.DHT.Handlers (Client, cookieRequest, createCookie ) | 10 | import Network.Tox.DHT.Handlers (Client, cookieRequest, createCookie ) |
11 | import Crypto.Tox | 11 | import Crypto.Tox |
12 | import Control.Concurrent.STM | 12 | import Control.Concurrent.STM |
13 | import Control.Concurrent.STM.TMChan | ||
13 | import Network.Address | 14 | import Network.Address |
14 | import qualified Data.Map.Strict as Map | 15 | import qualified Data.Map.Strict as Map |
15 | import Crypto.Hash | 16 | import Crypto.Hash |
@@ -37,6 +38,8 @@ import System.Random -- for ping fuzz | |||
37 | import Control.Concurrent | 38 | import Control.Concurrent |
38 | import GHC.Conc (labelThread) | 39 | import GHC.Conc (labelThread) |
39 | import PingMachine | 40 | import PingMachine |
41 | import qualified Data.IntMap.Strict as IntMap | ||
42 | import Control.Concurrent.Supply | ||
40 | 43 | ||
41 | -- util, todo: move to another module | 44 | -- util, todo: move to another module |
42 | maybeToEither :: Maybe b -> Either String b | 45 | maybeToEither :: Maybe b -> Either String b |
@@ -83,6 +86,20 @@ data SessionView = SessionView | |||
83 | 86 | ||
84 | type SessionID = Word64 | 87 | type SessionID = Word64 |
85 | 88 | ||
89 | -- | Application specific listener type (Word64) | ||
90 | -- | ||
91 | -- This is some kind of information associated with a listening TChan. | ||
92 | -- It may be used to indicate what kind of packets it is interested in. | ||
93 | -- | ||
94 | -- 0 means listen to all messages and is done automatically in 'defaultUnRecHook' | ||
95 | -- any other values are left open to application specific convention. | ||
96 | -- | ||
97 | -- This module does not know what the different values here | ||
98 | -- mean, but code that sets hooks may adhere to a convention | ||
99 | -- defined elsewhere. | ||
100 | -- | ||
101 | type ListenerType = Word64 | ||
102 | |||
86 | data NetCryptoSession = NCrypto | 103 | data NetCryptoSession = NCrypto |
87 | { ncState :: TVar NetCryptoSessionStatus | 104 | { ncState :: TVar NetCryptoSessionStatus |
88 | , ncMyPublicKey :: PublicKey | 105 | , ncMyPublicKey :: PublicKey |
@@ -116,6 +133,18 @@ data NetCryptoSession = NCrypto | |||
116 | CryptoMessage | 133 | CryptoMessage |
117 | (CryptoPacket Encrypted) | 134 | (CryptoPacket Encrypted) |
118 | CryptoData | 135 | CryptoData |
136 | , ncLastNMsgs :: PacketQueue (Bool{-Handled?-},CryptoMessage) | ||
137 | -- ^ cyclic buffer, holds the last N non-handshake crypto messages | ||
138 | -- even if there is no attached user interface. | ||
139 | , ncListeners :: TVar (IntMap.IntMap (ListenerType,TMChan CryptoMessage)) | ||
140 | -- ^ user interfaces may "listen" by inserting themselves into this map | ||
141 | -- with a unique id and a new TChan, and then reading from the TChan | ||
142 | , ncMsgNumVar :: TVar Word32 | ||
143 | -- ^ The number of non-handshake crypto messages recieved in this session | ||
144 | -- TODO: there is already a packet num etc, do we need two? | ||
145 | , ncDropCntVar :: TVar Word32 | ||
146 | -- ^ The number of crypto messages that were overwritten in the ncLastNMsgs | ||
147 | -- before anybody got to see them. | ||
119 | } | 148 | } |
120 | 149 | ||
121 | data NetCryptoSessions = NCSessions | 150 | data NetCryptoSessions = NCSessions |
@@ -131,6 +160,7 @@ data NetCryptoSessions = NCSessions | |||
131 | , nextSessionId :: TVar SessionID | 160 | , nextSessionId :: TVar SessionID |
132 | , announceNewSessionHooks :: TVar [IOHook (Maybe NoSpam) NetCryptoSession] | 161 | , announceNewSessionHooks :: TVar [IOHook (Maybe NoSpam) NetCryptoSession] |
133 | , sessionTransport :: Transport String SockAddr NetCrypto | 162 | , sessionTransport :: Transport String SockAddr NetCrypto |
163 | , listenerIDSupply :: TVar Supply | ||
134 | } | 164 | } |
135 | 165 | ||
136 | type NewSessionHook = IOHook (Maybe NoSpam) NetCryptoSession | 166 | type NewSessionHook = IOHook (Maybe NoSpam) NetCryptoSession |
@@ -173,6 +203,8 @@ newSessionsState crypto unrechook hooks = do | |||
173 | svDownloadDir0 <- atomically $ newTVar (homedir </> "Downloads") | 203 | svDownloadDir0 <- atomically $ newTVar (homedir </> "Downloads") |
174 | nextSessionId0 <- atomically $ newTVar 0 | 204 | nextSessionId0 <- atomically $ newTVar 0 |
175 | announceNewSessionHooks0 <- atomically $ newTVar [] | 205 | announceNewSessionHooks0 <- atomically $ newTVar [] |
206 | lsupply <- newSupply | ||
207 | lsupplyVar <- atomically (newTVar lsupply) | ||
176 | return NCSessions { netCryptoSessions = x | 208 | return NCSessions { netCryptoSessions = x |
177 | , netCryptoSessionsByKey = x2 | 209 | , netCryptoSessionsByKey = x2 |
178 | , transportCrypto = crypto | 210 | , transportCrypto = crypto |
@@ -195,6 +227,7 @@ newSessionsState crypto unrechook hooks = do | |||
195 | , nextSessionId = nextSessionId0 | 227 | , nextSessionId = nextSessionId0 |
196 | , announceNewSessionHooks = announceNewSessionHooks0 | 228 | , announceNewSessionHooks = announceNewSessionHooks0 |
197 | , sessionTransport = error "Need to set sessionTransport field of NetCryptoSessions!" | 229 | , sessionTransport = error "Need to set sessionTransport field of NetCryptoSessions!" |
230 | , listenerIDSupply = lsupplyVar | ||
198 | } | 231 | } |
199 | 232 | ||
200 | data HandshakeParams | 233 | data HandshakeParams |
@@ -338,6 +371,10 @@ freshCryptoSession sessions | |||
338 | writeTVar ncMyPacketNonce0 n24plus1 | 371 | writeTVar ncMyPacketNonce0 n24plus1 |
339 | return (return (f n24, n24, ncOutgoingIdMap0)) | 372 | return (return (f n24, n24, ncOutgoingIdMap0)) |
340 | pktoq <- atomically $ PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 | 373 | pktoq <- atomically $ PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 |
374 | msgQ <- atomically (PQ.newOverwrite 10 0 :: STM (PacketQueue (Bool,CryptoMessage))) | ||
375 | listeners <- atomically $ newTVar IntMap.empty | ||
376 | msgNum <- atomically $ newTVar 0 | ||
377 | dropNum <- atomically $ newTVar 0 | ||
341 | let netCryptoSession0 = | 378 | let netCryptoSession0 = |
342 | NCrypto { ncState = ncState0 | 379 | NCrypto { ncState = ncState0 |
343 | , ncMyPublicKey = toPublic key | 380 | , ncMyPublicKey = toPublic key |
@@ -362,6 +399,10 @@ freshCryptoSession sessions | |||
362 | , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" | 399 | , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" |
363 | , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" | 400 | , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" |
364 | , ncOutgoingQueue = pktoq | 401 | , ncOutgoingQueue = pktoq |
402 | , ncLastNMsgs = msgQ | ||
403 | , ncListeners = listeners | ||
404 | , ncMsgNumVar = msgNum | ||
405 | , ncDropCntVar = dropNum | ||
365 | } | 406 | } |
366 | -- launch dequeue thread | 407 | -- launch dequeue thread |
367 | threadid <- forkIO $ do | 408 | threadid <- forkIO $ do |
@@ -615,9 +656,25 @@ allMsgTypes fDefault = A.listArray (minBound,maxBound) (0:knownMsgs) | |||
615 | defaultCryptoDataHooks :: Map.Map MessageType [NetCryptoHook] | 656 | defaultCryptoDataHooks :: Map.Map MessageType [NetCryptoHook] |
616 | defaultCryptoDataHooks = Map.empty | 657 | defaultCryptoDataHooks = Map.empty |
617 | 658 | ||
618 | -- | discards all unrecognized packets | 659 | -- | updates ncLastNMsgs, and sends message to type-0 listeners |
619 | defaultUnRecHook :: MessageType -> NetCryptoHook | 660 | defaultUnRecHook :: MessageType -> NetCryptoHook |
620 | defaultUnRecHook _ _ _ = return Nothing | 661 | defaultUnRecHook typ session cm | any ($ typ) [isKillPacket, isOFFLINE] = atomically $ do |
662 | tmchans <- map snd . IntMap.elems <$> readTVar (ncListeners session) | ||
663 | forM_ tmchans $ \chan -> closeTMChan chan | ||
664 | return Nothing | ||
665 | |||
666 | defaultUnRecHook typ session cm = do | ||
667 | let msgQ = ncLastNMsgs session | ||
668 | msgNumVar = ncMsgNumVar session | ||
669 | dropCntVar = ncDropCntVar session | ||
670 | atomically $ do | ||
671 | num <- readTVar msgNumVar | ||
672 | (wraps,offset) <- PQ.enqueue msgQ num (False,cm) | ||
673 | capacity <- PQ.getCapacity msgQ | ||
674 | let dropped = wraps * capacity + offset | ||
675 | modifyTVar' msgNumVar (+1) | ||
676 | writeTVar dropCntVar dropped | ||
677 | return Nothing | ||
621 | 678 | ||
622 | -- | use to add a single hook to a specific session. | 679 | -- | use to add a single hook to a specific session. |
623 | addCryptoDataHook1 :: Map.Map MessageType [NetCryptoHook] -> MessageType -> NetCryptoHook -> Map.Map MessageType [NetCryptoHook] | 680 | addCryptoDataHook1 :: Map.Map MessageType [NetCryptoHook] -> MessageType -> NetCryptoHook -> Map.Map MessageType [NetCryptoHook] |