diff options
author | James Crayne <jim.crayne@gmail.com> | 2017-11-14 05:17:02 +0000 |
---|---|---|
committer | James Crayne <jim.crayne@gmail.com> | 2017-11-19 23:40:14 +0000 |
commit | 43219aa4da88fbf3187230af0b5ad5b2d17f177a (patch) | |
tree | 915d7e6d77006431c40a8ad050bec0dd85a20515 /src/Network/Tox/Crypto/Handlers.hs | |
parent | 01db8c87be13d4f1cbb2b60ecfa534301078df9f (diff) |
netcrypto dequeue thread
Diffstat (limited to 'src/Network/Tox/Crypto/Handlers.hs')
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 34 |
1 files changed, 29 insertions, 5 deletions
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index 787c69c2..b92d4805 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -30,6 +30,8 @@ import System.FilePath | |||
30 | import System.IO.Temp | 30 | import System.IO.Temp |
31 | import System.Environment | 31 | import System.Environment |
32 | import System.Directory | 32 | import System.Directory |
33 | import Control.Concurrent | ||
34 | import GHC.Conc (labelThread) | ||
33 | 35 | ||
34 | -- util, todo: move to another module | 36 | -- util, todo: move to another module |
35 | maybeToEither :: Maybe b -> Either String b | 37 | maybeToEither :: Maybe b -> Either String b |
@@ -82,8 +84,9 @@ data NetCryptoSession = NCrypto { ncState :: TVar NetCryptoSessionStatus | |||
82 | -- needs to possibly start another, as is | 84 | -- needs to possibly start another, as is |
83 | -- the case in group chats | 85 | -- the case in group chats |
84 | , ncView :: TVar SessionView | 86 | , ncView :: TVar SessionView |
85 | , ncPacketQueue :: PacketQueue CryptoMessage | 87 | , ncPacketQueue :: PacketQueue CryptoData |
86 | , ncBufferStart :: TVar Word32 | 88 | , ncBufferStart :: TVar Word32 |
89 | , ncDequeueThread :: ThreadId | ||
87 | } | 90 | } |
88 | 91 | ||
89 | data NetCryptoSessions = NCSessions { netCryptoSessions :: TVar (Map.Map SockAddr NetCryptoSession) | 92 | data NetCryptoSessions = NCSessions { netCryptoSessions :: TVar (Map.Map SockAddr NetCryptoSession) |
@@ -107,7 +110,7 @@ newSessionsState crypto unrechook hooks = do | |||
107 | grps <- atomically $ newTVar Map.empty | 110 | grps <- atomically $ newTVar Map.empty |
108 | pname <- getProgName | 111 | pname <- getProgName |
109 | cachedir <- sensibleCacheDirCreateIfMissing pname | 112 | cachedir <- sensibleCacheDirCreateIfMissing pname |
110 | tmpdir <- (</> pname) <$> getCanonicalTemporaryDirectory | 113 | tmpdir <- (</> pname) <$> (getTemporaryDirectory >>= canonicalizePath) -- getCanonicalTemporaryDirectory |
111 | configdir <- sensibleVarLib pname | 114 | configdir <- sensibleVarLib pname |
112 | homedir <- getHomeDirectory | 115 | homedir <- getHomeDirectory |
113 | svDownloadDir0 <- atomically $ newTVar (homedir </> "Downloads") | 116 | svDownloadDir0 <- atomically $ newTVar (homedir </> "Downloads") |
@@ -203,7 +206,7 @@ freshCryptoSession sessions | |||
203 | ncView0 <- atomically $ newTVar (sessionView sessions) | 206 | ncView0 <- atomically $ newTVar (sessionView sessions) |
204 | pktq <- atomically $ PQ.new (inboundQueueCapacity sessions) 0 | 207 | pktq <- atomically $ PQ.new (inboundQueueCapacity sessions) 0 |
205 | bufstart <- atomically $ newTVar 0 | 208 | bufstart <- atomically $ newTVar 0 |
206 | let netCryptoSession = | 209 | let netCryptoSession0 = |
207 | NCrypto { ncState = ncState0 | 210 | NCrypto { ncState = ncState0 |
208 | , ncTheirBaseNonce= ncTheirBaseNonce0 | 211 | , ncTheirBaseNonce= ncTheirBaseNonce0 |
209 | , ncMyPacketNonce = ncMyPacketNonce0 | 212 | , ncMyPacketNonce = ncMyPacketNonce0 |
@@ -220,7 +223,16 @@ freshCryptoSession sessions | |||
220 | , ncView = ncView0 | 223 | , ncView = ncView0 |
221 | , ncPacketQueue = pktq | 224 | , ncPacketQueue = pktq |
222 | , ncBufferStart = bufstart | 225 | , ncBufferStart = bufstart |
226 | , ncDequeueThread = error "you want the NetCrypto-Dequeue thread id, but is it started?" | ||
223 | } | 227 | } |
228 | threadid <- forkIO $ do | ||
229 | tid <- myThreadId | ||
230 | labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey)) | ||
231 | fix $ \loop -> do | ||
232 | cd <- atomically $ PQ.dequeue pktq | ||
233 | _ <- runCryptoHook (netCryptoSession0 {ncDequeueThread=tid}) cd | ||
234 | loop | ||
235 | let netCryptoSession = netCryptoSession0 {ncDequeueThread=threadid} | ||
224 | atomically $ modifyTVar allsessions (Map.insert addr netCryptoSession) | 236 | atomically $ modifyTVar allsessions (Map.insert addr netCryptoSession) |
225 | 237 | ||
226 | -- | Called when we get a handshake, but there's already a session entry. | 238 | -- | Called when we get a handshake, but there's already a session entry. |
@@ -304,7 +316,7 @@ cryptoNetHandler sessions addr (NetCrypto (CryptoPacket nonce16 encrypted)) = do | |||
304 | -- Handle Encrypted Message | 316 | -- Handle Encrypted Message |
305 | case Map.lookup addr sessionsmap of | 317 | case Map.lookup addr sessionsmap of |
306 | Nothing -> return Nothing -- drop packet, we have no session | 318 | Nothing -> return Nothing -- drop packet, we have no session |
307 | Just session@(NCrypto {ncState, ncHooks,ncSessionSecret,ncTheirSessionPublic,ncTheirBaseNonce}) -> do | 319 | Just session@(NCrypto {ncMessageTypes, ncState, ncPacketQueue, ncHooks,ncSessionSecret,ncTheirSessionPublic,ncTheirBaseNonce}) -> do |
308 | theirBaseNonce <- atomically $ readTVar ncTheirBaseNonce | 320 | theirBaseNonce <- atomically $ readTVar ncTheirBaseNonce |
309 | -- Try to decrypt message | 321 | -- Try to decrypt message |
310 | let diff :: Word16 | 322 | let diff :: Word16 |
@@ -333,7 +345,18 @@ cryptoNetHandler sessions addr (NetCrypto (CryptoPacket nonce16 encrypted)) = do | |||
333 | writeTVar ncTheirBaseNonce y | 345 | writeTVar ncTheirBaseNonce y |
334 | -- then set session confirmed, | 346 | -- then set session confirmed, |
335 | atomically $ writeTVar ncState Confirmed | 347 | atomically $ writeTVar ncState Confirmed |
336 | runCryptoHook session cd | 348 | msgTypes <- atomically $ readTVar ncMessageTypes |
349 | let msgTyp = cd ^. messageType | ||
350 | msgTypMapped16 = msgTypes A.! fromEnum8 (msgID cm) | ||
351 | msgTypMapped = fromWord16 $ msgTypMapped16 | ||
352 | isLossy (GrpMsg KnownLossy _) = True | ||
353 | isLossy (Msg mid) | lossyness mid == Lossy = True | ||
354 | isLossy _ = False | ||
355 | if isLossy msgTypMapped | ||
356 | then do atomically $ PQ.observeOutOfBand ncPacketQueue bufferEnd | ||
357 | runCryptoHook session cd | ||
358 | else do atomically $ PQ.enqueue ncPacketQueue bufferEnd cd | ||
359 | return Nothing | ||
337 | where | 360 | where |
338 | last2Bytes :: Nonce24 -> Word | 361 | last2Bytes :: Nonce24 -> Word |
339 | last2Bytes (Nonce24 bs) = case S.decode (B.drop 22 bs) of | 362 | last2Bytes (Nonce24 bs) = case S.decode (B.drop 22 bs) of |
@@ -341,6 +364,7 @@ cryptoNetHandler sessions addr (NetCrypto (CryptoPacket nonce16 encrypted)) = do | |||
341 | _ -> error "unreachable-last2Bytes" | 364 | _ -> error "unreachable-last2Bytes" |
342 | dATA_NUM_THRESHOLD = 21845 -- = 65535 / 3 | 365 | dATA_NUM_THRESHOLD = 21845 -- = 65535 / 3 |
343 | 366 | ||
367 | -- | TODO: make this accept CrytpoMessage instead | ||
344 | runCryptoHook :: NetCryptoSession -> CryptoData -> IO (Maybe (x -> x)) | 368 | runCryptoHook :: NetCryptoSession -> CryptoData -> IO (Maybe (x -> x)) |
345 | runCryptoHook session@(NCrypto {ncState, ncHooks,ncSessionSecret,ncTheirSessionPublic,ncTheirBaseNonce,ncMessageTypes}) | 369 | runCryptoHook session@(NCrypto {ncState, ncHooks,ncSessionSecret,ncTheirSessionPublic,ncTheirBaseNonce,ncMessageTypes}) |
346 | cd@(CryptoData {bufferStart, bufferEnd, bufferData=cm}) = do | 370 | cd@(CryptoData {bufferStart, bufferEnd, bufferData=cm}) = do |