diff options
-rw-r--r-- | examples/dhtd.hs | 17 | ||||
-rw-r--r-- | src/Data/PacketQueue.hs | 18 |
2 files changed, 31 insertions, 4 deletions
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 6e2647d1..1aa36b77 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -94,6 +94,7 @@ import Data.Typeable | |||
94 | import Network.Tox.ContactInfo as Tox | 94 | import Network.Tox.ContactInfo as Tox |
95 | import OnionRouter | 95 | import OnionRouter |
96 | import PingMachine | 96 | import PingMachine |
97 | import Data.PacketQueue | ||
97 | 98 | ||
98 | -- Presence imports. | 99 | -- Presence imports. |
99 | import ConsoleWriter | 100 | import ConsoleWriter |
@@ -998,8 +999,20 @@ newXmmpSource session = do | |||
998 | newXmmpSource session | 999 | newXmmpSource session |
999 | 1000 | ||
1000 | newXmmpSink :: Tox.NetCryptoSession -> C.Sink (Flush Tox.CryptoMessage) IO () | 1001 | newXmmpSink :: Tox.NetCryptoSession -> C.Sink (Flush Tox.CryptoMessage) IO () |
1001 | newXmmpSink sessions = C.awaitForever $ \flush_cyptomessage -> do | 1002 | newXmmpSink session@(Tox.NCrypto { ncOutgoingQueue, ncPacketQueue }) = C.awaitForever $ \flush_cyptomessage -> do |
1002 | liftIO $ (_todo sessions {- send the fucking message -}) flush_cyptomessage | 1003 | let sendit :: Tox.NetCryptoSession -> Flush Tox.CryptoMessage -> IO () |
1004 | sendit session (Chunk msg) = do | ||
1005 | extra <- readyOutGoing ncOutgoingQueue | ||
1006 | r <- atomically $ do | ||
1007 | rTry <- tryAppendQueueOutgoing extra ncOutgoingQueue msg | ||
1008 | case rTry of | ||
1009 | OGFull -> retry | ||
1010 | OGSuccess -> return OGSuccess | ||
1011 | OGEncodeFail -> return OGEncodeFail | ||
1012 | when (r == OGEncodeFail) $ | ||
1013 | hPutStrLn stderr ("FAILURE to Encode Outgoing: " ++ show msg) | ||
1014 | sendit session Flush = return () | ||
1015 | liftIO $ sendit session flush_cyptomessage | ||
1003 | 1016 | ||
1004 | -- | TODO | 1017 | -- | TODO |
1005 | -- | 1018 | -- |
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index cde26fb7..b7737656 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs | |||
@@ -12,8 +12,10 @@ module Data.PacketQueue | |||
12 | , observeOutOfBand | 12 | , observeOutOfBand |
13 | , PacketOutQueue | 13 | , PacketOutQueue |
14 | , newOutGoing | 14 | , newOutGoing |
15 | , readyOutGoing | ||
15 | , tryAppendQueueOutgoing | 16 | , tryAppendQueueOutgoing |
16 | , dequeueOutgoing | 17 | , dequeueOutgoing |
18 | , getHighestHandledPacketPlus1 | ||
17 | , mapOutGoing | 19 | , mapOutGoing |
18 | , OutGoingResult(..) | 20 | , OutGoingResult(..) |
19 | ) where | 21 | ) where |
@@ -158,8 +160,18 @@ tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPac | |||
158 | writeTVar pktoPacketNo $! pktno' | 160 | writeTVar pktoPacketNo $! pktno' |
159 | writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) | 161 | writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) |
160 | return OGSuccess | 162 | return OGSuccess |
161 | -- queue is full, block until its not | 163 | -- queue is full |
162 | _ -> return OGFull | 164 | Just (n,_) -> do |
165 | nn <- getHighestHandledPacketPlus1 q | ||
166 | if (n < nn) | ||
167 | -- but we can overwrite an old packet | ||
168 | then do | ||
169 | modifyTVar' (buffend pktoOutPQ) (+1) | ||
170 | writeTVar pktoPacketNo $! pktno' | ||
171 | writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) | ||
172 | return OGSuccess | ||
173 | -- uh oh this packet is still needed... | ||
174 | else return OGFull | ||
163 | -- don't know how to send this message | 175 | -- don't know how to send this message |
164 | Nothing -> return OGEncodeFail | 176 | Nothing -> return OGEncodeFail |
165 | 177 | ||
@@ -172,3 +184,5 @@ dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) | |||
172 | modifyTVar' seqno succ | 184 | modifyTVar' seqno succ |
173 | return x | 185 | return x |
174 | 186 | ||
187 | getHighestHandledPacketPlus1 :: PacketOutQueue extra msg wire fromwire -> STM Word32 | ||
188 | getHighestHandledPacketPlus1 (PacketOutQueue { pktoInPQ }) = readTVar (buffend pktoInPQ) | ||