diff options
Diffstat (limited to 'dht')
-rw-r--r-- | dht/Connection.hs | 135 | ||||
-rw-r--r-- | dht/Connection/Tcp.hs | 824 | ||||
-rw-r--r-- | dht/Presence/ControlMaybe.hs | 64 | ||||
-rw-r--r-- | dht/Presence/DNSCache.hs | 285 | ||||
-rw-r--r-- | dht/Presence/GetHostByAddr.hs | 77 | ||||
-rw-r--r-- | dht/Presence/SockAddr.hs | 14 | ||||
-rw-r--r-- | dht/dht-client.cabal | 20 | ||||
-rw-r--r-- | dht/src/Control/Concurrent/Delay.hs | 49 | ||||
-rw-r--r-- | dht/src/Control/Concurrent/PingMachine.hs | 161 | ||||
-rw-r--r-- | dht/src/Control/Concurrent/ThreadUtil.hs | 31 | ||||
-rw-r--r-- | dht/src/Data/TableMethods.hs | 105 | ||||
-rw-r--r-- | dht/src/DebugUtil.hs | 42 | ||||
-rw-r--r-- | dht/src/Network/QueryResponse.hs | 716 | ||||
-rw-r--r-- | dht/src/Network/QueryResponse/TCP.hs | 223 | ||||
-rw-r--r-- | dht/src/Network/SocketLike.hs | 98 | ||||
-rw-r--r-- | dht/src/Network/StreamServer.hs | 167 | ||||
-rw-r--r-- | dht/stack.yaml | 1 |
17 files changed, 5 insertions, 3007 deletions
diff --git a/dht/Connection.hs b/dht/Connection.hs deleted file mode 100644 index ea86f4bb..00000000 --- a/dht/Connection.hs +++ /dev/null | |||
@@ -1,135 +0,0 @@ | |||
1 | {-# LANGUAGE DeriveFunctor #-} | ||
2 | {-# LANGUAGE LambdaCase #-} | ||
3 | module Connection where | ||
4 | |||
5 | import Control.Applicative | ||
6 | import Control.Arrow | ||
7 | import Control.Concurrent.STM | ||
8 | import Data.Bits | ||
9 | import Data.Word | ||
10 | import qualified Data.Map as Map | ||
11 | ;import Data.Map (Map) | ||
12 | import Network.Socket (SockAddr(..)) | ||
13 | |||
14 | import Control.Concurrent.PingMachine | ||
15 | |||
16 | -- | This type indicates the current status of a connection. The type | ||
17 | -- parameter indicates protocol-specific status information. To present | ||
18 | -- information as a user-comprehensible string, use 'showStatus'. | ||
19 | data Status status | ||
20 | = Dormant | ||
21 | | InProgress status | ||
22 | | Established | ||
23 | deriving (Show,Eq,Ord,Functor) | ||
24 | |||
25 | -- | A policy indicates a desired connection status. | ||
26 | data Policy | ||
27 | = RefusingToConnect -- ^ We desire no connection. | ||
28 | | OpenToConnect -- ^ We will cooperate if a remote side initiates. | ||
29 | | TryingToConnect -- ^ We desire to be connected. | ||
30 | deriving (Eq,Ord,Show) | ||
31 | |||
32 | -- | Information obtained via the 'connectionStatus' interface to | ||
33 | -- 'Manager'. | ||
34 | data Connection status = Connection | ||
35 | { connStatus :: Status status | ||
36 | , connPolicy :: Policy | ||
37 | } | ||
38 | deriving Functor | ||
39 | |||
40 | -- | A 'PeerAddress' identifies an active session. For inactive sessions, multiple | ||
41 | -- values may be feasible. | ||
42 | |||
43 | -- We use a 'SockAddr' as it is convenient for TCP and UDP connections. But if | ||
44 | -- that is not your use case, see 'uniqueAsKey'. | ||
45 | newtype PeerAddress = PeerAddress { peerAddress :: SockAddr } | ||
46 | deriving (Eq,Ord,Show) | ||
47 | |||
48 | -- | A 24-byte word. | ||
49 | data Uniq24 = Uniq24 !Word64 !Word64 !Word64 | ||
50 | deriving (Eq,Ord,Show) | ||
51 | |||
52 | -- | Coerce a 'Uniq24' to a useable 'PeerAddress'. Note that this stores the | ||
53 | -- special value 0 into the port number of the underlying 'SockAddr' and thus | ||
54 | -- should be compatible for mixing together with TCP/UDP peers. | ||
55 | uniqueAsKey :: Uniq24 -> PeerAddress | ||
56 | uniqueAsKey (Uniq24 x y z) = PeerAddress $ SockAddrInet6 (fromIntegral 0) a bcde f | ||
57 | where | ||
58 | a = fromIntegral (x `shiftR` 32) | ||
59 | b = fromIntegral x | ||
60 | c = fromIntegral (y `shiftR` 32) | ||
61 | d = fromIntegral y | ||
62 | e = fromIntegral (z `shiftR` 32) | ||
63 | f = fromIntegral z | ||
64 | bcde = (b,c,d,e) | ||
65 | |||
66 | -- | Inverse of 'uniqueAsKey' | ||
67 | keyAsUnique :: PeerAddress -> Maybe Uniq24 | ||
68 | keyAsUnique (PeerAddress (SockAddrInet6 0 a bcde f)) = Just $ Uniq24 x y z | ||
69 | where | ||
70 | (b,c,d,e) = bcde | ||
71 | x = (fromIntegral a `shiftL` 32) .|. fromIntegral b | ||
72 | y = (fromIntegral c `shiftL` 32) .|. fromIntegral d | ||
73 | z = (fromIntegral e `shiftL` 32) .|. fromIntegral f | ||
74 | keyAsUniq _ = Nothing | ||
75 | |||
76 | |||
77 | -- | This is an interface to make or query status information about connections | ||
78 | -- of a specific kind. | ||
79 | -- | ||
80 | -- Type parameters: | ||
81 | -- | ||
82 | -- /k/ names a connection. It should implement Ord, and can be parsed and | ||
83 | -- displayed using 'stringToKey' and 'showKey'. | ||
84 | -- | ||
85 | -- /status/ indicates the progress of a connection. It is intended as a | ||
86 | -- parameter to the 'InProgress' constructor of 'Status'. | ||
87 | -- | ||
88 | data Manager status k = Manager | ||
89 | { -- | Connect or disconnect a connection. | ||
90 | setPolicy :: k -> Policy -> IO () | ||
91 | -- | Lookup a connection status. | ||
92 | , status :: k -> STM (Connection status) | ||
93 | -- | Obtain a list of all known connections. | ||
94 | , connections :: STM [k] | ||
95 | -- | Parse a connection key out of a string. Inverse of 'showKey'. | ||
96 | , stringToKey :: String -> Maybe k | ||
97 | -- | Convert a progress value to a string. | ||
98 | , showProgress :: status -> String | ||
99 | -- | Show a connection key as a string. | ||
100 | , showKey :: k -> String | ||
101 | -- | Obtain an address from a human-friendly name. For TCP/UDP | ||
102 | -- connections, this might be a forward-resolving DNS query. | ||
103 | , resolvePeer :: k -> IO [PeerAddress] | ||
104 | -- | This is the reverse of 'resolvePeer'. For TCP/UDP connections, this | ||
105 | -- might be a reverse-resolve DNS query. | ||
106 | , reverseAddress :: PeerAddress -> IO [k] | ||
107 | } | ||
108 | |||
109 | -- | Present status information (visible in a UI) for a connection. | ||
110 | showStatus :: Manager status k -> Status status -> String | ||
111 | showStatus mgr Dormant = "dormant" | ||
112 | showStatus mgr Established = "established" | ||
113 | showStatus mgr (InProgress s) = "in progress ("++showProgress mgr s++")" | ||
114 | |||
115 | |||
116 | -- | Combine two different species of 'Manager' into a single interface using | ||
117 | -- 'Either' to combine key and status types. | ||
118 | addManagers :: (Ord kA, Ord kB) => | ||
119 | Manager statusA kA | ||
120 | -> Manager statusB kB | ||
121 | -> Manager (Either statusA statusB) (Either kA kB) | ||
122 | addManagers mgrA mgrB = Manager | ||
123 | { setPolicy = either (setPolicy mgrA) (setPolicy mgrB) | ||
124 | , status = \case | ||
125 | Left k -> fmap Left <$> status mgrA k | ||
126 | Right k -> fmap Right <$> status mgrB k | ||
127 | , connections = do | ||
128 | as <- connections mgrA | ||
129 | bs <- connections mgrB | ||
130 | return $ map Left as ++ map Right bs | ||
131 | , stringToKey = \str -> Left <$> stringToKey mgrA str | ||
132 | <|> Right <$> stringToKey mgrB str | ||
133 | , showProgress = either (showProgress mgrA) (showProgress mgrB) | ||
134 | , showKey = either (showKey mgrA) (showKey mgrB) | ||
135 | } | ||
diff --git a/dht/Connection/Tcp.hs b/dht/Connection/Tcp.hs deleted file mode 100644 index 4d50d47f..00000000 --- a/dht/Connection/Tcp.hs +++ /dev/null | |||
@@ -1,824 +0,0 @@ | |||
1 | {-# OPTIONS_HADDOCK prune #-} | ||
2 | {-# LANGUAGE CPP #-} | ||
3 | {-# LANGUAGE DoAndIfThenElse #-} | ||
4 | {-# LANGUAGE FlexibleInstances #-} | ||
5 | {-# LANGUAGE OverloadedStrings #-} | ||
6 | {-# LANGUAGE RankNTypes #-} | ||
7 | {-# LANGUAGE StandaloneDeriving #-} | ||
8 | {-# LANGUAGE TupleSections #-} | ||
9 | {-# LANGUAGE LambdaCase #-} | ||
10 | ----------------------------------------------------------------------------- | ||
11 | -- | | ||
12 | -- Module : Connection.Tcp | ||
13 | -- | ||
14 | -- Maintainer : joe@jerkface.net | ||
15 | -- Stability : experimental | ||
16 | -- | ||
17 | -- A TCP client/server library. | ||
18 | -- | ||
19 | -- TODO: | ||
20 | -- | ||
21 | -- * interface tweaks | ||
22 | -- | ||
23 | module Connection.Tcp | ||
24 | ( module Connection.Tcp | ||
25 | , module Control.Concurrent.PingMachine ) where | ||
26 | |||
27 | import Data.ByteString (ByteString,hGetNonBlocking) | ||
28 | import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack) | ||
29 | import Data.Conduit ( ConduitT, Void, Flush ) | ||
30 | #if MIN_VERSION_containers(0,5,0) | ||
31 | import qualified Data.Map.Strict as Map | ||
32 | import Data.Map.Strict (Map) | ||
33 | #else | ||
34 | import qualified Data.Map as Map | ||
35 | import Data.Map (Map) | ||
36 | #endif | ||
37 | import Data.Monoid ( (<>) ) | ||
38 | import Control.Concurrent.ThreadUtil | ||
39 | |||
40 | import Control.Arrow | ||
41 | import Control.Concurrent.STM | ||
42 | -- import Control.Concurrent.STM.TMVar | ||
43 | -- import Control.Concurrent.STM.TChan | ||
44 | -- import Control.Concurrent.STM.Delay | ||
45 | import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..),onException) | ||
46 | import Control.Monad | ||
47 | import Control.Monad.Fix | ||
48 | -- import Control.Monad.STM | ||
49 | -- import Control.Monad.Trans.Resource | ||
50 | import Control.Monad.IO.Class (MonadIO (liftIO)) | ||
51 | import Data.Maybe | ||
52 | import System.IO.Error (isDoesNotExistError) | ||
53 | import System.IO | ||
54 | ( IOMode(..) | ||
55 | , hSetBuffering | ||
56 | , BufferMode(..) | ||
57 | , hWaitForInput | ||
58 | , hClose | ||
59 | , hIsEOF | ||
60 | , Handle | ||
61 | ) | ||
62 | import Network.Socket as Socket | ||
63 | import Network.BSD | ||
64 | ( getProtocolNumber | ||
65 | ) | ||
66 | import Debug.Trace | ||
67 | import Data.Time.Clock (getCurrentTime,diffUTCTime) | ||
68 | -- import SockAddr () | ||
69 | -- import System.Locale (defaultTimeLocale) | ||
70 | |||
71 | import qualified Data.Text as Text | ||
72 | ;import Data.Text (Text) | ||
73 | import DNSCache | ||
74 | import Control.Concurrent.Delay | ||
75 | import Control.Concurrent.PingMachine | ||
76 | import Network.StreamServer | ||
77 | import Network.SocketLike hiding (sClose) | ||
78 | import qualified Connection as G | ||
79 | ;import Connection (Manager (..), PeerAddress (..), Policy (..)) | ||
80 | import Network.Address (localhost4) | ||
81 | import DPut | ||
82 | import DebugTag | ||
83 | |||
84 | |||
85 | type Microseconds = Int | ||
86 | |||
87 | -- | This object is passed with the 'Listen' and 'Connect' | ||
88 | -- instructions in order to control the behavior of the | ||
89 | -- connections that are established. It is parameterized | ||
90 | -- by a user-suplied type @conkey@ that is used as a lookup | ||
91 | -- key for connections. | ||
92 | data ConnectionParameters conkey u = | ||
93 | ConnectionParameters | ||
94 | { pingInterval :: PingInterval | ||
95 | -- ^ The miliseconds of idle to allow before a 'RequiresPing' | ||
96 | -- event is signaled. | ||
97 | , timeout :: TimeOut | ||
98 | -- ^ The miliseconds of idle after 'RequiresPing' is signaled | ||
99 | -- that are necessary for the connection to be considered | ||
100 | -- lost and signalling 'EOF'. | ||
101 | , makeConnKey :: (RestrictedSocket,(Local SockAddr, Remote SockAddr)) -> IO (conkey,u) | ||
102 | -- ^ This action creates a lookup key for a new connection. If 'duplex' | ||
103 | -- is 'True' and the result is already assocatied with an established | ||
104 | -- connection, then an 'EOF' will be forced before the the new | ||
105 | -- connection becomes active. | ||
106 | -- | ||
107 | , duplex :: Bool | ||
108 | -- ^ If True, then the connection will be treated as a normal | ||
109 | -- two-way socket. Otherwise, a readable socket is established | ||
110 | -- with 'Listen' and a writable socket is established with | ||
111 | -- 'Connect' and they are associated when 'makeConnKey' yields | ||
112 | -- same value for each. | ||
113 | } | ||
114 | |||
115 | -- | Use this function to select appropriate default values for | ||
116 | -- 'ConnectionParameters' other than 'makeConnKey'. | ||
117 | -- | ||
118 | -- Current defaults: | ||
119 | -- | ||
120 | -- * 'pingInterval' = 28000 | ||
121 | -- | ||
122 | -- * 'timeout' = 2000 | ||
123 | -- | ||
124 | -- * 'duplex' = True | ||
125 | -- | ||
126 | connectionDefaults | ||
127 | :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> IO (conkey,u)) -> ConnectionParameters conkey u | ||
128 | connectionDefaults f = ConnectionParameters | ||
129 | { pingInterval = 28000 | ||
130 | , timeout = 2000 | ||
131 | , makeConnKey = f | ||
132 | , duplex = True | ||
133 | } | ||
134 | |||
135 | -- | Instructions for a 'Server' object | ||
136 | -- | ||
137 | -- To issue a command, put it into the 'serverCommand' TMVar. | ||
138 | data ServerInstruction conkey u | ||
139 | = Quit | ||
140 | -- ^ kill the server. This command is automatically issued when | ||
141 | -- the server is released. | ||
142 | | Listen SockAddr (ConnectionParameters conkey u) | ||
143 | -- ^ listen for incoming connections on the given bind address. | ||
144 | | Connect SockAddr (ConnectionParameters conkey u) | ||
145 | -- ^ connect to addresses | ||
146 | | ConnectWithEndlessRetry SockAddr | ||
147 | (ConnectionParameters conkey u) | ||
148 | Miliseconds | ||
149 | -- ^ keep retrying the connection | ||
150 | | Ignore SockAddr | ||
151 | -- ^ stop listening on specified bind address | ||
152 | | Send conkey ByteString | ||
153 | -- ^ send bytes to an established connection | ||
154 | |||
155 | #ifdef TEST | ||
156 | deriving instance Show conkey => Show (ServerInstruction conkey u) | ||
157 | instance Show (a -> b) where show _ = "<function>" | ||
158 | deriving instance Show conkey => Show (ConnectionParameters conkey u) | ||
159 | #endif | ||
160 | |||
161 | -- | This type specifies which which half of a half-duplex | ||
162 | -- connection is of interest. | ||
163 | data InOrOut = In | Out | ||
164 | deriving (Enum,Eq,Ord,Show,Read) | ||
165 | |||
166 | -- | These events may be read from 'serverEvent' TChannel. | ||
167 | -- | ||
168 | data ConnectionEvent b | ||
169 | = Connection (STM Bool) (ConduitT () b IO ()) (ConduitT (Flush b) Void IO ()) | ||
170 | -- ^ A new connection was established | ||
171 | | ConnectFailure SockAddr | ||
172 | -- ^ A 'Connect' command failed. | ||
173 | | HalfConnection InOrOut | ||
174 | -- ^ Half of a half-duplex connection is avaliable. | ||
175 | | EOF | ||
176 | -- ^ A connection was terminated | ||
177 | | RequiresPing | ||
178 | -- ^ 'pingInterval' miliseconds of idle was experienced | ||
179 | |||
180 | #ifdef TEST | ||
181 | instance Show (IO a) where show _ = "<IO action>" | ||
182 | instance Show (STM a) where show _ = "<STM action>" | ||
183 | instance Eq (ByteString -> IO Bool) where (==) _ _ = True | ||
184 | instance Eq (IO (Maybe ByteString)) where (==) _ _ = True | ||
185 | instance Eq (STM Bool) where (==) _ _ = True | ||
186 | deriving instance Show b => Show (ConnectionEvent b) | ||
187 | deriving instance Eq b => Eq (ConnectionEvent b) | ||
188 | #endif | ||
189 | |||
190 | -- | This is the per-connection state. | ||
191 | data ConnectionRecord u | ||
192 | = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler | ||
193 | , cstate :: ConnectionState -- ^ used to send/receive data to the connection | ||
194 | , cdata :: u -- ^ user data, stored in the connection map for convenience | ||
195 | } | ||
196 | |||
197 | -- | This object accepts commands and signals events and maintains | ||
198 | -- the list of currently listening ports and established connections. | ||
199 | data Server a u releaseKey b | ||
200 | = Server { serverCommand :: TMVar (ServerInstruction a u) | ||
201 | , serverEvent :: TChan ((a,u), ConnectionEvent b) | ||
202 | , serverReleaseKey :: releaseKey | ||
203 | , conmap :: TVar (Map a (ConnectionRecord u)) | ||
204 | , listenmap :: TVar (Map SockAddr ServerHandle) | ||
205 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) | ||
206 | } | ||
207 | |||
208 | control :: Server a u releaseKey b -> ServerInstruction a u -> IO () | ||
209 | control sv = atomically . putTMVar (serverCommand sv) | ||
210 | |||
211 | type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) | ||
212 | |||
213 | noCleanUp :: MonadIO m => Allocate () m | ||
214 | noCleanUp io _ = ( (,) () ) `liftM` liftIO io | ||
215 | |||
216 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' | ||
217 | -- to ensure proper cleanup. For example, | ||
218 | -- | ||
219 | -- > import Connection.Tcp | ||
220 | -- > import Control.Monad.Trans.Resource (runResourceT) | ||
221 | -- > import Control.Monad.IO.Class (liftIO) | ||
222 | -- > import Control.Monad.STM (atomically) | ||
223 | -- > import Control.Concurrent.STM.TMVar (putTMVar) | ||
224 | -- > import Control.Concurrent.STM.TChan (readTChan) | ||
225 | -- > | ||
226 | -- > main = runResourceT $ do | ||
227 | -- > sv <- server allocate | ||
228 | -- > let params = connectionDefaults (return . snd) | ||
229 | -- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params) | ||
230 | -- > let loop = do | ||
231 | -- > (_,event) <- atomically $ readTChan (serverEvent sv) | ||
232 | -- > case event of | ||
233 | -- > Connection getPingFlag readData writeData -> do | ||
234 | -- > forkIO $ do | ||
235 | -- > fix $ \readLoop -> do | ||
236 | -- > readData >>= mapM $ \bytes -> | ||
237 | -- > putStrLn $ "got: " ++ show bytes | ||
238 | -- > readLoop | ||
239 | -- > case event of EOF -> return () | ||
240 | -- > _ -> loop | ||
241 | -- > liftIO loop | ||
242 | -- | ||
243 | -- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp' | ||
244 | -- to do without automatic cleanup and be sure to remember to write 'Quit' to | ||
245 | -- the 'serverCommand' variable. | ||
246 | server :: | ||
247 | -- forall (m :: * -> *) a u conkey releaseKey. | ||
248 | (Show conkey, MonadIO m, Ord conkey) => | ||
249 | Allocate releaseKey m | ||
250 | -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) ) | ||
251 | -> m (Server conkey u releaseKey x) | ||
252 | server allocate sessionConduits = do | ||
253 | (key,cmds) <- allocate (atomically newEmptyTMVar) | ||
254 | (atomically . flip putTMVar Quit) | ||
255 | server <- liftIO . atomically $ do | ||
256 | tchan <- newTChan | ||
257 | conmap <- newTVar Map.empty | ||
258 | listenmap<- newTVar Map.empty | ||
259 | retrymap <- newTVar Map.empty | ||
260 | return Server { serverCommand = cmds | ||
261 | , serverEvent = tchan | ||
262 | , serverReleaseKey = key | ||
263 | , conmap = conmap | ||
264 | , listenmap = listenmap | ||
265 | , retrymap = retrymap | ||
266 | } | ||
267 | liftIO $ do | ||
268 | forkLabeled "server" $ fix $ \loop -> do | ||
269 | instr <- atomically $ takeTMVar cmds | ||
270 | -- warn $ "instr = " <> bshow instr | ||
271 | let again = do doit server instr | ||
272 | -- warn $ "finished " <> bshow instr | ||
273 | loop | ||
274 | case instr of Quit -> closeAll server | ||
275 | _ -> again | ||
276 | return server | ||
277 | where | ||
278 | closeAll server = do | ||
279 | listening <- atomically . readTVar $ listenmap server | ||
280 | mapM_ quitListening (Map.elems listening) | ||
281 | let stopRetry (v,d) = do atomically $ writeTVar v False | ||
282 | interruptDelay d | ||
283 | retriers <- atomically $ do | ||
284 | rmap <- readTVar $ retrymap server | ||
285 | writeTVar (retrymap server) Map.empty | ||
286 | return rmap | ||
287 | mapM_ stopRetry (Map.elems retriers) | ||
288 | cons <- atomically . readTVar $ conmap server | ||
289 | atomically $ mapM_ (connClose . cstate) (Map.elems cons) | ||
290 | atomically $ mapM_ (connWait . cstate) (Map.elems cons) | ||
291 | atomically $ writeTVar (conmap server) Map.empty | ||
292 | |||
293 | |||
294 | doit server (Listen port params) = do | ||
295 | |||
296 | listening <- Map.member port | ||
297 | `fmap` atomically (readTVar $ listenmap server) | ||
298 | when (not listening) $ do | ||
299 | |||
300 | dput XMisc $ "Started listening on "++show port | ||
301 | |||
302 | sserv <- flip streamServer [port] ServerConfig | ||
303 | { serverWarn = dput XMisc | ||
304 | , serverSession = \sock _ h -> do | ||
305 | (conkey,u) <- makeConnKey params sock | ||
306 | _ <- newConnection server sessionConduits params conkey u h In | ||
307 | return () | ||
308 | } | ||
309 | |||
310 | atomically $ listenmap server `modifyTVar'` Map.insert port sserv | ||
311 | |||
312 | doit server (Ignore port) = do | ||
313 | dput XMisc $ "Stopping listen on "++show port | ||
314 | mb <- atomically $ do | ||
315 | map <- readTVar $ listenmap server | ||
316 | modifyTVar' (listenmap server) $ Map.delete port | ||
317 | return $ Map.lookup port map | ||
318 | maybe (return ()) quitListening $ mb | ||
319 | |||
320 | doit server (Send con bs) = do -- . void . forkIO $ do | ||
321 | map <- atomically $ readTVar (conmap server) | ||
322 | let post False = (trace ("cant send: "++show bs) $ return ()) | ||
323 | post True = return () | ||
324 | maybe (post False) | ||
325 | (post <=< flip connWrite bs . cstate) | ||
326 | $ Map.lookup con map | ||
327 | |||
328 | doit server (Connect addr params) = join $ atomically $ do | ||
329 | Map.lookup addr <$> readTVar (retrymap server) | ||
330 | >>= return . \case | ||
331 | Nothing -> forkit | ||
332 | Just (v,d) -> do b <- atomically $ readTVar v | ||
333 | interruptDelay d | ||
334 | when (not b) forkit | ||
335 | where | ||
336 | forkit = void . forkLabeled ( "Connect." ++ show addr ) $ do | ||
337 | proto <- getProtocolNumber "tcp" | ||
338 | sock <- socket (socketFamily addr) Stream proto | ||
339 | handle (\e -> do -- let t = ioeGetErrorType e | ||
340 | when (isDoesNotExistError e) $ return () -- warn "GOTCHA" | ||
341 | -- warn $ "connect-error: " <> bshow e | ||
342 | (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) -- XXX: ? | ||
343 | Socket.close sock | ||
344 | atomically | ||
345 | $ writeTChan (serverEvent server) | ||
346 | $ ((conkey,u),ConnectFailure addr)) | ||
347 | $ do | ||
348 | connect sock addr | ||
349 | laddr <- Socket.getSocketName sock | ||
350 | (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr)) | ||
351 | h <- socketToHandle sock ReadWriteMode | ||
352 | newConnection server sessionConduits params conkey u h Out | ||
353 | return () | ||
354 | |||
355 | doit server (ConnectWithEndlessRetry addr params interval) = do | ||
356 | proto <- getProtocolNumber "tcp" | ||
357 | void . forkLabeled ("ConnectWithEndlessRetry." ++ show addr) $ do | ||
358 | timer <- interruptibleDelay | ||
359 | (retryVar,action) <- atomically $ do | ||
360 | map <- readTVar (retrymap server) | ||
361 | action <- case Map.lookup addr map of | ||
362 | Nothing -> return $ return () | ||
363 | Just (v,d) -> do writeTVar v False | ||
364 | return $ interruptDelay d | ||
365 | v <- newTVar True | ||
366 | writeTVar (retrymap server) $! Map.insert addr (v,timer) map | ||
367 | return (v,action :: IO ()) | ||
368 | action | ||
369 | fix $ \retryLoop -> do | ||
370 | utc <- getCurrentTime | ||
371 | shouldRetry <- do | ||
372 | handle (\(SomeException e) -> do | ||
373 | -- Exceptions thrown by 'socket' need to be handled specially | ||
374 | -- since we don't have enough information to broadcast a ConnectFailure | ||
375 | -- on serverEvent. | ||
376 | warn $ "Failed to create socket: " <> bshow e | ||
377 | atomically $ readTVar retryVar) $ do | ||
378 | sock <- socket (socketFamily addr) Stream proto | ||
379 | handle (\(SomeException e) -> do | ||
380 | -- Any thing else goes wrong and we broadcast ConnectFailure. | ||
381 | do (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) | ||
382 | Socket.close sock | ||
383 | atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr) | ||
384 | `onException` return () | ||
385 | atomically $ readTVar retryVar) $ do | ||
386 | connect sock addr | ||
387 | laddr <- Socket.getSocketName sock | ||
388 | (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr)) | ||
389 | h <- socketToHandle sock ReadWriteMode | ||
390 | threads <- newConnection server sessionConduits params conkey u h Out | ||
391 | atomically $ do threadsWait threads | ||
392 | readTVar retryVar | ||
393 | fin_utc <- getCurrentTime | ||
394 | when shouldRetry $ do | ||
395 | let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc) | ||
396 | expected = fromIntegral interval | ||
397 | when (shouldRetry && elapsed < expected) $ do | ||
398 | debugNoise $ "Waiting to retry " <> bshow addr | ||
399 | void $ startDelay timer (round $ 1000 * (expected-elapsed)) | ||
400 | debugNoise $ "retry " <> bshow (shouldRetry,addr) | ||
401 | when shouldRetry $ retryLoop | ||
402 | |||
403 | |||
404 | -- INTERNAL ---------------------------------------------------------- | ||
405 | |||
406 | {- | ||
407 | hWriteUntilNothing h outs = | ||
408 | fix $ \loop -> do | ||
409 | mb <- atomically $ takeTMVar outs | ||
410 | case mb of Just bs -> do S.hPutStrLn h bs | ||
411 | warn $ "wrote " <> bs | ||
412 | loop | ||
413 | Nothing -> do warn $ "wrote Nothing" | ||
414 | hClose h | ||
415 | |||
416 | -} | ||
417 | connRead :: ConnectionState -> IO (Maybe ByteString) | ||
418 | connRead (WriteOnlyConnection w) = do | ||
419 | -- atomically $ discardContents (threadsChannel w) | ||
420 | return Nothing | ||
421 | connRead conn = do | ||
422 | c <- atomically $ getThreads | ||
423 | threadsRead c | ||
424 | where | ||
425 | getThreads = | ||
426 | case conn of SaneConnection c -> return c | ||
427 | ReadOnlyConnection c -> return c | ||
428 | ConnectionPair c w -> do | ||
429 | -- discardContents (threadsChannel w) | ||
430 | return c | ||
431 | |||
432 | socketFamily :: SockAddr -> Family | ||
433 | socketFamily (SockAddrInet _ _) = AF_INET | ||
434 | socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
435 | socketFamily (SockAddrUnix _) = AF_UNIX | ||
436 | |||
437 | |||
438 | conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) ) | ||
439 | -> ConnectionState | ||
440 | -> ConnectionEvent x | ||
441 | conevent sessionConduits con = Connection pingflag read write | ||
442 | where | ||
443 | pingflag = swapTVar (pingFlag (connPingTimer con)) False | ||
444 | (read,write) = sessionConduits (connRead con) (connWrite con) | ||
445 | |||
446 | newConnection :: Ord a | ||
447 | => Server a u1 releaseKey x | ||
448 | -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) ) | ||
449 | -> ConnectionParameters conkey u | ||
450 | -> a | ||
451 | -> u1 | ||
452 | -> Handle | ||
453 | -> InOrOut | ||
454 | -> IO ConnectionThreads | ||
455 | newConnection server sessionConduits params conkey u h inout = do | ||
456 | hSetBuffering h NoBuffering | ||
457 | let (idle_ms,timeout_ms) = | ||
458 | case (inout,duplex params) of | ||
459 | (Out,False) -> ( 0, 0 ) | ||
460 | _ -> ( pingInterval params | ||
461 | , timeout params ) | ||
462 | |||
463 | new <- do pinglogic <- forkPingMachine "newConnection" idle_ms timeout_ms | ||
464 | connectionThreads h pinglogic | ||
465 | started <- atomically $ newEmptyTMVar | ||
466 | kontvar <- atomically newEmptyTMVar | ||
467 | -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ? | ||
468 | let _ = kontvar :: TMVar (STM (IO ())) | ||
469 | forkLabeled ("connecting...") $ do | ||
470 | getkont <- atomically $ takeTMVar kontvar | ||
471 | kont <- atomically getkont | ||
472 | kont | ||
473 | |||
474 | atomically $ do | ||
475 | current <- fmap (Map.lookup conkey) $ readTVar (conmap server) | ||
476 | case current of | ||
477 | Nothing -> do | ||
478 | (newCon,e) <- return $ | ||
479 | if duplex params | ||
480 | then let newcon = SaneConnection new | ||
481 | in ( newcon, ((conkey,u), conevent sessionConduits newcon) ) | ||
482 | else ( case inout of | ||
483 | In -> ReadOnlyConnection new | ||
484 | Out -> WriteOnlyConnection new | ||
485 | , ((conkey,u), HalfConnection inout) ) | ||
486 | modifyTVar' (conmap server) $ Map.insert conkey | ||
487 | ConnectionRecord { ckont = kontvar | ||
488 | , cstate = newCon | ||
489 | , cdata = u } | ||
490 | announce e | ||
491 | putTMVar kontvar $ return $ do | ||
492 | myThreadId >>= flip labelThread ("connection."++show inout) -- XXX: more info would be nice. | ||
493 | atomically $ putTMVar started () | ||
494 | -- Wait for something interesting. | ||
495 | handleEOF conkey u kontvar newCon | ||
496 | Just what@ConnectionRecord { ckont =mvar }-> do | ||
497 | putTMVar kontvar $ return $ return () -- Kill redundant "connecting..." thread. | ||
498 | putTMVar mvar $ do | ||
499 | -- The action returned by updateConMap, eventually invokes handleEOF, | ||
500 | -- so the sequencer thread will not be terminated. | ||
501 | kont <- updateConMap conkey u new what | ||
502 | putTMVar started () | ||
503 | return kont | ||
504 | return new | ||
505 | where | ||
506 | |||
507 | announce e = writeTChan (serverEvent server) e | ||
508 | |||
509 | -- This function loops and will not quit unless an action is posted to the | ||
510 | -- mvar that does not in turn invoke this function, or if an EOF occurs. | ||
511 | handleEOF conkey u mvar newCon = do | ||
512 | action <- atomically . foldr1 orElse $ | ||
513 | [ takeTMVar mvar >>= id -- passed continuation | ||
514 | , connWait newCon >> return eof | ||
515 | , connWaitPing newCon >>= return . sendPing | ||
516 | -- , pingWait pingTimer >>= return . sendPing | ||
517 | ] | ||
518 | action :: IO () | ||
519 | where | ||
520 | eof = do | ||
521 | -- warn $ "EOF " <>bshow conkey | ||
522 | connCancelPing newCon | ||
523 | atomically $ do connFlush newCon | ||
524 | announce ((conkey,u),EOF) | ||
525 | modifyTVar' (conmap server) | ||
526 | $ Map.delete conkey | ||
527 | -- warn $ "fin-EOF "<>bshow conkey | ||
528 | |||
529 | sendPing PingTimeOut = do | ||
530 | {- | ||
531 | utc <- getCurrentTime | ||
532 | let utc' = formatTime defaultTimeLocale "%s" utc | ||
533 | warn $ "ping:TIMEOUT " <> bshow utc' | ||
534 | -} | ||
535 | atomically (connClose newCon) | ||
536 | eof | ||
537 | |||
538 | sendPing PingIdle = do | ||
539 | {- | ||
540 | utc <- getCurrentTime | ||
541 | let utc' = formatTime defaultTimeLocale "%s" utc | ||
542 | -- warn $ "ping:IDLE " <> bshow utc' | ||
543 | -} | ||
544 | atomically $ announce ((conkey,u),RequiresPing) | ||
545 | handleEOF conkey u mvar newCon | ||
546 | |||
547 | |||
548 | updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do | ||
549 | new' <- | ||
550 | if duplex params then do | ||
551 | announce ((conkey,u),EOF) | ||
552 | connClose replaced | ||
553 | let newcon = SaneConnection new | ||
554 | announce $ ((conkey,u),conevent sessionConduits newcon) | ||
555 | return $ newcon | ||
556 | else | ||
557 | case replaced of | ||
558 | WriteOnlyConnection w | inout==In -> | ||
559 | do let newcon = ConnectionPair new w | ||
560 | announce ((conkey,u),conevent sessionConduits newcon) | ||
561 | return newcon | ||
562 | ReadOnlyConnection r | inout==Out -> | ||
563 | do let newcon = ConnectionPair r new | ||
564 | announce ((conkey,u),conevent sessionConduits newcon) | ||
565 | return newcon | ||
566 | _ -> do -- connFlush todo | ||
567 | announce ((conkey,u0), EOF) | ||
568 | connClose replaced | ||
569 | announce ((conkey,u), HalfConnection inout) | ||
570 | return $ case inout of | ||
571 | In -> ReadOnlyConnection new | ||
572 | Out -> WriteOnlyConnection new | ||
573 | modifyTVar' (conmap server) $ Map.insert conkey | ||
574 | ConnectionRecord { ckont = mvar | ||
575 | , cstate = new' | ||
576 | , cdata = u } | ||
577 | return $ handleEOF conkey u mvar new' | ||
578 | |||
579 | |||
580 | getPacket :: Handle -> IO ByteString | ||
581 | getPacket h = do hWaitForInput h (-1) | ||
582 | hGetNonBlocking h 1024 | ||
583 | |||
584 | |||
585 | |||
586 | -- | 'ConnectionThreads' is an interface to a pair of threads | ||
587 | -- that are reading and writing a 'Handle'. | ||
588 | data ConnectionThreads = ConnectionThreads | ||
589 | { threadsWriter :: TMVar (Maybe ByteString) | ||
590 | , threadsChannel :: TChan ByteString | ||
591 | , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close | ||
592 | , threadsPing :: PingMachine | ||
593 | } | ||
594 | |||
595 | -- | This spawns the reader and writer threads and returns a newly | ||
596 | -- constructed 'ConnectionThreads' object. | ||
597 | connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads | ||
598 | connectionThreads h pinglogic = do | ||
599 | |||
600 | (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar | ||
601 | |||
602 | (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan | ||
603 | readerThread <- forkLabeled "readerThread" $ do | ||
604 | let finished e = do | ||
605 | hClose h | ||
606 | -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) | ||
607 | -- let _ = fmap ioeGetErrorType e -- type hint | ||
608 | let _ = fmap what e where what (SomeException _) = undefined | ||
609 | atomically $ do tryTakeTMVar outs | ||
610 | putTMVar outs Nothing -- quit writer | ||
611 | putTMVar doner () | ||
612 | handle (finished . Just) $ do | ||
613 | pingBump pinglogic -- start the ping timer | ||
614 | fix $ \loop -> do | ||
615 | packet <- getPacket h | ||
616 | -- warn $ "read: " <> S.take 60 packet | ||
617 | atomically $ writeTChan incomming packet | ||
618 | pingBump pinglogic | ||
619 | -- warn $ "bumped: " <> S.take 60 packet | ||
620 | isEof <- hIsEOF h | ||
621 | if isEof then finished Nothing else loop | ||
622 | |||
623 | writerThread <- forkLabeled "writerThread" . fix $ \loop -> do | ||
624 | let finished = do -- warn $ "finished write" | ||
625 | -- hClose h -- quit reader | ||
626 | throwTo readerThread (ErrorCall "EOF") | ||
627 | atomically $ putTMVar donew () | ||
628 | mb <- atomically $ readTMVar outs | ||
629 | case mb of Just bs -> handle (\(SomeException e)->finished) | ||
630 | (do -- warn $ "writing: " <> S.take 60 bs | ||
631 | S.hPutStr h bs | ||
632 | -- warn $ "wrote: " <> S.take 60 bs | ||
633 | atomically $ takeTMVar outs | ||
634 | loop) | ||
635 | Nothing -> finished | ||
636 | |||
637 | let wait = do readTMVar donew | ||
638 | readTMVar doner | ||
639 | return () | ||
640 | return ConnectionThreads { threadsWriter = outs | ||
641 | , threadsChannel = incomming | ||
642 | , threadsWait = wait | ||
643 | , threadsPing = pinglogic } | ||
644 | |||
645 | |||
646 | -- | 'threadsWrite' writes the given 'ByteString' to the | ||
647 | -- 'ConnectionThreads' object. It blocks until the ByteString | ||
648 | -- is written and 'True' is returned, or the connection is | ||
649 | -- interrupted and 'False' is returned. | ||
650 | threadsWrite :: ConnectionThreads -> ByteString -> IO Bool | ||
651 | threadsWrite c bs = atomically $ | ||
652 | orElse (const False `fmap` threadsWait c) | ||
653 | (const True `fmap` putTMVar (threadsWriter c) (Just bs)) | ||
654 | |||
655 | -- | 'threadsClose' signals for the 'ConnectionThreads' object | ||
656 | -- to quit and close the associated 'Handle'. This operation | ||
657 | -- is non-blocking, follow it with 'threadsWait' if you want | ||
658 | -- to wait for the operation to complete. | ||
659 | threadsClose :: ConnectionThreads -> STM () | ||
660 | threadsClose c = do | ||
661 | let mvar = threadsWriter c | ||
662 | v <- tryReadTMVar mvar | ||
663 | case v of | ||
664 | Just Nothing -> return () -- already closed | ||
665 | _ -> putTMVar mvar Nothing | ||
666 | |||
667 | -- | 'threadsRead' blocks until a 'ByteString' is available which | ||
668 | -- is returned to the caller, or the connection is interrupted and | ||
669 | -- 'Nothing' is returned. | ||
670 | threadsRead :: ConnectionThreads -> IO (Maybe ByteString) | ||
671 | threadsRead c = atomically $ | ||
672 | orElse (const Nothing `fmap` threadsWait c) | ||
673 | (Just `fmap` readTChan (threadsChannel c)) | ||
674 | |||
675 | -- | A 'ConnectionState' is an interface to a single 'ConnectionThreads' | ||
676 | -- or to a pair of 'ConnectionThreads' objects that are considered as one | ||
677 | -- connection. | ||
678 | data ConnectionState = | ||
679 | SaneConnection ConnectionThreads | ||
680 | -- ^ ordinary read/write connection | ||
681 | | WriteOnlyConnection ConnectionThreads | ||
682 | | ReadOnlyConnection ConnectionThreads | ||
683 | | ConnectionPair ConnectionThreads ConnectionThreads | ||
684 | -- ^ Two 'ConnectionThreads' objects, read operations use the | ||
685 | -- first, write operations use the second. | ||
686 | |||
687 | |||
688 | |||
689 | connWrite :: ConnectionState -> ByteString -> IO Bool | ||
690 | connWrite (ReadOnlyConnection _) bs = return False | ||
691 | connWrite conn bs = threadsWrite c bs | ||
692 | where | ||
693 | c = case conn of SaneConnection c -> c | ||
694 | WriteOnlyConnection c -> c | ||
695 | ConnectionPair _ c -> c | ||
696 | |||
697 | |||
698 | mapConn :: Bool -> | ||
699 | (ConnectionThreads -> STM ()) -> ConnectionState -> STM () | ||
700 | mapConn both action c = | ||
701 | case c of | ||
702 | SaneConnection rw -> action rw | ||
703 | ReadOnlyConnection r -> action r | ||
704 | WriteOnlyConnection w -> action w | ||
705 | ConnectionPair r w -> do | ||
706 | rem <- orElse (const w `fmap` action r) | ||
707 | (const r `fmap` action w) | ||
708 | when both $ action rem | ||
709 | |||
710 | connClose :: ConnectionState -> STM () | ||
711 | connClose c = mapConn True threadsClose c | ||
712 | |||
713 | connWait :: ConnectionState -> STM () | ||
714 | connWait c = doit -- mapConn False threadsWait c | ||
715 | where | ||
716 | action = threadsWait | ||
717 | doit = | ||
718 | case c of | ||
719 | SaneConnection rw -> action rw | ||
720 | ReadOnlyConnection r -> action r | ||
721 | WriteOnlyConnection w -> action w | ||
722 | ConnectionPair r w -> do | ||
723 | rem <- orElse (const w `fmap` action r) | ||
724 | (const r `fmap` action w) | ||
725 | threadsClose rem | ||
726 | |||
727 | connPingTimer :: ConnectionState -> PingMachine | ||
728 | connPingTimer c = | ||
729 | case c of | ||
730 | SaneConnection rw -> threadsPing rw | ||
731 | ReadOnlyConnection r -> threadsPing r | ||
732 | WriteOnlyConnection w -> threadsPing w -- should be disabled. | ||
733 | ConnectionPair r w -> threadsPing r | ||
734 | |||
735 | connCancelPing :: ConnectionState -> IO () | ||
736 | connCancelPing c = pingCancel (connPingTimer c) | ||
737 | |||
738 | connWaitPing :: ConnectionState -> STM PingEvent | ||
739 | connWaitPing c = pingWait (connPingTimer c) | ||
740 | |||
741 | connFlush :: ConnectionState -> STM () | ||
742 | connFlush c = | ||
743 | case c of | ||
744 | SaneConnection rw -> waitChan rw | ||
745 | ReadOnlyConnection r -> waitChan r | ||
746 | WriteOnlyConnection w -> return () | ||
747 | ConnectionPair r w -> waitChan r | ||
748 | where | ||
749 | waitChan t = do | ||
750 | b <- isEmptyTChan (threadsChannel t) | ||
751 | when (not b) retry | ||
752 | |||
753 | bshow :: Show a => a -> ByteString | ||
754 | bshow e = S.pack . show $ e | ||
755 | |||
756 | warn :: ByteString -> IO () | ||
757 | warn str =dputB XMisc str | ||
758 | |||
759 | debugNoise :: Monad m => t -> m () | ||
760 | debugNoise str = return () | ||
761 | |||
762 | data TCPStatus = Resolving | AwaitingRead | AwaitingWrite | ||
763 | |||
764 | -- SockAddr -> (SockAddr, ConnectionParameters SockAddr ConnectionData, Miliseconds) | ||
765 | |||
766 | |||
767 | tcpManager :: (PeerAddress -> (SockAddr, ConnectionParameters PeerAddress u, Miliseconds)) | ||
768 | -- -> (String -> Maybe Text) | ||
769 | -- -> (Text -> IO (Maybe PeerAddress)) | ||
770 | -> Server PeerAddress u releaseKey x | ||
771 | -> IO (Manager TCPStatus Text) | ||
772 | tcpManager grokKey sv = do | ||
773 | rmap <- atomically $ newTVar Map.empty -- Map k (Maybe conkey) | ||
774 | nullping <- forkPingMachine "tcpManager" 0 0 | ||
775 | (rslv,rev) <- do | ||
776 | dns <- newDNSCache | ||
777 | let rslv k = map PeerAddress <$> forwardResolve dns k | ||
778 | rev (PeerAddress addr) = reverseResolve dns addr | ||
779 | return (rslv,rev) | ||
780 | return Manager { | ||
781 | setPolicy = \k -> \case | ||
782 | TryingToConnect -> join $ atomically $ do | ||
783 | r <- readTVar rmap | ||
784 | case Map.lookup k r of | ||
785 | Just {} -> return $ return () -- Connection already in progress. | ||
786 | Nothing -> do | ||
787 | modifyTVar' rmap $ Map.insert k Nothing | ||
788 | return $ void $ forkLabeled ("resolve."++show k) $ do | ||
789 | mconkey <- listToMaybe <$> rslv k | ||
790 | case mconkey of | ||
791 | Nothing -> atomically $ modifyTVar' rmap $ Map.delete k | ||
792 | Just conkey -> do | ||
793 | control sv $ case grokKey conkey of | ||
794 | (saddr,params,ms) -> ConnectWithEndlessRetry saddr params ms | ||
795 | OpenToConnect -> dput XMisc "TODO: TCP OpenToConnect" | ||
796 | RefusingToConnect -> dput XMisc "TODO: TCP RefusingToConnect" | ||
797 | , status = \k -> do | ||
798 | c <- readTVar (conmap sv) | ||
799 | ck <- Map.lookup k <$> readTVar rmap | ||
800 | return $ exportConnection c (join ck) | ||
801 | , connections = Map.keys <$> readTVar rmap | ||
802 | , stringToKey = Just . Text.pack | ||
803 | , showProgress = \case | ||
804 | Resolving -> "resolving" | ||
805 | AwaitingRead -> "awaiting inbound" | ||
806 | AwaitingWrite -> "awaiting outbound" | ||
807 | , showKey = show | ||
808 | , resolvePeer = rslv | ||
809 | , reverseAddress = rev | ||
810 | } | ||
811 | |||
812 | exportConnection :: Ord conkey => Map conkey (ConnectionRecord u) -> Maybe conkey -> G.Connection TCPStatus | ||
813 | exportConnection conmap mkey = G.Connection | ||
814 | { G.connStatus = case mkey of | ||
815 | Nothing -> G.Dormant | ||
816 | Just conkey -> case Map.lookup conkey conmap of | ||
817 | Nothing -> G.InProgress Resolving | ||
818 | Just (ConnectionRecord ckont cstate cdata) -> case cstate of | ||
819 | SaneConnection {} -> G.Established | ||
820 | ConnectionPair {} -> G.Established | ||
821 | ReadOnlyConnection {} -> G.InProgress AwaitingWrite | ||
822 | WriteOnlyConnection {} -> G.InProgress AwaitingRead | ||
823 | , G.connPolicy = TryingToConnect | ||
824 | } | ||
diff --git a/dht/Presence/ControlMaybe.hs b/dht/Presence/ControlMaybe.hs deleted file mode 100644 index a101d667..00000000 --- a/dht/Presence/ControlMaybe.hs +++ /dev/null | |||
@@ -1,64 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE ScopedTypeVariables #-} | ||
3 | module ControlMaybe | ||
4 | ( module ControlMaybe | ||
5 | , module Data.Functor | ||
6 | ) where | ||
7 | |||
8 | -- import GHC.IO.Exception (IOException(..)) | ||
9 | import Control.Monad | ||
10 | import Data.Functor | ||
11 | import System.IO.Error | ||
12 | |||
13 | |||
14 | -- forM_ with less polymorphism. | ||
15 | withJust :: Monad m => Maybe x -> (x -> m ()) -> m () | ||
16 | withJust m f = forM_ m f | ||
17 | {-# INLINE withJust #-} | ||
18 | |||
19 | whenJust :: Monad m => m (Maybe x) -> (x -> m ()) -> m () | ||
20 | whenJust acn f = acn >>= mapM_ f | ||
21 | {-# INLINE whenJust #-} | ||
22 | |||
23 | |||
24 | catchIO_ :: IO a -> IO a -> IO a | ||
25 | catchIO_ body catcher = catchIOError body (\_ -> catcher) | ||
26 | {-# INLINE catchIO_ #-} | ||
27 | |||
28 | handleIO_ :: IO a -> IO a -> IO a | ||
29 | handleIO_ catcher body = catchIOError body (\_ -> catcher) | ||
30 | {-# INLINE handleIO_ #-} | ||
31 | |||
32 | |||
33 | handleIO :: (IOError -> IO a) -> IO a -> IO a | ||
34 | handleIO catcher body = catchIOError body catcher | ||
35 | {-# INLINE handleIO #-} | ||
36 | |||
37 | #if !MIN_VERSION_base(4,11,0) | ||
38 | -- | Flipped version of '<$>'. | ||
39 | -- | ||
40 | -- @ | ||
41 | -- ('<&>') = 'flip' 'fmap' | ||
42 | -- @ | ||
43 | -- | ||
44 | -- @since 4.11.0.0 | ||
45 | -- | ||
46 | -- ==== __Examples__ | ||
47 | -- Apply @(+1)@ to a list, a 'Data.Maybe.Just' and a 'Data.Either.Right': | ||
48 | -- | ||
49 | -- >>> Just 2 <&> (+1) | ||
50 | -- Just 3 | ||
51 | -- | ||
52 | -- >>> [1,2,3] <&> (+1) | ||
53 | -- [2,3,4] | ||
54 | -- | ||
55 | -- >>> Right 3 <&> (+1) | ||
56 | -- Right 4 | ||
57 | -- | ||
58 | (<&>) :: Functor f => f a -> (a -> b) -> f b | ||
59 | as <&> f = f <$> as | ||
60 | |||
61 | infixl 1 <&> | ||
62 | #endif | ||
63 | |||
64 | |||
diff --git a/dht/Presence/DNSCache.hs b/dht/Presence/DNSCache.hs deleted file mode 100644 index 9bb354e1..00000000 --- a/dht/Presence/DNSCache.hs +++ /dev/null | |||
@@ -1,285 +0,0 @@ | |||
1 | -- | Both 'getAddrInfo' and 'getHostByAddr' have hard-coded timeouts for | ||
2 | -- waiting upon network queries that can be a little too long for some use | ||
3 | -- cases. This module wraps both of them so that they block for at most one | ||
4 | -- second. It caches late-arriving results so that they can be returned by | ||
5 | -- repeated timed-out queries. | ||
6 | -- | ||
7 | -- In order to achieve the shorter timeout, it is likely that the you will need | ||
8 | -- to build with GHC's -threaded option. Otherwise, if the wrapped FFI calls | ||
9 | -- to resolve the address will block Haskell threads. Note: I didn't verify | ||
10 | -- this. | ||
11 | {-# LANGUAGE TupleSections #-} | ||
12 | {-# LANGUAGE RankNTypes #-} | ||
13 | {-# LANGUAGE CPP #-} | ||
14 | module DNSCache | ||
15 | ( DNSCache | ||
16 | , reverseResolve | ||
17 | , forwardResolve | ||
18 | , newDNSCache | ||
19 | , parseAddress | ||
20 | , unsafeParseAddress | ||
21 | , strip_brackets | ||
22 | , withPort | ||
23 | ) where | ||
24 | |||
25 | import Control.Concurrent.ThreadUtil | ||
26 | import Control.Arrow | ||
27 | import Control.Concurrent.STM | ||
28 | import Data.Text ( Text ) | ||
29 | import Network.Socket ( SockAddr(..), AddrInfoFlag(..), defaultHints, getAddrInfo, AddrInfo(..) ) | ||
30 | import Data.Time.Clock ( UTCTime, getCurrentTime, diffUTCTime ) | ||
31 | import System.IO.Error ( isDoesNotExistError ) | ||
32 | import System.Endian ( fromBE32, toBE32 ) | ||
33 | import Control.Exception ( handle ) | ||
34 | import Data.Map ( Map ) | ||
35 | import qualified Data.Map as Map | ||
36 | import qualified Network.BSD as BSD | ||
37 | import qualified Data.Text as Text | ||
38 | import Control.Monad | ||
39 | import Data.Function | ||
40 | import Data.List | ||
41 | import Data.Ord | ||
42 | import Data.Maybe | ||
43 | import System.IO.Error | ||
44 | import System.IO.Unsafe | ||
45 | |||
46 | import SockAddr () | ||
47 | import ControlMaybe ( handleIO_ ) | ||
48 | import GetHostByAddr ( getHostByAddr ) | ||
49 | import Control.Concurrent.Delay | ||
50 | import DPut | ||
51 | import DebugTag | ||
52 | |||
53 | type TimeStamp = UTCTime | ||
54 | |||
55 | data DNSCache = | ||
56 | DNSCache | ||
57 | { fcache :: TVar (Map Text [(TimeStamp, SockAddr)]) | ||
58 | , rcache :: TVar (Map SockAddr [(TimeStamp, Text)]) | ||
59 | } | ||
60 | |||
61 | |||
62 | newDNSCache :: IO DNSCache | ||
63 | newDNSCache = do | ||
64 | fcache <- newTVarIO Map.empty | ||
65 | rcache <- newTVarIO Map.empty | ||
66 | return DNSCache { fcache=fcache, rcache=rcache } | ||
67 | |||
68 | updateCache :: Eq x => | ||
69 | Bool -> TimeStamp -> [x] -> Maybe [(TimeStamp,x)] -> Maybe [(TimeStamp,x)] | ||
70 | updateCache withScrub utc xs mys = do | ||
71 | let ys = maybe [] id mys | ||
72 | ys' = filter scrub ys | ||
73 | ys'' = map (utc,) xs ++ ys' | ||
74 | minute = 60 | ||
75 | scrub (t,x) | withScrub && diffUTCTime utc t < minute = False | ||
76 | scrub (t,x) | x `elem` xs = False | ||
77 | scrub _ = True | ||
78 | guard $ not (null ys'') | ||
79 | return ys'' | ||
80 | |||
81 | dnsObserve :: DNSCache -> Bool -> TimeStamp -> [(Text,SockAddr)] -> STM () | ||
82 | dnsObserve dns withScrub utc obs = do | ||
83 | f <- readTVar $ fcache dns | ||
84 | r <- readTVar $ rcache dns | ||
85 | let obs' = map (\(n,a)->(n,a `withPort` 0)) obs | ||
86 | gs = do | ||
87 | g <- groupBy ((==) `on` fst) $ sortBy (comparing fst) obs' | ||
88 | (n,_) <- take 1 g | ||
89 | return (n,map snd g) | ||
90 | f' = foldl' updatef f gs | ||
91 | hs = do | ||
92 | h <- groupBy ((==) `on` snd) $ sortBy (comparing snd) obs' | ||
93 | (_,a) <- take 1 h | ||
94 | return (a,map fst h) | ||
95 | r' = foldl' updater r hs | ||
96 | writeTVar (fcache dns) f' | ||
97 | writeTVar (rcache dns) r' | ||
98 | where | ||
99 | updatef f (n,addrs) = Map.alter (updateCache withScrub utc addrs) n f | ||
100 | updater r (a,ns) = Map.alter (updateCache withScrub utc ns) a r | ||
101 | |||
102 | make6mapped4 :: SockAddr -> SockAddr | ||
103 | make6mapped4 addr@(SockAddrInet6 {}) = addr | ||
104 | make6mapped4 addr@(SockAddrInet port a) = SockAddrInet6 port 0 (0,0,0xFFFF,fromBE32 a) 0 | ||
105 | |||
106 | tryForkOS :: String -> IO () -> IO ThreadId | ||
107 | tryForkOS lbl action = catchIOError (forkOSLabeled lbl action) $ \e -> do | ||
108 | dput XMisc $ "DNSCache: Link with -threaded to avoid excessively long time-out." | ||
109 | forkLabeled lbl action | ||
110 | |||
111 | |||
112 | -- Attempt to resolve the given domain name. Returns an empty list if the | ||
113 | -- resolve operation takes longer than the timeout, but the 'DNSCache' will be | ||
114 | -- updated when the resolve completes. | ||
115 | -- | ||
116 | -- When the resolve operation does complete, any entries less than a minute old | ||
117 | -- will be overwritten with the new results. Older entries are allowed to | ||
118 | -- persist for reasons I don't understand as of this writing. (See 'updateCache') | ||
119 | rawForwardResolve :: | ||
120 | DNSCache -> (Text -> IO ()) -> Int -> Text -> IO [SockAddr] | ||
121 | rawForwardResolve dns onFail timeout addrtext = do | ||
122 | r <- atomically newEmptyTMVar | ||
123 | mvar <- interruptibleDelay | ||
124 | rt <- tryForkOS ("resolve."++show addrtext) $ do | ||
125 | resolver r mvar | ||
126 | startDelay mvar timeout | ||
127 | did <- atomically $ tryPutTMVar r [] | ||
128 | when did (onFail addrtext) | ||
129 | atomically $ readTMVar r | ||
130 | where | ||
131 | resolver r mvar = do | ||
132 | xs <- handle (\e -> let _ = isDoesNotExistError e in return []) | ||
133 | $ do fmap (nub . map (make6mapped4 . addrAddress)) $ | ||
134 | getAddrInfo (Just $ defaultHints { addrFlags = [ AI_CANONNAME, AI_V4MAPPED ]}) | ||
135 | (Just $ Text.unpack $ strip_brackets addrtext) | ||
136 | (Just "5269") | ||
137 | did <- atomically $ tryPutTMVar r xs | ||
138 | when did $ do | ||
139 | interruptDelay mvar | ||
140 | utc <- getCurrentTime | ||
141 | atomically $ dnsObserve dns True utc $ map (addrtext,) xs | ||
142 | return () | ||
143 | |||
144 | strip_brackets :: Text -> Text | ||
145 | strip_brackets s = | ||
146 | case Text.uncons s of | ||
147 | Just ('[',t) -> Text.takeWhile (/=']') t | ||
148 | _ -> s | ||
149 | |||
150 | |||
151 | reportTimeout :: forall a. Show a => a -> IO () | ||
152 | reportTimeout addrtext = do | ||
153 | dput XMisc $ "timeout resolving: "++show addrtext | ||
154 | -- killThread rt | ||
155 | |||
156 | unmap6mapped4 :: SockAddr -> SockAddr | ||
157 | unmap6mapped4 addr@(SockAddrInet6 port _ (0,0,0xFFFF,a) _) = | ||
158 | SockAddrInet port (toBE32 a) | ||
159 | unmap6mapped4 addr = addr | ||
160 | |||
161 | rawReverseResolve :: | ||
162 | DNSCache -> (SockAddr -> IO ()) -> Int -> SockAddr -> IO [Text] | ||
163 | rawReverseResolve dns onFail timeout addr = do | ||
164 | r <- atomically newEmptyTMVar | ||
165 | mvar <- interruptibleDelay | ||
166 | rt <- forkOS $ resolver r mvar | ||
167 | startDelay mvar timeout | ||
168 | did <- atomically $ tryPutTMVar r [] | ||
169 | when did (onFail addr) | ||
170 | atomically $ readTMVar r | ||
171 | where | ||
172 | resolver r mvar = | ||
173 | handleIO_ (return ()) $ do | ||
174 | ent <- getHostByAddr (unmap6mapped4 addr) -- AF_UNSPEC addr | ||
175 | let names = BSD.hostName ent : BSD.hostAliases ent | ||
176 | xs = map Text.pack $ nub names | ||
177 | forkIO $ do | ||
178 | utc <- getCurrentTime | ||
179 | atomically $ dnsObserve dns False utc $ map (,addr) xs | ||
180 | atomically $ putTMVar r xs | ||
181 | |||
182 | -- Returns expired (older than a minute) cached reverse-dns results | ||
183 | -- and removes them from the cache. | ||
184 | expiredReverse :: DNSCache -> SockAddr -> IO [Text] | ||
185 | expiredReverse dns addr = do | ||
186 | utc <- getCurrentTime | ||
187 | addr <- return $ addr `withPort` 0 | ||
188 | es <- atomically $ do | ||
189 | r <- readTVar $ rcache dns | ||
190 | let ns = maybe [] id $ Map.lookup addr r | ||
191 | minute = 60 -- seconds | ||
192 | -- XXX: Is this right? flip diffUTCTime utc returns the age of the | ||
193 | -- cache entry? | ||
194 | (es0,ns') = partition ( (>=minute) . flip diffUTCTime utc . fst ) ns | ||
195 | es = map snd es0 | ||
196 | modifyTVar' (rcache dns) $ Map.insert addr ns' | ||
197 | f <- readTVar $ fcache dns | ||
198 | let f' = foldl' (flip $ Map.alter (expire utc)) f es | ||
199 | expire utc Nothing = Nothing | ||
200 | expire utc (Just as) = if null as' then Nothing else Just as' | ||
201 | where as' = filter ( (<minute) . flip diffUTCTime utc . fst) as | ||
202 | writeTVar (fcache dns) f' | ||
203 | return es | ||
204 | return es | ||
205 | |||
206 | cachedReverse :: DNSCache -> SockAddr -> IO [Text] | ||
207 | cachedReverse dns addr = do | ||
208 | utc <- getCurrentTime | ||
209 | addr <- return $ addr `withPort` 0 | ||
210 | atomically $ do | ||
211 | r <- readTVar (rcache dns) | ||
212 | let ns = maybe [] id $ Map.lookup addr r | ||
213 | {- | ||
214 | ns' = filter ( (<minute) . flip diffUTCTime utc . fst) ns | ||
215 | minute = 60 -- seconds | ||
216 | modifyTVar' (rcache dns) $ Map.insert addr ns' | ||
217 | return $ map snd ns' | ||
218 | -} | ||
219 | return $ map snd ns | ||
220 | |||
221 | -- Returns any dns query results for the given name that were observed less | ||
222 | -- than a minute ago and updates the forward-cache to remove any results older | ||
223 | -- than that. | ||
224 | cachedForward :: DNSCache -> Text -> IO [SockAddr] | ||
225 | cachedForward dns n = do | ||
226 | utc <- getCurrentTime | ||
227 | atomically $ do | ||
228 | f <- readTVar (fcache dns) | ||
229 | let as = maybe [] id $ Map.lookup n f | ||
230 | as' = filter ( (<minute) . flip diffUTCTime utc . fst) as | ||
231 | minute = 60 -- seconds | ||
232 | modifyTVar' (fcache dns) $ Map.insert n as' | ||
233 | return $ map snd as' | ||
234 | |||
235 | -- Reverse-resolves an address to a domain name. Returns both the result of a | ||
236 | -- new query and any freshly cached results. Cache entries older than a minute | ||
237 | -- will not be returned, but will be refreshed in spawned threads so that they | ||
238 | -- may be available for the next call. | ||
239 | reverseResolve :: DNSCache -> SockAddr -> IO [Text] | ||
240 | reverseResolve dns addr = do | ||
241 | expired <- expiredReverse dns addr | ||
242 | forM_ expired $ \n -> forkIO $ do | ||
243 | rawForwardResolve dns (const $ return ()) 1000000 n | ||
244 | return () | ||
245 | xs <- rawReverseResolve dns (const $ return ()) 1000000 addr | ||
246 | cs <- cachedReverse dns addr | ||
247 | return $ xs ++ filter (not . flip elem xs) cs | ||
248 | |||
249 | -- Resolves a name, if there's no result within one second, then any cached | ||
250 | -- results that are less than a minute old are returned. | ||
251 | forwardResolve :: DNSCache -> Text -> IO [SockAddr] | ||
252 | forwardResolve dns n = do | ||
253 | as <- rawForwardResolve dns (const $ return ()) 1000000 n | ||
254 | if null as | ||
255 | then cachedForward dns n | ||
256 | else return as | ||
257 | |||
258 | parseAddress :: Text -> IO (Maybe SockAddr) | ||
259 | parseAddress addr_str = do | ||
260 | info <- getAddrInfo (Just $ defaultHints { addrFlags = [ AI_NUMERICHOST ] }) | ||
261 | (Just . Text.unpack $ addr_str) | ||
262 | (Just "0") | ||
263 | return . listToMaybe $ map addrAddress info | ||
264 | |||
265 | |||
266 | splitAtPort :: String -> (String,String) | ||
267 | splitAtPort s = second sanitizePort $ case s of | ||
268 | ('[':t) -> break (==']') t | ||
269 | _ -> break (==':') s | ||
270 | where | ||
271 | sanitizePort (']':':':p) = p | ||
272 | sanitizePort (':':p) = p | ||
273 | sanitizePort _ = "0" | ||
274 | |||
275 | unsafeParseAddress :: String -> Maybe SockAddr | ||
276 | unsafeParseAddress addr_str = unsafePerformIO $ do | ||
277 | let (ipstr,portstr) = splitAtPort addr_str | ||
278 | info <- getAddrInfo (Just $ defaultHints { addrFlags = [ AI_NUMERICHOST ] }) | ||
279 | (Just ipstr) | ||
280 | (Just portstr) | ||
281 | return . listToMaybe $ map addrAddress info | ||
282 | |||
283 | withPort :: SockAddr -> Int -> SockAddr | ||
284 | withPort (SockAddrInet _ a) port = SockAddrInet (toEnum port) a | ||
285 | withPort (SockAddrInet6 _ a b c) port = SockAddrInet6 (toEnum port) a b c | ||
diff --git a/dht/Presence/GetHostByAddr.hs b/dht/Presence/GetHostByAddr.hs deleted file mode 100644 index 45bca5e9..00000000 --- a/dht/Presence/GetHostByAddr.hs +++ /dev/null | |||
@@ -1,77 +0,0 @@ | |||
1 | {-# LANGUAGE ForeignFunctionInterface #-} | ||
2 | module GetHostByAddr where | ||
3 | |||
4 | import Network.BSD | ||
5 | import Foreign.Ptr | ||
6 | import Foreign.C.Types | ||
7 | import Foreign.Storable (Storable(..)) | ||
8 | import Foreign.Marshal.Utils (with) | ||
9 | import Foreign.Marshal.Alloc | ||
10 | import Control.Concurrent | ||
11 | import System.IO.Unsafe | ||
12 | import System.IO.Error (ioeSetErrorString, mkIOError) | ||
13 | import Network.Socket | ||
14 | import GHC.IO.Exception | ||
15 | |||
16 | |||
17 | throwNoSuchThingIfNull :: String -> String -> IO (Ptr a) -> IO (Ptr a) | ||
18 | throwNoSuchThingIfNull loc desc act = do | ||
19 | ptr <- act | ||
20 | if (ptr == nullPtr) | ||
21 | then ioError (ioeSetErrorString (mkIOError NoSuchThing loc Nothing Nothing) desc) | ||
22 | else return ptr | ||
23 | |||
24 | {-# NOINLINE lock #-} | ||
25 | lock :: MVar () | ||
26 | lock = unsafePerformIO $ newMVar () | ||
27 | |||
28 | withLock :: IO a -> IO a | ||
29 | withLock act = withMVar lock (\_ -> act) | ||
30 | |||
31 | trySysCall :: IO a -> IO a | ||
32 | trySysCall act = act | ||
33 | |||
34 | {- | ||
35 | -- The locking of gethostbyaddr is similar to gethostbyname. | ||
36 | -- | Get a 'HostEntry' corresponding to the given address and family. | ||
37 | -- Note that only IPv4 is currently supported. | ||
38 | getHostByAddr :: Family -> SockAddr -> IO HostEntry | ||
39 | getHostByAddr family addr = do | ||
40 | withSockAddr addr $ \ ptr_addr len -> withLock $ do | ||
41 | throwNoSuchThingIfNull "getHostByAddr" "no such host entry" | ||
42 | $ trySysCall $ c_gethostbyaddr ptr_addr (fromIntegral len) (packFamily family) | ||
43 | >>= peek | ||
44 | -} | ||
45 | |||
46 | |||
47 | -- The locking of gethostbyaddr is similar to gethostbyname. | ||
48 | -- | Get a 'HostEntry' corresponding to the given address and family. | ||
49 | -- Note that only IPv4 is currently supported. | ||
50 | -- getHostByAddr :: Family -> HostAddress -> IO HostEntry | ||
51 | -- getHostByAddr family addr = do | ||
52 | getHostByAddr :: SockAddr -> IO HostEntry | ||
53 | getHostByAddr (SockAddrInet port addr ) = do | ||
54 | let family = AF_INET | ||
55 | with addr $ \ ptr_addr -> withLock $ do | ||
56 | throwNoSuchThingIfNull "getHostByAddr" "no such host entry" | ||
57 | $ trySysCall $ c_gethostbyaddr ptr_addr (fromIntegral (sizeOf addr)) (packFamily family) | ||
58 | >>= peek | ||
59 | getHostByAddr (SockAddrInet6 port flow (a,b,c,d) scope) = do | ||
60 | let family = AF_INET6 | ||
61 | allocaBytes 16 $ \ ptr_addr -> do | ||
62 | pokeElemOff ptr_addr 0 a | ||
63 | pokeElemOff ptr_addr 1 b | ||
64 | pokeElemOff ptr_addr 2 c | ||
65 | pokeElemOff ptr_addr 3 d | ||
66 | withLock $ do | ||
67 | throwNoSuchThingIfNull "getHostByAddr" "no such host entry" | ||
68 | $ trySysCall $ c_gethostbyaddr ptr_addr 16 (packFamily family) | ||
69 | >>= peek | ||
70 | |||
71 | |||
72 | foreign import ccall safe "gethostbyaddr" | ||
73 | c_gethostbyaddr :: Ptr a -> CInt -> CInt -> IO (Ptr HostEntry) | ||
74 | |||
75 | |||
76 | |||
77 | -- vim:ft=haskell: | ||
diff --git a/dht/Presence/SockAddr.hs b/dht/Presence/SockAddr.hs deleted file mode 100644 index b5fbf16e..00000000 --- a/dht/Presence/SockAddr.hs +++ /dev/null | |||
@@ -1,14 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE StandaloneDeriving #-} | ||
3 | module SockAddr () where | ||
4 | |||
5 | #if MIN_VERSION_network(2,4,0) | ||
6 | import Network.Socket () | ||
7 | #else | ||
8 | import Network.Socket ( SockAddr(..) ) | ||
9 | |||
10 | deriving instance Ord SockAddr | ||
11 | #endif | ||
12 | |||
13 | |||
14 | |||
diff --git a/dht/dht-client.cabal b/dht/dht-client.cabal index 64d48f53..0da181df 100644 --- a/dht/dht-client.cabal +++ b/dht/dht-client.cabal | |||
@@ -75,15 +75,10 @@ library | |||
75 | , RecordWildCards | 75 | , RecordWildCards |
76 | , NondecreasingIndentation | 76 | , NondecreasingIndentation |
77 | hs-source-dirs: src, ., Presence | 77 | hs-source-dirs: src, ., Presence |
78 | exposed-modules: Control.Concurrent.ThreadUtil | 78 | exposed-modules: Data.Digest.CRC32C |
79 | Network.SocketLike | ||
80 | Data.Digest.CRC32C | ||
81 | Data.Bits.ByteString | 79 | Data.Bits.ByteString |
82 | Data.TableMethods | ||
83 | Network.BitTorrent.DHT.ContactInfo | 80 | Network.BitTorrent.DHT.ContactInfo |
84 | Network.BitTorrent.DHT.Token | 81 | Network.BitTorrent.DHT.Token |
85 | Network.QueryResponse | ||
86 | Network.StreamServer | ||
87 | Data.BEncode.Pretty | 82 | Data.BEncode.Pretty |
88 | Network.BitTorrent.MainlineDHT | 83 | Network.BitTorrent.MainlineDHT |
89 | Network.BitTorrent.MainlineDHT.Symbols | 84 | Network.BitTorrent.MainlineDHT.Symbols |
@@ -107,7 +102,6 @@ library | |||
107 | Network.Tox.Avahi | 102 | Network.Tox.Avahi |
108 | Network.Tox.RelayPinger | 103 | Network.Tox.RelayPinger |
109 | Network.UPNP | 104 | Network.UPNP |
110 | Network.QueryResponse.TCP | ||
111 | Network.Tox.Relay | 105 | Network.Tox.Relay |
112 | Network.Tox.TCP | 106 | Network.Tox.TCP |
113 | Data.Tox.Msg | 107 | Data.Tox.Msg |
@@ -117,7 +111,6 @@ library | |||
117 | Network.Tox.ContactInfo | 111 | Network.Tox.ContactInfo |
118 | Announcer | 112 | Announcer |
119 | Announcer.Tox | 113 | Announcer.Tox |
120 | Control.Concurrent.Delay | ||
121 | ByteStringOperators | 114 | ByteStringOperators |
122 | ClientState | 115 | ClientState |
123 | ConfigFiles | 116 | ConfigFiles |
@@ -126,19 +119,14 @@ library | |||
126 | Control.Concurrent.STM.StatusCache | 119 | Control.Concurrent.STM.StatusCache |
127 | Control.Concurrent.STM.UpdateStream | 120 | Control.Concurrent.STM.UpdateStream |
128 | Control.Concurrent.STM.Util | 121 | Control.Concurrent.STM.Util |
129 | ControlMaybe | ||
130 | Data.BitSyntax | 122 | Data.BitSyntax |
131 | DNSCache | ||
132 | EventUtil | 123 | EventUtil |
133 | FGConsole | 124 | FGConsole |
134 | GetHostByAddr | ||
135 | LocalPeerCred | 125 | LocalPeerCred |
136 | LockedChan | 126 | LockedChan |
137 | Logging | 127 | Logging |
138 | Nesting | 128 | Nesting |
139 | Paths | 129 | Paths |
140 | Connection.Tcp | ||
141 | SockAddr | ||
142 | UTmp | 130 | UTmp |
143 | MUC | 131 | MUC |
144 | LocalChat | 132 | LocalChat |
@@ -149,13 +137,10 @@ library | |||
149 | XMPPServer | 137 | XMPPServer |
150 | Util | 138 | Util |
151 | Presence | 139 | Presence |
152 | Control.Concurrent.PingMachine | ||
153 | Connection | ||
154 | ToxChat | 140 | ToxChat |
155 | ToxToXMPP | 141 | ToxToXMPP |
156 | ToxManager | 142 | ToxManager |
157 | XMPPToTox | 143 | XMPPToTox |
158 | DebugUtil | ||
159 | Data.IntervalSet | 144 | Data.IntervalSet |
160 | Data.Tox.Message | 145 | Data.Tox.Message |
161 | HandshakeCache | 146 | HandshakeCache |
@@ -233,6 +218,7 @@ library | |||
233 | , kad | 218 | , kad |
234 | , tasks | 219 | , tasks |
235 | , torrent-types | 220 | , torrent-types |
221 | , server | ||
236 | 222 | ||
237 | if impl(ghc < 8) | 223 | if impl(ghc < 8) |
238 | Build-depends: transformers | 224 | Build-depends: transformers |
@@ -319,6 +305,7 @@ executable dhtd | |||
319 | , pretty | 305 | , pretty |
320 | , dependent-sum | 306 | , dependent-sum |
321 | , dht-client | 307 | , dht-client |
308 | , server | ||
322 | , dput-hslogger | 309 | , dput-hslogger |
323 | , word64-map | 310 | , word64-map |
324 | , tox-crypto | 311 | , tox-crypto |
@@ -368,6 +355,7 @@ executable testTox | |||
368 | default-language: Haskell2010 | 355 | default-language: Haskell2010 |
369 | build-depends: base | 356 | build-depends: base |
370 | , dht-client | 357 | , dht-client |
358 | , server | ||
371 | , dput-hslogger | 359 | , dput-hslogger |
372 | , tox-crypto | 360 | , tox-crypto |
373 | , lifted-concurrent | 361 | , lifted-concurrent |
diff --git a/dht/src/Control/Concurrent/Delay.hs b/dht/src/Control/Concurrent/Delay.hs deleted file mode 100644 index 67dcd451..00000000 --- a/dht/src/Control/Concurrent/Delay.hs +++ /dev/null | |||
@@ -1,49 +0,0 @@ | |||
1 | module Control.Concurrent.Delay where | ||
2 | |||
3 | import Control.Concurrent | ||
4 | import Control.Monad | ||
5 | import Control.Exception ({-evaluate,-}handle,finally,throwIO) | ||
6 | import Data.Time.Clock (NominalDiffTime) | ||
7 | import System.IO.Error | ||
8 | |||
9 | type Microseconds = Int | ||
10 | |||
11 | microseconds :: NominalDiffTime -> Microseconds | ||
12 | microseconds d = round $ 1000000 * d | ||
13 | |||
14 | data InterruptibleDelay = InterruptibleDelay | ||
15 | { delayThread :: MVar ThreadId | ||
16 | } | ||
17 | |||
18 | interruptibleDelay :: IO InterruptibleDelay | ||
19 | interruptibleDelay = do | ||
20 | fmap InterruptibleDelay newEmptyMVar | ||
21 | |||
22 | -- | Delay for the given number of microseconds and return 'True' if the delay | ||
23 | -- is not interrupted. | ||
24 | -- | ||
25 | -- Note: If a thread is already waiting on the given 'InterruptibleDelay' | ||
26 | -- object, then this will block until it becomes available and only then start | ||
27 | -- the delay timer. | ||
28 | startDelay :: InterruptibleDelay -> Microseconds -> IO Bool | ||
29 | startDelay d interval = do | ||
30 | thread <- myThreadId | ||
31 | handle (\e -> do when (not $ isUserError e) (throwIO e) | ||
32 | return False) $ do | ||
33 | putMVar (delayThread d) thread | ||
34 | threadDelay interval | ||
35 | void $ takeMVar (delayThread d) | ||
36 | return True | ||
37 | -- The following cleanup shouldn't be necessary, but I'm paranoid. | ||
38 | `finally` tryTakeMVar (delayThread d) | ||
39 | |||
40 | where debugNoise str = return () | ||
41 | |||
42 | -- | Signal the thread waiting on the given 'InterruptibleDelay' object to | ||
43 | -- continue even though the timeout has not elapsed. If no thread is waiting, | ||
44 | -- then this is a no-op. | ||
45 | interruptDelay :: InterruptibleDelay -> IO () | ||
46 | interruptDelay d = do | ||
47 | mthread <- tryTakeMVar (delayThread d) | ||
48 | forM_ mthread $ \thread -> do | ||
49 | throwTo thread (userError "Interrupted delay") | ||
diff --git a/dht/src/Control/Concurrent/PingMachine.hs b/dht/src/Control/Concurrent/PingMachine.hs deleted file mode 100644 index a8f10e83..00000000 --- a/dht/src/Control/Concurrent/PingMachine.hs +++ /dev/null | |||
@@ -1,161 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE TupleSections #-} | ||
3 | module Control.Concurrent.PingMachine where | ||
4 | |||
5 | import Control.Monad | ||
6 | import Data.Function | ||
7 | #ifdef THREAD_DEBUG | ||
8 | import Control.Concurrent.Lifted.Instrument | ||
9 | #else | ||
10 | import Control.Concurrent.Lifted | ||
11 | import GHC.Conc (labelThread) | ||
12 | #endif | ||
13 | import Control.Concurrent.STM | ||
14 | |||
15 | import Control.Concurrent.Delay | ||
16 | |||
17 | type Miliseconds = Int | ||
18 | type TimeOut = Miliseconds | ||
19 | type PingInterval = Miliseconds | ||
20 | |||
21 | -- | Events that occur as a result of the 'PingMachine' watchdog. | ||
22 | -- | ||
23 | -- Use 'pingWait' to wait for one of these to occur. | ||
24 | data PingEvent | ||
25 | = PingIdle -- ^ You should send a ping if you observe this event. | ||
26 | | PingTimeOut -- ^ You should give up on the connection in case of this event. | ||
27 | |||
28 | data PingMachine = PingMachine | ||
29 | { pingFlag :: TVar Bool | ||
30 | , pingInterruptible :: InterruptibleDelay | ||
31 | , pingEvent :: TMVar PingEvent | ||
32 | , pingStarted :: TMVar Bool | ||
33 | } | ||
34 | |||
35 | -- | Fork a thread to monitor a connection for a ping timeout. | ||
36 | -- | ||
37 | -- If 'pingBump' is not invoked after a idle is signaled, a timeout event will | ||
38 | -- occur. When that happens, even if the caller chooses to ignore this event, | ||
39 | -- the watchdog thread will be terminated and no more ping events will be | ||
40 | -- signaled. | ||
41 | -- | ||
42 | -- An idle connection will be signaled by: | ||
43 | -- | ||
44 | -- (1) 'pingFlag' is set 'True' | ||
45 | -- | ||
46 | -- (2) 'pingWait' returns 'PingIdle' | ||
47 | -- | ||
48 | -- Either may be tested to determine whether a ping should be sent but | ||
49 | -- 'pingFlag' is difficult to use properly because it is up to the caller to | ||
50 | -- remember that the ping is already in progress. | ||
51 | forkPingMachine | ||
52 | :: String | ||
53 | -> PingInterval -- ^ Milliseconds of idle before a ping is considered necessary. | ||
54 | -> TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'. | ||
55 | -> IO PingMachine | ||
56 | forkPingMachine label idle timeout = do | ||
57 | d <- interruptibleDelay | ||
58 | flag <- atomically $ newTVar False | ||
59 | canceled <- atomically $ newTVar False | ||
60 | event <- atomically newEmptyTMVar | ||
61 | started <- atomically $ newEmptyTMVar | ||
62 | when (idle/=0) $ void . forkIO $ do | ||
63 | myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog") | ||
64 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
65 | fix $ \loop -> do | ||
66 | atomically $ writeTVar flag False | ||
67 | fin <- startDelay d (1000*idle) | ||
68 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
69 | if (not fin) then loop | ||
70 | else do | ||
71 | -- Idle event | ||
72 | atomically $ do | ||
73 | tryTakeTMVar event | ||
74 | putTMVar event PingIdle | ||
75 | writeTVar flag True | ||
76 | fin <- startDelay d (1000*timeout) | ||
77 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
78 | me <- myThreadId | ||
79 | if (not fin) then loop | ||
80 | else do | ||
81 | -- Timeout event | ||
82 | atomically $ do | ||
83 | tryTakeTMVar event | ||
84 | writeTVar flag False | ||
85 | putTMVar event PingTimeOut | ||
86 | return PingMachine | ||
87 | { pingFlag = flag | ||
88 | , pingInterruptible = d | ||
89 | , pingEvent = event | ||
90 | , pingStarted = started | ||
91 | } | ||
92 | |||
93 | -- | like 'forkPingMachine' but the timeout and idle parameters can be changed dynamically | ||
94 | -- Unlike 'forkPingMachine', 'forkPingMachineDynamic' always launches a thread | ||
95 | -- regardless of idle value. | ||
96 | forkPingMachineDynamic | ||
97 | :: String | ||
98 | -> TVar PingInterval -- ^ Milliseconds of idle before a ping is considered necessary. | ||
99 | -> TVar TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'. | ||
100 | -> IO PingMachine | ||
101 | forkPingMachineDynamic label idleV timeoutV = do | ||
102 | d <- interruptibleDelay | ||
103 | flag <- atomically $ newTVar False | ||
104 | canceled <- atomically $ newTVar False | ||
105 | event <- atomically newEmptyTMVar | ||
106 | started <- atomically $ newEmptyTMVar | ||
107 | void . forkIO $ do | ||
108 | myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog") | ||
109 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
110 | fix $ \loop -> do | ||
111 | atomically $ writeTVar flag False | ||
112 | (idle,timeout) <- atomically $ (,) <$> readTVar idleV <*> readTVar timeoutV | ||
113 | fin <- startDelay d (1000*idle) | ||
114 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
115 | if (not fin) then loop | ||
116 | else do | ||
117 | -- Idle event | ||
118 | atomically $ do | ||
119 | tryTakeTMVar event | ||
120 | putTMVar event PingIdle | ||
121 | writeTVar flag True | ||
122 | fin <- startDelay d (1000*timeout) | ||
123 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
124 | me <- myThreadId | ||
125 | if (not fin) then loop | ||
126 | else do | ||
127 | -- Timeout event | ||
128 | atomically $ do | ||
129 | tryTakeTMVar event | ||
130 | writeTVar flag False | ||
131 | putTMVar event PingTimeOut | ||
132 | return PingMachine | ||
133 | { pingFlag = flag | ||
134 | , pingInterruptible = d | ||
135 | , pingEvent = event | ||
136 | , pingStarted = started | ||
137 | } | ||
138 | |||
139 | -- | Terminate the watchdog thread. Call this upon connection close. | ||
140 | -- | ||
141 | -- You should ensure no threads are waiting on 'pingWait' because there is no | ||
142 | -- 'PingEvent' signaling termination. | ||
143 | pingCancel :: PingMachine -> IO () | ||
144 | pingCancel me = do | ||
145 | atomically $ do tryTakeTMVar (pingStarted me) | ||
146 | putTMVar (pingStarted me) False | ||
147 | interruptDelay (pingInterruptible me) | ||
148 | |||
149 | -- | Reset the ping timer. Call this regularly to prevent 'PingTimeOut'. | ||
150 | pingBump :: PingMachine -> IO () | ||
151 | pingBump me = do | ||
152 | atomically $ do | ||
153 | b <- tryReadTMVar (pingStarted me) | ||
154 | when (b/=Just False) $ do | ||
155 | tryTakeTMVar (pingStarted me) | ||
156 | putTMVar (pingStarted me) True | ||
157 | interruptDelay (pingInterruptible me) | ||
158 | |||
159 | -- | Retries until a 'PingEvent' occurs. | ||
160 | pingWait :: PingMachine -> STM PingEvent | ||
161 | pingWait me = takeTMVar (pingEvent me) | ||
diff --git a/dht/src/Control/Concurrent/ThreadUtil.hs b/dht/src/Control/Concurrent/ThreadUtil.hs deleted file mode 100644 index a258d933..00000000 --- a/dht/src/Control/Concurrent/ThreadUtil.hs +++ /dev/null | |||
@@ -1,31 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | module Control.Concurrent.ThreadUtil | ||
3 | ( | ||
4 | #ifdef THREAD_DEBUG | ||
5 | module Control.Concurrent.Lifted.Instrument | ||
6 | #else | ||
7 | module Control.Control.Lifted | ||
8 | , module GHC.Conc | ||
9 | #endif | ||
10 | ) where | ||
11 | |||
12 | #ifdef THREAD_DEBUG | ||
13 | import Control.Concurrent.Lifted.Instrument | ||
14 | #else | ||
15 | import Control.Concurrent.Lifted | ||
16 | import GHC.Conc (labelThread) | ||
17 | |||
18 | forkLabeled :: String -> IO () -> IO ThreadId | ||
19 | forkLabeled lbl action = do | ||
20 | t <- forkIO action | ||
21 | labelThread t lbl | ||
22 | return t | ||
23 | {-# INLINE forkLabeled #-} | ||
24 | |||
25 | forkOSLabeled :: String -> IO () -> IO ThreadId | ||
26 | forkOSLabeled lbl action = do | ||
27 | t <- forkOS action | ||
28 | labelThread t lbl | ||
29 | return t | ||
30 | {-# INLINE forkOSLabeled #-} | ||
31 | #endif | ||
diff --git a/dht/src/Data/TableMethods.hs b/dht/src/Data/TableMethods.hs deleted file mode 100644 index e4208a69..00000000 --- a/dht/src/Data/TableMethods.hs +++ /dev/null | |||
@@ -1,105 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE GADTs #-} | ||
3 | {-# LANGUAGE LambdaCase #-} | ||
4 | {-# LANGUAGE PartialTypeSignatures #-} | ||
5 | {-# LANGUAGE RankNTypes #-} | ||
6 | {-# LANGUAGE ScopedTypeVariables #-} | ||
7 | {-# LANGUAGE TupleSections #-} | ||
8 | module Data.TableMethods where | ||
9 | |||
10 | import Data.Functor.Contravariant | ||
11 | import Data.Time.Clock.POSIX | ||
12 | import Data.Word | ||
13 | import qualified Data.IntMap.Strict as IntMap | ||
14 | ;import Data.IntMap.Strict (IntMap) | ||
15 | import qualified Data.Map.Strict as Map | ||
16 | ;import Data.Map.Strict (Map) | ||
17 | import qualified Data.Word64Map as W64Map | ||
18 | ;import Data.Word64Map (Word64Map) | ||
19 | |||
20 | import Data.Wrapper.PSQ as PSQ | ||
21 | |||
22 | type Priority = POSIXTime | ||
23 | |||
24 | data OptionalPriority t tid x | ||
25 | = NoPriority | ||
26 | | HasPriority (Priority -> t x -> ([(tid, Priority, x)], t x)) | ||
27 | |||
28 | -- | The standard lookup table methods. | ||
29 | data TableMethods t tid = TableMethods | ||
30 | { -- | Insert a new /tid/ entry into the transaction table. | ||
31 | tblInsert :: forall a. tid -> a -> Priority -> t a -> t a | ||
32 | -- | Delete transaction /tid/ from the transaction table. | ||
33 | , tblDelete :: forall a. tid -> t a -> t a | ||
34 | -- | Lookup the value associated with transaction /tid/. | ||
35 | , tblLookup :: forall a. tid -> t a -> Maybe a | ||
36 | } | ||
37 | |||
38 | data QMethods t tid x = QMethods | ||
39 | { qTbl :: TableMethods t tid | ||
40 | , qAtMostView :: OptionalPriority t tid x | ||
41 | } | ||
42 | |||
43 | vanillaTable :: TableMethods t tid -> QMethods t tid x | ||
44 | vanillaTable tbl = QMethods tbl NoPriority | ||
45 | |||
46 | priorityTable :: TableMethods t tid | ||
47 | -> (Priority -> t x -> ([(k, Priority, x)], t x)) | ||
48 | -> (k -> x -> tid) | ||
49 | -> QMethods t tid x | ||
50 | priorityTable tbl atmost f = QMethods | ||
51 | { qTbl = tbl | ||
52 | , qAtMostView = HasPriority $ \p t -> case atmost p t of | ||
53 | (es,t') -> (map (\(k,p,a) -> (f k a, p, a)) es, t') | ||
54 | } | ||
55 | |||
56 | -- | Methods for using 'Data.IntMap'. | ||
57 | intMapMethods :: TableMethods IntMap Int | ||
58 | intMapMethods = TableMethods | ||
59 | { tblInsert = \tid a p -> IntMap.insert tid a | ||
60 | , tblDelete = IntMap.delete | ||
61 | , tblLookup = IntMap.lookup | ||
62 | } | ||
63 | |||
64 | -- | Methods for using 'Data.Word64Map'. | ||
65 | w64MapMethods :: TableMethods Word64Map Word64 | ||
66 | w64MapMethods = TableMethods | ||
67 | { tblInsert = \tid a p -> W64Map.insert tid a | ||
68 | , tblDelete = W64Map.delete | ||
69 | , tblLookup = W64Map.lookup | ||
70 | } | ||
71 | |||
72 | -- | Methods for using 'Data.Map' | ||
73 | mapMethods :: Ord tid => TableMethods (Map tid) tid | ||
74 | mapMethods = TableMethods | ||
75 | { tblInsert = \tid a p -> Map.insert tid a | ||
76 | , tblDelete = Map.delete | ||
77 | , tblLookup = Map.lookup | ||
78 | } | ||
79 | |||
80 | -- psqMethods :: PSQKey tid => QMethods (HashPSQ tid Priority) tid x | ||
81 | psqMethods :: PSQKey k => (tid -> k) -> (k -> x -> tid) -> QMethods (PSQ' k Priority) tid x | ||
82 | psqMethods g f = priorityTable (contramap g tbl) PSQ.atMostView f | ||
83 | where | ||
84 | tbl :: PSQKey tid => TableMethods (PSQ' tid Priority) tid | ||
85 | tbl = TableMethods | ||
86 | { tblInsert = PSQ.insert' | ||
87 | , tblDelete = PSQ.delete | ||
88 | , tblLookup = \tid t -> case PSQ.lookup tid t of | ||
89 | Just (p,a) -> Just a | ||
90 | Nothing -> Nothing | ||
91 | } | ||
92 | |||
93 | |||
94 | -- | Change the key type for a lookup table implementation. | ||
95 | -- | ||
96 | -- This can be used with 'intMapMethods' or 'mapMethods' to restrict lookups to | ||
97 | -- only a part of the generated /tid/ value. This is useful for /tid/ types | ||
98 | -- that are especially large due their use for other purposes, such as secure | ||
99 | -- nonces for encryption. | ||
100 | instance Contravariant (TableMethods t) where | ||
101 | -- contramap :: (tid -> t1) -> TableMethods t t1 -> TableMethods t tid | ||
102 | contramap f (TableMethods ins del lookup) = | ||
103 | TableMethods (\k p v t -> ins (f k) p v t) | ||
104 | (\k t -> del (f k) t) | ||
105 | (\k t -> lookup (f k) t) | ||
diff --git a/dht/src/DebugUtil.hs b/dht/src/DebugUtil.hs deleted file mode 100644 index 96ab8cc5..00000000 --- a/dht/src/DebugUtil.hs +++ /dev/null | |||
@@ -1,42 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | module DebugUtil where | ||
3 | |||
4 | import Control.Monad | ||
5 | import Data.Time.Clock | ||
6 | import Data.List | ||
7 | import Text.Printf | ||
8 | import GHC.Conc (threadStatus,ThreadStatus(..)) | ||
9 | #ifdef THREAD_DEBUG | ||
10 | import Control.Concurrent.Lifted.Instrument | ||
11 | #else | ||
12 | import Control.Concurrent.Lifted | ||
13 | import GHC.Conc (labelThread) | ||
14 | #endif | ||
15 | |||
16 | showReport :: [(String,String)] -> String | ||
17 | showReport kvs = showColumns $ map (\(x,y)->[x,y]) kvs | ||
18 | |||
19 | showColumns :: [[String]] -> String | ||
20 | showColumns rows = do | ||
21 | let cols = transpose rows | ||
22 | ws = map (maximum . map (succ . length)) cols | ||
23 | fs <- rows | ||
24 | _ <- take 1 fs -- Guard against empty rows so that 'last' is safe. | ||
25 | " " ++ concat (zipWith (printf "%-*s") (init ws) (init fs)) ++ last fs ++ "\n" | ||
26 | |||
27 | |||
28 | threadReport :: Bool -- ^ False to summarize search threads. | ||
29 | -> IO String | ||
30 | threadReport want_ss = do | ||
31 | threads <- threadsInformation | ||
32 | tm <- getCurrentTime | ||
33 | let (ss,ts) = partition (("search" `isPrefixOf`) . lbl . snd) | ||
34 | threads | ||
35 | r <- forM (if want_ss then threads else ts) $ \(tid,PerThread{..}) -> do | ||
36 | stat <- threadStatus tid | ||
37 | let showStat (ThreadBlocked reason) = show reason | ||
38 | showStat stat = show stat | ||
39 | return [show lbl,show (diffUTCTime tm startTime),showStat stat] | ||
40 | return $ unlines [ showColumns r | ||
41 | , (if want_ss then " There are " else " and ") | ||
42 | ++ show (length ss) ++ " search threads." ] | ||
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs deleted file mode 100644 index 20e7ecf0..00000000 --- a/dht/src/Network/QueryResponse.hs +++ /dev/null | |||
@@ -1,716 +0,0 @@ | |||
1 | -- | This module can implement any query\/response protocol. It was written | ||
2 | -- with Kademlia implementations in mind. | ||
3 | |||
4 | {-# LANGUAGE CPP #-} | ||
5 | {-# LANGUAGE GADTs #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | {-# LANGUAGE PartialTypeSignatures #-} | ||
8 | {-# LANGUAGE RankNTypes #-} | ||
9 | {-# LANGUAGE ScopedTypeVariables #-} | ||
10 | {-# LANGUAGE TupleSections #-} | ||
11 | module Network.QueryResponse where | ||
12 | |||
13 | #ifdef THREAD_DEBUG | ||
14 | import Control.Concurrent.Lifted.Instrument | ||
15 | #else | ||
16 | import Control.Concurrent | ||
17 | import GHC.Conc (labelThread) | ||
18 | #endif | ||
19 | import Control.Concurrent.STM | ||
20 | import Control.Exception | ||
21 | import Control.Monad | ||
22 | import qualified Data.ByteString as B | ||
23 | ;import Data.ByteString (ByteString) | ||
24 | import Data.Dependent.Map as DMap | ||
25 | import Data.Dependent.Sum | ||
26 | import Data.Function | ||
27 | import Data.Functor.Contravariant | ||
28 | import Data.Functor.Identity | ||
29 | import Data.GADT.Show | ||
30 | import qualified Data.IntMap.Strict as IntMap | ||
31 | ;import Data.IntMap.Strict (IntMap) | ||
32 | import qualified Data.Map.Strict as Map | ||
33 | ;import Data.Map.Strict (Map) | ||
34 | import Data.Time.Clock.POSIX | ||
35 | import qualified Data.Word64Map as W64Map | ||
36 | ;import Data.Word64Map (Word64Map) | ||
37 | import Data.Word | ||
38 | import Data.Maybe | ||
39 | import GHC.Conc (closeFdWith) | ||
40 | import GHC.Event | ||
41 | import Network.Socket | ||
42 | import Network.Socket.ByteString as B | ||
43 | import System.Endian | ||
44 | import System.IO | ||
45 | import System.IO.Error | ||
46 | import System.Timeout | ||
47 | |||
48 | import DPut | ||
49 | import DebugTag | ||
50 | import Data.TableMethods | ||
51 | |||
52 | -- | An inbound packet or condition raised while monitoring a connection. | ||
53 | data Arrival err addr x | ||
54 | = Terminated -- ^ Virtual message that signals EOF. | ||
55 | | ParseError !err -- ^ A badly-formed message was received. | ||
56 | | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message. | ||
57 | |||
58 | -- | Three methods are required to implement a datagram based query\/response protocol. | ||
59 | data TransportA err addr x y = Transport | ||
60 | { -- | Blocks until an inbound packet is available. Then calls the provided | ||
61 | -- continuation with the packet and origin addres or an error condition. | ||
62 | awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a) | ||
63 | -- | Send an /y/ packet to the given destination /addr/. | ||
64 | , sendMessage :: addr -> y -> IO () | ||
65 | -- | Shutdown and clean up any state related to this 'Transport'. | ||
66 | , setActive :: Bool -> IO () | ||
67 | } | ||
68 | |||
69 | type Transport err addr x = TransportA err addr x x | ||
70 | |||
71 | closeTransport :: TransportA err addr x y -> IO () | ||
72 | closeTransport tr = setActive tr False | ||
73 | |||
74 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
75 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
76 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
77 | -- associated public keys. | ||
78 | layerTransportM :: | ||
79 | (x -> addr -> IO (Either err (x', addr'))) | ||
80 | -- ^ Function that attempts to transform a low-level address/packet | ||
81 | -- pair into a higher level representation. | ||
82 | -> (y' -> addr' -> IO (y, addr)) | ||
83 | -- ^ Function to encode a high-level address/packet into a lower level | ||
84 | -- representation. | ||
85 | -> TransportA err addr x y | ||
86 | -- ^ The low-level transport to be transformed. | ||
87 | -> TransportA err addr' x' y' | ||
88 | layerTransportM parse encode tr = | ||
89 | tr { awaitMessage = \kont -> | ||
90 | awaitMessage tr $ \case | ||
91 | Terminated -> kont $ Terminated | ||
92 | ParseError e -> kont $ ParseError e | ||
93 | Arrival addr x -> parse x addr >>= \case | ||
94 | Left e -> kont $ ParseError e | ||
95 | Right (x',addr') -> kont $ Arrival addr' x' | ||
96 | , sendMessage = \addr' msg' -> do | ||
97 | (msg,addr) <- encode msg' addr' | ||
98 | sendMessage tr addr msg | ||
99 | } | ||
100 | |||
101 | |||
102 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
103 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
104 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
105 | -- associated public keys. | ||
106 | layerTransport :: | ||
107 | (x -> addr -> Either err (x', addr')) | ||
108 | -- ^ Function that attempts to transform a low-level address/packet | ||
109 | -- pair into a higher level representation. | ||
110 | -> (y' -> addr' -> (y, addr)) | ||
111 | -- ^ Function to encode a high-level address/packet into a lower level | ||
112 | -- representation. | ||
113 | -> TransportA err addr x y | ||
114 | -- ^ The low-level transport to be transformed. | ||
115 | -> TransportA err addr' x' y' | ||
116 | layerTransport parse encode tr = | ||
117 | layerTransportM (\x addr -> return $ parse x addr) | ||
118 | (\x' addr' -> return $ encode x' addr') | ||
119 | tr | ||
120 | |||
121 | -- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan' | ||
122 | -- is used to share the same underlying socket, so be sure to fork a thread for | ||
123 | -- both returned 'Transport's to avoid hanging. | ||
124 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | ||
125 | -> ((y,xaddr) -> IO (Maybe (c,a))) | ||
126 | -> TransportA err a b c | ||
127 | -> IO (TransportA err xaddr x y, TransportA err a b c) | ||
128 | partitionTransportM parse encodex tr = do | ||
129 | tchan <- atomically newTChan | ||
130 | let ytr = tr { awaitMessage = \kont -> fix $ \again -> do | ||
131 | awaitMessage tr $ \m -> case m of | ||
132 | Arrival adr msg -> parse (msg,adr) >>= \case | ||
133 | Left x -> atomically (writeTChan tchan (Just x)) >> join (atomically again) | ||
134 | Right (y,yaddr) -> kont $ Arrival yaddr y | ||
135 | ParseError e -> kont $ ParseError e | ||
136 | Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated | ||
137 | , sendMessage = sendMessage tr | ||
138 | } | ||
139 | xtr = Transport | ||
140 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case | ||
141 | Nothing -> Terminated | ||
142 | Just (x,xaddr) -> Arrival xaddr x | ||
143 | , sendMessage = \addr' msg' -> do | ||
144 | msg_addr <- encodex (msg',addr') | ||
145 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr | ||
146 | , setActive = const $ return () | ||
147 | } | ||
148 | return (xtr, ytr) | ||
149 | |||
150 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan' | ||
151 | -- is used to share the same underlying socket, so be sure to fork a thread for | ||
152 | -- both returned 'Transport's to avoid hanging. | ||
153 | partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) | ||
154 | -> ((y,xaddr) -> Maybe (c,a)) | ||
155 | -> TransportA err a b c | ||
156 | -> IO (TransportA err xaddr x y, TransportA err a b c) | ||
157 | partitionTransport parse encodex tr = | ||
158 | partitionTransportM (return . parse) (return . encodex) tr | ||
159 | |||
160 | -- | | ||
161 | -- * f add x --> Nothing, consume x | ||
162 | -- --> Just id, leave x to a different handler | ||
163 | -- --> Just g, apply g to x and leave that to a different handler | ||
164 | -- | ||
165 | -- Note: If you add a handler to one of the branches before applying a | ||
166 | -- 'mergeTransports' combinator, then this handler may not block or return | ||
167 | -- Nothing. | ||
168 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> TransportA err addr x y -> TransportA err addr x y | ||
169 | addHandler onParseError f tr = tr | ||
170 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case | ||
171 | Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) | ||
172 | ParseError e -> onParseError e >> kont (ParseError e) | ||
173 | Terminated -> kont Terminated | ||
174 | } | ||
175 | |||
176 | -- | Modify a 'Transport' to invoke an action upon every received packet. | ||
177 | onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x | ||
178 | onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (Just id)) tr | ||
179 | |||
180 | -- * Using a query\/response client. | ||
181 | |||
182 | -- | Fork a thread that handles inbound packets. The returned action may be used | ||
183 | -- to terminate the thread and clean up any related state. | ||
184 | -- | ||
185 | -- Example usage: | ||
186 | -- | ||
187 | -- > -- Start client. | ||
188 | -- > quitServer <- forkListener "listener" (clientNet client) | ||
189 | -- > -- Send a query q, recieve a response r. | ||
190 | -- > r <- sendQuery client method q | ||
191 | -- > -- Quit client. | ||
192 | -- > quitServer | ||
193 | forkListener :: String -> Transport err addr x -> IO (IO ()) | ||
194 | forkListener name client = do | ||
195 | setActive client True | ||
196 | thread_id <- forkIO $ do | ||
197 | myThreadId >>= flip labelThread ("listener."++name) | ||
198 | fix $ \loop -> join $ atomically $ awaitMessage client $ \case | ||
199 | Terminated -> return () | ||
200 | _ -> loop | ||
201 | dput XMisc $ "Listener died: " ++ name | ||
202 | return $ do | ||
203 | setActive client False | ||
204 | -- killThread thread_id | ||
205 | |||
206 | -- * Implementing a query\/response 'Client'. | ||
207 | |||
208 | -- | These methods indicate what should be done upon various conditions. Write | ||
209 | -- to a log file, make debug prints, or simply ignore them. | ||
210 | -- | ||
211 | -- [ /addr/ ] Address of remote peer. | ||
212 | -- | ||
213 | -- [ /x/ ] Incoming or outgoing packet. | ||
214 | -- | ||
215 | -- [ /meth/ ] Method id of incoming or outgoing request. | ||
216 | -- | ||
217 | -- [ /tid/ ] Transaction id for outgoing packet. | ||
218 | -- | ||
219 | -- [ /err/ ] Error information, typically a 'String'. | ||
220 | data ErrorReporter addr x meth tid err = ErrorReporter | ||
221 | { -- | Incoming: failed to parse packet. | ||
222 | reportParseError :: err -> IO () | ||
223 | -- | Incoming: no handler for request. | ||
224 | , reportMissingHandler :: meth -> addr -> x -> IO () | ||
225 | -- | Incoming: unable to identify request. | ||
226 | , reportUnknown :: addr -> x -> err -> IO () | ||
227 | } | ||
228 | |||
229 | ignoreErrors :: ErrorReporter addr x meth tid err | ||
230 | ignoreErrors = ErrorReporter | ||
231 | { reportParseError = \_ -> return () | ||
232 | , reportMissingHandler = \_ _ _ -> return () | ||
233 | , reportUnknown = \_ _ _ -> return () | ||
234 | } | ||
235 | |||
236 | logErrors :: ( Show addr | ||
237 | , Show meth | ||
238 | ) => ErrorReporter addr x meth tid String | ||
239 | logErrors = ErrorReporter | ||
240 | { reportParseError = \err -> dput XMisc err | ||
241 | , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" | ||
242 | , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err | ||
243 | } | ||
244 | |||
245 | printErrors :: ( Show addr | ||
246 | , Show meth | ||
247 | ) => Handle -> ErrorReporter addr x meth tid String | ||
248 | printErrors h = ErrorReporter | ||
249 | { reportParseError = \err -> hPutStrLn h err | ||
250 | , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" | ||
251 | , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err | ||
252 | } | ||
253 | |||
254 | -- Change the /err/ type for an 'ErrorReporter'. | ||
255 | instance Contravariant (ErrorReporter addr x meth tid) where | ||
256 | -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 | ||
257 | contramap f (ErrorReporter pe mh unk) | ||
258 | = ErrorReporter (\e -> pe (f e)) | ||
259 | mh | ||
260 | (\addr x e -> unk addr x (f e)) | ||
261 | |||
262 | -- | An incoming message can be classified into three cases. | ||
263 | data MessageClass err meth tid addr x | ||
264 | = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response | ||
265 | -- should include the provided /tid/ value. | ||
266 | | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. | ||
267 | | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked | ||
268 | -- with the source and destination address of a message. If it handles the | ||
269 | -- message, it should return Nothing. Otherwise, it should return a transform | ||
270 | -- (usually /id/) to apply before the next handler examines it. | ||
271 | | IsUnknown err -- ^ None of the above. | ||
272 | |||
273 | -- | Handler for an inbound query of type /x/ from an address of type _addr_. | ||
274 | type MethodHandler err tid addr x = MethodHandlerA err tid addr x x | ||
275 | |||
276 | -- | Handler for an inbound query of type /x/ with outbound response of type | ||
277 | -- /y/ to an address of type /addr/. | ||
278 | data MethodHandlerA err tid addr x y = forall a b. MethodHandler | ||
279 | { -- | Parse the query into a more specific type for this method. | ||
280 | methodParse :: x -> Either err a | ||
281 | -- | Serialize the response for transmission, given a context /ctx/ and the origin | ||
282 | -- and destination addresses. | ||
283 | , methodSerialize :: tid -> addr -> addr -> b -> y | ||
284 | -- | Fully typed action to perform upon the query. The remote origin | ||
285 | -- address of the query is provided to the handler. | ||
286 | -- | ||
287 | -- TODO: Allow queries to be ignored? | ||
288 | , methodAction :: addr -> a -> IO b | ||
289 | } | ||
290 | -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. | ||
291 | | forall a. NoReply | ||
292 | { -- | Parse the query into a more specific type for this method. | ||
293 | methodParse :: x -> Either err a | ||
294 | -- | Fully typed action to perform upon the query. The remote origin | ||
295 | -- address of the query is provided to the handler. | ||
296 | , noreplyAction :: addr -> a -> IO () | ||
297 | } | ||
298 | |||
299 | |||
300 | -- | To dispatch responses to our outbound queries, we require three | ||
301 | -- primitives. See the 'transactionMethods' function to create these | ||
302 | -- primitives out of a lookup table and a generator for transaction ids. | ||
303 | -- | ||
304 | -- The type variable /d/ is used to represent the current state of the | ||
305 | -- transaction generator and the table of pending transactions. | ||
306 | data TransactionMethods d qid addr x = TransactionMethods | ||
307 | { | ||
308 | -- | Before a query is sent, this function stores an 'MVar' to which the | ||
309 | -- response will be written too. The returned /qid/ is a transaction id | ||
310 | -- that can be used to forget the 'MVar' if the remote peer is not | ||
311 | -- responding. | ||
312 | dispatchRegister :: POSIXTime -- time of expiry | ||
313 | -> (Maybe x -> IO ()) -- callback upon response (or timeout) | ||
314 | -> addr | ||
315 | -> d | ||
316 | -> STM (qid, d) | ||
317 | -- | This method is invoked when an incoming packet /x/ indicates it is | ||
318 | -- a response to the transaction with id /qid/. The returned IO action | ||
319 | -- will write the packet to the correct 'MVar' thus completing the | ||
320 | -- dispatch. | ||
321 | , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) | ||
322 | -- | When a timeout interval elapses, this method is called to remove the | ||
323 | -- transaction from the table. | ||
324 | , dispatchCancel :: qid -> d -> STM d | ||
325 | } | ||
326 | |||
327 | -- | A set of methods necessary for dispatching incoming packets. | ||
328 | type DispatchMethods tbl err meth tid addr x = DispatchMethodsA tbl err meth tid addr x x | ||
329 | |||
330 | -- | A set of methods necessary for dispatching incoming packets. | ||
331 | data DispatchMethodsA tbl err meth tid addr x y = DispatchMethods | ||
332 | { -- | Classify an inbound packet as a query or response. | ||
333 | classifyInbound :: x -> MessageClass err meth tid addr x | ||
334 | -- | Lookup the handler for a inbound query. | ||
335 | , lookupHandler :: meth -> Maybe (MethodHandlerA err tid addr x y) | ||
336 | -- | Methods for handling incoming responses. | ||
337 | , tableMethods :: TransactionMethods tbl tid addr x | ||
338 | } | ||
339 | |||
340 | -- | All inputs required to implement a query\/response client. | ||
341 | type Client err meth tid addr x = ClientA err meth tid addr x x | ||
342 | |||
343 | -- | All inputs required to implement a query\/response client. | ||
344 | data ClientA err meth tid addr x y = forall tbl. Client | ||
345 | { -- | The 'Transport' used to dispatch and receive packets. | ||
346 | clientNet :: TransportA err addr x y | ||
347 | -- | Methods for handling inbound packets. | ||
348 | , clientDispatcher :: DispatchMethodsA tbl err meth tid addr x y | ||
349 | -- | Methods for reporting various conditions. | ||
350 | , clientErrorReporter :: ErrorReporter addr x meth tid err | ||
351 | -- | State necessary for routing inbound responses and assigning unique | ||
352 | -- /tid/ values for outgoing queries. | ||
353 | , clientPending :: TVar tbl | ||
354 | -- | An action yielding this client\'s own address. It is invoked once | ||
355 | -- on each outbound and inbound packet. It is valid for this to always | ||
356 | -- return the same value. | ||
357 | -- | ||
358 | -- The argument, if supplied, is the remote address for the transaction. | ||
359 | -- This can be used to maintain consistent aliases for specific peers. | ||
360 | , clientAddress :: Maybe addr -> IO addr | ||
361 | -- | Transform a query /tid/ value to an appropriate response /tid/ | ||
362 | -- value. Normally, this would be the identity transformation, but if | ||
363 | -- /tid/ includes a unique cryptographic nonce, then it should be | ||
364 | -- generated here. | ||
365 | , clientResponseId :: tid -> IO tid | ||
366 | } | ||
367 | |||
368 | -- | These four parameters are required to implement an outgoing query. A | ||
369 | -- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that | ||
370 | -- might be returned by 'lookupHandler'. | ||
371 | data MethodSerializerA tid addr x y meth a b = MethodSerializer | ||
372 | { -- | Returns the microseconds to wait for a response to this query being | ||
373 | -- sent to the given address. The /addr/ may also be modified to add | ||
374 | -- routing information. | ||
375 | methodTimeout :: addr -> STM (addr,Int) | ||
376 | -- | A method identifier used for error reporting. This needn't be the | ||
377 | -- same as the /meth/ argument to 'MethodHandler', but it is suggested. | ||
378 | , method :: meth | ||
379 | -- | Serialize the outgoing query /a/ into a transmittable packet /x/. | ||
380 | -- The /addr/ arguments are, respectively, our own origin address and the | ||
381 | -- destination of the request. The /tid/ argument is useful for attaching | ||
382 | -- auxiliary notations on all outgoing packets. | ||
383 | , wrapQuery :: tid -> addr -> addr -> a -> x | ||
384 | -- | Parse an inbound packet /x/ into a response /b/ for this query. | ||
385 | , unwrapResponse :: y -> b | ||
386 | } | ||
387 | |||
388 | type MethodSerializer tid addr x meth a b = MethodSerializerA tid addr x x meth a b | ||
389 | |||
390 | microsecondsDiff :: Int -> POSIXTime | ||
391 | microsecondsDiff us = fromIntegral us / 1000000 | ||
392 | |||
393 | asyncQuery_ :: Client err meth tid addr x | ||
394 | -> MethodSerializer tid addr x meth a b | ||
395 | -> a | ||
396 | -> addr | ||
397 | -> (Maybe b -> IO ()) | ||
398 | -> IO (tid,POSIXTime,Int) | ||
399 | asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do | ||
400 | now <- getPOSIXTime | ||
401 | (tid,addr,expiry) <- atomically $ do | ||
402 | tbl <- readTVar pending | ||
403 | (addr,expiry) <- methodTimeout meth addr0 | ||
404 | (tid, tbl') <- dispatchRegister (tableMethods d) | ||
405 | (now + microsecondsDiff expiry) | ||
406 | (withResponse . fmap (unwrapResponse meth)) | ||
407 | addr -- XXX: Should be addr0 or addr? | ||
408 | tbl | ||
409 | -- (addr,expiry) <- methodTimeout meth tid addr0 | ||
410 | writeTVar pending tbl' | ||
411 | return (tid,addr,expiry) | ||
412 | self <- whoami (Just addr) | ||
413 | mres <- do sendMessage net addr (wrapQuery meth tid self addr q) | ||
414 | return $ Just () | ||
415 | `catchIOError` (\e -> return Nothing) | ||
416 | return (tid,now,expiry) | ||
417 | |||
418 | asyncQuery :: Show meth => Client err meth tid addr x | ||
419 | -> MethodSerializer tid addr x meth a b | ||
420 | -> a | ||
421 | -> addr | ||
422 | -> (Maybe b -> IO ()) | ||
423 | -> IO () | ||
424 | asyncQuery client meth q addr withResponse0 = do | ||
425 | tm <- getSystemTimerManager | ||
426 | tidvar <- newEmptyMVar | ||
427 | timedout <- registerTimeout tm 1000000 $ do | ||
428 | dput XMisc $ "async TIMEDOUT " ++ show (method meth) | ||
429 | withResponse0 Nothing | ||
430 | tid <- takeMVar tidvar | ||
431 | dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) | ||
432 | case client of | ||
433 | Client { clientDispatcher = d, clientPending = pending } -> do | ||
434 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
435 | (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do | ||
436 | unregisterTimeout tm timedout | ||
437 | withResponse0 x | ||
438 | putMVar tidvar tid | ||
439 | updateTimeout tm timedout expiry | ||
440 | dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry | ||
441 | |||
442 | -- | Send a query to a remote peer. Note that this function will always time | ||
443 | -- out if 'forkListener' was never invoked to spawn a thread to receive and | ||
444 | -- dispatch the response. | ||
445 | sendQuery :: | ||
446 | forall err a b tbl x meth tid addr. | ||
447 | Client err meth tid addr x -- ^ A query/response implementation. | ||
448 | -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. | ||
449 | -> a -- ^ The outbound query. | ||
450 | -> addr -- ^ Destination address of query. | ||
451 | -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. | ||
452 | sendQuery c@(Client net d err pending whoami _) meth q addr0 = do | ||
453 | mvar <- newEmptyMVar | ||
454 | (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) | ||
455 | mres <- timeout expiry $ takeMVar mvar | ||
456 | case mres of | ||
457 | Just b -> return $ Just b | ||
458 | Nothing -> do | ||
459 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
460 | return Nothing | ||
461 | |||
462 | contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x | ||
463 | contramapAddr f (MethodHandler p s a) | ||
464 | = MethodHandler | ||
465 | p | ||
466 | (\tid src dst result -> s tid (f src) (f dst) result) | ||
467 | (\addr arg -> a (f addr) arg) | ||
468 | contramapAddr f (NoReply p a) | ||
469 | = NoReply p (\addr arg -> a (f addr) arg) | ||
470 | |||
471 | -- | Query handlers can throw this to ignore a query instead of responding to | ||
472 | -- it. | ||
473 | data DropQuery = DropQuery | ||
474 | deriving Show | ||
475 | |||
476 | instance Exception DropQuery | ||
477 | |||
478 | -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the | ||
479 | -- parse is successful, the returned IO action will construct our reply if | ||
480 | -- there is one. Otherwise, a parse err is returned. | ||
481 | dispatchQuery :: MethodHandlerA err tid addr x y -- ^ Handler to invoke. | ||
482 | -> tid -- ^ The transaction id for this query\/response session. | ||
483 | -> addr -- ^ Our own address, to which the query was sent. | ||
484 | -> x -- ^ The query packet. | ||
485 | -> addr -- ^ The origin address of the query. | ||
486 | -> Either err (IO (Maybe y)) | ||
487 | dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr = | ||
488 | fmap (\a -> catch (Just . wrapR tid self addr <$> f addr a) | ||
489 | (\DropQuery -> return Nothing)) | ||
490 | $ unwrapQ x | ||
491 | dispatchQuery (NoReply unwrapQ f) tid self x addr = | ||
492 | fmap (\a -> f addr a >> return Nothing) $ unwrapQ x | ||
493 | |||
494 | -- | Like 'transactionMethods' but allows extra information to be stored in the | ||
495 | -- table of pending transactions. This also enables multiple 'Client's to | ||
496 | -- share a single transaction table. | ||
497 | transactionMethods' :: | ||
498 | ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry | ||
499 | -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry | ||
500 | -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. | ||
501 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | ||
502 | -> TransactionMethods (g,t a) tid addr x | ||
503 | transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods | ||
504 | { dispatchCancel = \tid (g,t) -> return (g, delete tid t) | ||
505 | , dispatchRegister = \nowPlusExpiry v a (g,t) -> do | ||
506 | let (tid,g') = generate g | ||
507 | let t' = insert tid (store v) nowPlusExpiry t -- (now + microsecondsDiff expiry) t | ||
508 | return ( tid, (g',t') ) | ||
509 | , dispatchResponse = \tid x (g,t) -> | ||
510 | case lookup tid t of | ||
511 | Just v -> let t' = delete tid t | ||
512 | in return ((g,t'),void $ load v $ Just x) | ||
513 | Nothing -> return ((g,t), return ()) | ||
514 | } | ||
515 | |||
516 | -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a | ||
517 | -- function for generating unique transaction ids. | ||
518 | transactionMethods :: | ||
519 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. | ||
520 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | ||
521 | -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x | ||
522 | transactionMethods methods generate = transactionMethods' id id methods generate | ||
523 | |||
524 | -- | Handle a single inbound packet and then invoke the given continuation. | ||
525 | -- The 'forkListener' function is implemented by passing this function to 'fix' | ||
526 | -- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or | ||
527 | -- throws an exception. | ||
528 | handleMessage :: | ||
529 | ClientA err meth tid addr x y | ||
530 | -> addr | ||
531 | -> x | ||
532 | -> IO (Maybe (x -> x)) | ||
533 | handleMessage (Client net d err pending whoami responseID) addr plain = do | ||
534 | -- Just (Left e) -> do reportParseError err e | ||
535 | -- return $! Just id | ||
536 | -- Just (Right (plain, addr)) -> do | ||
537 | case classifyInbound d plain of | ||
538 | IsQuery meth tid -> case lookupHandler d meth of | ||
539 | Nothing -> do reportMissingHandler err meth addr plain | ||
540 | return $! Just id | ||
541 | Just m -> do | ||
542 | self <- whoami (Just addr) | ||
543 | tid' <- responseID tid | ||
544 | either (\e -> do reportParseError err e | ||
545 | return $! Just id) | ||
546 | (>>= \m -> do mapM_ (sendMessage net addr) m | ||
547 | return $! Nothing) | ||
548 | (dispatchQuery m tid' self plain addr) | ||
549 | IsUnsolicited action -> do | ||
550 | self <- whoami (Just addr) | ||
551 | action self addr | ||
552 | return Nothing | ||
553 | IsResponse tid -> do | ||
554 | action <- atomically $ do | ||
555 | ts0 <- readTVar pending | ||
556 | (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0 | ||
557 | writeTVar pending ts | ||
558 | return action | ||
559 | action | ||
560 | return $! Nothing | ||
561 | IsUnknown e -> do reportUnknown err addr plain e | ||
562 | return $! Just id | ||
563 | -- Nothing -> return $! id | ||
564 | |||
565 | -- * UDP Datagrams. | ||
566 | |||
567 | -- | Access the address family of a given 'SockAddr'. This convenient accessor | ||
568 | -- is missing from 'Network.Socket', so I implemented it here. | ||
569 | sockAddrFamily :: SockAddr -> Family | ||
570 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
571 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
572 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
573 | #if !MIN_VERSION_network(3,0,0) | ||
574 | sockAddrFamily _ = AF_CAN -- SockAddrCan constructor deprecated | ||
575 | #endif | ||
576 | |||
577 | -- | Packets with an empty payload may trigger EOF exception. | ||
578 | -- 'udpTransport' uses this function to avoid throwing in that | ||
579 | -- case. | ||
580 | ignoreEOF :: Socket -> MVar () -> Arrival e a x -> IOError -> IO (Arrival e a x) | ||
581 | ignoreEOF sock isClosed def e = do | ||
582 | done <- tryReadMVar isClosed | ||
583 | case done of | ||
584 | Just () -> do close sock | ||
585 | dput XMisc "Closing UDP socket." | ||
586 | pure Terminated | ||
587 | _ -> if isEOFError e then pure def | ||
588 | else throwIO e | ||
589 | |||
590 | -- | Hard-coded maximum packet size for incoming UDP Packets received via | ||
591 | -- 'udpTransport'. | ||
592 | udpBufferSize :: Int | ||
593 | udpBufferSize = 65536 | ||
594 | |||
595 | -- | Wrapper around 'B.sendTo' that silently ignores DoesNotExistError. | ||
596 | saferSendTo :: Socket -> ByteString -> SockAddr -> IO () | ||
597 | saferSendTo sock bs saddr = void (B.sendTo sock bs saddr) | ||
598 | `catch` \e -> | ||
599 | -- sendTo: does not exist (Network is unreachable) | ||
600 | -- Occurs when IPv6 or IPv4 network is not available. | ||
601 | -- Currently, we require -threaded to prevent a forever-hang in this case. | ||
602 | if isDoesNotExistError e | ||
603 | then return () | ||
604 | else throw e | ||
605 | |||
606 | -- | Like 'udpTransport' except also returns the raw socket (for broadcast use). | ||
607 | udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket) | ||
608 | udpTransport' bind_address = do | ||
609 | let family = sockAddrFamily bind_address | ||
610 | sock <- socket family Datagram defaultProtocol | ||
611 | when (family == AF_INET6) $ do | ||
612 | setSocketOption sock IPv6Only 0 | ||
613 | setSocketOption sock Broadcast 1 | ||
614 | bind sock bind_address | ||
615 | isClosed <- newEmptyMVar | ||
616 | udpTChan <- atomically newTChan | ||
617 | let tr = Transport { | ||
618 | awaitMessage = \kont -> do | ||
619 | r <- readTChan udpTChan | ||
620 | return $ kont $! r | ||
621 | , sendMessage = case family of | ||
622 | AF_INET6 -> \case | ||
623 | (SockAddrInet port addr) -> \bs -> | ||
624 | -- Change IPv4 to 4mapped6 address. | ||
625 | saferSendTo sock bs $ SockAddrInet6 port 0 (0,0,0x0000ffff,fromBE32 addr) 0 | ||
626 | addr6 -> \bs -> saferSendTo sock bs addr6 | ||
627 | AF_INET -> \case | ||
628 | (SockAddrInet6 port 0 (0,0,0x0000ffff,raw4) 0) -> \bs -> do | ||
629 | let host4 = toBE32 raw4 | ||
630 | -- Change 4mapped6 to ordinary IPv4. | ||
631 | -- dput XMisc $ "4mapped6 -> "++show (SockAddrInet port host4) | ||
632 | saferSendTo sock bs (SockAddrInet port host4) | ||
633 | addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) | ||
634 | addr4 -> \bs -> saferSendTo sock bs addr4 | ||
635 | _ -> \addr bs -> saferSendTo sock bs addr | ||
636 | , setActive = \case | ||
637 | False -> do | ||
638 | dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address | ||
639 | tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. | ||
640 | #if MIN_VERSION_network (3,1,0) | ||
641 | #elif MIN_VERSION_network(3,0,0) | ||
642 | let withFdSocket sock f = fdSocket sock >>= f >>= seq sock . return | ||
643 | #else | ||
644 | let withFdSocket sock f = f (fdSocket sock) >>= seq sock . return | ||
645 | #endif | ||
646 | withFdSocket sock $ \fd -> do | ||
647 | let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () | ||
648 | -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. | ||
649 | closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) | ||
650 | True -> do | ||
651 | udpThread <- forkIO $ fix $ \again -> do | ||
652 | r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do | ||
653 | uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize | ||
654 | atomically $ writeTChan udpTChan r | ||
655 | case r of Terminated -> return () | ||
656 | _ -> again | ||
657 | labelThread udpThread ("udp.io."++show bind_address) | ||
658 | } | ||
659 | return (tr, sock) | ||
660 | |||
661 | -- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The | ||
662 | -- argument is the listen-address for incoming packets. This is a useful | ||
663 | -- low-level 'Transport' that can be transformed for higher-level protocols | ||
664 | -- using 'layerTransport'. | ||
665 | udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString) | ||
666 | udpTransport bind_address = fst <$> udpTransport' bind_address | ||
667 | |||
668 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x | ||
669 | chanTransport chanFromAddr self achan aclosed = Transport | ||
670 | { awaitMessage = \kont -> do | ||
671 | x <- (uncurry (flip Arrival) <$> readTChan achan) | ||
672 | `orElse` | ||
673 | (readTVar aclosed >>= check >> return Terminated) | ||
674 | return $ kont x | ||
675 | , sendMessage = \them bs -> do | ||
676 | atomically $ writeTChan (chanFromAddr them) (bs,self) | ||
677 | , setActive = \case | ||
678 | False -> atomically $ writeTVar aclosed True | ||
679 | True -> return () | ||
680 | } | ||
681 | |||
682 | -- | Returns a pair of transports linked together to simulate two computers talking to each other. | ||
683 | testPairTransport :: IO (Transport err SockAddr ByteString, Transport err SockAddr ByteString) | ||
684 | testPairTransport = do | ||
685 | achan <- atomically newTChan | ||
686 | bchan <- atomically newTChan | ||
687 | aclosed <- atomically $ newTVar False | ||
688 | bclosed <- atomically $ newTVar False | ||
689 | let a = SockAddrInet 1 1 | ||
690 | b = SockAddrInet 2 2 | ||
691 | return ( chanTransport (const bchan) a achan aclosed | ||
692 | , chanTransport (const achan) b bchan bclosed ) | ||
693 | |||
694 | newtype ByAddress err x addr = ByAddress (Transport err addr x) | ||
695 | |||
696 | newtype Tagged x addr = Tagged x | ||
697 | |||
698 | decorateAddr :: tag addr -> Arrival e addr x -> Arrival e (DSum tag Identity) x | ||
699 | decorateAddr tag Terminated = Terminated | ||
700 | decorateAddr tag (ParseError e) = ParseError e | ||
701 | decorateAddr tag (Arrival addr x) = Arrival (tag ==> addr) x | ||
702 | |||
703 | mergeTransports :: GCompare tag => DMap tag (ByAddress err x) -> IO (Transport err (DSum tag Identity) x) | ||
704 | mergeTransports tmap = do | ||
705 | -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap | ||
706 | -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap | ||
707 | return Transport | ||
708 | { awaitMessage = \kont -> | ||
709 | foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n) | ||
710 | retry | ||
711 | tmap | ||
712 | , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of | ||
713 | Just (ByAddress tr) -> sendMessage tr addr x | ||
714 | Nothing -> return () | ||
715 | , setActive = \toggle -> foldrWithKey (\_ (ByAddress tr) next -> setActive tr toggle >> next) (return ()) tmap | ||
716 | } | ||
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs deleted file mode 100644 index 0028a5b6..00000000 --- a/dht/src/Network/QueryResponse/TCP.hs +++ /dev/null | |||
@@ -1,223 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
3 | {-# LANGUAGE LambdaCase #-} | ||
4 | module Network.QueryResponse.TCP where | ||
5 | |||
6 | #ifdef THREAD_DEBUG | ||
7 | import Control.Concurrent.Lifted.Instrument | ||
8 | #else | ||
9 | import Control.Concurrent.Lifted | ||
10 | import GHC.Conc (labelThread) | ||
11 | #endif | ||
12 | |||
13 | import Control.Arrow | ||
14 | import Control.Concurrent.STM | ||
15 | import Control.Concurrent.STM.TMVar | ||
16 | import Control.Monad | ||
17 | import Data.ByteString (ByteString,hPut) | ||
18 | import Data.Function | ||
19 | import Data.Hashable | ||
20 | import Data.Maybe | ||
21 | import Data.Ord | ||
22 | import Data.Time.Clock.POSIX | ||
23 | import Data.Word | ||
24 | import Data.String (IsString(..)) | ||
25 | import Network.BSD | ||
26 | import Network.Socket as Socket | ||
27 | import System.Timeout | ||
28 | import System.IO | ||
29 | import System.IO.Error | ||
30 | |||
31 | import DebugTag | ||
32 | import DebugUtil | ||
33 | import DPut | ||
34 | import Connection.Tcp (socketFamily) | ||
35 | import qualified Data.MinMaxPSQ as MM | ||
36 | import Network.QueryResponse | ||
37 | |||
38 | data TCPSession st | ||
39 | = PendingTCPSession | ||
40 | | TCPSession | ||
41 | { tcpHandle :: Handle | ||
42 | , tcpState :: st | ||
43 | , tcpThread :: ThreadId | ||
44 | } | ||
45 | |||
46 | newtype TCPAddress = TCPAddress SockAddr | ||
47 | deriving (Eq,Ord,Show) | ||
48 | |||
49 | instance Hashable TCPAddress where | ||
50 | hashWithSalt salt (TCPAddress x) = case x of | ||
51 | SockAddrInet port addr -> hashWithSalt salt (fromIntegral port :: Word16,addr) | ||
52 | SockAddrInet6 port b c d -> hashWithSalt salt (fromIntegral port :: Word16,b,c,d) | ||
53 | _ -> 0 | ||
54 | |||
55 | data TCPCache st = TCPCache | ||
56 | { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st)) | ||
57 | , tcpMax :: Int | ||
58 | } | ||
59 | |||
60 | -- This is a suitable /st/ parameter to 'TCPCache' | ||
61 | data SessionProtocol x y = SessionProtocol | ||
62 | { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination. | ||
63 | , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages. | ||
64 | , streamEncode :: y -> IO () -- ^ Serialize outbound messages. | ||
65 | } | ||
66 | |||
67 | data StreamHandshake addr x y = StreamHandshake | ||
68 | { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection. | ||
69 | , streamAddr :: addr -> SockAddr | ||
70 | } | ||
71 | |||
72 | killSession :: TCPSession st -> IO () | ||
73 | killSession PendingTCPSession = return () | ||
74 | killSession TCPSession{tcpThread=t} = killThread t | ||
75 | |||
76 | showStat :: IsString p => TCPSession st -> p | ||
77 | showStat r = case r of PendingTCPSession -> "pending." | ||
78 | TCPSession {} -> "established." | ||
79 | |||
80 | tcp_timeout :: Int | ||
81 | tcp_timeout = 10000000 | ||
82 | |||
83 | acquireConnection :: TMVar (Arrival a addr x) | ||
84 | -> TCPCache (SessionProtocol x y) | ||
85 | -> StreamHandshake addr x y | ||
86 | -> addr | ||
87 | -> Bool | ||
88 | -> IO (Maybe (y -> IO ())) | ||
89 | acquireConnection mvar tcpcache stream addr bDoCon = do | ||
90 | now <- getPOSIXTime | ||
91 | -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) | ||
92 | entry <- atomically $ do | ||
93 | c <- readTVar (lru tcpcache) | ||
94 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
95 | case v of | ||
96 | Nothing | bDoCon -> writeTVar (lru tcpcache) | ||
97 | $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c | ||
98 | | otherwise -> return () | ||
99 | Just (tm, v) -> writeTVar (lru tcpcache) | ||
100 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) c | ||
101 | return v | ||
102 | -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) | ||
103 | case entry of | ||
104 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | ||
105 | proto <- getProtocolNumber "tcp" | ||
106 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | ||
107 | mh <- catchIOError (do h <- timeout tcp_timeout $ do | ||
108 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) | ||
109 | h <- socketToHandle sock ReadWriteMode | ||
110 | hSetBuffering h NoBuffering | ||
111 | return h | ||
112 | return h) | ||
113 | $ \e -> return Nothing | ||
114 | when (isNothing mh) $ do | ||
115 | atomically $ modifyTVar' (lru tcpcache) | ||
116 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
117 | Socket.close sock | ||
118 | ret <- fmap join $ forM mh $ \h -> do | ||
119 | mst <- catchIOError (Just <$> streamHello stream addr h) | ||
120 | (\e -> return Nothing) | ||
121 | case mst of | ||
122 | Nothing -> do | ||
123 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
124 | return Nothing | ||
125 | Just st -> do | ||
126 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) | ||
127 | signal <- newTVarIO False | ||
128 | let showAddr a = show (streamAddr stream a) | ||
129 | rthread <- forkLabeled ("tcp:"++showAddr addr) $ do | ||
130 | atomically (readTVar signal >>= check) | ||
131 | fix $ \loop -> do | ||
132 | x <- streamDecode st | ||
133 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x | ||
134 | case x of | ||
135 | Just u -> do | ||
136 | m <- timeout tcp_timeout $ atomically (putTMVar mvar $ Arrival addr u) | ||
137 | when (isNothing m) $ do | ||
138 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." | ||
139 | atomically $ tryTakeTMVar mvar | ||
140 | return () | ||
141 | loop | ||
142 | Nothing -> do | ||
143 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | ||
144 | do atomically $ modifyTVar' (lru tcpcache) | ||
145 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
146 | c <- atomically $ readTVar (lru tcpcache) | ||
147 | now <- getPOSIXTime | ||
148 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do | ||
149 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] | ||
150 | mreport <- timeout tcp_timeout $ threadReport False -- XXX: Paranoid timeout | ||
151 | case mreport of | ||
152 | Just treport -> dput XTCP treport | ||
153 | Nothing -> dput XTCP "TCP ERROR: threadReport timed out." | ||
154 | hClose h `catchIOError` \e -> return () | ||
155 | let v = TCPSession | ||
156 | { tcpHandle = h | ||
157 | , tcpState = st | ||
158 | , tcpThread = rthread | ||
159 | } | ||
160 | t <- getPOSIXTime | ||
161 | retires <- atomically $ do | ||
162 | c <- readTVar (lru tcpcache) | ||
163 | let (rs,c') = MM.takeView (tcpMax tcpcache) | ||
164 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | ||
165 | writeTVar (lru tcpcache) c' | ||
166 | writeTVar signal True | ||
167 | return rs | ||
168 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do | ||
169 | dput XTCP $ "TCP dropped: " ++ show k | ||
170 | killSession r | ||
171 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do | ||
172 | streamGoodbye st | ||
173 | hClose h | ||
174 | `catchIOError` \e -> return () | ||
175 | _ -> return () | ||
176 | |||
177 | return $ Just $ streamEncode st | ||
178 | when (isNothing ret) $ do | ||
179 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
180 | return ret | ||
181 | Just (tm, PendingTCPSession) | ||
182 | | not bDoCon -> return Nothing | ||
183 | | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do | ||
184 | c <- readTVar (lru tcpcache) | ||
185 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
186 | case v of | ||
187 | Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st | ||
188 | Nothing -> return Nothing | ||
189 | _ -> retry | ||
190 | Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st | ||
191 | |||
192 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () | ||
193 | closeAll tcpcache stream = do | ||
194 | dput XTCP "TCP.closeAll called." | ||
195 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty | ||
196 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | ||
197 | killSession r | ||
198 | case r of TCPSession{tcpState=st,tcpHandle=h} -> catchIOError (streamGoodbye st >> hClose h) | ||
199 | (\e -> return ()) | ||
200 | _ -> return () | ||
201 | |||
202 | -- Use a cache of TCP client connections for sending (and receiving) packets. | ||
203 | -- The boolean value prepended to the message allows the sender to specify | ||
204 | -- whether or not a new connection will be initiated if neccessary. If 'False' | ||
205 | -- is passed, then the packet will be sent only if there already exists a | ||
206 | -- connection. | ||
207 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | ||
208 | -> StreamHandshake addr x y | ||
209 | -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) | ||
210 | tcpTransport maxcon stream = do | ||
211 | msgvar <- atomically newEmptyTMVar | ||
212 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | ||
213 | return $ (,) tcpcache Transport | ||
214 | { awaitMessage = \f -> takeTMVar msgvar >>= \x -> return $ do | ||
215 | f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Terminated) | ||
216 | , sendMessage = \addr (bDoCon,y) -> do | ||
217 | void . forkLabeled "tcp-send" $ do | ||
218 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon | ||
219 | mapM_ ($ y) msock | ||
220 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e | ||
221 | , setActive = \case False -> closeAll tcpcache stream >> atomically (putTMVar msgvar Terminated) | ||
222 | True -> return () | ||
223 | } | ||
diff --git a/dht/src/Network/SocketLike.hs b/dht/src/Network/SocketLike.hs deleted file mode 100644 index 37891cfd..00000000 --- a/dht/src/Network/SocketLike.hs +++ /dev/null | |||
@@ -1,98 +0,0 @@ | |||
1 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
2 | {-# LANGUAGE TupleSections #-} | ||
3 | {-# LANGUAGE CPP #-} | ||
4 | -- | | ||
5 | -- | ||
6 | -- A socket could be used indirectly via a 'System.IO.Handle' or a conduit from | ||
7 | -- Michael Snoyman's conduit package. But doing so presents an encapsulation | ||
8 | -- problem. Do we allow access to the underlying socket and trust that it wont | ||
9 | -- be used in an unsafe way? Or do we protect it at the higher level and deny | ||
10 | -- access to various state information? | ||
11 | -- | ||
12 | -- The 'SocketLike' class enables the approach that provides a safe wrapper to | ||
13 | -- the underlying socket and gives access to various state information without | ||
14 | -- enabling direct reads or writes. | ||
15 | module Network.SocketLike | ||
16 | ( SocketLike(..) | ||
17 | , RestrictedSocket | ||
18 | , restrictSocket | ||
19 | , restrictHandleSocket | ||
20 | -- * Re-exports | ||
21 | -- | ||
22 | -- | To make the 'SocketLike' methods less awkward to use, the types | ||
23 | -- 'CUInt', 'SockAddr', and 'PortNumber' are re-exported. | ||
24 | , CUInt | ||
25 | , PortNumber | ||
26 | , SockAddr(..) | ||
27 | ) where | ||
28 | |||
29 | import Network.Socket | ||
30 | ( PortNumber | ||
31 | , SockAddr | ||
32 | ) | ||
33 | import Foreign.C.Types ( CUInt ) | ||
34 | |||
35 | import qualified Network.Socket as NS | ||
36 | import System.IO (Handle,hClose,hIsOpen) | ||
37 | import Control.Arrow | ||
38 | |||
39 | -- | A safe (mostly read-only) interface to a 'NS.Socket'. Note that despite | ||
40 | -- how this class is named, it provides no access to typical 'NS.Socket' uses | ||
41 | -- like sending or receiving network packets. | ||
42 | class SocketLike sock where | ||
43 | -- | See 'NS.getSocketName' | ||
44 | getSocketName :: sock -> IO SockAddr | ||
45 | -- | See 'NS.getPeerName' | ||
46 | getPeerName :: sock -> IO SockAddr | ||
47 | -- | See 'NS.getPeerCred' | ||
48 | -- getPeerCred :: sock -> IO (CUInt, CUInt, CUInt) | ||
49 | |||
50 | -- | Is the socket still valid? Connected | ||
51 | -- | ||
52 | -- In order to give the instance writer | ||
53 | -- the option to do book-keeping in a pure | ||
54 | -- type, a conceptually modified version of | ||
55 | -- the 'SocketLike' is returned. | ||
56 | -- | ||
57 | isValidSocket :: sock -> IO (sock,Bool) | ||
58 | |||
59 | |||
60 | instance SocketLike NS.Socket where | ||
61 | getSocketName = NS.getSocketName | ||
62 | getPeerName = NS.getPeerName | ||
63 | -- getPeerCred = NS.getPeerCred | ||
64 | #if MIN_VERSION_network(3,1,0) | ||
65 | isValidSocket s = (s,) <$> NS.withFdSocket s (return . (/= (-1))) | ||
66 | #else | ||
67 | #if MIN_VERSION_network(3,0,0) | ||
68 | isValidSocket s = (s,) . (/= (-1)) <$> NS.fdSocket s | ||
69 | #else | ||
70 | #if MIN_VERSION_network(2,4,0) | ||
71 | isValidSocket s = (s,) <$> NS.isConnected s -- warning: this is always False if the socket | ||
72 | -- was converted to a Handle | ||
73 | #else | ||
74 | isValidSocket s = (s,) <$> NS.sIsConnected s -- warning: this is always False if the socket | ||
75 | -- was converted to a Handle | ||
76 | #endif | ||
77 | #endif | ||
78 | #endif | ||
79 | |||
80 | -- | An encapsulated socket. Data reads and writes are not possible. | ||
81 | data RestrictedSocket = Restricted (Maybe Handle) NS.Socket deriving Show | ||
82 | |||
83 | instance SocketLike RestrictedSocket where | ||
84 | getSocketName (Restricted mb sock) = NS.getSocketName sock | ||
85 | getPeerName (Restricted mb sock) = NS.getPeerName sock | ||
86 | -- getPeerCred (Restricted mb sock) = NS.getPeerCred sock | ||
87 | isValidSocket rs@(Restricted mb sock) = maybe (first (Restricted mb) <$> isValidSocket sock) (((rs,) <$>) . hIsOpen) mb | ||
88 | |||
89 | -- | Create a 'RestrictedSocket' that explicitly disallows sending or | ||
90 | -- receiving data. | ||
91 | restrictSocket :: NS.Socket -> RestrictedSocket | ||
92 | restrictSocket socket = Restricted Nothing socket | ||
93 | |||
94 | -- | Build a 'RestrictedSocket' for which 'sClose' will close the given | ||
95 | -- 'Handle'. It is intended that this 'Handle' was obtained via | ||
96 | -- 'NS.socketToHandle'. | ||
97 | restrictHandleSocket :: Handle -> NS.Socket -> RestrictedSocket | ||
98 | restrictHandleSocket h socket = Restricted (Just h) socket | ||
diff --git a/dht/src/Network/StreamServer.hs b/dht/src/Network/StreamServer.hs deleted file mode 100644 index 1da612ce..00000000 --- a/dht/src/Network/StreamServer.hs +++ /dev/null | |||
@@ -1,167 +0,0 @@ | |||
1 | -- | This module implements a bare-bones TCP or Unix socket server. | ||
2 | {-# LANGUAGE CPP #-} | ||
3 | {-# LANGUAGE TypeFamilies #-} | ||
4 | {-# LANGUAGE TypeOperators #-} | ||
5 | {-# LANGUAGE OverloadedStrings #-} | ||
6 | {-# LANGUAGE RankNTypes #-} | ||
7 | module Network.StreamServer | ||
8 | ( streamServer | ||
9 | , ServerHandle | ||
10 | , getAcceptLoopThreadId | ||
11 | , ServerConfig(..) | ||
12 | , withSession | ||
13 | , quitListening | ||
14 | --, dummyServerHandle | ||
15 | , listenSocket | ||
16 | , Local(..) | ||
17 | , Remote(..) | ||
18 | ) where | ||
19 | |||
20 | import Data.Monoid | ||
21 | import Network.Socket as Socket | ||
22 | import System.Directory (removeFile) | ||
23 | import System.IO | ||
24 | ( IOMode(..) | ||
25 | , stderr | ||
26 | , hFlush | ||
27 | ) | ||
28 | import Control.Monad | ||
29 | import Control.Monad.Fix (fix) | ||
30 | #ifdef THREAD_DEBUG | ||
31 | import Control.Concurrent.Lifted.Instrument | ||
32 | ( forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId | ||
33 | , killThread ) | ||
34 | #else | ||
35 | import GHC.Conc (labelThread) | ||
36 | import Control.Concurrent | ||
37 | ( forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId | ||
38 | , killThread ) | ||
39 | #endif | ||
40 | import Control.Exception (handle,finally) | ||
41 | import System.IO.Error (tryIOError) | ||
42 | import System.Mem.Weak | ||
43 | import System.IO.Error | ||
44 | |||
45 | -- import Data.Conduit | ||
46 | import System.IO (Handle) | ||
47 | import Control.Concurrent.MVar (newMVar) | ||
48 | |||
49 | import Network.SocketLike | ||
50 | import DPut | ||
51 | import DebugTag | ||
52 | |||
53 | data ServerHandle = ServerHandle Socket (Weak ThreadId) | ||
54 | |||
55 | -- | Useful for testing. | ||
56 | getAcceptLoopThreadId :: ServerHandle -> IO (Weak ThreadId) | ||
57 | getAcceptLoopThreadId (ServerHandle _ t) = return t | ||
58 | |||
59 | listenSocket :: ServerHandle -> RestrictedSocket | ||
60 | listenSocket (ServerHandle sock _) = restrictSocket sock | ||
61 | |||
62 | {- // Removed, bit-rotted and there are no call sites | ||
63 | -- | Create a useless do-nothing 'ServerHandle'. | ||
64 | dummyServerHandle :: IO ServerHandle | ||
65 | dummyServerHandle = do | ||
66 | mvar <- newMVar Closed | ||
67 | let sock = MkSocket 0 AF_UNSPEC NoSocketType 0 mvar | ||
68 | thread <- mkWeakThreadId <=< forkIO $ return () | ||
69 | return (ServerHandle sock thread) | ||
70 | -} | ||
71 | |||
72 | removeSocketFile :: SockAddr -> IO () | ||
73 | removeSocketFile (SockAddrUnix fname) = removeFile fname | ||
74 | removeSocketFile _ = return () | ||
75 | |||
76 | -- | Terminate the server accept-loop. Call this to shut down the server. | ||
77 | quitListening :: ServerHandle -> IO () | ||
78 | quitListening (ServerHandle socket acceptThread) = | ||
79 | finally (Socket.getSocketName socket >>= removeSocketFile) | ||
80 | (do mapM_ killThread =<< deRefWeak acceptThread | ||
81 | Socket.close socket) | ||
82 | |||
83 | |||
84 | -- | It's 'bshow' instead of 'show' to enable swapping in a 'ByteString' | ||
85 | -- variation. (This is not exported.) | ||
86 | bshow :: Show a => a -> String | ||
87 | bshow e = show e | ||
88 | |||
89 | -- | Send a string to stderr. Not exported. Default 'serverWarn' when | ||
90 | -- 'withSession' is used to configure the server. | ||
91 | warnStderr :: String -> IO () | ||
92 | warnStderr str = dput XMisc str >> hFlush stderr | ||
93 | |||
94 | newtype Local a = Local a deriving (Eq,Ord,Show) | ||
95 | newtype Remote a = Remote a deriving (Eq,Ord,Show) | ||
96 | |||
97 | data ServerConfig = ServerConfig | ||
98 | { serverWarn :: String -> IO () | ||
99 | -- ^ Action to report warnings and errors. | ||
100 | , serverSession :: ( RestrictedSocket, (Local SockAddr, Remote SockAddr)) -> Int -> Handle -> IO () | ||
101 | -- ^ Action to handle interaction with a client | ||
102 | } | ||
103 | |||
104 | -- | Initialize a 'ServerConfig' using the provided session handler. | ||
105 | withSession :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> Int -> Handle -> IO ()) -> ServerConfig | ||
106 | withSession session = ServerConfig warnStderr session | ||
107 | |||
108 | -- | Launch a thread to listen at the given bind address and dispatch | ||
109 | -- to session handler threads on every incoming connection. Supports | ||
110 | -- IPv4 and IPv6, TCP and unix sockets. | ||
111 | -- | ||
112 | -- The returned handle can be used with 'quitListening' to terminate the | ||
113 | -- thread and prevent any new sessions from starting. Currently active | ||
114 | -- session threads will not be terminated or signaled in any way. | ||
115 | streamServer :: ServerConfig -> [SockAddr] -> IO ServerHandle | ||
116 | streamServer cfg addrs = do | ||
117 | let warn = serverWarn cfg | ||
118 | family = case addrs of | ||
119 | SockAddrInet {}:_ -> AF_INET | ||
120 | SockAddrInet6 {}:_ -> AF_INET6 | ||
121 | SockAddrUnix {}:_ -> AF_UNIX | ||
122 | [] -> AF_INET6 | ||
123 | sock <- socket family Stream 0 | ||
124 | setSocketOption sock ReuseAddr 1 | ||
125 | let tryBind addr next _ = do | ||
126 | tryIOError (removeSocketFile addr) | ||
127 | bind sock addr | ||
128 | `catchIOError` \e -> next (Just e) | ||
129 | fix $ \loop -> let again mbe = do | ||
130 | forM_ mbe $ \e -> warn $ "bind-error: " <> bshow addrs <> " " <> bshow e | ||
131 | threadDelay 5000000 | ||
132 | loop | ||
133 | in foldr tryBind again addrs Nothing | ||
134 | listen sock maxListenQueue | ||
135 | thread <- mkWeakThreadId <=< forkIO $ do | ||
136 | bindaddr <- Socket.getSocketName sock | ||
137 | myThreadId >>= flip labelThread ("StreamServer.acceptLoop." <> bshow bindaddr) | ||
138 | acceptLoop cfg sock 0 | ||
139 | return (ServerHandle sock thread) | ||
140 | |||
141 | -- | Not exported. This, combined with 'acceptException' form a mutually | ||
142 | -- recursive loop that handles incoming connections. To quit the loop, the | ||
143 | -- socket must be closed by 'quitListening'. | ||
144 | acceptLoop :: ServerConfig -> Socket -> Int -> IO () | ||
145 | acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do | ||
146 | (con,raddr) <- accept sock | ||
147 | let conkey = n + 1 | ||
148 | laddr <- Socket.getSocketName con | ||
149 | h <- socketToHandle con ReadWriteMode | ||
150 | forkIO $ do | ||
151 | myThreadId >>= flip labelThread "StreamServer.session" | ||
152 | serverSession cfg (restrictHandleSocket h con, (Local laddr, Remote raddr)) conkey h | ||
153 | acceptLoop cfg sock (n + 1) | ||
154 | |||
155 | acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () | ||
156 | acceptException cfg n sock ioerror = do | ||
157 | case show (ioeGetErrorType ioerror) of | ||
158 | "resource exhausted" -> do -- try again (ioeGetErrorType ioerror == fullErrorType) | ||
159 | serverWarn cfg $ ("acceptLoop: resource exhasted") | ||
160 | threadDelay 500000 | ||
161 | acceptLoop cfg sock (n + 1) | ||
162 | "invalid argument" -> do -- quit on closed socket | ||
163 | Socket.close sock | ||
164 | message -> do -- unexpected exception | ||
165 | serverWarn cfg $ ("acceptLoop: "<>bshow message) | ||
166 | Socket.close sock | ||
167 | |||
diff --git a/dht/stack.yaml b/dht/stack.yaml index 3ae992c7..5c6013a0 100644 --- a/dht/stack.yaml +++ b/dht/stack.yaml | |||
@@ -18,6 +18,7 @@ extra-deps: | |||
18 | - "../concurrent-supply/" | 18 | - "../concurrent-supply/" |
19 | - "../base32-bytestring/" | 19 | - "../base32-bytestring/" |
20 | - "../dependent-map/" | 20 | - "../dependent-map/" |
21 | - "../server/" | ||
21 | - cryptonite-0.23 | 22 | - cryptonite-0.23 |
22 | - reference-0.1 | 23 | - reference-0.1 |
23 | - avahi-0.2.0@sha256:eb725536d8427548685b531d4bf8271a3104da06f611ed38165a0e08e21c54eb,1799 | 24 | - avahi-0.2.0@sha256:eb725536d8427548685b531d4bf8271a3104da06f611ed38165a0e08e21c54eb,1799 |