summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/dhtd.hs17
-rw-r--r--src/Data/PacketQueue.hs18
2 files changed, 31 insertions, 4 deletions
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index 6e2647d1..1aa36b77 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -94,6 +94,7 @@ import Data.Typeable
94import Network.Tox.ContactInfo as Tox 94import Network.Tox.ContactInfo as Tox
95import OnionRouter 95import OnionRouter
96import PingMachine 96import PingMachine
97import Data.PacketQueue
97 98
98-- Presence imports. 99-- Presence imports.
99import ConsoleWriter 100import ConsoleWriter
@@ -998,8 +999,20 @@ newXmmpSource session = do
998 newXmmpSource session 999 newXmmpSource session
999 1000
1000newXmmpSink :: Tox.NetCryptoSession -> C.Sink (Flush Tox.CryptoMessage) IO () 1001newXmmpSink :: Tox.NetCryptoSession -> C.Sink (Flush Tox.CryptoMessage) IO ()
1001newXmmpSink sessions = C.awaitForever $ \flush_cyptomessage -> do 1002newXmmpSink session@(Tox.NCrypto { ncOutgoingQueue, ncPacketQueue }) = C.awaitForever $ \flush_cyptomessage -> do
1002 liftIO $ (_todo sessions {- send the fucking message -}) flush_cyptomessage 1003 let sendit :: Tox.NetCryptoSession -> Flush Tox.CryptoMessage -> IO ()
1004 sendit session (Chunk msg) = do
1005 extra <- readyOutGoing ncOutgoingQueue
1006 r <- atomically $ do
1007 rTry <- tryAppendQueueOutgoing extra ncOutgoingQueue msg
1008 case rTry of
1009 OGFull -> retry
1010 OGSuccess -> return OGSuccess
1011 OGEncodeFail -> return OGEncodeFail
1012 when (r == OGEncodeFail) $
1013 hPutStrLn stderr ("FAILURE to Encode Outgoing: " ++ show msg)
1014 sendit session Flush = return ()
1015 liftIO $ sendit session flush_cyptomessage
1003 1016
1004-- | TODO 1017-- | TODO
1005-- 1018--
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
index cde26fb7..b7737656 100644
--- a/src/Data/PacketQueue.hs
+++ b/src/Data/PacketQueue.hs
@@ -12,8 +12,10 @@ module Data.PacketQueue
12 , observeOutOfBand 12 , observeOutOfBand
13 , PacketOutQueue 13 , PacketOutQueue
14 , newOutGoing 14 , newOutGoing
15 , readyOutGoing
15 , tryAppendQueueOutgoing 16 , tryAppendQueueOutgoing
16 , dequeueOutgoing 17 , dequeueOutgoing
18 , getHighestHandledPacketPlus1
17 , mapOutGoing 19 , mapOutGoing
18 , OutGoingResult(..) 20 , OutGoingResult(..)
19 ) where 21 ) where
@@ -158,8 +160,18 @@ tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPac
158 writeTVar pktoPacketNo $! pktno' 160 writeTVar pktoPacketNo $! pktno'
159 writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) 161 writeArray (pktq pktoOutPQ) i (Just (pktno,pkt))
160 return OGSuccess 162 return OGSuccess
161 -- queue is full, block until its not 163 -- queue is full
162 _ -> return OGFull 164 Just (n,_) -> do
165 nn <- getHighestHandledPacketPlus1 q
166 if (n < nn)
167 -- but we can overwrite an old packet
168 then do
169 modifyTVar' (buffend pktoOutPQ) (+1)
170 writeTVar pktoPacketNo $! pktno'
171 writeArray (pktq pktoOutPQ) i (Just (pktno,pkt))
172 return OGSuccess
173 -- uh oh this packet is still needed...
174 else return OGFull
163 -- don't know how to send this message 175 -- don't know how to send this message
164 Nothing -> return OGEncodeFail 176 Nothing -> return OGEncodeFail
165 177
@@ -172,3 +184,5 @@ dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }})
172 modifyTVar' seqno succ 184 modifyTVar' seqno succ
173 return x 185 return x
174 186
187getHighestHandledPacketPlus1 :: PacketOutQueue extra msg wire fromwire -> STM Word32
188getHighestHandledPacketPlus1 (PacketOutQueue { pktoInPQ }) = readTVar (buffend pktoInPQ)