summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2018-12-02 16:36:03 -0500
committerJoe Crayne <joe@jerkface.net>2018-12-16 14:08:26 -0500
commit93e51f29cc6c25455a5f91dbc1c3e678922523fd (patch)
treed9911cdc48cf4de510fcfc5693def6cb9d1b6e81
parente64a0efd2ca29257c189343b6dc75f6bee29d66d (diff)
TCP cache: Use (Down POSIXTime) for MinMax priority.
-rw-r--r--src/Network/QueryResponse/TCP.hs42
-rw-r--r--src/Network/Tox/TCP.hs10
2 files changed, 22 insertions, 30 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
index 7efe6966..c0bdcd3c 100644
--- a/src/Network/QueryResponse/TCP.hs
+++ b/src/Network/QueryResponse/TCP.hs
@@ -14,6 +14,7 @@ import Control.Monad
14import Data.ByteString (ByteString,hPut) 14import Data.ByteString (ByteString,hPut)
15import Data.Function 15import Data.Function
16import Data.Hashable 16import Data.Hashable
17import Data.Ord
17import Data.Time.Clock.POSIX 18import Data.Time.Clock.POSIX
18import Data.Word 19import Data.Word
19import Network.BSD 20import Network.BSD
@@ -26,7 +27,7 @@ import qualified Data.MinMaxPSQ as MM
26import Network.QueryResponse 27import Network.QueryResponse
27 28
28data TCPSession st = TCPSession 29data TCPSession st = TCPSession
29 { tcpHandle :: MVar Handle 30 { tcpHandle :: Handle
30 , tcpState :: st 31 , tcpState :: st
31 , tcpThread :: ThreadId 32 , tcpThread :: ThreadId
32 } 33 }
@@ -41,14 +42,14 @@ instance Hashable TCPAddress where
41 _ -> 0 42 _ -> 0
42 43
43data TCPCache st = TCPCache 44data TCPCache st = TCPCache
44 { lru :: TVar (MM.MinMaxPSQ' TCPAddress POSIXTime (TCPSession st)) 45 { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st))
45 , tcpMax :: Int 46 , tcpMax :: Int
46 } 47 }
47 48
48data SessionProtocol x y = SessionProtocol 49data SessionProtocol x y = SessionProtocol
49 { streamGoodbye :: Handle -> IO () -- ^ "Goodbye" protocol upon termination. 50 { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination.
50 , streamDecode :: Handle -> IO (Maybe x) -- ^ Parse inbound messages. 51 , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages.
51 , streamEncode :: y -> IO ByteString -- ^ Serialize outbound messages. 52 , streamEncode :: y -> IO () -- ^ Serialize outbound messages.
52 } 53 }
53 54
54data StreamHandshake addr x y = StreamHandshake 55data StreamHandshake addr x y = StreamHandshake
@@ -71,9 +72,8 @@ acquireConnection mvar tcpcache stream addr = do
71 h <- socketToHandle sock ReadWriteMode 72 h <- socketToHandle sock ReadWriteMode
72 st <- streamHello stream addr h 73 st <- streamHello stream addr h
73 t <- getPOSIXTime 74 t <- getPOSIXTime
74 mh <- newMVar h
75 rthread <- forkIO $ fix $ \loop -> do 75 rthread <- forkIO $ fix $ \loop -> do
76 x <- streamDecode st h 76 x <- streamDecode st
77 putMVar mvar $ fmap (\u -> Right (u, addr)) x 77 putMVar mvar $ fmap (\u -> Right (u, addr)) x
78 case x of 78 case x of
79 Just _ -> loop 79 Just _ -> loop
@@ -84,41 +84,35 @@ acquireConnection mvar tcpcache stream addr = do
84 let showAddr a = show (streamAddr stream a) 84 let showAddr a = show (streamAddr stream a)
85 labelThread rthread ("tcp:"++showAddr addr) 85 labelThread rthread ("tcp:"++showAddr addr)
86 let v = TCPSession 86 let v = TCPSession
87 { tcpHandle = mh 87 { tcpHandle = h
88 , tcpState = st 88 , tcpState = st
89 , tcpThread = rthread 89 , tcpThread = rthread
90 } 90 }
91 let (retires,cache') = MM.takeView (tcpMax tcpcache) 91 let (retires,cache') = MM.takeView (tcpMax tcpcache)
92 $ MM.insert' (TCPAddress $ streamAddr stream addr) v t cache 92 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache
93 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 93 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
94 myThreadId >>= flip labelThread ("tcp-close:"++show k) 94 myThreadId >>= flip labelThread ("tcp-close:"++show k)
95 killThread (tcpThread r) 95 killThread (tcpThread r)
96 h <- takeMVar (tcpHandle r) 96 streamGoodbye st
97 streamGoodbye st h 97 hClose (tcpHandle r)
98 hClose h
99 atomically $ writeTVar (lru tcpcache) cache' 98 atomically $ writeTVar (lru tcpcache) cache'
100 99
101 return $ Just $ \y -> do 100 return $ Just $ streamEncode st
102 bs <- streamEncode st y
103 withMVar mh (`hPut` bs)
104 Just (tm,v) -> do 101 Just (tm,v) -> do
105 t <- getPOSIXTime 102 t <- getPOSIXTime
106 let TCPSession { tcpHandle = mh, tcpState = st } = v 103 let TCPSession { tcpHandle = h, tcpState = st } = v
107 cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v t cache 104 cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache
108 atomically $ writeTVar (lru tcpcache) cache' 105 atomically $ writeTVar (lru tcpcache) cache'
109 return $ Just $ \y -> do 106 return $ Just $ streamEncode st
110 bs <- streamEncode st y
111 withMVar mh (`hPut` bs)
112 107
113closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () 108closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
114closeAll tcpcache stream = do 109closeAll tcpcache stream = do
115 cache <- atomically $ readTVar (lru tcpcache) 110 cache <- atomically $ swapTVar (lru tcpcache) MM.empty
116 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do 111 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
117 let st = tcpState r 112 let st = tcpState r
118 killThread (tcpThread r) 113 killThread (tcpThread r)
119 h <- takeMVar $ tcpHandle r 114 streamGoodbye st
120 streamGoodbye st h 115 hClose (tcpHandle r)
121 hClose h
122 116
123tcpTransport :: Int -- ^ maximum number of TCP links to maintain. 117tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
124 -> StreamHandshake addr x y 118 -> StreamHandshake addr x y
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs
index 0780f121..28bcd244 100644
--- a/src/Network/Tox/TCP.hs
+++ b/src/Network/Tox/TCP.hs
@@ -8,10 +8,9 @@ import Control.Concurrent
8import Control.Concurrent.STM 8import Control.Concurrent.STM
9import Data.Functor.Identity 9import Data.Functor.Identity
10import Data.Serialize 10import Data.Serialize
11import System.IO (Handle)
12 11
13import Crypto.Tox 12import Crypto.Tox
14import Data.ByteString (hPut,hGet,ByteString) 13import Data.ByteString (hPut,hGet)
15import Data.Tox.Relay 14import Data.Tox.Relay
16import Network.Address (setPort,PortNumber,SockAddr) 15import Network.Address (setPort,PortNumber,SockAddr)
17import Network.QueryResponse 16import Network.QueryResponse
@@ -60,8 +59,8 @@ tcpStream crypto = StreamHandshake
60 nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome) 59 nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome)
61 let them = sessionPublicKey $ runIdentity $ welcomeData welcome 60 let them = sessionPublicKey $ runIdentity $ welcomeData welcome
62 return SessionProtocol 61 return SessionProtocol
63 { streamGoodbye = \h -> return () -- No goodbye packet? Seems rude. 62 { streamGoodbye = return () -- No goodbye packet? Seems rude.
64 , streamDecode = \h -> do 63 , streamDecode = do
65 decode <$> hGet h 2 >>= \case 64 decode <$> hGet h 2 >>= \case
66 Left _ -> return Nothing 65 Left _ -> return Nothing
67 Right len -> do 66 Right len -> do
@@ -74,9 +73,8 @@ tcpStream crypto = StreamHandshake
74 return $ either (const Nothing) Just r 73 return $ either (const Nothing) Just r
75 , streamEncode = \y -> do 74 , streamEncode = \y -> do
76 n24 <- takeMVar nsend 75 n24 <- takeMVar nsend
77 let bs = encode $ encrypt (noncef' n24) $ encodePlain y 76 hPut h $ encode $ encrypt (noncef' n24) $ encodePlain y
78 putMVar nsend (incrementNonce24 n24) 77 putMVar nsend (incrementNonce24 n24)
79 return bs -- XXX: Should we wait until this bytestring is sent before putting the nonce back in the MVar?
80 } 78 }
81 , streamAddr = nodeAddr 79 , streamAddr = nodeAddr
82 } 80 }