diff options
Diffstat (limited to 'dht/src/Network')
-rw-r--r-- | dht/src/Network/QueryResponse.hs | 716 | ||||
-rw-r--r-- | dht/src/Network/QueryResponse/TCP.hs | 223 | ||||
-rw-r--r-- | dht/src/Network/SocketLike.hs | 98 | ||||
-rw-r--r-- | dht/src/Network/StreamServer.hs | 167 |
4 files changed, 0 insertions, 1204 deletions
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs deleted file mode 100644 index 20e7ecf0..00000000 --- a/dht/src/Network/QueryResponse.hs +++ /dev/null | |||
@@ -1,716 +0,0 @@ | |||
1 | -- | This module can implement any query\/response protocol. It was written | ||
2 | -- with Kademlia implementations in mind. | ||
3 | |||
4 | {-# LANGUAGE CPP #-} | ||
5 | {-# LANGUAGE GADTs #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | {-# LANGUAGE PartialTypeSignatures #-} | ||
8 | {-# LANGUAGE RankNTypes #-} | ||
9 | {-# LANGUAGE ScopedTypeVariables #-} | ||
10 | {-# LANGUAGE TupleSections #-} | ||
11 | module Network.QueryResponse where | ||
12 | |||
13 | #ifdef THREAD_DEBUG | ||
14 | import Control.Concurrent.Lifted.Instrument | ||
15 | #else | ||
16 | import Control.Concurrent | ||
17 | import GHC.Conc (labelThread) | ||
18 | #endif | ||
19 | import Control.Concurrent.STM | ||
20 | import Control.Exception | ||
21 | import Control.Monad | ||
22 | import qualified Data.ByteString as B | ||
23 | ;import Data.ByteString (ByteString) | ||
24 | import Data.Dependent.Map as DMap | ||
25 | import Data.Dependent.Sum | ||
26 | import Data.Function | ||
27 | import Data.Functor.Contravariant | ||
28 | import Data.Functor.Identity | ||
29 | import Data.GADT.Show | ||
30 | import qualified Data.IntMap.Strict as IntMap | ||
31 | ;import Data.IntMap.Strict (IntMap) | ||
32 | import qualified Data.Map.Strict as Map | ||
33 | ;import Data.Map.Strict (Map) | ||
34 | import Data.Time.Clock.POSIX | ||
35 | import qualified Data.Word64Map as W64Map | ||
36 | ;import Data.Word64Map (Word64Map) | ||
37 | import Data.Word | ||
38 | import Data.Maybe | ||
39 | import GHC.Conc (closeFdWith) | ||
40 | import GHC.Event | ||
41 | import Network.Socket | ||
42 | import Network.Socket.ByteString as B | ||
43 | import System.Endian | ||
44 | import System.IO | ||
45 | import System.IO.Error | ||
46 | import System.Timeout | ||
47 | |||
48 | import DPut | ||
49 | import DebugTag | ||
50 | import Data.TableMethods | ||
51 | |||
52 | -- | An inbound packet or condition raised while monitoring a connection. | ||
53 | data Arrival err addr x | ||
54 | = Terminated -- ^ Virtual message that signals EOF. | ||
55 | | ParseError !err -- ^ A badly-formed message was received. | ||
56 | | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message. | ||
57 | |||
58 | -- | Three methods are required to implement a datagram based query\/response protocol. | ||
59 | data TransportA err addr x y = Transport | ||
60 | { -- | Blocks until an inbound packet is available. Then calls the provided | ||
61 | -- continuation with the packet and origin addres or an error condition. | ||
62 | awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a) | ||
63 | -- | Send an /y/ packet to the given destination /addr/. | ||
64 | , sendMessage :: addr -> y -> IO () | ||
65 | -- | Shutdown and clean up any state related to this 'Transport'. | ||
66 | , setActive :: Bool -> IO () | ||
67 | } | ||
68 | |||
69 | type Transport err addr x = TransportA err addr x x | ||
70 | |||
71 | closeTransport :: TransportA err addr x y -> IO () | ||
72 | closeTransport tr = setActive tr False | ||
73 | |||
74 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
75 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
76 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
77 | -- associated public keys. | ||
78 | layerTransportM :: | ||
79 | (x -> addr -> IO (Either err (x', addr'))) | ||
80 | -- ^ Function that attempts to transform a low-level address/packet | ||
81 | -- pair into a higher level representation. | ||
82 | -> (y' -> addr' -> IO (y, addr)) | ||
83 | -- ^ Function to encode a high-level address/packet into a lower level | ||
84 | -- representation. | ||
85 | -> TransportA err addr x y | ||
86 | -- ^ The low-level transport to be transformed. | ||
87 | -> TransportA err addr' x' y' | ||
88 | layerTransportM parse encode tr = | ||
89 | tr { awaitMessage = \kont -> | ||
90 | awaitMessage tr $ \case | ||
91 | Terminated -> kont $ Terminated | ||
92 | ParseError e -> kont $ ParseError e | ||
93 | Arrival addr x -> parse x addr >>= \case | ||
94 | Left e -> kont $ ParseError e | ||
95 | Right (x',addr') -> kont $ Arrival addr' x' | ||
96 | , sendMessage = \addr' msg' -> do | ||
97 | (msg,addr) <- encode msg' addr' | ||
98 | sendMessage tr addr msg | ||
99 | } | ||
100 | |||
101 | |||
102 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
103 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
104 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
105 | -- associated public keys. | ||
106 | layerTransport :: | ||
107 | (x -> addr -> Either err (x', addr')) | ||
108 | -- ^ Function that attempts to transform a low-level address/packet | ||
109 | -- pair into a higher level representation. | ||
110 | -> (y' -> addr' -> (y, addr)) | ||
111 | -- ^ Function to encode a high-level address/packet into a lower level | ||
112 | -- representation. | ||
113 | -> TransportA err addr x y | ||
114 | -- ^ The low-level transport to be transformed. | ||
115 | -> TransportA err addr' x' y' | ||
116 | layerTransport parse encode tr = | ||
117 | layerTransportM (\x addr -> return $ parse x addr) | ||
118 | (\x' addr' -> return $ encode x' addr') | ||
119 | tr | ||
120 | |||
121 | -- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan' | ||
122 | -- is used to share the same underlying socket, so be sure to fork a thread for | ||
123 | -- both returned 'Transport's to avoid hanging. | ||
124 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | ||
125 | -> ((y,xaddr) -> IO (Maybe (c,a))) | ||
126 | -> TransportA err a b c | ||
127 | -> IO (TransportA err xaddr x y, TransportA err a b c) | ||
128 | partitionTransportM parse encodex tr = do | ||
129 | tchan <- atomically newTChan | ||
130 | let ytr = tr { awaitMessage = \kont -> fix $ \again -> do | ||
131 | awaitMessage tr $ \m -> case m of | ||
132 | Arrival adr msg -> parse (msg,adr) >>= \case | ||
133 | Left x -> atomically (writeTChan tchan (Just x)) >> join (atomically again) | ||
134 | Right (y,yaddr) -> kont $ Arrival yaddr y | ||
135 | ParseError e -> kont $ ParseError e | ||
136 | Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated | ||
137 | , sendMessage = sendMessage tr | ||
138 | } | ||
139 | xtr = Transport | ||
140 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case | ||
141 | Nothing -> Terminated | ||
142 | Just (x,xaddr) -> Arrival xaddr x | ||
143 | , sendMessage = \addr' msg' -> do | ||
144 | msg_addr <- encodex (msg',addr') | ||
145 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr | ||
146 | , setActive = const $ return () | ||
147 | } | ||
148 | return (xtr, ytr) | ||
149 | |||
150 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan' | ||
151 | -- is used to share the same underlying socket, so be sure to fork a thread for | ||
152 | -- both returned 'Transport's to avoid hanging. | ||
153 | partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) | ||
154 | -> ((y,xaddr) -> Maybe (c,a)) | ||
155 | -> TransportA err a b c | ||
156 | -> IO (TransportA err xaddr x y, TransportA err a b c) | ||
157 | partitionTransport parse encodex tr = | ||
158 | partitionTransportM (return . parse) (return . encodex) tr | ||
159 | |||
160 | -- | | ||
161 | -- * f add x --> Nothing, consume x | ||
162 | -- --> Just id, leave x to a different handler | ||
163 | -- --> Just g, apply g to x and leave that to a different handler | ||
164 | -- | ||
165 | -- Note: If you add a handler to one of the branches before applying a | ||
166 | -- 'mergeTransports' combinator, then this handler may not block or return | ||
167 | -- Nothing. | ||
168 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> TransportA err addr x y -> TransportA err addr x y | ||
169 | addHandler onParseError f tr = tr | ||
170 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case | ||
171 | Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) | ||
172 | ParseError e -> onParseError e >> kont (ParseError e) | ||
173 | Terminated -> kont Terminated | ||
174 | } | ||
175 | |||
176 | -- | Modify a 'Transport' to invoke an action upon every received packet. | ||
177 | onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x | ||
178 | onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (Just id)) tr | ||
179 | |||
180 | -- * Using a query\/response client. | ||
181 | |||
182 | -- | Fork a thread that handles inbound packets. The returned action may be used | ||
183 | -- to terminate the thread and clean up any related state. | ||
184 | -- | ||
185 | -- Example usage: | ||
186 | -- | ||
187 | -- > -- Start client. | ||
188 | -- > quitServer <- forkListener "listener" (clientNet client) | ||
189 | -- > -- Send a query q, recieve a response r. | ||
190 | -- > r <- sendQuery client method q | ||
191 | -- > -- Quit client. | ||
192 | -- > quitServer | ||
193 | forkListener :: String -> Transport err addr x -> IO (IO ()) | ||
194 | forkListener name client = do | ||
195 | setActive client True | ||
196 | thread_id <- forkIO $ do | ||
197 | myThreadId >>= flip labelThread ("listener."++name) | ||
198 | fix $ \loop -> join $ atomically $ awaitMessage client $ \case | ||
199 | Terminated -> return () | ||
200 | _ -> loop | ||
201 | dput XMisc $ "Listener died: " ++ name | ||
202 | return $ do | ||
203 | setActive client False | ||
204 | -- killThread thread_id | ||
205 | |||
206 | -- * Implementing a query\/response 'Client'. | ||
207 | |||
208 | -- | These methods indicate what should be done upon various conditions. Write | ||
209 | -- to a log file, make debug prints, or simply ignore them. | ||
210 | -- | ||
211 | -- [ /addr/ ] Address of remote peer. | ||
212 | -- | ||
213 | -- [ /x/ ] Incoming or outgoing packet. | ||
214 | -- | ||
215 | -- [ /meth/ ] Method id of incoming or outgoing request. | ||
216 | -- | ||
217 | -- [ /tid/ ] Transaction id for outgoing packet. | ||
218 | -- | ||
219 | -- [ /err/ ] Error information, typically a 'String'. | ||
220 | data ErrorReporter addr x meth tid err = ErrorReporter | ||
221 | { -- | Incoming: failed to parse packet. | ||
222 | reportParseError :: err -> IO () | ||
223 | -- | Incoming: no handler for request. | ||
224 | , reportMissingHandler :: meth -> addr -> x -> IO () | ||
225 | -- | Incoming: unable to identify request. | ||
226 | , reportUnknown :: addr -> x -> err -> IO () | ||
227 | } | ||
228 | |||
229 | ignoreErrors :: ErrorReporter addr x meth tid err | ||
230 | ignoreErrors = ErrorReporter | ||
231 | { reportParseError = \_ -> return () | ||
232 | , reportMissingHandler = \_ _ _ -> return () | ||
233 | , reportUnknown = \_ _ _ -> return () | ||
234 | } | ||
235 | |||
236 | logErrors :: ( Show addr | ||
237 | , Show meth | ||
238 | ) => ErrorReporter addr x meth tid String | ||
239 | logErrors = ErrorReporter | ||
240 | { reportParseError = \err -> dput XMisc err | ||
241 | , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" | ||
242 | , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err | ||
243 | } | ||
244 | |||
245 | printErrors :: ( Show addr | ||
246 | , Show meth | ||
247 | ) => Handle -> ErrorReporter addr x meth tid String | ||
248 | printErrors h = ErrorReporter | ||
249 | { reportParseError = \err -> hPutStrLn h err | ||
250 | , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" | ||
251 | , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err | ||
252 | } | ||
253 | |||
254 | -- Change the /err/ type for an 'ErrorReporter'. | ||
255 | instance Contravariant (ErrorReporter addr x meth tid) where | ||
256 | -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 | ||
257 | contramap f (ErrorReporter pe mh unk) | ||
258 | = ErrorReporter (\e -> pe (f e)) | ||
259 | mh | ||
260 | (\addr x e -> unk addr x (f e)) | ||
261 | |||
262 | -- | An incoming message can be classified into three cases. | ||
263 | data MessageClass err meth tid addr x | ||
264 | = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response | ||
265 | -- should include the provided /tid/ value. | ||
266 | | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. | ||
267 | | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked | ||
268 | -- with the source and destination address of a message. If it handles the | ||
269 | -- message, it should return Nothing. Otherwise, it should return a transform | ||
270 | -- (usually /id/) to apply before the next handler examines it. | ||
271 | | IsUnknown err -- ^ None of the above. | ||
272 | |||
273 | -- | Handler for an inbound query of type /x/ from an address of type _addr_. | ||
274 | type MethodHandler err tid addr x = MethodHandlerA err tid addr x x | ||
275 | |||
276 | -- | Handler for an inbound query of type /x/ with outbound response of type | ||
277 | -- /y/ to an address of type /addr/. | ||
278 | data MethodHandlerA err tid addr x y = forall a b. MethodHandler | ||
279 | { -- | Parse the query into a more specific type for this method. | ||
280 | methodParse :: x -> Either err a | ||
281 | -- | Serialize the response for transmission, given a context /ctx/ and the origin | ||
282 | -- and destination addresses. | ||
283 | , methodSerialize :: tid -> addr -> addr -> b -> y | ||
284 | -- | Fully typed action to perform upon the query. The remote origin | ||
285 | -- address of the query is provided to the handler. | ||
286 | -- | ||
287 | -- TODO: Allow queries to be ignored? | ||
288 | , methodAction :: addr -> a -> IO b | ||
289 | } | ||
290 | -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. | ||
291 | | forall a. NoReply | ||
292 | { -- | Parse the query into a more specific type for this method. | ||
293 | methodParse :: x -> Either err a | ||
294 | -- | Fully typed action to perform upon the query. The remote origin | ||
295 | -- address of the query is provided to the handler. | ||
296 | , noreplyAction :: addr -> a -> IO () | ||
297 | } | ||
298 | |||
299 | |||
300 | -- | To dispatch responses to our outbound queries, we require three | ||
301 | -- primitives. See the 'transactionMethods' function to create these | ||
302 | -- primitives out of a lookup table and a generator for transaction ids. | ||
303 | -- | ||
304 | -- The type variable /d/ is used to represent the current state of the | ||
305 | -- transaction generator and the table of pending transactions. | ||
306 | data TransactionMethods d qid addr x = TransactionMethods | ||
307 | { | ||
308 | -- | Before a query is sent, this function stores an 'MVar' to which the | ||
309 | -- response will be written too. The returned /qid/ is a transaction id | ||
310 | -- that can be used to forget the 'MVar' if the remote peer is not | ||
311 | -- responding. | ||
312 | dispatchRegister :: POSIXTime -- time of expiry | ||
313 | -> (Maybe x -> IO ()) -- callback upon response (or timeout) | ||
314 | -> addr | ||
315 | -> d | ||
316 | -> STM (qid, d) | ||
317 | -- | This method is invoked when an incoming packet /x/ indicates it is | ||
318 | -- a response to the transaction with id /qid/. The returned IO action | ||
319 | -- will write the packet to the correct 'MVar' thus completing the | ||
320 | -- dispatch. | ||
321 | , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) | ||
322 | -- | When a timeout interval elapses, this method is called to remove the | ||
323 | -- transaction from the table. | ||
324 | , dispatchCancel :: qid -> d -> STM d | ||
325 | } | ||
326 | |||
327 | -- | A set of methods necessary for dispatching incoming packets. | ||
328 | type DispatchMethods tbl err meth tid addr x = DispatchMethodsA tbl err meth tid addr x x | ||
329 | |||
330 | -- | A set of methods necessary for dispatching incoming packets. | ||
331 | data DispatchMethodsA tbl err meth tid addr x y = DispatchMethods | ||
332 | { -- | Classify an inbound packet as a query or response. | ||
333 | classifyInbound :: x -> MessageClass err meth tid addr x | ||
334 | -- | Lookup the handler for a inbound query. | ||
335 | , lookupHandler :: meth -> Maybe (MethodHandlerA err tid addr x y) | ||
336 | -- | Methods for handling incoming responses. | ||
337 | , tableMethods :: TransactionMethods tbl tid addr x | ||
338 | } | ||
339 | |||
340 | -- | All inputs required to implement a query\/response client. | ||
341 | type Client err meth tid addr x = ClientA err meth tid addr x x | ||
342 | |||
343 | -- | All inputs required to implement a query\/response client. | ||
344 | data ClientA err meth tid addr x y = forall tbl. Client | ||
345 | { -- | The 'Transport' used to dispatch and receive packets. | ||
346 | clientNet :: TransportA err addr x y | ||
347 | -- | Methods for handling inbound packets. | ||
348 | , clientDispatcher :: DispatchMethodsA tbl err meth tid addr x y | ||
349 | -- | Methods for reporting various conditions. | ||
350 | , clientErrorReporter :: ErrorReporter addr x meth tid err | ||
351 | -- | State necessary for routing inbound responses and assigning unique | ||
352 | -- /tid/ values for outgoing queries. | ||
353 | , clientPending :: TVar tbl | ||
354 | -- | An action yielding this client\'s own address. It is invoked once | ||
355 | -- on each outbound and inbound packet. It is valid for this to always | ||
356 | -- return the same value. | ||
357 | -- | ||
358 | -- The argument, if supplied, is the remote address for the transaction. | ||
359 | -- This can be used to maintain consistent aliases for specific peers. | ||
360 | , clientAddress :: Maybe addr -> IO addr | ||
361 | -- | Transform a query /tid/ value to an appropriate response /tid/ | ||
362 | -- value. Normally, this would be the identity transformation, but if | ||
363 | -- /tid/ includes a unique cryptographic nonce, then it should be | ||
364 | -- generated here. | ||
365 | , clientResponseId :: tid -> IO tid | ||
366 | } | ||
367 | |||
368 | -- | These four parameters are required to implement an outgoing query. A | ||
369 | -- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that | ||
370 | -- might be returned by 'lookupHandler'. | ||
371 | data MethodSerializerA tid addr x y meth a b = MethodSerializer | ||
372 | { -- | Returns the microseconds to wait for a response to this query being | ||
373 | -- sent to the given address. The /addr/ may also be modified to add | ||
374 | -- routing information. | ||
375 | methodTimeout :: addr -> STM (addr,Int) | ||
376 | -- | A method identifier used for error reporting. This needn't be the | ||
377 | -- same as the /meth/ argument to 'MethodHandler', but it is suggested. | ||
378 | , method :: meth | ||
379 | -- | Serialize the outgoing query /a/ into a transmittable packet /x/. | ||
380 | -- The /addr/ arguments are, respectively, our own origin address and the | ||
381 | -- destination of the request. The /tid/ argument is useful for attaching | ||
382 | -- auxiliary notations on all outgoing packets. | ||
383 | , wrapQuery :: tid -> addr -> addr -> a -> x | ||
384 | -- | Parse an inbound packet /x/ into a response /b/ for this query. | ||
385 | , unwrapResponse :: y -> b | ||
386 | } | ||
387 | |||
388 | type MethodSerializer tid addr x meth a b = MethodSerializerA tid addr x x meth a b | ||
389 | |||
390 | microsecondsDiff :: Int -> POSIXTime | ||
391 | microsecondsDiff us = fromIntegral us / 1000000 | ||
392 | |||
393 | asyncQuery_ :: Client err meth tid addr x | ||
394 | -> MethodSerializer tid addr x meth a b | ||
395 | -> a | ||
396 | -> addr | ||
397 | -> (Maybe b -> IO ()) | ||
398 | -> IO (tid,POSIXTime,Int) | ||
399 | asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do | ||
400 | now <- getPOSIXTime | ||
401 | (tid,addr,expiry) <- atomically $ do | ||
402 | tbl <- readTVar pending | ||
403 | (addr,expiry) <- methodTimeout meth addr0 | ||
404 | (tid, tbl') <- dispatchRegister (tableMethods d) | ||
405 | (now + microsecondsDiff expiry) | ||
406 | (withResponse . fmap (unwrapResponse meth)) | ||
407 | addr -- XXX: Should be addr0 or addr? | ||
408 | tbl | ||
409 | -- (addr,expiry) <- methodTimeout meth tid addr0 | ||
410 | writeTVar pending tbl' | ||
411 | return (tid,addr,expiry) | ||
412 | self <- whoami (Just addr) | ||
413 | mres <- do sendMessage net addr (wrapQuery meth tid self addr q) | ||
414 | return $ Just () | ||
415 | `catchIOError` (\e -> return Nothing) | ||
416 | return (tid,now,expiry) | ||
417 | |||
418 | asyncQuery :: Show meth => Client err meth tid addr x | ||
419 | -> MethodSerializer tid addr x meth a b | ||
420 | -> a | ||
421 | -> addr | ||
422 | -> (Maybe b -> IO ()) | ||
423 | -> IO () | ||
424 | asyncQuery client meth q addr withResponse0 = do | ||
425 | tm <- getSystemTimerManager | ||
426 | tidvar <- newEmptyMVar | ||
427 | timedout <- registerTimeout tm 1000000 $ do | ||
428 | dput XMisc $ "async TIMEDOUT " ++ show (method meth) | ||
429 | withResponse0 Nothing | ||
430 | tid <- takeMVar tidvar | ||
431 | dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) | ||
432 | case client of | ||
433 | Client { clientDispatcher = d, clientPending = pending } -> do | ||
434 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
435 | (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do | ||
436 | unregisterTimeout tm timedout | ||
437 | withResponse0 x | ||
438 | putMVar tidvar tid | ||
439 | updateTimeout tm timedout expiry | ||
440 | dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry | ||
441 | |||
442 | -- | Send a query to a remote peer. Note that this function will always time | ||
443 | -- out if 'forkListener' was never invoked to spawn a thread to receive and | ||
444 | -- dispatch the response. | ||
445 | sendQuery :: | ||
446 | forall err a b tbl x meth tid addr. | ||
447 | Client err meth tid addr x -- ^ A query/response implementation. | ||
448 | -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. | ||
449 | -> a -- ^ The outbound query. | ||
450 | -> addr -- ^ Destination address of query. | ||
451 | -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. | ||
452 | sendQuery c@(Client net d err pending whoami _) meth q addr0 = do | ||
453 | mvar <- newEmptyMVar | ||
454 | (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) | ||
455 | mres <- timeout expiry $ takeMVar mvar | ||
456 | case mres of | ||
457 | Just b -> return $ Just b | ||
458 | Nothing -> do | ||
459 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
460 | return Nothing | ||
461 | |||
462 | contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x | ||
463 | contramapAddr f (MethodHandler p s a) | ||
464 | = MethodHandler | ||
465 | p | ||
466 | (\tid src dst result -> s tid (f src) (f dst) result) | ||
467 | (\addr arg -> a (f addr) arg) | ||
468 | contramapAddr f (NoReply p a) | ||
469 | = NoReply p (\addr arg -> a (f addr) arg) | ||
470 | |||
471 | -- | Query handlers can throw this to ignore a query instead of responding to | ||
472 | -- it. | ||
473 | data DropQuery = DropQuery | ||
474 | deriving Show | ||
475 | |||
476 | instance Exception DropQuery | ||
477 | |||
478 | -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the | ||
479 | -- parse is successful, the returned IO action will construct our reply if | ||
480 | -- there is one. Otherwise, a parse err is returned. | ||
481 | dispatchQuery :: MethodHandlerA err tid addr x y -- ^ Handler to invoke. | ||
482 | -> tid -- ^ The transaction id for this query\/response session. | ||
483 | -> addr -- ^ Our own address, to which the query was sent. | ||
484 | -> x -- ^ The query packet. | ||
485 | -> addr -- ^ The origin address of the query. | ||
486 | -> Either err (IO (Maybe y)) | ||
487 | dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr = | ||
488 | fmap (\a -> catch (Just . wrapR tid self addr <$> f addr a) | ||
489 | (\DropQuery -> return Nothing)) | ||
490 | $ unwrapQ x | ||
491 | dispatchQuery (NoReply unwrapQ f) tid self x addr = | ||
492 | fmap (\a -> f addr a >> return Nothing) $ unwrapQ x | ||
493 | |||
494 | -- | Like 'transactionMethods' but allows extra information to be stored in the | ||
495 | -- table of pending transactions. This also enables multiple 'Client's to | ||
496 | -- share a single transaction table. | ||
497 | transactionMethods' :: | ||
498 | ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry | ||
499 | -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry | ||
500 | -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. | ||
501 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | ||
502 | -> TransactionMethods (g,t a) tid addr x | ||
503 | transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods | ||
504 | { dispatchCancel = \tid (g,t) -> return (g, delete tid t) | ||
505 | , dispatchRegister = \nowPlusExpiry v a (g,t) -> do | ||
506 | let (tid,g') = generate g | ||
507 | let t' = insert tid (store v) nowPlusExpiry t -- (now + microsecondsDiff expiry) t | ||
508 | return ( tid, (g',t') ) | ||
509 | , dispatchResponse = \tid x (g,t) -> | ||
510 | case lookup tid t of | ||
511 | Just v -> let t' = delete tid t | ||
512 | in return ((g,t'),void $ load v $ Just x) | ||
513 | Nothing -> return ((g,t), return ()) | ||
514 | } | ||
515 | |||
516 | -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a | ||
517 | -- function for generating unique transaction ids. | ||
518 | transactionMethods :: | ||
519 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. | ||
520 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | ||
521 | -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x | ||
522 | transactionMethods methods generate = transactionMethods' id id methods generate | ||
523 | |||
524 | -- | Handle a single inbound packet and then invoke the given continuation. | ||
525 | -- The 'forkListener' function is implemented by passing this function to 'fix' | ||
526 | -- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or | ||
527 | -- throws an exception. | ||
528 | handleMessage :: | ||
529 | ClientA err meth tid addr x y | ||
530 | -> addr | ||
531 | -> x | ||
532 | -> IO (Maybe (x -> x)) | ||
533 | handleMessage (Client net d err pending whoami responseID) addr plain = do | ||
534 | -- Just (Left e) -> do reportParseError err e | ||
535 | -- return $! Just id | ||
536 | -- Just (Right (plain, addr)) -> do | ||
537 | case classifyInbound d plain of | ||
538 | IsQuery meth tid -> case lookupHandler d meth of | ||
539 | Nothing -> do reportMissingHandler err meth addr plain | ||
540 | return $! Just id | ||
541 | Just m -> do | ||
542 | self <- whoami (Just addr) | ||
543 | tid' <- responseID tid | ||
544 | either (\e -> do reportParseError err e | ||
545 | return $! Just id) | ||
546 | (>>= \m -> do mapM_ (sendMessage net addr) m | ||
547 | return $! Nothing) | ||
548 | (dispatchQuery m tid' self plain addr) | ||
549 | IsUnsolicited action -> do | ||
550 | self <- whoami (Just addr) | ||
551 | action self addr | ||
552 | return Nothing | ||
553 | IsResponse tid -> do | ||
554 | action <- atomically $ do | ||
555 | ts0 <- readTVar pending | ||
556 | (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0 | ||
557 | writeTVar pending ts | ||
558 | return action | ||
559 | action | ||
560 | return $! Nothing | ||
561 | IsUnknown e -> do reportUnknown err addr plain e | ||
562 | return $! Just id | ||
563 | -- Nothing -> return $! id | ||
564 | |||
565 | -- * UDP Datagrams. | ||
566 | |||
567 | -- | Access the address family of a given 'SockAddr'. This convenient accessor | ||
568 | -- is missing from 'Network.Socket', so I implemented it here. | ||
569 | sockAddrFamily :: SockAddr -> Family | ||
570 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
571 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
572 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
573 | #if !MIN_VERSION_network(3,0,0) | ||
574 | sockAddrFamily _ = AF_CAN -- SockAddrCan constructor deprecated | ||
575 | #endif | ||
576 | |||
577 | -- | Packets with an empty payload may trigger EOF exception. | ||
578 | -- 'udpTransport' uses this function to avoid throwing in that | ||
579 | -- case. | ||
580 | ignoreEOF :: Socket -> MVar () -> Arrival e a x -> IOError -> IO (Arrival e a x) | ||
581 | ignoreEOF sock isClosed def e = do | ||
582 | done <- tryReadMVar isClosed | ||
583 | case done of | ||
584 | Just () -> do close sock | ||
585 | dput XMisc "Closing UDP socket." | ||
586 | pure Terminated | ||
587 | _ -> if isEOFError e then pure def | ||
588 | else throwIO e | ||
589 | |||
590 | -- | Hard-coded maximum packet size for incoming UDP Packets received via | ||
591 | -- 'udpTransport'. | ||
592 | udpBufferSize :: Int | ||
593 | udpBufferSize = 65536 | ||
594 | |||
595 | -- | Wrapper around 'B.sendTo' that silently ignores DoesNotExistError. | ||
596 | saferSendTo :: Socket -> ByteString -> SockAddr -> IO () | ||
597 | saferSendTo sock bs saddr = void (B.sendTo sock bs saddr) | ||
598 | `catch` \e -> | ||
599 | -- sendTo: does not exist (Network is unreachable) | ||
600 | -- Occurs when IPv6 or IPv4 network is not available. | ||
601 | -- Currently, we require -threaded to prevent a forever-hang in this case. | ||
602 | if isDoesNotExistError e | ||
603 | then return () | ||
604 | else throw e | ||
605 | |||
606 | -- | Like 'udpTransport' except also returns the raw socket (for broadcast use). | ||
607 | udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket) | ||
608 | udpTransport' bind_address = do | ||
609 | let family = sockAddrFamily bind_address | ||
610 | sock <- socket family Datagram defaultProtocol | ||
611 | when (family == AF_INET6) $ do | ||
612 | setSocketOption sock IPv6Only 0 | ||
613 | setSocketOption sock Broadcast 1 | ||
614 | bind sock bind_address | ||
615 | isClosed <- newEmptyMVar | ||
616 | udpTChan <- atomically newTChan | ||
617 | let tr = Transport { | ||
618 | awaitMessage = \kont -> do | ||
619 | r <- readTChan udpTChan | ||
620 | return $ kont $! r | ||
621 | , sendMessage = case family of | ||
622 | AF_INET6 -> \case | ||
623 | (SockAddrInet port addr) -> \bs -> | ||
624 | -- Change IPv4 to 4mapped6 address. | ||
625 | saferSendTo sock bs $ SockAddrInet6 port 0 (0,0,0x0000ffff,fromBE32 addr) 0 | ||
626 | addr6 -> \bs -> saferSendTo sock bs addr6 | ||
627 | AF_INET -> \case | ||
628 | (SockAddrInet6 port 0 (0,0,0x0000ffff,raw4) 0) -> \bs -> do | ||
629 | let host4 = toBE32 raw4 | ||
630 | -- Change 4mapped6 to ordinary IPv4. | ||
631 | -- dput XMisc $ "4mapped6 -> "++show (SockAddrInet port host4) | ||
632 | saferSendTo sock bs (SockAddrInet port host4) | ||
633 | addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) | ||
634 | addr4 -> \bs -> saferSendTo sock bs addr4 | ||
635 | _ -> \addr bs -> saferSendTo sock bs addr | ||
636 | , setActive = \case | ||
637 | False -> do | ||
638 | dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address | ||
639 | tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. | ||
640 | #if MIN_VERSION_network (3,1,0) | ||
641 | #elif MIN_VERSION_network(3,0,0) | ||
642 | let withFdSocket sock f = fdSocket sock >>= f >>= seq sock . return | ||
643 | #else | ||
644 | let withFdSocket sock f = f (fdSocket sock) >>= seq sock . return | ||
645 | #endif | ||
646 | withFdSocket sock $ \fd -> do | ||
647 | let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () | ||
648 | -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. | ||
649 | closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) | ||
650 | True -> do | ||
651 | udpThread <- forkIO $ fix $ \again -> do | ||
652 | r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do | ||
653 | uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize | ||
654 | atomically $ writeTChan udpTChan r | ||
655 | case r of Terminated -> return () | ||
656 | _ -> again | ||
657 | labelThread udpThread ("udp.io."++show bind_address) | ||
658 | } | ||
659 | return (tr, sock) | ||
660 | |||
661 | -- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The | ||
662 | -- argument is the listen-address for incoming packets. This is a useful | ||
663 | -- low-level 'Transport' that can be transformed for higher-level protocols | ||
664 | -- using 'layerTransport'. | ||
665 | udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString) | ||
666 | udpTransport bind_address = fst <$> udpTransport' bind_address | ||
667 | |||
668 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x | ||
669 | chanTransport chanFromAddr self achan aclosed = Transport | ||
670 | { awaitMessage = \kont -> do | ||
671 | x <- (uncurry (flip Arrival) <$> readTChan achan) | ||
672 | `orElse` | ||
673 | (readTVar aclosed >>= check >> return Terminated) | ||
674 | return $ kont x | ||
675 | , sendMessage = \them bs -> do | ||
676 | atomically $ writeTChan (chanFromAddr them) (bs,self) | ||
677 | , setActive = \case | ||
678 | False -> atomically $ writeTVar aclosed True | ||
679 | True -> return () | ||
680 | } | ||
681 | |||
682 | -- | Returns a pair of transports linked together to simulate two computers talking to each other. | ||
683 | testPairTransport :: IO (Transport err SockAddr ByteString, Transport err SockAddr ByteString) | ||
684 | testPairTransport = do | ||
685 | achan <- atomically newTChan | ||
686 | bchan <- atomically newTChan | ||
687 | aclosed <- atomically $ newTVar False | ||
688 | bclosed <- atomically $ newTVar False | ||
689 | let a = SockAddrInet 1 1 | ||
690 | b = SockAddrInet 2 2 | ||
691 | return ( chanTransport (const bchan) a achan aclosed | ||
692 | , chanTransport (const achan) b bchan bclosed ) | ||
693 | |||
694 | newtype ByAddress err x addr = ByAddress (Transport err addr x) | ||
695 | |||
696 | newtype Tagged x addr = Tagged x | ||
697 | |||
698 | decorateAddr :: tag addr -> Arrival e addr x -> Arrival e (DSum tag Identity) x | ||
699 | decorateAddr tag Terminated = Terminated | ||
700 | decorateAddr tag (ParseError e) = ParseError e | ||
701 | decorateAddr tag (Arrival addr x) = Arrival (tag ==> addr) x | ||
702 | |||
703 | mergeTransports :: GCompare tag => DMap tag (ByAddress err x) -> IO (Transport err (DSum tag Identity) x) | ||
704 | mergeTransports tmap = do | ||
705 | -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap | ||
706 | -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap | ||
707 | return Transport | ||
708 | { awaitMessage = \kont -> | ||
709 | foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n) | ||
710 | retry | ||
711 | tmap | ||
712 | , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of | ||
713 | Just (ByAddress tr) -> sendMessage tr addr x | ||
714 | Nothing -> return () | ||
715 | , setActive = \toggle -> foldrWithKey (\_ (ByAddress tr) next -> setActive tr toggle >> next) (return ()) tmap | ||
716 | } | ||
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs deleted file mode 100644 index 0028a5b6..00000000 --- a/dht/src/Network/QueryResponse/TCP.hs +++ /dev/null | |||
@@ -1,223 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
3 | {-# LANGUAGE LambdaCase #-} | ||
4 | module Network.QueryResponse.TCP where | ||
5 | |||
6 | #ifdef THREAD_DEBUG | ||
7 | import Control.Concurrent.Lifted.Instrument | ||
8 | #else | ||
9 | import Control.Concurrent.Lifted | ||
10 | import GHC.Conc (labelThread) | ||
11 | #endif | ||
12 | |||
13 | import Control.Arrow | ||
14 | import Control.Concurrent.STM | ||
15 | import Control.Concurrent.STM.TMVar | ||
16 | import Control.Monad | ||
17 | import Data.ByteString (ByteString,hPut) | ||
18 | import Data.Function | ||
19 | import Data.Hashable | ||
20 | import Data.Maybe | ||
21 | import Data.Ord | ||
22 | import Data.Time.Clock.POSIX | ||
23 | import Data.Word | ||
24 | import Data.String (IsString(..)) | ||
25 | import Network.BSD | ||
26 | import Network.Socket as Socket | ||
27 | import System.Timeout | ||
28 | import System.IO | ||
29 | import System.IO.Error | ||
30 | |||
31 | import DebugTag | ||
32 | import DebugUtil | ||
33 | import DPut | ||
34 | import Connection.Tcp (socketFamily) | ||
35 | import qualified Data.MinMaxPSQ as MM | ||
36 | import Network.QueryResponse | ||
37 | |||
38 | data TCPSession st | ||
39 | = PendingTCPSession | ||
40 | | TCPSession | ||
41 | { tcpHandle :: Handle | ||
42 | , tcpState :: st | ||
43 | , tcpThread :: ThreadId | ||
44 | } | ||
45 | |||
46 | newtype TCPAddress = TCPAddress SockAddr | ||
47 | deriving (Eq,Ord,Show) | ||
48 | |||
49 | instance Hashable TCPAddress where | ||
50 | hashWithSalt salt (TCPAddress x) = case x of | ||
51 | SockAddrInet port addr -> hashWithSalt salt (fromIntegral port :: Word16,addr) | ||
52 | SockAddrInet6 port b c d -> hashWithSalt salt (fromIntegral port :: Word16,b,c,d) | ||
53 | _ -> 0 | ||
54 | |||
55 | data TCPCache st = TCPCache | ||
56 | { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st)) | ||
57 | , tcpMax :: Int | ||
58 | } | ||
59 | |||
60 | -- This is a suitable /st/ parameter to 'TCPCache' | ||
61 | data SessionProtocol x y = SessionProtocol | ||
62 | { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination. | ||
63 | , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages. | ||
64 | , streamEncode :: y -> IO () -- ^ Serialize outbound messages. | ||
65 | } | ||
66 | |||
67 | data StreamHandshake addr x y = StreamHandshake | ||
68 | { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection. | ||
69 | , streamAddr :: addr -> SockAddr | ||
70 | } | ||
71 | |||
72 | killSession :: TCPSession st -> IO () | ||
73 | killSession PendingTCPSession = return () | ||
74 | killSession TCPSession{tcpThread=t} = killThread t | ||
75 | |||
76 | showStat :: IsString p => TCPSession st -> p | ||
77 | showStat r = case r of PendingTCPSession -> "pending." | ||
78 | TCPSession {} -> "established." | ||
79 | |||
80 | tcp_timeout :: Int | ||
81 | tcp_timeout = 10000000 | ||
82 | |||
83 | acquireConnection :: TMVar (Arrival a addr x) | ||
84 | -> TCPCache (SessionProtocol x y) | ||
85 | -> StreamHandshake addr x y | ||
86 | -> addr | ||
87 | -> Bool | ||
88 | -> IO (Maybe (y -> IO ())) | ||
89 | acquireConnection mvar tcpcache stream addr bDoCon = do | ||
90 | now <- getPOSIXTime | ||
91 | -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) | ||
92 | entry <- atomically $ do | ||
93 | c <- readTVar (lru tcpcache) | ||
94 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
95 | case v of | ||
96 | Nothing | bDoCon -> writeTVar (lru tcpcache) | ||
97 | $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c | ||
98 | | otherwise -> return () | ||
99 | Just (tm, v) -> writeTVar (lru tcpcache) | ||
100 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) c | ||
101 | return v | ||
102 | -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) | ||
103 | case entry of | ||
104 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | ||
105 | proto <- getProtocolNumber "tcp" | ||
106 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | ||
107 | mh <- catchIOError (do h <- timeout tcp_timeout $ do | ||
108 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) | ||
109 | h <- socketToHandle sock ReadWriteMode | ||
110 | hSetBuffering h NoBuffering | ||
111 | return h | ||
112 | return h) | ||
113 | $ \e -> return Nothing | ||
114 | when (isNothing mh) $ do | ||
115 | atomically $ modifyTVar' (lru tcpcache) | ||
116 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
117 | Socket.close sock | ||
118 | ret <- fmap join $ forM mh $ \h -> do | ||
119 | mst <- catchIOError (Just <$> streamHello stream addr h) | ||
120 | (\e -> return Nothing) | ||
121 | case mst of | ||
122 | Nothing -> do | ||
123 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
124 | return Nothing | ||
125 | Just st -> do | ||
126 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) | ||
127 | signal <- newTVarIO False | ||
128 | let showAddr a = show (streamAddr stream a) | ||
129 | rthread <- forkLabeled ("tcp:"++showAddr addr) $ do | ||
130 | atomically (readTVar signal >>= check) | ||
131 | fix $ \loop -> do | ||
132 | x <- streamDecode st | ||
133 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x | ||
134 | case x of | ||
135 | Just u -> do | ||
136 | m <- timeout tcp_timeout $ atomically (putTMVar mvar $ Arrival addr u) | ||
137 | when (isNothing m) $ do | ||
138 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." | ||
139 | atomically $ tryTakeTMVar mvar | ||
140 | return () | ||
141 | loop | ||
142 | Nothing -> do | ||
143 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | ||
144 | do atomically $ modifyTVar' (lru tcpcache) | ||
145 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
146 | c <- atomically $ readTVar (lru tcpcache) | ||
147 | now <- getPOSIXTime | ||
148 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do | ||
149 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] | ||
150 | mreport <- timeout tcp_timeout $ threadReport False -- XXX: Paranoid timeout | ||
151 | case mreport of | ||
152 | Just treport -> dput XTCP treport | ||
153 | Nothing -> dput XTCP "TCP ERROR: threadReport timed out." | ||
154 | hClose h `catchIOError` \e -> return () | ||
155 | let v = TCPSession | ||
156 | { tcpHandle = h | ||
157 | , tcpState = st | ||
158 | , tcpThread = rthread | ||
159 | } | ||
160 | t <- getPOSIXTime | ||
161 | retires <- atomically $ do | ||
162 | c <- readTVar (lru tcpcache) | ||
163 | let (rs,c') = MM.takeView (tcpMax tcpcache) | ||
164 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | ||
165 | writeTVar (lru tcpcache) c' | ||
166 | writeTVar signal True | ||
167 | return rs | ||
168 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do | ||
169 | dput XTCP $ "TCP dropped: " ++ show k | ||
170 | killSession r | ||
171 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do | ||
172 | streamGoodbye st | ||
173 | hClose h | ||
174 | `catchIOError` \e -> return () | ||
175 | _ -> return () | ||
176 | |||
177 | return $ Just $ streamEncode st | ||
178 | when (isNothing ret) $ do | ||
179 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
180 | return ret | ||
181 | Just (tm, PendingTCPSession) | ||
182 | | not bDoCon -> return Nothing | ||
183 | | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do | ||
184 | c <- readTVar (lru tcpcache) | ||
185 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
186 | case v of | ||
187 | Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st | ||
188 | Nothing -> return Nothing | ||
189 | _ -> retry | ||
190 | Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st | ||
191 | |||
192 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () | ||
193 | closeAll tcpcache stream = do | ||
194 | dput XTCP "TCP.closeAll called." | ||
195 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty | ||
196 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | ||
197 | killSession r | ||
198 | case r of TCPSession{tcpState=st,tcpHandle=h} -> catchIOError (streamGoodbye st >> hClose h) | ||
199 | (\e -> return ()) | ||
200 | _ -> return () | ||
201 | |||
202 | -- Use a cache of TCP client connections for sending (and receiving) packets. | ||
203 | -- The boolean value prepended to the message allows the sender to specify | ||
204 | -- whether or not a new connection will be initiated if neccessary. If 'False' | ||
205 | -- is passed, then the packet will be sent only if there already exists a | ||
206 | -- connection. | ||
207 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | ||
208 | -> StreamHandshake addr x y | ||
209 | -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) | ||
210 | tcpTransport maxcon stream = do | ||
211 | msgvar <- atomically newEmptyTMVar | ||
212 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | ||
213 | return $ (,) tcpcache Transport | ||
214 | { awaitMessage = \f -> takeTMVar msgvar >>= \x -> return $ do | ||
215 | f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Terminated) | ||
216 | , sendMessage = \addr (bDoCon,y) -> do | ||
217 | void . forkLabeled "tcp-send" $ do | ||
218 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon | ||
219 | mapM_ ($ y) msock | ||
220 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e | ||
221 | , setActive = \case False -> closeAll tcpcache stream >> atomically (putTMVar msgvar Terminated) | ||
222 | True -> return () | ||
223 | } | ||
diff --git a/dht/src/Network/SocketLike.hs b/dht/src/Network/SocketLike.hs deleted file mode 100644 index 37891cfd..00000000 --- a/dht/src/Network/SocketLike.hs +++ /dev/null | |||
@@ -1,98 +0,0 @@ | |||
1 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
2 | {-# LANGUAGE TupleSections #-} | ||
3 | {-# LANGUAGE CPP #-} | ||
4 | -- | | ||
5 | -- | ||
6 | -- A socket could be used indirectly via a 'System.IO.Handle' or a conduit from | ||
7 | -- Michael Snoyman's conduit package. But doing so presents an encapsulation | ||
8 | -- problem. Do we allow access to the underlying socket and trust that it wont | ||
9 | -- be used in an unsafe way? Or do we protect it at the higher level and deny | ||
10 | -- access to various state information? | ||
11 | -- | ||
12 | -- The 'SocketLike' class enables the approach that provides a safe wrapper to | ||
13 | -- the underlying socket and gives access to various state information without | ||
14 | -- enabling direct reads or writes. | ||
15 | module Network.SocketLike | ||
16 | ( SocketLike(..) | ||
17 | , RestrictedSocket | ||
18 | , restrictSocket | ||
19 | , restrictHandleSocket | ||
20 | -- * Re-exports | ||
21 | -- | ||
22 | -- | To make the 'SocketLike' methods less awkward to use, the types | ||
23 | -- 'CUInt', 'SockAddr', and 'PortNumber' are re-exported. | ||
24 | , CUInt | ||
25 | , PortNumber | ||
26 | , SockAddr(..) | ||
27 | ) where | ||
28 | |||
29 | import Network.Socket | ||
30 | ( PortNumber | ||
31 | , SockAddr | ||
32 | ) | ||
33 | import Foreign.C.Types ( CUInt ) | ||
34 | |||
35 | import qualified Network.Socket as NS | ||
36 | import System.IO (Handle,hClose,hIsOpen) | ||
37 | import Control.Arrow | ||
38 | |||
39 | -- | A safe (mostly read-only) interface to a 'NS.Socket'. Note that despite | ||
40 | -- how this class is named, it provides no access to typical 'NS.Socket' uses | ||
41 | -- like sending or receiving network packets. | ||
42 | class SocketLike sock where | ||
43 | -- | See 'NS.getSocketName' | ||
44 | getSocketName :: sock -> IO SockAddr | ||
45 | -- | See 'NS.getPeerName' | ||
46 | getPeerName :: sock -> IO SockAddr | ||
47 | -- | See 'NS.getPeerCred' | ||
48 | -- getPeerCred :: sock -> IO (CUInt, CUInt, CUInt) | ||
49 | |||
50 | -- | Is the socket still valid? Connected | ||
51 | -- | ||
52 | -- In order to give the instance writer | ||
53 | -- the option to do book-keeping in a pure | ||
54 | -- type, a conceptually modified version of | ||
55 | -- the 'SocketLike' is returned. | ||
56 | -- | ||
57 | isValidSocket :: sock -> IO (sock,Bool) | ||
58 | |||
59 | |||
60 | instance SocketLike NS.Socket where | ||
61 | getSocketName = NS.getSocketName | ||
62 | getPeerName = NS.getPeerName | ||
63 | -- getPeerCred = NS.getPeerCred | ||
64 | #if MIN_VERSION_network(3,1,0) | ||
65 | isValidSocket s = (s,) <$> NS.withFdSocket s (return . (/= (-1))) | ||
66 | #else | ||
67 | #if MIN_VERSION_network(3,0,0) | ||
68 | isValidSocket s = (s,) . (/= (-1)) <$> NS.fdSocket s | ||
69 | #else | ||
70 | #if MIN_VERSION_network(2,4,0) | ||
71 | isValidSocket s = (s,) <$> NS.isConnected s -- warning: this is always False if the socket | ||
72 | -- was converted to a Handle | ||
73 | #else | ||
74 | isValidSocket s = (s,) <$> NS.sIsConnected s -- warning: this is always False if the socket | ||
75 | -- was converted to a Handle | ||
76 | #endif | ||
77 | #endif | ||
78 | #endif | ||
79 | |||
80 | -- | An encapsulated socket. Data reads and writes are not possible. | ||
81 | data RestrictedSocket = Restricted (Maybe Handle) NS.Socket deriving Show | ||
82 | |||
83 | instance SocketLike RestrictedSocket where | ||
84 | getSocketName (Restricted mb sock) = NS.getSocketName sock | ||
85 | getPeerName (Restricted mb sock) = NS.getPeerName sock | ||
86 | -- getPeerCred (Restricted mb sock) = NS.getPeerCred sock | ||
87 | isValidSocket rs@(Restricted mb sock) = maybe (first (Restricted mb) <$> isValidSocket sock) (((rs,) <$>) . hIsOpen) mb | ||
88 | |||
89 | -- | Create a 'RestrictedSocket' that explicitly disallows sending or | ||
90 | -- receiving data. | ||
91 | restrictSocket :: NS.Socket -> RestrictedSocket | ||
92 | restrictSocket socket = Restricted Nothing socket | ||
93 | |||
94 | -- | Build a 'RestrictedSocket' for which 'sClose' will close the given | ||
95 | -- 'Handle'. It is intended that this 'Handle' was obtained via | ||
96 | -- 'NS.socketToHandle'. | ||
97 | restrictHandleSocket :: Handle -> NS.Socket -> RestrictedSocket | ||
98 | restrictHandleSocket h socket = Restricted (Just h) socket | ||
diff --git a/dht/src/Network/StreamServer.hs b/dht/src/Network/StreamServer.hs deleted file mode 100644 index 1da612ce..00000000 --- a/dht/src/Network/StreamServer.hs +++ /dev/null | |||
@@ -1,167 +0,0 @@ | |||
1 | -- | This module implements a bare-bones TCP or Unix socket server. | ||
2 | {-# LANGUAGE CPP #-} | ||
3 | {-# LANGUAGE TypeFamilies #-} | ||
4 | {-# LANGUAGE TypeOperators #-} | ||
5 | {-# LANGUAGE OverloadedStrings #-} | ||
6 | {-# LANGUAGE RankNTypes #-} | ||
7 | module Network.StreamServer | ||
8 | ( streamServer | ||
9 | , ServerHandle | ||
10 | , getAcceptLoopThreadId | ||
11 | , ServerConfig(..) | ||
12 | , withSession | ||
13 | , quitListening | ||
14 | --, dummyServerHandle | ||
15 | , listenSocket | ||
16 | , Local(..) | ||
17 | , Remote(..) | ||
18 | ) where | ||
19 | |||
20 | import Data.Monoid | ||
21 | import Network.Socket as Socket | ||
22 | import System.Directory (removeFile) | ||
23 | import System.IO | ||
24 | ( IOMode(..) | ||
25 | , stderr | ||
26 | , hFlush | ||
27 | ) | ||
28 | import Control.Monad | ||
29 | import Control.Monad.Fix (fix) | ||
30 | #ifdef THREAD_DEBUG | ||
31 | import Control.Concurrent.Lifted.Instrument | ||
32 | ( forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId | ||
33 | , killThread ) | ||
34 | #else | ||
35 | import GHC.Conc (labelThread) | ||
36 | import Control.Concurrent | ||
37 | ( forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId | ||
38 | , killThread ) | ||
39 | #endif | ||
40 | import Control.Exception (handle,finally) | ||
41 | import System.IO.Error (tryIOError) | ||
42 | import System.Mem.Weak | ||
43 | import System.IO.Error | ||
44 | |||
45 | -- import Data.Conduit | ||
46 | import System.IO (Handle) | ||
47 | import Control.Concurrent.MVar (newMVar) | ||
48 | |||
49 | import Network.SocketLike | ||
50 | import DPut | ||
51 | import DebugTag | ||
52 | |||
53 | data ServerHandle = ServerHandle Socket (Weak ThreadId) | ||
54 | |||
55 | -- | Useful for testing. | ||
56 | getAcceptLoopThreadId :: ServerHandle -> IO (Weak ThreadId) | ||
57 | getAcceptLoopThreadId (ServerHandle _ t) = return t | ||
58 | |||
59 | listenSocket :: ServerHandle -> RestrictedSocket | ||
60 | listenSocket (ServerHandle sock _) = restrictSocket sock | ||
61 | |||
62 | {- // Removed, bit-rotted and there are no call sites | ||
63 | -- | Create a useless do-nothing 'ServerHandle'. | ||
64 | dummyServerHandle :: IO ServerHandle | ||
65 | dummyServerHandle = do | ||
66 | mvar <- newMVar Closed | ||
67 | let sock = MkSocket 0 AF_UNSPEC NoSocketType 0 mvar | ||
68 | thread <- mkWeakThreadId <=< forkIO $ return () | ||
69 | return (ServerHandle sock thread) | ||
70 | -} | ||
71 | |||
72 | removeSocketFile :: SockAddr -> IO () | ||
73 | removeSocketFile (SockAddrUnix fname) = removeFile fname | ||
74 | removeSocketFile _ = return () | ||
75 | |||
76 | -- | Terminate the server accept-loop. Call this to shut down the server. | ||
77 | quitListening :: ServerHandle -> IO () | ||
78 | quitListening (ServerHandle socket acceptThread) = | ||
79 | finally (Socket.getSocketName socket >>= removeSocketFile) | ||
80 | (do mapM_ killThread =<< deRefWeak acceptThread | ||
81 | Socket.close socket) | ||
82 | |||
83 | |||
84 | -- | It's 'bshow' instead of 'show' to enable swapping in a 'ByteString' | ||
85 | -- variation. (This is not exported.) | ||
86 | bshow :: Show a => a -> String | ||
87 | bshow e = show e | ||
88 | |||
89 | -- | Send a string to stderr. Not exported. Default 'serverWarn' when | ||
90 | -- 'withSession' is used to configure the server. | ||
91 | warnStderr :: String -> IO () | ||
92 | warnStderr str = dput XMisc str >> hFlush stderr | ||
93 | |||
94 | newtype Local a = Local a deriving (Eq,Ord,Show) | ||
95 | newtype Remote a = Remote a deriving (Eq,Ord,Show) | ||
96 | |||
97 | data ServerConfig = ServerConfig | ||
98 | { serverWarn :: String -> IO () | ||
99 | -- ^ Action to report warnings and errors. | ||
100 | , serverSession :: ( RestrictedSocket, (Local SockAddr, Remote SockAddr)) -> Int -> Handle -> IO () | ||
101 | -- ^ Action to handle interaction with a client | ||
102 | } | ||
103 | |||
104 | -- | Initialize a 'ServerConfig' using the provided session handler. | ||
105 | withSession :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> Int -> Handle -> IO ()) -> ServerConfig | ||
106 | withSession session = ServerConfig warnStderr session | ||
107 | |||
108 | -- | Launch a thread to listen at the given bind address and dispatch | ||
109 | -- to session handler threads on every incoming connection. Supports | ||
110 | -- IPv4 and IPv6, TCP and unix sockets. | ||
111 | -- | ||
112 | -- The returned handle can be used with 'quitListening' to terminate the | ||
113 | -- thread and prevent any new sessions from starting. Currently active | ||
114 | -- session threads will not be terminated or signaled in any way. | ||
115 | streamServer :: ServerConfig -> [SockAddr] -> IO ServerHandle | ||
116 | streamServer cfg addrs = do | ||
117 | let warn = serverWarn cfg | ||
118 | family = case addrs of | ||
119 | SockAddrInet {}:_ -> AF_INET | ||
120 | SockAddrInet6 {}:_ -> AF_INET6 | ||
121 | SockAddrUnix {}:_ -> AF_UNIX | ||
122 | [] -> AF_INET6 | ||
123 | sock <- socket family Stream 0 | ||
124 | setSocketOption sock ReuseAddr 1 | ||
125 | let tryBind addr next _ = do | ||
126 | tryIOError (removeSocketFile addr) | ||
127 | bind sock addr | ||
128 | `catchIOError` \e -> next (Just e) | ||
129 | fix $ \loop -> let again mbe = do | ||
130 | forM_ mbe $ \e -> warn $ "bind-error: " <> bshow addrs <> " " <> bshow e | ||
131 | threadDelay 5000000 | ||
132 | loop | ||
133 | in foldr tryBind again addrs Nothing | ||
134 | listen sock maxListenQueue | ||
135 | thread <- mkWeakThreadId <=< forkIO $ do | ||
136 | bindaddr <- Socket.getSocketName sock | ||
137 | myThreadId >>= flip labelThread ("StreamServer.acceptLoop." <> bshow bindaddr) | ||
138 | acceptLoop cfg sock 0 | ||
139 | return (ServerHandle sock thread) | ||
140 | |||
141 | -- | Not exported. This, combined with 'acceptException' form a mutually | ||
142 | -- recursive loop that handles incoming connections. To quit the loop, the | ||
143 | -- socket must be closed by 'quitListening'. | ||
144 | acceptLoop :: ServerConfig -> Socket -> Int -> IO () | ||
145 | acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do | ||
146 | (con,raddr) <- accept sock | ||
147 | let conkey = n + 1 | ||
148 | laddr <- Socket.getSocketName con | ||
149 | h <- socketToHandle con ReadWriteMode | ||
150 | forkIO $ do | ||
151 | myThreadId >>= flip labelThread "StreamServer.session" | ||
152 | serverSession cfg (restrictHandleSocket h con, (Local laddr, Remote raddr)) conkey h | ||
153 | acceptLoop cfg sock (n + 1) | ||
154 | |||
155 | acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () | ||
156 | acceptException cfg n sock ioerror = do | ||
157 | case show (ioeGetErrorType ioerror) of | ||
158 | "resource exhausted" -> do -- try again (ioeGetErrorType ioerror == fullErrorType) | ||
159 | serverWarn cfg $ ("acceptLoop: resource exhasted") | ||
160 | threadDelay 500000 | ||
161 | acceptLoop cfg sock (n + 1) | ||
162 | "invalid argument" -> do -- quit on closed socket | ||
163 | Socket.close sock | ||
164 | message -> do -- unexpected exception | ||
165 | serverWarn cfg $ ("acceptLoop: "<>bshow message) | ||
166 | Socket.close sock | ||
167 | |||