diff options
-rw-r--r-- | src/Data/PacketQueue.hs | 23 | ||||
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 63 |
2 files changed, 83 insertions, 3 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index cb308bce..f9d9f28f 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs | |||
@@ -17,6 +17,7 @@ module Data.PacketQueue | |||
17 | , packetQueueViewList | 17 | , packetQueueViewList |
18 | , newOutGoing | 18 | , newOutGoing |
19 | , readyOutGoing | 19 | , readyOutGoing |
20 | , peekPacket | ||
20 | , tryAppendQueueOutgoing | 21 | , tryAppendQueueOutgoing |
21 | , dequeueOutgoing | 22 | , dequeueOutgoing |
22 | , getHighestHandledPacketPlus1 | 23 | , getHighestHandledPacketPlus1 |
@@ -187,6 +188,27 @@ data OutGoingResult = OGSuccess | OGFull | OGEncodeFail | |||
187 | readyOutGoing :: PacketOutQueue extra msg wire fromwire -> IO (STM extra) | 188 | readyOutGoing :: PacketOutQueue extra msg wire fromwire -> IO (STM extra) |
188 | readyOutGoing (PacketOutQueue {pktoToWireIO }) = pktoToWireIO | 189 | readyOutGoing (PacketOutQueue {pktoToWireIO }) = pktoToWireIO |
189 | 190 | ||
191 | peekPacket :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM (Maybe (wire,Word32)) | ||
192 | peekPacket getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg | ||
193 | = do | ||
194 | be <- readTVar (buffend pktoOutPQ) | ||
195 | let i = be `mod` (qsize pktoOutPQ) | ||
196 | let arrayEmpty :: MArray a e m => a Word32 e -> m Bool | ||
197 | arrayEmpty ar = do (lowB,highB) <- getBounds ar | ||
198 | let result= lowB > highB | ||
199 | return $ trace ("arrayEmpty result=" ++ show result | ||
200 | ++ " lowB=" ++ show lowB | ||
201 | ++ " highB = " ++ show highB | ||
202 | ++ " i = " ++ show i) result | ||
203 | mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ) | ||
204 | if emp then trace "(peekPacket empty)" $ return Nothing | ||
205 | else trace "(peekPacket nonempty)" $ do | ||
206 | result <- readArray (pktq pktoOutPQ) i | ||
207 | return $ trace ("readArray (isJust result)==" ++ show (isJust result)) result | ||
208 | pktno <- readTVar pktoPacketNo | ||
209 | nextno <- readTVar (seqno pktoInPQ) | ||
210 | pktoToWire getExtra nextno be pktno msg | ||
211 | |||
190 | -- | Convert a message to packet format and append it to the front of a queue | 212 | -- | Convert a message to packet format and append it to the front of a queue |
191 | -- used for outgoing messages. (Note that ‘front‛ usually means the higher | 213 | -- used for outgoing messages. (Note that ‘front‛ usually means the higher |
192 | -- index in this implementation.) | 214 | -- index in this implementation.) |
@@ -210,6 +232,7 @@ tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPac | |||
210 | pktno <- readTVar pktoPacketNo | 232 | pktno <- readTVar pktoPacketNo |
211 | nextno <- readTVar (seqno pktoInPQ) | 233 | nextno <- readTVar (seqno pktoInPQ) |
212 | mbWire <- pktoToWire getExtra nextno be pktno msg | 234 | mbWire <- pktoToWire getExtra nextno be pktno msg |
235 | -- TODO all the above lines ^^ can be replaced with call to peekPacket | ||
213 | case trace "(tryAppendQueueOutgoing mbWire)" mbWire of | 236 | case trace "(tryAppendQueueOutgoing mbWire)" mbWire of |
214 | Just (pkt,pktno') | 237 | Just (pkt,pktno') |
215 | -> trace "(tryAppendQueueOutgoing A)" | 238 | -> trace "(tryAppendQueueOutgoing A)" |
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index c5c17e4e..d174b10c 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -345,6 +345,7 @@ data NetCryptoSession = NCrypto | |||
345 | -- ^ when the thread which dequeues from ncPacketQueue | 345 | -- ^ when the thread which dequeues from ncPacketQueue |
346 | -- is started, its ThreadId is stored here | 346 | -- is started, its ThreadId is stored here |
347 | , ncPingMachine :: Maybe PingMachine | 347 | , ncPingMachine :: Maybe PingMachine |
348 | , ncPingThread :: Maybe ThreadId | ||
348 | -- ^ when the ping thread is started, store it here | 349 | -- ^ when the ping thread is started, store it here |
349 | , ncOutgoingQueue :: TVar | 350 | , ncOutgoingQueue :: TVar |
350 | (UponHandshake | 351 | (UponHandshake |
@@ -622,6 +623,7 @@ freshCryptoSession sessions | |||
622 | , ncPacketQueue = pktq | 623 | , ncPacketQueue = pktq |
623 | , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" | 624 | , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" |
624 | , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" | 625 | , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" |
626 | , ncPingThread = Nothing -- error "you want the NetCrypto-PingSender, but is it started?" | ||
625 | , ncOutgoingQueue = mbpktoqVar | 627 | , ncOutgoingQueue = mbpktoqVar |
626 | , ncLastNMsgs = lastNQ | 628 | , ncLastNMsgs = lastNQ |
627 | , ncListeners = listeners | 629 | , ncListeners = listeners |
@@ -754,7 +756,9 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
754 | Right _ -> return () | 756 | Right _ -> return () |
755 | PingTimeOut -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") PingTimeOut TODO" | 757 | PingTimeOut -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") PingTimeOut TODO" |
756 | -- update session with thread ids | 758 | -- update session with thread ids |
757 | let netCryptoSession = netCryptoSession0 {ncDequeueThread=Just threadid, ncPingMachine=Just pingMachine} | 759 | let netCryptoSession = netCryptoSession0 { ncDequeueThread=Just threadid |
760 | , ncPingMachine=Just pingMachine | ||
761 | , ncPingThread=Just pingThreadId} | ||
758 | -- add this session to the lookup maps | 762 | -- add this session to the lookup maps |
759 | -- atomically $ addSessionToMapIfNotThere sessions addr netCryptoSession | 763 | -- atomically $ addSessionToMapIfNotThere sessions addr netCryptoSession |
760 | -- run announceNewSessionHooks | 764 | -- run announceNewSessionHooks |
@@ -770,6 +774,23 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
770 | Just f -> loop (hs, f session) | 774 | Just f -> loop (hs, f session) |
771 | Nothing -> return () | 775 | Nothing -> return () |
772 | 776 | ||
777 | destroySession :: NetCryptoSession -> IO () | ||
778 | destroySession session = do | ||
779 | let allsessions = ncAllSessions session | ||
780 | sid = ncSessionId session | ||
781 | stopThread :: Maybe ThreadId -> IO () | ||
782 | stopThread x = maybe (return ()) killThread x | ||
783 | stopMachine :: Maybe PingMachine -> IO () | ||
784 | stopMachine x = maybe (return ()) pingCancel x | ||
785 | atomically $ do | ||
786 | modifyTVar (netCryptoSessionsByKey allsessions) | ||
787 | $ Map.map (filter ((/=sid) . ncSessionId)) | ||
788 | modifyTVar (netCryptoSessions allsessions) | ||
789 | $ Map.filterWithKey (\k v -> ncSessionId v /= sid) | ||
790 | stopThread (ncPingThread session) | ||
791 | stopMachine (ncPingMachine session) | ||
792 | stopThread (ncDequeueThread session) | ||
793 | |||
773 | -- | Called when we get a handshake, but there's already a session entry. | 794 | -- | Called when we get a handshake, but there's already a session entry. |
774 | -- | 795 | -- |
775 | -- 1) duplicate packet ... ignore | 796 | -- 1) duplicate packet ... ignore |
@@ -1130,8 +1151,37 @@ sendOffline crypto session = do | |||
1130 | sendKill :: TransportCrypto -> NetCryptoSession -> IO (Either String ()) | 1151 | sendKill :: TransportCrypto -> NetCryptoSession -> IO (Either String ()) |
1131 | sendKill crypto session = do | 1152 | sendKill crypto session = do |
1132 | let cm=OneByte KillPacket | 1153 | let cm=OneByte KillPacket |
1133 | addMsgToLastN False (cm ^. messageType) session (Out cm) | 1154 | mbOutQ <- atomically $ readTVar (ncOutgoingQueue session) |
1134 | sendCrypto crypto session (return ()) cm | 1155 | case mbOutQ of |
1156 | NeedHandshake -> do | ||
1157 | let errmsg="NetCrypto NOT SENDING Kill packet (sessionid: " ++ show (ncSessionId session) ++ ") since no handshake yet" | ||
1158 | dput XNetCrypto errmsg | ||
1159 | dput XNetCrypto $ "Destroying session anyway" | ||
1160 | destroySession session | ||
1161 | return (Left errmsg) | ||
1162 | HaveHandshake outq -> do | ||
1163 | dput XNetCrypto $ "NetCrypto sending Kill packet (sessionid: " ++ show (ncSessionId session) ++ ")" | ||
1164 | getOutGoingParam <- PQ.readyOutGoing outq | ||
1165 | mbPkt <- atomically $ PQ.peekPacket getOutGoingParam outq cm | ||
1166 | case mbPkt of | ||
1167 | Nothing -> do | ||
1168 | let errmsg = "Error sending kill packet! (sessionid: " ++ show (ncSessionId session) ++ ")" | ||
1169 | dput XNetCrypto errmsg | ||
1170 | dput XNetCrypto $ "Destroying session anyway" | ||
1171 | Right <$> destroySession session | ||
1172 | return (Left errmsg) | ||
1173 | Just (pkt,seqno) -> do | ||
1174 | case (ncSockAddr session) of | ||
1175 | NeedDHTKey -> do | ||
1176 | let errmsg= "NetCrypto NOT SENDING Kill packet (sessionid: " ++ show (ncSessionId session) ++ ") since no DHTkey(sockaddr) yet" | ||
1177 | dput XNetCrypto errmsg | ||
1178 | dput XNetCrypto $ "Destroying session anyway" | ||
1179 | Right <$> destroySession session | ||
1180 | return (Left errmsg) | ||
1181 | HaveDHTKey saddr -> do | ||
1182 | sendSessionPacket (ncAllSessions session) saddr pkt | ||
1183 | dput XNetCrypto $ "sent kill packet (sessionid: " ++ show (ncSessionId session) ++ ")... now destroying session..." | ||
1184 | Right <$> destroySession session | ||
1135 | 1185 | ||
1136 | setNick :: TransportCrypto -> NetCryptoSession -> ByteString -> IO (Either String ()) | 1186 | setNick :: TransportCrypto -> NetCryptoSession -> ByteString -> IO (Either String ()) |
1137 | setNick crypto session nick = do | 1187 | setNick crypto session nick = do |
@@ -1207,8 +1257,15 @@ defaultCryptoDataHooks | |||
1207 | , (Msg TYPING,[defaultTypingHook]) | 1257 | , (Msg TYPING,[defaultTypingHook]) |
1208 | , (Msg NICKNAME, [defaultNicknameHook]) | 1258 | , (Msg NICKNAME, [defaultNicknameHook]) |
1209 | , (Msg STATUSMESSAGE, [defaultStatusMsgHook]) | 1259 | , (Msg STATUSMESSAGE, [defaultStatusMsgHook]) |
1260 | , (Msg KillPacket, [defaultKillHook]) | ||
1210 | ] | 1261 | ] |
1211 | 1262 | ||
1263 | defaultKillHook :: NetCryptoSession -> CryptoMessage -> IO (Maybe (CryptoMessage -> CryptoMessage)) | ||
1264 | defaultKillHook session cm@(OneByte {msgID=KillPacket}) = do | ||
1265 | dput XNetCrypto $ "Recieved kill packet (sessionid: " ++ show (ncSessionId session) ++ ") destroying session" | ||
1266 | destroySession session | ||
1267 | return Nothing | ||
1268 | |||
1212 | defaultUserStatusHook :: NetCryptoSession -> CryptoMessage -> IO (Maybe (CryptoMessage -> CryptoMessage)) | 1269 | defaultUserStatusHook :: NetCryptoSession -> CryptoMessage -> IO (Maybe (CryptoMessage -> CryptoMessage)) |
1213 | defaultUserStatusHook session cm@(TwoByte {msgID=USERSTATUS, msgByte=statusByte}) = do | 1270 | defaultUserStatusHook session cm@(TwoByte {msgID=USERSTATUS, msgByte=statusByte}) = do |
1214 | let status = toEnum8 statusByte | 1271 | let status = toEnum8 statusByte |