summaryrefslogtreecommitdiff
path: root/dht/src/Network/QueryResponse/TCP.hs
diff options
context:
space:
mode:
Diffstat (limited to 'dht/src/Network/QueryResponse/TCP.hs')
-rw-r--r--dht/src/Network/QueryResponse/TCP.hs15
1 files changed, 11 insertions, 4 deletions
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs
index 45ff73a6..defdf6a2 100644
--- a/dht/src/Network/QueryResponse/TCP.hs
+++ b/dht/src/Network/QueryResponse/TCP.hs
@@ -55,6 +55,7 @@ data TCPCache st = TCPCache
55 , tcpMax :: Int 55 , tcpMax :: Int
56 } 56 }
57 57
58-- This is a suitable /st/ parameter to 'TCPCache'
58data SessionProtocol x y = SessionProtocol 59data SessionProtocol x y = SessionProtocol
59 { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination. 60 { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination.
60 , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages. 61 , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages.
@@ -74,6 +75,9 @@ showStat :: IsString p => TCPSession st -> p
74showStat r = case r of PendingTCPSession -> "pending." 75showStat r = case r of PendingTCPSession -> "pending."
75 TCPSession {} -> "established." 76 TCPSession {} -> "established."
76 77
78tcp_timeout :: Int
79tcp_timeout = 10000000
80
77acquireConnection :: MVar (Maybe (Either a (x, addr))) 81acquireConnection :: MVar (Maybe (Either a (x, addr)))
78 -> TCPCache (SessionProtocol x y) 82 -> TCPCache (SessionProtocol x y)
79 -> StreamHandshake addr x y 83 -> StreamHandshake addr x y
@@ -96,7 +100,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
96 case entry of 100 case entry of
97 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do 101 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
98 proto <- getProtocolNumber "tcp" 102 proto <- getProtocolNumber "tcp"
99 mh <- catchIOError (do h <- timeout 10000000 $ do 103 mh <- catchIOError (do h <- timeout tcp_timeout $ do
100 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto 104 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
101 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) 105 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
102 h <- socketToHandle sock ReadWriteMode 106 h <- socketToHandle sock ReadWriteMode
@@ -104,6 +108,9 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
104 return h 108 return h
105 return h) 109 return h)
106 $ \e -> return Nothing 110 $ \e -> return Nothing
111 when (isNothing mh)
112 $ atomically $ modifyTVar' (lru tcpcache)
113 $ MM.delete (TCPAddress $ streamAddr stream addr)
107 ret <- fmap join $ forM mh $ \h -> do 114 ret <- fmap join $ forM mh $ \h -> do
108 mst <- catchIOError (Just <$> streamHello stream addr h) 115 mst <- catchIOError (Just <$> streamHello stream addr h)
109 (\e -> return Nothing) 116 (\e -> return Nothing)
@@ -122,7 +129,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
122 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x 129 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
123 case x of 130 case x of
124 Just u -> do 131 Just u -> do
125 m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) 132 m <- timeout tcp_timeout $ putMVar mvar $ Just $ Right (u, addr)
126 when (isNothing m) $ do 133 when (isNothing m) $ do
127 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." 134 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet."
128 tryTakeMVar mvar 135 tryTakeMVar mvar
@@ -136,7 +143,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
136 now <- getPOSIXTime 143 now <- getPOSIXTime
137 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do 144 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
138 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] 145 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r]
139 mreport <- timeout 10000000 $ threadReport False -- XXX: Paranoid timeout 146 mreport <- timeout tcp_timeout $ threadReport False -- XXX: Paranoid timeout
140 case mreport of 147 case mreport of
141 Just treport -> dput XTCP treport 148 Just treport -> dput XTCP treport
142 Nothing -> dput XTCP "TCP ERROR: threadReport timed out." 149 Nothing -> dput XTCP "TCP ERROR: threadReport timed out."
@@ -168,7 +175,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
168 return ret 175 return ret
169 Just (tm, PendingTCPSession) 176 Just (tm, PendingTCPSession)
170 | not bDoCon -> return Nothing 177 | not bDoCon -> return Nothing
171 | otherwise -> fmap join $ timeout 10000000 $ atomically $ do 178 | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do
172 c <- readTVar (lru tcpcache) 179 c <- readTVar (lru tcpcache)
173 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c 180 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
174 case v of 181 case v of