summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Data/PacketQueue.hs23
-rw-r--r--src/Network/Tox/Crypto/Handlers.hs63
2 files changed, 83 insertions, 3 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
index cb308bce..f9d9f28f 100644
--- a/src/Data/PacketQueue.hs
+++ b/src/Data/PacketQueue.hs
@@ -17,6 +17,7 @@ module Data.PacketQueue
17 , packetQueueViewList 17 , packetQueueViewList
18 , newOutGoing 18 , newOutGoing
19 , readyOutGoing 19 , readyOutGoing
20 , peekPacket
20 , tryAppendQueueOutgoing 21 , tryAppendQueueOutgoing
21 , dequeueOutgoing 22 , dequeueOutgoing
22 , getHighestHandledPacketPlus1 23 , getHighestHandledPacketPlus1
@@ -187,6 +188,27 @@ data OutGoingResult = OGSuccess | OGFull | OGEncodeFail
187readyOutGoing :: PacketOutQueue extra msg wire fromwire -> IO (STM extra) 188readyOutGoing :: PacketOutQueue extra msg wire fromwire -> IO (STM extra)
188readyOutGoing (PacketOutQueue {pktoToWireIO }) = pktoToWireIO 189readyOutGoing (PacketOutQueue {pktoToWireIO }) = pktoToWireIO
189 190
191peekPacket :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM (Maybe (wire,Word32))
192peekPacket getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg
193 = do
194 be <- readTVar (buffend pktoOutPQ)
195 let i = be `mod` (qsize pktoOutPQ)
196 let arrayEmpty :: MArray a e m => a Word32 e -> m Bool
197 arrayEmpty ar = do (lowB,highB) <- getBounds ar
198 let result= lowB > highB
199 return $ trace ("arrayEmpty result=" ++ show result
200 ++ " lowB=" ++ show lowB
201 ++ " highB = " ++ show highB
202 ++ " i = " ++ show i) result
203 mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ)
204 if emp then trace "(peekPacket empty)" $ return Nothing
205 else trace "(peekPacket nonempty)" $ do
206 result <- readArray (pktq pktoOutPQ) i
207 return $ trace ("readArray (isJust result)==" ++ show (isJust result)) result
208 pktno <- readTVar pktoPacketNo
209 nextno <- readTVar (seqno pktoInPQ)
210 pktoToWire getExtra nextno be pktno msg
211
190-- | Convert a message to packet format and append it to the front of a queue 212-- | Convert a message to packet format and append it to the front of a queue
191-- used for outgoing messages. (Note that ‘front‛ usually means the higher 213-- used for outgoing messages. (Note that ‘front‛ usually means the higher
192-- index in this implementation.) 214-- index in this implementation.)
@@ -210,6 +232,7 @@ tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPac
210 pktno <- readTVar pktoPacketNo 232 pktno <- readTVar pktoPacketNo
211 nextno <- readTVar (seqno pktoInPQ) 233 nextno <- readTVar (seqno pktoInPQ)
212 mbWire <- pktoToWire getExtra nextno be pktno msg 234 mbWire <- pktoToWire getExtra nextno be pktno msg
235 -- TODO all the above lines ^^ can be replaced with call to peekPacket
213 case trace "(tryAppendQueueOutgoing mbWire)" mbWire of 236 case trace "(tryAppendQueueOutgoing mbWire)" mbWire of
214 Just (pkt,pktno') 237 Just (pkt,pktno')
215 -> trace "(tryAppendQueueOutgoing A)" 238 -> trace "(tryAppendQueueOutgoing A)"
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs
index c5c17e4e..d174b10c 100644
--- a/src/Network/Tox/Crypto/Handlers.hs
+++ b/src/Network/Tox/Crypto/Handlers.hs
@@ -345,6 +345,7 @@ data NetCryptoSession = NCrypto
345 -- ^ when the thread which dequeues from ncPacketQueue 345 -- ^ when the thread which dequeues from ncPacketQueue
346 -- is started, its ThreadId is stored here 346 -- is started, its ThreadId is stored here
347 , ncPingMachine :: Maybe PingMachine 347 , ncPingMachine :: Maybe PingMachine
348 , ncPingThread :: Maybe ThreadId
348 -- ^ when the ping thread is started, store it here 349 -- ^ when the ping thread is started, store it here
349 , ncOutgoingQueue :: TVar 350 , ncOutgoingQueue :: TVar
350 (UponHandshake 351 (UponHandshake
@@ -622,6 +623,7 @@ freshCryptoSession sessions
622 , ncPacketQueue = pktq 623 , ncPacketQueue = pktq
623 , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" 624 , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?"
624 , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" 625 , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?"
626 , ncPingThread = Nothing -- error "you want the NetCrypto-PingSender, but is it started?"
625 , ncOutgoingQueue = mbpktoqVar 627 , ncOutgoingQueue = mbpktoqVar
626 , ncLastNMsgs = lastNQ 628 , ncLastNMsgs = lastNQ
627 , ncListeners = listeners 629 , ncListeners = listeners
@@ -754,7 +756,9 @@ runUponHandshake netCryptoSession0 addr pktoq = do
754 Right _ -> return () 756 Right _ -> return ()
755 PingTimeOut -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") PingTimeOut TODO" 757 PingTimeOut -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") PingTimeOut TODO"
756 -- update session with thread ids 758 -- update session with thread ids
757 let netCryptoSession = netCryptoSession0 {ncDequeueThread=Just threadid, ncPingMachine=Just pingMachine} 759 let netCryptoSession = netCryptoSession0 { ncDequeueThread=Just threadid
760 , ncPingMachine=Just pingMachine
761 , ncPingThread=Just pingThreadId}
758 -- add this session to the lookup maps 762 -- add this session to the lookup maps
759 -- atomically $ addSessionToMapIfNotThere sessions addr netCryptoSession 763 -- atomically $ addSessionToMapIfNotThere sessions addr netCryptoSession
760 -- run announceNewSessionHooks 764 -- run announceNewSessionHooks
@@ -770,6 +774,23 @@ runUponHandshake netCryptoSession0 addr pktoq = do
770 Just f -> loop (hs, f session) 774 Just f -> loop (hs, f session)
771 Nothing -> return () 775 Nothing -> return ()
772 776
777destroySession :: NetCryptoSession -> IO ()
778destroySession session = do
779 let allsessions = ncAllSessions session
780 sid = ncSessionId session
781 stopThread :: Maybe ThreadId -> IO ()
782 stopThread x = maybe (return ()) killThread x
783 stopMachine :: Maybe PingMachine -> IO ()
784 stopMachine x = maybe (return ()) pingCancel x
785 atomically $ do
786 modifyTVar (netCryptoSessionsByKey allsessions)
787 $ Map.map (filter ((/=sid) . ncSessionId))
788 modifyTVar (netCryptoSessions allsessions)
789 $ Map.filterWithKey (\k v -> ncSessionId v /= sid)
790 stopThread (ncPingThread session)
791 stopMachine (ncPingMachine session)
792 stopThread (ncDequeueThread session)
793
773-- | Called when we get a handshake, but there's already a session entry. 794-- | Called when we get a handshake, but there's already a session entry.
774-- 795--
775-- 1) duplicate packet ... ignore 796-- 1) duplicate packet ... ignore
@@ -1130,8 +1151,37 @@ sendOffline crypto session = do
1130sendKill :: TransportCrypto -> NetCryptoSession -> IO (Either String ()) 1151sendKill :: TransportCrypto -> NetCryptoSession -> IO (Either String ())
1131sendKill crypto session = do 1152sendKill crypto session = do
1132 let cm=OneByte KillPacket 1153 let cm=OneByte KillPacket
1133 addMsgToLastN False (cm ^. messageType) session (Out cm) 1154 mbOutQ <- atomically $ readTVar (ncOutgoingQueue session)
1134 sendCrypto crypto session (return ()) cm 1155 case mbOutQ of
1156 NeedHandshake -> do
1157 let errmsg="NetCrypto NOT SENDING Kill packet (sessionid: " ++ show (ncSessionId session) ++ ") since no handshake yet"
1158 dput XNetCrypto errmsg
1159 dput XNetCrypto $ "Destroying session anyway"
1160 destroySession session
1161 return (Left errmsg)
1162 HaveHandshake outq -> do
1163 dput XNetCrypto $ "NetCrypto sending Kill packet (sessionid: " ++ show (ncSessionId session) ++ ")"
1164 getOutGoingParam <- PQ.readyOutGoing outq
1165 mbPkt <- atomically $ PQ.peekPacket getOutGoingParam outq cm
1166 case mbPkt of
1167 Nothing -> do
1168 let errmsg = "Error sending kill packet! (sessionid: " ++ show (ncSessionId session) ++ ")"
1169 dput XNetCrypto errmsg
1170 dput XNetCrypto $ "Destroying session anyway"
1171 Right <$> destroySession session
1172 return (Left errmsg)
1173 Just (pkt,seqno) -> do
1174 case (ncSockAddr session) of
1175 NeedDHTKey -> do
1176 let errmsg= "NetCrypto NOT SENDING Kill packet (sessionid: " ++ show (ncSessionId session) ++ ") since no DHTkey(sockaddr) yet"
1177 dput XNetCrypto errmsg
1178 dput XNetCrypto $ "Destroying session anyway"
1179 Right <$> destroySession session
1180 return (Left errmsg)
1181 HaveDHTKey saddr -> do
1182 sendSessionPacket (ncAllSessions session) saddr pkt
1183 dput XNetCrypto $ "sent kill packet (sessionid: " ++ show (ncSessionId session) ++ ")... now destroying session..."
1184 Right <$> destroySession session
1135 1185
1136setNick :: TransportCrypto -> NetCryptoSession -> ByteString -> IO (Either String ()) 1186setNick :: TransportCrypto -> NetCryptoSession -> ByteString -> IO (Either String ())
1137setNick crypto session nick = do 1187setNick crypto session nick = do
@@ -1207,8 +1257,15 @@ defaultCryptoDataHooks
1207 , (Msg TYPING,[defaultTypingHook]) 1257 , (Msg TYPING,[defaultTypingHook])
1208 , (Msg NICKNAME, [defaultNicknameHook]) 1258 , (Msg NICKNAME, [defaultNicknameHook])
1209 , (Msg STATUSMESSAGE, [defaultStatusMsgHook]) 1259 , (Msg STATUSMESSAGE, [defaultStatusMsgHook])
1260 , (Msg KillPacket, [defaultKillHook])
1210 ] 1261 ]
1211 1262
1263defaultKillHook :: NetCryptoSession -> CryptoMessage -> IO (Maybe (CryptoMessage -> CryptoMessage))
1264defaultKillHook session cm@(OneByte {msgID=KillPacket}) = do
1265 dput XNetCrypto $ "Recieved kill packet (sessionid: " ++ show (ncSessionId session) ++ ") destroying session"
1266 destroySession session
1267 return Nothing
1268
1212defaultUserStatusHook :: NetCryptoSession -> CryptoMessage -> IO (Maybe (CryptoMessage -> CryptoMessage)) 1269defaultUserStatusHook :: NetCryptoSession -> CryptoMessage -> IO (Maybe (CryptoMessage -> CryptoMessage))
1213defaultUserStatusHook session cm@(TwoByte {msgID=USERSTATUS, msgByte=statusByte}) = do 1270defaultUserStatusHook session cm@(TwoByte {msgID=USERSTATUS, msgByte=statusByte}) = do
1214 let status = toEnum8 statusByte 1271 let status = toEnum8 statusByte