diff options
Diffstat (limited to 'dht/src')
-rw-r--r-- | dht/src/Network/QueryResponse/TCP.hs | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs index 45ff73a6..defdf6a2 100644 --- a/dht/src/Network/QueryResponse/TCP.hs +++ b/dht/src/Network/QueryResponse/TCP.hs | |||
@@ -55,6 +55,7 @@ data TCPCache st = TCPCache | |||
55 | , tcpMax :: Int | 55 | , tcpMax :: Int |
56 | } | 56 | } |
57 | 57 | ||
58 | -- This is a suitable /st/ parameter to 'TCPCache' | ||
58 | data SessionProtocol x y = SessionProtocol | 59 | data SessionProtocol x y = SessionProtocol |
59 | { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination. | 60 | { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination. |
60 | , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages. | 61 | , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages. |
@@ -74,6 +75,9 @@ showStat :: IsString p => TCPSession st -> p | |||
74 | showStat r = case r of PendingTCPSession -> "pending." | 75 | showStat r = case r of PendingTCPSession -> "pending." |
75 | TCPSession {} -> "established." | 76 | TCPSession {} -> "established." |
76 | 77 | ||
78 | tcp_timeout :: Int | ||
79 | tcp_timeout = 10000000 | ||
80 | |||
77 | acquireConnection :: MVar (Maybe (Either a (x, addr))) | 81 | acquireConnection :: MVar (Maybe (Either a (x, addr))) |
78 | -> TCPCache (SessionProtocol x y) | 82 | -> TCPCache (SessionProtocol x y) |
79 | -> StreamHandshake addr x y | 83 | -> StreamHandshake addr x y |
@@ -96,7 +100,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
96 | case entry of | 100 | case entry of |
97 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | 101 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do |
98 | proto <- getProtocolNumber "tcp" | 102 | proto <- getProtocolNumber "tcp" |
99 | mh <- catchIOError (do h <- timeout 10000000 $ do | 103 | mh <- catchIOError (do h <- timeout tcp_timeout $ do |
100 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | 104 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto |
101 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) | 105 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) |
102 | h <- socketToHandle sock ReadWriteMode | 106 | h <- socketToHandle sock ReadWriteMode |
@@ -104,6 +108,9 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
104 | return h | 108 | return h |
105 | return h) | 109 | return h) |
106 | $ \e -> return Nothing | 110 | $ \e -> return Nothing |
111 | when (isNothing mh) | ||
112 | $ atomically $ modifyTVar' (lru tcpcache) | ||
113 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
107 | ret <- fmap join $ forM mh $ \h -> do | 114 | ret <- fmap join $ forM mh $ \h -> do |
108 | mst <- catchIOError (Just <$> streamHello stream addr h) | 115 | mst <- catchIOError (Just <$> streamHello stream addr h) |
109 | (\e -> return Nothing) | 116 | (\e -> return Nothing) |
@@ -122,7 +129,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
122 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x | 129 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x |
123 | case x of | 130 | case x of |
124 | Just u -> do | 131 | Just u -> do |
125 | m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) | 132 | m <- timeout tcp_timeout $ putMVar mvar $ Just $ Right (u, addr) |
126 | when (isNothing m) $ do | 133 | when (isNothing m) $ do |
127 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." | 134 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." |
128 | tryTakeMVar mvar | 135 | tryTakeMVar mvar |
@@ -136,7 +143,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
136 | now <- getPOSIXTime | 143 | now <- getPOSIXTime |
137 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do | 144 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do |
138 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] | 145 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] |
139 | mreport <- timeout 10000000 $ threadReport False -- XXX: Paranoid timeout | 146 | mreport <- timeout tcp_timeout $ threadReport False -- XXX: Paranoid timeout |
140 | case mreport of | 147 | case mreport of |
141 | Just treport -> dput XTCP treport | 148 | Just treport -> dput XTCP treport |
142 | Nothing -> dput XTCP "TCP ERROR: threadReport timed out." | 149 | Nothing -> dput XTCP "TCP ERROR: threadReport timed out." |
@@ -168,7 +175,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
168 | return ret | 175 | return ret |
169 | Just (tm, PendingTCPSession) | 176 | Just (tm, PendingTCPSession) |
170 | | not bDoCon -> return Nothing | 177 | | not bDoCon -> return Nothing |
171 | | otherwise -> fmap join $ timeout 10000000 $ atomically $ do | 178 | | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do |
172 | c <- readTVar (lru tcpcache) | 179 | c <- readTVar (lru tcpcache) |
173 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | 180 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c |
174 | case v of | 181 | case v of |