diff options
-rw-r--r-- | dht-client.cabal | 5 | ||||
-rw-r--r-- | examples/dhtd.hs | 84 | ||||
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 61 |
3 files changed, 100 insertions, 50 deletions
diff --git a/dht-client.cabal b/dht-client.cabal index 60a42f00..02b5cdbe 100644 --- a/dht-client.cabal +++ b/dht-client.cabal | |||
@@ -4,7 +4,7 @@ license: BSD3 | |||
4 | license-file: LICENSE | 4 | license-file: LICENSE |
5 | author: Joe Crayne | 5 | author: Joe Crayne |
6 | maintainer: Joe Crayne | 6 | maintainer: Joe Crayne |
7 | copyright: (c) 2017 Joe Crayne, (c) 2013, Sam Truzjan | 7 | copyright: (c) 2017 Joe Crayne, (c) 2017 James Crayne, (c) 2013 Sam Truzjan |
8 | category: Network | 8 | category: Network |
9 | build-type: Custom | 9 | build-type: Custom |
10 | cabal-version: >= 1.10 | 10 | cabal-version: >= 1.10 |
@@ -144,6 +144,8 @@ library | |||
144 | , hashable | 144 | , hashable |
145 | , iproute | 145 | , iproute |
146 | , stm >= 2.4.0 | 146 | , stm >= 2.4.0 |
147 | , stm-chans | ||
148 | , concurrent-supply | ||
147 | , base16-bytestring | 149 | , base16-bytestring |
148 | , base32-bytestring | 150 | , base32-bytestring |
149 | , base64-bytestring | 151 | , base64-bytestring |
@@ -261,6 +263,7 @@ executable dhtd | |||
261 | , containers | 263 | , containers |
262 | , stm | 264 | , stm |
263 | , stm-chans | 265 | , stm-chans |
266 | , concurrent-supply | ||
264 | , cereal | 267 | , cereal |
265 | , bencoding | 268 | , bencoding |
266 | , unordered-containers | 269 | , unordered-containers |
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 47a4cd46..df8cf1c4 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -107,6 +107,7 @@ import XMPPServer | |||
107 | import Connection | 107 | import Connection |
108 | import ToxToXMPP | 108 | import ToxToXMPP |
109 | import qualified Connection.Tcp as Tcp (ConnectionEvent(..)) | 109 | import qualified Connection.Tcp as Tcp (ConnectionEvent(..)) |
110 | import Control.Concurrent.Supply | ||
110 | 111 | ||
111 | 112 | ||
112 | showReport :: [(String,String)] -> String | 113 | showReport :: [(String,String)] -> String |
@@ -396,7 +397,6 @@ data Session = Session | |||
396 | , userkeys :: TVar [(SecretKey,PublicKey)] | 397 | , userkeys :: TVar [(SecretKey,PublicKey)] |
397 | , roster :: Tox.ContactInfo | 398 | , roster :: Tox.ContactInfo |
398 | , announceToLan :: IO () | 399 | , announceToLan :: IO () |
399 | , sessions :: TVar [PerSession] | ||
400 | , connectionManager :: Maybe ConnectionManager | 400 | , connectionManager :: Maybe ConnectionManager |
401 | , onionRouter :: OnionRouter | 401 | , onionRouter :: OnionRouter |
402 | , announcer :: Announcer | 402 | , announcer :: Announcer |
@@ -433,7 +433,7 @@ clientSession0 s sock cnum h = do | |||
433 | else throwIO e | 433 | else throwIO e |
434 | 434 | ||
435 | readKeys :: TVar [(SecretKey, PublicKey)] | 435 | readKeys :: TVar [(SecretKey, PublicKey)] |
436 | -> TVar (HashMap.HashMap Tox.NodeId Account) | 436 | -> TVar (HashMap.HashMap Tox.NodeId Account) -- ContactInfo { accounts } |
437 | -> STM [(SecretKey, PublicKey)] | 437 | -> STM [(SecretKey, PublicKey)] |
438 | readKeys userkeys roster = do | 438 | readKeys userkeys roster = do |
439 | uks <- readTVar userkeys | 439 | uks <- readTVar userkeys |
@@ -475,6 +475,7 @@ clientSession s@Session{..} sock cnum h = do | |||
475 | , ["k"] | 475 | , ["k"] |
476 | , ["roster"] | 476 | , ["roster"] |
477 | , ["sessions"] | 477 | , ["sessions"] |
478 | , ["netcrypto"] | ||
478 | , ["onion"] | 479 | , ["onion"] |
479 | , ["g"] | 480 | , ["g"] |
480 | , ["p"] | 481 | , ["p"] |
@@ -679,17 +680,19 @@ clientSession s@Session{..} sock cnum h = do | |||
679 | hPutClientChunk h $ unlines [ dns, "", "Friend Requests" ] | 680 | hPutClientChunk h $ unlines [ dns, "", "Friend Requests" ] |
680 | hPutClient h $ showReport frs | 681 | hPutClient h $ showReport frs |
681 | 682 | ||
682 | ("sessions", s) | "" <- strp s | 683 | ("sessions", s') | "" <- strp s' |
683 | -> cmd0 $ do | 684 | -> cmd0 $ do |
684 | sessions' <- atomically $ readTVar sessions :: IO [PerSession] | 685 | sessions <- concat . Map.elems <$> (atomically $ readTVar (Tox.netCryptoSessionsByKey cryptosessions)) |
685 | let sessionsReport = mapM showPerSession sessions' | 686 | let sessionsReport = mapM showPerSession sessions |
686 | headers = ["Key", "NextMsg", "Dropped","Handled","Unhandled"] | 687 | headers = ["SessionID", "YourKey", "TheirKey", "NextMsg", "Dropped","Handled","Unhandled"] |
687 | showPerSession (PerSession | 688 | showPerSession (Tox.NCrypto |
688 | { perSessionMsgs = msgQ | 689 | { ncSessionId = id |
689 | , perSessionPublicKey = pubKey | 690 | , ncMyPublicKey = yourkey |
690 | , perSessionAddr = sockAddr | 691 | , ncTheirPublicKey = theirkey |
691 | , perSessionNumVar = msgNumVar | 692 | , ncLastNMsgs = msgQ |
692 | , perSessionDropCount = dropCntVar | 693 | , ncSockAddr = sockAddr |
694 | , ncMsgNumVar = msgNumVar | ||
695 | , ncDropCntVar = dropCntVar | ||
693 | }) = do | 696 | }) = do |
694 | num <- atomically (readTVar msgNumVar) | 697 | num <- atomically (readTVar msgNumVar) |
695 | dropped <- atomically (readTVar dropCntVar) | 698 | dropped <- atomically (readTVar dropCntVar) |
@@ -697,13 +700,15 @@ clientSession s@Session{..} sock cnum h = do | |||
697 | let (h,u) = partition (fst . snd) as | 700 | let (h,u) = partition (fst . snd) as |
698 | countHandled = length h | 701 | countHandled = length h |
699 | countUnhandled = length u | 702 | countUnhandled = length u |
700 | return [ show (Tox.key2id pubKey) -- "Key" | 703 | return [ printf "%x" id -- "SessionID" |
704 | , show (Tox.key2id yourkey) -- "YourKey" | ||
705 | , show (Tox.key2id theirkey)-- "TheirKey" | ||
701 | , show num -- "NextMsg" | 706 | , show num -- "NextMsg" |
702 | , show dropped -- "Dropped" | 707 | , show dropped -- "Dropped" |
703 | , show countHandled -- "Handled" | 708 | , show countHandled -- "Handled" |
704 | , show countUnhandled -- "Unhandled" | 709 | , show countUnhandled -- "Unhandled" |
705 | ] | 710 | ] |
706 | if null sessions' | 711 | if null sessions |
707 | then hPutClient h "No sessions." | 712 | then hPutClient h "No sessions." |
708 | else do | 713 | else do |
709 | rows <- sessionsReport | 714 | rows <- sessionsReport |
@@ -724,6 +729,19 @@ clientSession s@Session{..} sock cnum h = do | |||
724 | hPutClientChunk h $ "trampolines: " ++ show (IntMap.size ts) ++ "\n" | 729 | hPutClientChunk h $ "trampolines: " ++ show (IntMap.size ts) ++ "\n" |
725 | hPutClient h $ showColumns $ ["","responses","timeouts"]:r | 730 | hPutClient h $ showColumns $ ["","responses","timeouts"]:r |
726 | 731 | ||
732 | ("netcrypto", s) | ||
733 | | Just DHT{..} <- Map.lookup netname dhts | ||
734 | -> cmd0 $ do | ||
735 | case selectedKey of | ||
736 | Nothing -> hPutClient h "No key is selected, see k command." | ||
737 | Just mypubkey -> do | ||
738 | let nidstr = strp s | ||
739 | goParse = either (hPutClient h . ("Bad netcrypto target: "++)) | ||
740 | goTarget | ||
741 | $ dhtParseId nidstr | ||
742 | goTarget nid = do | ||
743 | hPutClient h "TODO: convert selected public key to private, call netCrypto.." | ||
744 | goParse | ||
727 | ("g", s) | Just DHT{..} <- Map.lookup netname dhts | 745 | ("g", s) | Just DHT{..} <- Map.lookup netname dhts |
728 | -> cmd0 $ do | 746 | -> cmd0 $ do |
729 | -- arguments: method | 747 | -- arguments: method |
@@ -1258,12 +1276,6 @@ announceToxJabberPeer echan laddr saddr pingflag tsrc tsnk | |||
1258 | 1276 | ||
1259 | #endif | 1277 | #endif |
1260 | 1278 | ||
1261 | data PerSession = PerSession { perSessionMsgs :: PacketQueue (Bool{-Handled?-},Tox.CryptoMessage) | ||
1262 | , perSessionPublicKey :: PublicKey | ||
1263 | , perSessionAddr :: SockAddr | ||
1264 | , perSessionNumVar :: TVar Word32 | ||
1265 | , perSessionDropCount :: TVar Word32 | ||
1266 | } | ||
1267 | 1279 | ||
1268 | main :: IO () | 1280 | main :: IO () |
1269 | main = runResourceT $ liftBaseWith $ \resT -> do | 1281 | main = runResourceT $ liftBaseWith $ \resT -> do |
@@ -1372,7 +1384,6 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1372 | 1384 | ||
1373 | crypto <- Tox.newCrypto | 1385 | crypto <- Tox.newCrypto |
1374 | netCryptoSessionsState <- Tox.newSessionsState crypto Tox.defaultUnRecHook Tox.defaultCryptoDataHooks | 1386 | netCryptoSessionsState <- Tox.newSessionsState crypto Tox.defaultUnRecHook Tox.defaultCryptoDataHooks |
1375 | sessions <- atomically (newTVar []) :: IO (TVar [PerSession]) | ||
1376 | (mbtox,quitTox,toxdhts,toxips,(taddrs::[SockAddr])) <- case porttox opts of | 1387 | (mbtox,quitTox,toxdhts,toxips,(taddrs::[SockAddr])) <- case porttox opts of |
1377 | "" -> return (Nothing,return (), Map.empty, return [],[]) | 1388 | "" -> return (Nothing,return (), Map.empty, return [],[]) |
1378 | toxport -> do | 1389 | toxport -> do |
@@ -1602,16 +1613,6 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1602 | -- allsessionsMap <- atomically $ readTVar (netCryptoSessions netCryptoSessionsState) | 1613 | -- allsessionsMap <- atomically $ readTVar (netCryptoSessions netCryptoSessionsState) |
1603 | let sockAddr = Tox.ncSockAddr netcrypto | 1614 | let sockAddr = Tox.ncSockAddr netcrypto |
1604 | pubKey = Tox.ncTheirPublicKey netcrypto | 1615 | pubKey = Tox.ncTheirPublicKey netcrypto |
1605 | msgQ <- atomically (Data.PacketQueue.newOverwrite 10 0 :: STM (PacketQueue (Bool,Tox.CryptoMessage))) | ||
1606 | msgNumVar <- atomically (newTVar 0) | ||
1607 | dropCntVar <- atomically (newTVar 0) | ||
1608 | let perSession = PerSession { perSessionMsgs = msgQ | ||
1609 | , perSessionPublicKey = pubKey | ||
1610 | , perSessionAddr = sockAddr | ||
1611 | , perSessionNumVar = msgNumVar | ||
1612 | , perSessionDropCount = dropCntVar | ||
1613 | } | ||
1614 | atomically $ modifyTVar' sessions (perSession:) | ||
1615 | tmchan <- atomically newTMChan | 1616 | tmchan <- atomically newTMChan |
1616 | let Just pingMachine = Tox.ncPingMachine netcrypto | 1617 | let Just pingMachine = Tox.ncPingMachine netcrypto |
1617 | pingflag = readTVar (pingFlag pingMachine) | 1618 | pingflag = readTVar (pingFlag pingMachine) |
@@ -1624,21 +1625,11 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1624 | announceToxJabberPeer (xmppEventChannel sv) addrTox (Tox.ncSockAddr netcrypto) pingflag xmppSrc xmppSink | 1625 | announceToxJabberPeer (xmppEventChannel sv) addrTox (Tox.ncSockAddr netcrypto) pingflag xmppSrc xmppSink |
1625 | -- TODO: Update toxContactInfo, connected. | 1626 | -- TODO: Update toxContactInfo, connected. |
1626 | #endif | 1627 | #endif |
1627 | let handleIncoming typ session cm | any ($ typ) [Tox.isKillPacket, Tox.isOFFLINE] = atomically $ do | 1628 | atomically $ do |
1628 | closeTMChan tmchan | 1629 | supply <- readTVar (Tox.listenerIDSupply netCryptoSessionsState) |
1629 | Tox.forgetCrypto crypto netCryptoSessionsState netcrypto | 1630 | let (listenerId,supply') = freshId supply |
1630 | return Nothing | 1631 | writeTVar (Tox.listenerIDSupply netCryptoSessionsState) supply' |
1631 | handleIncoming mTyp session cm = do | 1632 | modifyTVar' (Tox.ncListeners netcrypto) (IntMap.insert listenerId (0,tmchan)) |
1632 | atomically $ do | ||
1633 | num <- readTVar msgNumVar | ||
1634 | (wraps,offset) <- enqueue msgQ num (False,cm) | ||
1635 | capacity <- getCapacity msgQ | ||
1636 | let dropped = wraps * capacity + offset | ||
1637 | modifyTVar' msgNumVar (+1) | ||
1638 | writeTVar dropCntVar dropped | ||
1639 | atomically $ writeTMChan tmchan cm -- (Tox.bufferData cd) | ||
1640 | return Nothing | ||
1641 | atomically $ writeTVar (Tox.ncUnrecognizedHook netcrypto) handleIncoming | ||
1642 | return Nothing | 1633 | return Nothing |
1643 | 1634 | ||
1644 | let dhts = Map.union btdhts toxdhts | 1635 | let dhts = Map.union btdhts toxdhts |
@@ -1667,7 +1658,6 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1667 | , userkeys = toxids | 1658 | , userkeys = toxids |
1668 | , roster = rstr | 1659 | , roster = rstr |
1669 | , announceToLan = fromMaybe (return ()) $ Tox.toxAnnounceToLan <$> mbtox | 1660 | , announceToLan = fromMaybe (return ()) $ Tox.toxAnnounceToLan <$> mbtox |
1670 | , sessions = sessions | ||
1671 | , connectionManager = ConnectionManager <$> mconns | 1661 | , connectionManager = ConnectionManager <$> mconns |
1672 | , onionRouter = orouter | 1662 | , onionRouter = orouter |
1673 | , externalAddresses = liftM2 (++) btips toxips | 1663 | , externalAddresses = liftM2 (++) btips toxips |
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index 0e349196..602b14cc 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -10,6 +10,7 @@ import Network.Tox.DHT.Transport (Cookie(..),CookieData(..), CookieRequest(..), | |||
10 | import Network.Tox.DHT.Handlers (Client, cookieRequest, createCookie ) | 10 | import Network.Tox.DHT.Handlers (Client, cookieRequest, createCookie ) |
11 | import Crypto.Tox | 11 | import Crypto.Tox |
12 | import Control.Concurrent.STM | 12 | import Control.Concurrent.STM |
13 | import Control.Concurrent.STM.TMChan | ||
13 | import Network.Address | 14 | import Network.Address |
14 | import qualified Data.Map.Strict as Map | 15 | import qualified Data.Map.Strict as Map |
15 | import Crypto.Hash | 16 | import Crypto.Hash |
@@ -37,6 +38,8 @@ import System.Random -- for ping fuzz | |||
37 | import Control.Concurrent | 38 | import Control.Concurrent |
38 | import GHC.Conc (labelThread) | 39 | import GHC.Conc (labelThread) |
39 | import PingMachine | 40 | import PingMachine |
41 | import qualified Data.IntMap.Strict as IntMap | ||
42 | import Control.Concurrent.Supply | ||
40 | 43 | ||
41 | -- util, todo: move to another module | 44 | -- util, todo: move to another module |
42 | maybeToEither :: Maybe b -> Either String b | 45 | maybeToEither :: Maybe b -> Either String b |
@@ -83,6 +86,20 @@ data SessionView = SessionView | |||
83 | 86 | ||
84 | type SessionID = Word64 | 87 | type SessionID = Word64 |
85 | 88 | ||
89 | -- | Application specific listener type (Word64) | ||
90 | -- | ||
91 | -- This is some kind of information associated with a listening TChan. | ||
92 | -- It may be used to indicate what kind of packets it is interested in. | ||
93 | -- | ||
94 | -- 0 means listen to all messages and is done automatically in 'defaultUnRecHook' | ||
95 | -- any other values are left open to application specific convention. | ||
96 | -- | ||
97 | -- This module does not know what the different values here | ||
98 | -- mean, but code that sets hooks may adhere to a convention | ||
99 | -- defined elsewhere. | ||
100 | -- | ||
101 | type ListenerType = Word64 | ||
102 | |||
86 | data NetCryptoSession = NCrypto | 103 | data NetCryptoSession = NCrypto |
87 | { ncState :: TVar NetCryptoSessionStatus | 104 | { ncState :: TVar NetCryptoSessionStatus |
88 | , ncMyPublicKey :: PublicKey | 105 | , ncMyPublicKey :: PublicKey |
@@ -116,6 +133,18 @@ data NetCryptoSession = NCrypto | |||
116 | CryptoMessage | 133 | CryptoMessage |
117 | (CryptoPacket Encrypted) | 134 | (CryptoPacket Encrypted) |
118 | CryptoData | 135 | CryptoData |
136 | , ncLastNMsgs :: PacketQueue (Bool{-Handled?-},CryptoMessage) | ||
137 | -- ^ cyclic buffer, holds the last N non-handshake crypto messages | ||
138 | -- even if there is no attached user interface. | ||
139 | , ncListeners :: TVar (IntMap.IntMap (ListenerType,TMChan CryptoMessage)) | ||
140 | -- ^ user interfaces may "listen" by inserting themselves into this map | ||
141 | -- with a unique id and a new TChan, and then reading from the TChan | ||
142 | , ncMsgNumVar :: TVar Word32 | ||
143 | -- ^ The number of non-handshake crypto messages recieved in this session | ||
144 | -- TODO: there is already a packet num etc, do we need two? | ||
145 | , ncDropCntVar :: TVar Word32 | ||
146 | -- ^ The number of crypto messages that were overwritten in the ncLastNMsgs | ||
147 | -- before anybody got to see them. | ||
119 | } | 148 | } |
120 | 149 | ||
121 | data NetCryptoSessions = NCSessions | 150 | data NetCryptoSessions = NCSessions |
@@ -131,6 +160,7 @@ data NetCryptoSessions = NCSessions | |||
131 | , nextSessionId :: TVar SessionID | 160 | , nextSessionId :: TVar SessionID |
132 | , announceNewSessionHooks :: TVar [IOHook (Maybe NoSpam) NetCryptoSession] | 161 | , announceNewSessionHooks :: TVar [IOHook (Maybe NoSpam) NetCryptoSession] |
133 | , sessionTransport :: Transport String SockAddr NetCrypto | 162 | , sessionTransport :: Transport String SockAddr NetCrypto |
163 | , listenerIDSupply :: TVar Supply | ||
134 | } | 164 | } |
135 | 165 | ||
136 | type NewSessionHook = IOHook (Maybe NoSpam) NetCryptoSession | 166 | type NewSessionHook = IOHook (Maybe NoSpam) NetCryptoSession |
@@ -173,6 +203,8 @@ newSessionsState crypto unrechook hooks = do | |||
173 | svDownloadDir0 <- atomically $ newTVar (homedir </> "Downloads") | 203 | svDownloadDir0 <- atomically $ newTVar (homedir </> "Downloads") |
174 | nextSessionId0 <- atomically $ newTVar 0 | 204 | nextSessionId0 <- atomically $ newTVar 0 |
175 | announceNewSessionHooks0 <- atomically $ newTVar [] | 205 | announceNewSessionHooks0 <- atomically $ newTVar [] |
206 | lsupply <- newSupply | ||
207 | lsupplyVar <- atomically (newTVar lsupply) | ||
176 | return NCSessions { netCryptoSessions = x | 208 | return NCSessions { netCryptoSessions = x |
177 | , netCryptoSessionsByKey = x2 | 209 | , netCryptoSessionsByKey = x2 |
178 | , transportCrypto = crypto | 210 | , transportCrypto = crypto |
@@ -195,6 +227,7 @@ newSessionsState crypto unrechook hooks = do | |||
195 | , nextSessionId = nextSessionId0 | 227 | , nextSessionId = nextSessionId0 |
196 | , announceNewSessionHooks = announceNewSessionHooks0 | 228 | , announceNewSessionHooks = announceNewSessionHooks0 |
197 | , sessionTransport = error "Need to set sessionTransport field of NetCryptoSessions!" | 229 | , sessionTransport = error "Need to set sessionTransport field of NetCryptoSessions!" |
230 | , listenerIDSupply = lsupplyVar | ||
198 | } | 231 | } |
199 | 232 | ||
200 | data HandshakeParams | 233 | data HandshakeParams |
@@ -338,6 +371,10 @@ freshCryptoSession sessions | |||
338 | writeTVar ncMyPacketNonce0 n24plus1 | 371 | writeTVar ncMyPacketNonce0 n24plus1 |
339 | return (return (f n24, n24, ncOutgoingIdMap0)) | 372 | return (return (f n24, n24, ncOutgoingIdMap0)) |
340 | pktoq <- atomically $ PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 | 373 | pktoq <- atomically $ PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 |
374 | msgQ <- atomically (PQ.newOverwrite 10 0 :: STM (PacketQueue (Bool,CryptoMessage))) | ||
375 | listeners <- atomically $ newTVar IntMap.empty | ||
376 | msgNum <- atomically $ newTVar 0 | ||
377 | dropNum <- atomically $ newTVar 0 | ||
341 | let netCryptoSession0 = | 378 | let netCryptoSession0 = |
342 | NCrypto { ncState = ncState0 | 379 | NCrypto { ncState = ncState0 |
343 | , ncMyPublicKey = toPublic key | 380 | , ncMyPublicKey = toPublic key |
@@ -362,6 +399,10 @@ freshCryptoSession sessions | |||
362 | , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" | 399 | , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" |
363 | , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" | 400 | , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" |
364 | , ncOutgoingQueue = pktoq | 401 | , ncOutgoingQueue = pktoq |
402 | , ncLastNMsgs = msgQ | ||
403 | , ncListeners = listeners | ||
404 | , ncMsgNumVar = msgNum | ||
405 | , ncDropCntVar = dropNum | ||
365 | } | 406 | } |
366 | -- launch dequeue thread | 407 | -- launch dequeue thread |
367 | threadid <- forkIO $ do | 408 | threadid <- forkIO $ do |
@@ -615,9 +656,25 @@ allMsgTypes fDefault = A.listArray (minBound,maxBound) (0:knownMsgs) | |||
615 | defaultCryptoDataHooks :: Map.Map MessageType [NetCryptoHook] | 656 | defaultCryptoDataHooks :: Map.Map MessageType [NetCryptoHook] |
616 | defaultCryptoDataHooks = Map.empty | 657 | defaultCryptoDataHooks = Map.empty |
617 | 658 | ||
618 | -- | discards all unrecognized packets | 659 | -- | updates ncLastNMsgs, and sends message to type-0 listeners |
619 | defaultUnRecHook :: MessageType -> NetCryptoHook | 660 | defaultUnRecHook :: MessageType -> NetCryptoHook |
620 | defaultUnRecHook _ _ _ = return Nothing | 661 | defaultUnRecHook typ session cm | any ($ typ) [isKillPacket, isOFFLINE] = atomically $ do |
662 | tmchans <- map snd . IntMap.elems <$> readTVar (ncListeners session) | ||
663 | forM_ tmchans $ \chan -> closeTMChan chan | ||
664 | return Nothing | ||
665 | |||
666 | defaultUnRecHook typ session cm = do | ||
667 | let msgQ = ncLastNMsgs session | ||
668 | msgNumVar = ncMsgNumVar session | ||
669 | dropCntVar = ncDropCntVar session | ||
670 | atomically $ do | ||
671 | num <- readTVar msgNumVar | ||
672 | (wraps,offset) <- PQ.enqueue msgQ num (False,cm) | ||
673 | capacity <- PQ.getCapacity msgQ | ||
674 | let dropped = wraps * capacity + offset | ||
675 | modifyTVar' msgNumVar (+1) | ||
676 | writeTVar dropCntVar dropped | ||
677 | return Nothing | ||
621 | 678 | ||
622 | -- | use to add a single hook to a specific session. | 679 | -- | use to add a single hook to a specific session. |
623 | addCryptoDataHook1 :: Map.Map MessageType [NetCryptoHook] -> MessageType -> NetCryptoHook -> Map.Map MessageType [NetCryptoHook] | 680 | addCryptoDataHook1 :: Map.Map MessageType [NetCryptoHook] -> MessageType -> NetCryptoHook -> Map.Map MessageType [NetCryptoHook] |