diff options
author | Joe Crayne <joe@jerkface.net> | 2018-12-02 16:36:03 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-12-16 14:08:26 -0500 |
commit | 93e51f29cc6c25455a5f91dbc1c3e678922523fd (patch) | |
tree | d9911cdc48cf4de510fcfc5693def6cb9d1b6e81 | |
parent | e64a0efd2ca29257c189343b6dc75f6bee29d66d (diff) |
TCP cache: Use (Down POSIXTime) for MinMax priority.
-rw-r--r-- | src/Network/QueryResponse/TCP.hs | 42 | ||||
-rw-r--r-- | src/Network/Tox/TCP.hs | 10 |
2 files changed, 22 insertions, 30 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index 7efe6966..c0bdcd3c 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs | |||
@@ -14,6 +14,7 @@ import Control.Monad | |||
14 | import Data.ByteString (ByteString,hPut) | 14 | import Data.ByteString (ByteString,hPut) |
15 | import Data.Function | 15 | import Data.Function |
16 | import Data.Hashable | 16 | import Data.Hashable |
17 | import Data.Ord | ||
17 | import Data.Time.Clock.POSIX | 18 | import Data.Time.Clock.POSIX |
18 | import Data.Word | 19 | import Data.Word |
19 | import Network.BSD | 20 | import Network.BSD |
@@ -26,7 +27,7 @@ import qualified Data.MinMaxPSQ as MM | |||
26 | import Network.QueryResponse | 27 | import Network.QueryResponse |
27 | 28 | ||
28 | data TCPSession st = TCPSession | 29 | data TCPSession st = TCPSession |
29 | { tcpHandle :: MVar Handle | 30 | { tcpHandle :: Handle |
30 | , tcpState :: st | 31 | , tcpState :: st |
31 | , tcpThread :: ThreadId | 32 | , tcpThread :: ThreadId |
32 | } | 33 | } |
@@ -41,14 +42,14 @@ instance Hashable TCPAddress where | |||
41 | _ -> 0 | 42 | _ -> 0 |
42 | 43 | ||
43 | data TCPCache st = TCPCache | 44 | data TCPCache st = TCPCache |
44 | { lru :: TVar (MM.MinMaxPSQ' TCPAddress POSIXTime (TCPSession st)) | 45 | { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st)) |
45 | , tcpMax :: Int | 46 | , tcpMax :: Int |
46 | } | 47 | } |
47 | 48 | ||
48 | data SessionProtocol x y = SessionProtocol | 49 | data SessionProtocol x y = SessionProtocol |
49 | { streamGoodbye :: Handle -> IO () -- ^ "Goodbye" protocol upon termination. | 50 | { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination. |
50 | , streamDecode :: Handle -> IO (Maybe x) -- ^ Parse inbound messages. | 51 | , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages. |
51 | , streamEncode :: y -> IO ByteString -- ^ Serialize outbound messages. | 52 | , streamEncode :: y -> IO () -- ^ Serialize outbound messages. |
52 | } | 53 | } |
53 | 54 | ||
54 | data StreamHandshake addr x y = StreamHandshake | 55 | data StreamHandshake addr x y = StreamHandshake |
@@ -71,9 +72,8 @@ acquireConnection mvar tcpcache stream addr = do | |||
71 | h <- socketToHandle sock ReadWriteMode | 72 | h <- socketToHandle sock ReadWriteMode |
72 | st <- streamHello stream addr h | 73 | st <- streamHello stream addr h |
73 | t <- getPOSIXTime | 74 | t <- getPOSIXTime |
74 | mh <- newMVar h | ||
75 | rthread <- forkIO $ fix $ \loop -> do | 75 | rthread <- forkIO $ fix $ \loop -> do |
76 | x <- streamDecode st h | 76 | x <- streamDecode st |
77 | putMVar mvar $ fmap (\u -> Right (u, addr)) x | 77 | putMVar mvar $ fmap (\u -> Right (u, addr)) x |
78 | case x of | 78 | case x of |
79 | Just _ -> loop | 79 | Just _ -> loop |
@@ -84,41 +84,35 @@ acquireConnection mvar tcpcache stream addr = do | |||
84 | let showAddr a = show (streamAddr stream a) | 84 | let showAddr a = show (streamAddr stream a) |
85 | labelThread rthread ("tcp:"++showAddr addr) | 85 | labelThread rthread ("tcp:"++showAddr addr) |
86 | let v = TCPSession | 86 | let v = TCPSession |
87 | { tcpHandle = mh | 87 | { tcpHandle = h |
88 | , tcpState = st | 88 | , tcpState = st |
89 | , tcpThread = rthread | 89 | , tcpThread = rthread |
90 | } | 90 | } |
91 | let (retires,cache') = MM.takeView (tcpMax tcpcache) | 91 | let (retires,cache') = MM.takeView (tcpMax tcpcache) |
92 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v t cache | 92 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache |
93 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 93 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do |
94 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | 94 | myThreadId >>= flip labelThread ("tcp-close:"++show k) |
95 | killThread (tcpThread r) | 95 | killThread (tcpThread r) |
96 | h <- takeMVar (tcpHandle r) | 96 | streamGoodbye st |
97 | streamGoodbye st h | 97 | hClose (tcpHandle r) |
98 | hClose h | ||
99 | atomically $ writeTVar (lru tcpcache) cache' | 98 | atomically $ writeTVar (lru tcpcache) cache' |
100 | 99 | ||
101 | return $ Just $ \y -> do | 100 | return $ Just $ streamEncode st |
102 | bs <- streamEncode st y | ||
103 | withMVar mh (`hPut` bs) | ||
104 | Just (tm,v) -> do | 101 | Just (tm,v) -> do |
105 | t <- getPOSIXTime | 102 | t <- getPOSIXTime |
106 | let TCPSession { tcpHandle = mh, tcpState = st } = v | 103 | let TCPSession { tcpHandle = h, tcpState = st } = v |
107 | cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v t cache | 104 | cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache |
108 | atomically $ writeTVar (lru tcpcache) cache' | 105 | atomically $ writeTVar (lru tcpcache) cache' |
109 | return $ Just $ \y -> do | 106 | return $ Just $ streamEncode st |
110 | bs <- streamEncode st y | ||
111 | withMVar mh (`hPut` bs) | ||
112 | 107 | ||
113 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () | 108 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () |
114 | closeAll tcpcache stream = do | 109 | closeAll tcpcache stream = do |
115 | cache <- atomically $ readTVar (lru tcpcache) | 110 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty |
116 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | 111 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do |
117 | let st = tcpState r | 112 | let st = tcpState r |
118 | killThread (tcpThread r) | 113 | killThread (tcpThread r) |
119 | h <- takeMVar $ tcpHandle r | 114 | streamGoodbye st |
120 | streamGoodbye st h | 115 | hClose (tcpHandle r) |
121 | hClose h | ||
122 | 116 | ||
123 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | 117 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. |
124 | -> StreamHandshake addr x y | 118 | -> StreamHandshake addr x y |
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs index 0780f121..28bcd244 100644 --- a/src/Network/Tox/TCP.hs +++ b/src/Network/Tox/TCP.hs | |||
@@ -8,10 +8,9 @@ import Control.Concurrent | |||
8 | import Control.Concurrent.STM | 8 | import Control.Concurrent.STM |
9 | import Data.Functor.Identity | 9 | import Data.Functor.Identity |
10 | import Data.Serialize | 10 | import Data.Serialize |
11 | import System.IO (Handle) | ||
12 | 11 | ||
13 | import Crypto.Tox | 12 | import Crypto.Tox |
14 | import Data.ByteString (hPut,hGet,ByteString) | 13 | import Data.ByteString (hPut,hGet) |
15 | import Data.Tox.Relay | 14 | import Data.Tox.Relay |
16 | import Network.Address (setPort,PortNumber,SockAddr) | 15 | import Network.Address (setPort,PortNumber,SockAddr) |
17 | import Network.QueryResponse | 16 | import Network.QueryResponse |
@@ -60,8 +59,8 @@ tcpStream crypto = StreamHandshake | |||
60 | nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome) | 59 | nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome) |
61 | let them = sessionPublicKey $ runIdentity $ welcomeData welcome | 60 | let them = sessionPublicKey $ runIdentity $ welcomeData welcome |
62 | return SessionProtocol | 61 | return SessionProtocol |
63 | { streamGoodbye = \h -> return () -- No goodbye packet? Seems rude. | 62 | { streamGoodbye = return () -- No goodbye packet? Seems rude. |
64 | , streamDecode = \h -> do | 63 | , streamDecode = do |
65 | decode <$> hGet h 2 >>= \case | 64 | decode <$> hGet h 2 >>= \case |
66 | Left _ -> return Nothing | 65 | Left _ -> return Nothing |
67 | Right len -> do | 66 | Right len -> do |
@@ -74,9 +73,8 @@ tcpStream crypto = StreamHandshake | |||
74 | return $ either (const Nothing) Just r | 73 | return $ either (const Nothing) Just r |
75 | , streamEncode = \y -> do | 74 | , streamEncode = \y -> do |
76 | n24 <- takeMVar nsend | 75 | n24 <- takeMVar nsend |
77 | let bs = encode $ encrypt (noncef' n24) $ encodePlain y | 76 | hPut h $ encode $ encrypt (noncef' n24) $ encodePlain y |
78 | putMVar nsend (incrementNonce24 n24) | 77 | putMVar nsend (incrementNonce24 n24) |
79 | return bs -- XXX: Should we wait until this bytestring is sent before putting the nonce back in the MVar? | ||
80 | } | 78 | } |
81 | , streamAddr = nodeAddr | 79 | , streamAddr = nodeAddr |
82 | } | 80 | } |