summaryrefslogtreecommitdiff
path: root/dht
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-03 15:35:23 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-03 17:26:06 -0500
commit31b799222cb76cd0002d9a3cc5b340a7b6fed139 (patch)
tree8b834e455529fb270375e4967d1acad56553544f /dht
parent1e03ed3670a8386ede93a09fa0c67785e7da6478 (diff)
server library.
Diffstat (limited to 'dht')
-rw-r--r--dht/Connection.hs135
-rw-r--r--dht/Connection/Tcp.hs824
-rw-r--r--dht/Presence/ControlMaybe.hs64
-rw-r--r--dht/Presence/DNSCache.hs285
-rw-r--r--dht/Presence/GetHostByAddr.hs77
-rw-r--r--dht/Presence/SockAddr.hs14
-rw-r--r--dht/dht-client.cabal20
-rw-r--r--dht/src/Control/Concurrent/Delay.hs49
-rw-r--r--dht/src/Control/Concurrent/PingMachine.hs161
-rw-r--r--dht/src/Control/Concurrent/ThreadUtil.hs31
-rw-r--r--dht/src/Data/TableMethods.hs105
-rw-r--r--dht/src/DebugUtil.hs42
-rw-r--r--dht/src/Network/QueryResponse.hs716
-rw-r--r--dht/src/Network/QueryResponse/TCP.hs223
-rw-r--r--dht/src/Network/SocketLike.hs98
-rw-r--r--dht/src/Network/StreamServer.hs167
-rw-r--r--dht/stack.yaml1
17 files changed, 5 insertions, 3007 deletions
diff --git a/dht/Connection.hs b/dht/Connection.hs
deleted file mode 100644
index ea86f4bb..00000000
--- a/dht/Connection.hs
+++ /dev/null
@@ -1,135 +0,0 @@
1{-# LANGUAGE DeriveFunctor #-}
2{-# LANGUAGE LambdaCase #-}
3module Connection where
4
5import Control.Applicative
6import Control.Arrow
7import Control.Concurrent.STM
8import Data.Bits
9import Data.Word
10import qualified Data.Map as Map
11 ;import Data.Map (Map)
12import Network.Socket (SockAddr(..))
13
14import Control.Concurrent.PingMachine
15
16-- | This type indicates the current status of a connection. The type
17-- parameter indicates protocol-specific status information. To present
18-- information as a user-comprehensible string, use 'showStatus'.
19data Status status
20 = Dormant
21 | InProgress status
22 | Established
23 deriving (Show,Eq,Ord,Functor)
24
25-- | A policy indicates a desired connection status.
26data Policy
27 = RefusingToConnect -- ^ We desire no connection.
28 | OpenToConnect -- ^ We will cooperate if a remote side initiates.
29 | TryingToConnect -- ^ We desire to be connected.
30 deriving (Eq,Ord,Show)
31
32-- | Information obtained via the 'connectionStatus' interface to
33-- 'Manager'.
34data Connection status = Connection
35 { connStatus :: Status status
36 , connPolicy :: Policy
37 }
38 deriving Functor
39
40-- | A 'PeerAddress' identifies an active session. For inactive sessions, multiple
41-- values may be feasible.
42
43-- We use a 'SockAddr' as it is convenient for TCP and UDP connections. But if
44-- that is not your use case, see 'uniqueAsKey'.
45newtype PeerAddress = PeerAddress { peerAddress :: SockAddr }
46 deriving (Eq,Ord,Show)
47
48-- | A 24-byte word.
49data Uniq24 = Uniq24 !Word64 !Word64 !Word64
50 deriving (Eq,Ord,Show)
51
52-- | Coerce a 'Uniq24' to a useable 'PeerAddress'. Note that this stores the
53-- special value 0 into the port number of the underlying 'SockAddr' and thus
54-- should be compatible for mixing together with TCP/UDP peers.
55uniqueAsKey :: Uniq24 -> PeerAddress
56uniqueAsKey (Uniq24 x y z) = PeerAddress $ SockAddrInet6 (fromIntegral 0) a bcde f
57 where
58 a = fromIntegral (x `shiftR` 32)
59 b = fromIntegral x
60 c = fromIntegral (y `shiftR` 32)
61 d = fromIntegral y
62 e = fromIntegral (z `shiftR` 32)
63 f = fromIntegral z
64 bcde = (b,c,d,e)
65
66-- | Inverse of 'uniqueAsKey'
67keyAsUnique :: PeerAddress -> Maybe Uniq24
68keyAsUnique (PeerAddress (SockAddrInet6 0 a bcde f)) = Just $ Uniq24 x y z
69 where
70 (b,c,d,e) = bcde
71 x = (fromIntegral a `shiftL` 32) .|. fromIntegral b
72 y = (fromIntegral c `shiftL` 32) .|. fromIntegral d
73 z = (fromIntegral e `shiftL` 32) .|. fromIntegral f
74keyAsUniq _ = Nothing
75
76
77-- | This is an interface to make or query status information about connections
78-- of a specific kind.
79--
80-- Type parameters:
81--
82-- /k/ names a connection. It should implement Ord, and can be parsed and
83-- displayed using 'stringToKey' and 'showKey'.
84--
85-- /status/ indicates the progress of a connection. It is intended as a
86-- parameter to the 'InProgress' constructor of 'Status'.
87--
88data Manager status k = Manager
89 { -- | Connect or disconnect a connection.
90 setPolicy :: k -> Policy -> IO ()
91 -- | Lookup a connection status.
92 , status :: k -> STM (Connection status)
93 -- | Obtain a list of all known connections.
94 , connections :: STM [k]
95 -- | Parse a connection key out of a string. Inverse of 'showKey'.
96 , stringToKey :: String -> Maybe k
97 -- | Convert a progress value to a string.
98 , showProgress :: status -> String
99 -- | Show a connection key as a string.
100 , showKey :: k -> String
101 -- | Obtain an address from a human-friendly name. For TCP/UDP
102 -- connections, this might be a forward-resolving DNS query.
103 , resolvePeer :: k -> IO [PeerAddress]
104 -- | This is the reverse of 'resolvePeer'. For TCP/UDP connections, this
105 -- might be a reverse-resolve DNS query.
106 , reverseAddress :: PeerAddress -> IO [k]
107 }
108
109-- | Present status information (visible in a UI) for a connection.
110showStatus :: Manager status k -> Status status -> String
111showStatus mgr Dormant = "dormant"
112showStatus mgr Established = "established"
113showStatus mgr (InProgress s) = "in progress ("++showProgress mgr s++")"
114
115
116-- | Combine two different species of 'Manager' into a single interface using
117-- 'Either' to combine key and status types.
118addManagers :: (Ord kA, Ord kB) =>
119 Manager statusA kA
120 -> Manager statusB kB
121 -> Manager (Either statusA statusB) (Either kA kB)
122addManagers mgrA mgrB = Manager
123 { setPolicy = either (setPolicy mgrA) (setPolicy mgrB)
124 , status = \case
125 Left k -> fmap Left <$> status mgrA k
126 Right k -> fmap Right <$> status mgrB k
127 , connections = do
128 as <- connections mgrA
129 bs <- connections mgrB
130 return $ map Left as ++ map Right bs
131 , stringToKey = \str -> Left <$> stringToKey mgrA str
132 <|> Right <$> stringToKey mgrB str
133 , showProgress = either (showProgress mgrA) (showProgress mgrB)
134 , showKey = either (showKey mgrA) (showKey mgrB)
135 }
diff --git a/dht/Connection/Tcp.hs b/dht/Connection/Tcp.hs
deleted file mode 100644
index 4d50d47f..00000000
--- a/dht/Connection/Tcp.hs
+++ /dev/null
@@ -1,824 +0,0 @@
1{-# OPTIONS_HADDOCK prune #-}
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE DoAndIfThenElse #-}
4{-# LANGUAGE FlexibleInstances #-}
5{-# LANGUAGE OverloadedStrings #-}
6{-# LANGUAGE RankNTypes #-}
7{-# LANGUAGE StandaloneDeriving #-}
8{-# LANGUAGE TupleSections #-}
9{-# LANGUAGE LambdaCase #-}
10-----------------------------------------------------------------------------
11-- |
12-- Module : Connection.Tcp
13--
14-- Maintainer : joe@jerkface.net
15-- Stability : experimental
16--
17-- A TCP client/server library.
18--
19-- TODO:
20--
21-- * interface tweaks
22--
23module Connection.Tcp
24 ( module Connection.Tcp
25 , module Control.Concurrent.PingMachine ) where
26
27import Data.ByteString (ByteString,hGetNonBlocking)
28import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack)
29import Data.Conduit ( ConduitT, Void, Flush )
30#if MIN_VERSION_containers(0,5,0)
31import qualified Data.Map.Strict as Map
32import Data.Map.Strict (Map)
33#else
34import qualified Data.Map as Map
35import Data.Map (Map)
36#endif
37import Data.Monoid ( (<>) )
38import Control.Concurrent.ThreadUtil
39
40import Control.Arrow
41import Control.Concurrent.STM
42-- import Control.Concurrent.STM.TMVar
43-- import Control.Concurrent.STM.TChan
44-- import Control.Concurrent.STM.Delay
45import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..),onException)
46import Control.Monad
47import Control.Monad.Fix
48-- import Control.Monad.STM
49-- import Control.Monad.Trans.Resource
50import Control.Monad.IO.Class (MonadIO (liftIO))
51import Data.Maybe
52import System.IO.Error (isDoesNotExistError)
53import System.IO
54 ( IOMode(..)
55 , hSetBuffering
56 , BufferMode(..)
57 , hWaitForInput
58 , hClose
59 , hIsEOF
60 , Handle
61 )
62import Network.Socket as Socket
63import Network.BSD
64 ( getProtocolNumber
65 )
66import Debug.Trace
67import Data.Time.Clock (getCurrentTime,diffUTCTime)
68-- import SockAddr ()
69-- import System.Locale (defaultTimeLocale)
70
71import qualified Data.Text as Text
72 ;import Data.Text (Text)
73import DNSCache
74import Control.Concurrent.Delay
75import Control.Concurrent.PingMachine
76import Network.StreamServer
77import Network.SocketLike hiding (sClose)
78import qualified Connection as G
79 ;import Connection (Manager (..), PeerAddress (..), Policy (..))
80import Network.Address (localhost4)
81import DPut
82import DebugTag
83
84
85type Microseconds = Int
86
87-- | This object is passed with the 'Listen' and 'Connect'
88-- instructions in order to control the behavior of the
89-- connections that are established. It is parameterized
90-- by a user-suplied type @conkey@ that is used as a lookup
91-- key for connections.
92data ConnectionParameters conkey u =
93 ConnectionParameters
94 { pingInterval :: PingInterval
95 -- ^ The miliseconds of idle to allow before a 'RequiresPing'
96 -- event is signaled.
97 , timeout :: TimeOut
98 -- ^ The miliseconds of idle after 'RequiresPing' is signaled
99 -- that are necessary for the connection to be considered
100 -- lost and signalling 'EOF'.
101 , makeConnKey :: (RestrictedSocket,(Local SockAddr, Remote SockAddr)) -> IO (conkey,u)
102 -- ^ This action creates a lookup key for a new connection. If 'duplex'
103 -- is 'True' and the result is already assocatied with an established
104 -- connection, then an 'EOF' will be forced before the the new
105 -- connection becomes active.
106 --
107 , duplex :: Bool
108 -- ^ If True, then the connection will be treated as a normal
109 -- two-way socket. Otherwise, a readable socket is established
110 -- with 'Listen' and a writable socket is established with
111 -- 'Connect' and they are associated when 'makeConnKey' yields
112 -- same value for each.
113 }
114
115-- | Use this function to select appropriate default values for
116-- 'ConnectionParameters' other than 'makeConnKey'.
117--
118-- Current defaults:
119--
120-- * 'pingInterval' = 28000
121--
122-- * 'timeout' = 2000
123--
124-- * 'duplex' = True
125--
126connectionDefaults
127 :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> IO (conkey,u)) -> ConnectionParameters conkey u
128connectionDefaults f = ConnectionParameters
129 { pingInterval = 28000
130 , timeout = 2000
131 , makeConnKey = f
132 , duplex = True
133 }
134
135-- | Instructions for a 'Server' object
136--
137-- To issue a command, put it into the 'serverCommand' TMVar.
138data ServerInstruction conkey u
139 = Quit
140 -- ^ kill the server. This command is automatically issued when
141 -- the server is released.
142 | Listen SockAddr (ConnectionParameters conkey u)
143 -- ^ listen for incoming connections on the given bind address.
144 | Connect SockAddr (ConnectionParameters conkey u)
145 -- ^ connect to addresses
146 | ConnectWithEndlessRetry SockAddr
147 (ConnectionParameters conkey u)
148 Miliseconds
149 -- ^ keep retrying the connection
150 | Ignore SockAddr
151 -- ^ stop listening on specified bind address
152 | Send conkey ByteString
153 -- ^ send bytes to an established connection
154
155#ifdef TEST
156deriving instance Show conkey => Show (ServerInstruction conkey u)
157instance Show (a -> b) where show _ = "<function>"
158deriving instance Show conkey => Show (ConnectionParameters conkey u)
159#endif
160
161-- | This type specifies which which half of a half-duplex
162-- connection is of interest.
163data InOrOut = In | Out
164 deriving (Enum,Eq,Ord,Show,Read)
165
166-- | These events may be read from 'serverEvent' TChannel.
167--
168data ConnectionEvent b
169 = Connection (STM Bool) (ConduitT () b IO ()) (ConduitT (Flush b) Void IO ())
170 -- ^ A new connection was established
171 | ConnectFailure SockAddr
172 -- ^ A 'Connect' command failed.
173 | HalfConnection InOrOut
174 -- ^ Half of a half-duplex connection is avaliable.
175 | EOF
176 -- ^ A connection was terminated
177 | RequiresPing
178 -- ^ 'pingInterval' miliseconds of idle was experienced
179
180#ifdef TEST
181instance Show (IO a) where show _ = "<IO action>"
182instance Show (STM a) where show _ = "<STM action>"
183instance Eq (ByteString -> IO Bool) where (==) _ _ = True
184instance Eq (IO (Maybe ByteString)) where (==) _ _ = True
185instance Eq (STM Bool) where (==) _ _ = True
186deriving instance Show b => Show (ConnectionEvent b)
187deriving instance Eq b => Eq (ConnectionEvent b)
188#endif
189
190-- | This is the per-connection state.
191data ConnectionRecord u
192 = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler
193 , cstate :: ConnectionState -- ^ used to send/receive data to the connection
194 , cdata :: u -- ^ user data, stored in the connection map for convenience
195 }
196
197-- | This object accepts commands and signals events and maintains
198-- the list of currently listening ports and established connections.
199data Server a u releaseKey b
200 = Server { serverCommand :: TMVar (ServerInstruction a u)
201 , serverEvent :: TChan ((a,u), ConnectionEvent b)
202 , serverReleaseKey :: releaseKey
203 , conmap :: TVar (Map a (ConnectionRecord u))
204 , listenmap :: TVar (Map SockAddr ServerHandle)
205 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay))
206 }
207
208control :: Server a u releaseKey b -> ServerInstruction a u -> IO ()
209control sv = atomically . putTMVar (serverCommand sv)
210
211type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b)
212
213noCleanUp :: MonadIO m => Allocate () m
214noCleanUp io _ = ( (,) () ) `liftM` liftIO io
215
216-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
217-- to ensure proper cleanup. For example,
218--
219-- > import Connection.Tcp
220-- > import Control.Monad.Trans.Resource (runResourceT)
221-- > import Control.Monad.IO.Class (liftIO)
222-- > import Control.Monad.STM (atomically)
223-- > import Control.Concurrent.STM.TMVar (putTMVar)
224-- > import Control.Concurrent.STM.TChan (readTChan)
225-- >
226-- > main = runResourceT $ do
227-- > sv <- server allocate
228-- > let params = connectionDefaults (return . snd)
229-- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params)
230-- > let loop = do
231-- > (_,event) <- atomically $ readTChan (serverEvent sv)
232-- > case event of
233-- > Connection getPingFlag readData writeData -> do
234-- > forkIO $ do
235-- > fix $ \readLoop -> do
236-- > readData >>= mapM $ \bytes ->
237-- > putStrLn $ "got: " ++ show bytes
238-- > readLoop
239-- > case event of EOF -> return ()
240-- > _ -> loop
241-- > liftIO loop
242--
243-- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp'
244-- to do without automatic cleanup and be sure to remember to write 'Quit' to
245-- the 'serverCommand' variable.
246server ::
247 -- forall (m :: * -> *) a u conkey releaseKey.
248 (Show conkey, MonadIO m, Ord conkey) =>
249 Allocate releaseKey m
250 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) )
251 -> m (Server conkey u releaseKey x)
252server allocate sessionConduits = do
253 (key,cmds) <- allocate (atomically newEmptyTMVar)
254 (atomically . flip putTMVar Quit)
255 server <- liftIO . atomically $ do
256 tchan <- newTChan
257 conmap <- newTVar Map.empty
258 listenmap<- newTVar Map.empty
259 retrymap <- newTVar Map.empty
260 return Server { serverCommand = cmds
261 , serverEvent = tchan
262 , serverReleaseKey = key
263 , conmap = conmap
264 , listenmap = listenmap
265 , retrymap = retrymap
266 }
267 liftIO $ do
268 forkLabeled "server" $ fix $ \loop -> do
269 instr <- atomically $ takeTMVar cmds
270 -- warn $ "instr = " <> bshow instr
271 let again = do doit server instr
272 -- warn $ "finished " <> bshow instr
273 loop
274 case instr of Quit -> closeAll server
275 _ -> again
276 return server
277 where
278 closeAll server = do
279 listening <- atomically . readTVar $ listenmap server
280 mapM_ quitListening (Map.elems listening)
281 let stopRetry (v,d) = do atomically $ writeTVar v False
282 interruptDelay d
283 retriers <- atomically $ do
284 rmap <- readTVar $ retrymap server
285 writeTVar (retrymap server) Map.empty
286 return rmap
287 mapM_ stopRetry (Map.elems retriers)
288 cons <- atomically . readTVar $ conmap server
289 atomically $ mapM_ (connClose . cstate) (Map.elems cons)
290 atomically $ mapM_ (connWait . cstate) (Map.elems cons)
291 atomically $ writeTVar (conmap server) Map.empty
292
293
294 doit server (Listen port params) = do
295
296 listening <- Map.member port
297 `fmap` atomically (readTVar $ listenmap server)
298 when (not listening) $ do
299
300 dput XMisc $ "Started listening on "++show port
301
302 sserv <- flip streamServer [port] ServerConfig
303 { serverWarn = dput XMisc
304 , serverSession = \sock _ h -> do
305 (conkey,u) <- makeConnKey params sock
306 _ <- newConnection server sessionConduits params conkey u h In
307 return ()
308 }
309
310 atomically $ listenmap server `modifyTVar'` Map.insert port sserv
311
312 doit server (Ignore port) = do
313 dput XMisc $ "Stopping listen on "++show port
314 mb <- atomically $ do
315 map <- readTVar $ listenmap server
316 modifyTVar' (listenmap server) $ Map.delete port
317 return $ Map.lookup port map
318 maybe (return ()) quitListening $ mb
319
320 doit server (Send con bs) = do -- . void . forkIO $ do
321 map <- atomically $ readTVar (conmap server)
322 let post False = (trace ("cant send: "++show bs) $ return ())
323 post True = return ()
324 maybe (post False)
325 (post <=< flip connWrite bs . cstate)
326 $ Map.lookup con map
327
328 doit server (Connect addr params) = join $ atomically $ do
329 Map.lookup addr <$> readTVar (retrymap server)
330 >>= return . \case
331 Nothing -> forkit
332 Just (v,d) -> do b <- atomically $ readTVar v
333 interruptDelay d
334 when (not b) forkit
335 where
336 forkit = void . forkLabeled ( "Connect." ++ show addr ) $ do
337 proto <- getProtocolNumber "tcp"
338 sock <- socket (socketFamily addr) Stream proto
339 handle (\e -> do -- let t = ioeGetErrorType e
340 when (isDoesNotExistError e) $ return () -- warn "GOTCHA"
341 -- warn $ "connect-error: " <> bshow e
342 (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) -- XXX: ?
343 Socket.close sock
344 atomically
345 $ writeTChan (serverEvent server)
346 $ ((conkey,u),ConnectFailure addr))
347 $ do
348 connect sock addr
349 laddr <- Socket.getSocketName sock
350 (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr))
351 h <- socketToHandle sock ReadWriteMode
352 newConnection server sessionConduits params conkey u h Out
353 return ()
354
355 doit server (ConnectWithEndlessRetry addr params interval) = do
356 proto <- getProtocolNumber "tcp"
357 void . forkLabeled ("ConnectWithEndlessRetry." ++ show addr) $ do
358 timer <- interruptibleDelay
359 (retryVar,action) <- atomically $ do
360 map <- readTVar (retrymap server)
361 action <- case Map.lookup addr map of
362 Nothing -> return $ return ()
363 Just (v,d) -> do writeTVar v False
364 return $ interruptDelay d
365 v <- newTVar True
366 writeTVar (retrymap server) $! Map.insert addr (v,timer) map
367 return (v,action :: IO ())
368 action
369 fix $ \retryLoop -> do
370 utc <- getCurrentTime
371 shouldRetry <- do
372 handle (\(SomeException e) -> do
373 -- Exceptions thrown by 'socket' need to be handled specially
374 -- since we don't have enough information to broadcast a ConnectFailure
375 -- on serverEvent.
376 warn $ "Failed to create socket: " <> bshow e
377 atomically $ readTVar retryVar) $ do
378 sock <- socket (socketFamily addr) Stream proto
379 handle (\(SomeException e) -> do
380 -- Any thing else goes wrong and we broadcast ConnectFailure.
381 do (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr))
382 Socket.close sock
383 atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr)
384 `onException` return ()
385 atomically $ readTVar retryVar) $ do
386 connect sock addr
387 laddr <- Socket.getSocketName sock
388 (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr))
389 h <- socketToHandle sock ReadWriteMode
390 threads <- newConnection server sessionConduits params conkey u h Out
391 atomically $ do threadsWait threads
392 readTVar retryVar
393 fin_utc <- getCurrentTime
394 when shouldRetry $ do
395 let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc)
396 expected = fromIntegral interval
397 when (shouldRetry && elapsed < expected) $ do
398 debugNoise $ "Waiting to retry " <> bshow addr
399 void $ startDelay timer (round $ 1000 * (expected-elapsed))
400 debugNoise $ "retry " <> bshow (shouldRetry,addr)
401 when shouldRetry $ retryLoop
402
403
404-- INTERNAL ----------------------------------------------------------
405
406{-
407hWriteUntilNothing h outs =
408 fix $ \loop -> do
409 mb <- atomically $ takeTMVar outs
410 case mb of Just bs -> do S.hPutStrLn h bs
411 warn $ "wrote " <> bs
412 loop
413 Nothing -> do warn $ "wrote Nothing"
414 hClose h
415
416-}
417connRead :: ConnectionState -> IO (Maybe ByteString)
418connRead (WriteOnlyConnection w) = do
419 -- atomically $ discardContents (threadsChannel w)
420 return Nothing
421connRead conn = do
422 c <- atomically $ getThreads
423 threadsRead c
424 where
425 getThreads =
426 case conn of SaneConnection c -> return c
427 ReadOnlyConnection c -> return c
428 ConnectionPair c w -> do
429 -- discardContents (threadsChannel w)
430 return c
431
432socketFamily :: SockAddr -> Family
433socketFamily (SockAddrInet _ _) = AF_INET
434socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
435socketFamily (SockAddrUnix _) = AF_UNIX
436
437
438conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) )
439 -> ConnectionState
440 -> ConnectionEvent x
441conevent sessionConduits con = Connection pingflag read write
442 where
443 pingflag = swapTVar (pingFlag (connPingTimer con)) False
444 (read,write) = sessionConduits (connRead con) (connWrite con)
445
446newConnection :: Ord a
447 => Server a u1 releaseKey x
448 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) )
449 -> ConnectionParameters conkey u
450 -> a
451 -> u1
452 -> Handle
453 -> InOrOut
454 -> IO ConnectionThreads
455newConnection server sessionConduits params conkey u h inout = do
456 hSetBuffering h NoBuffering
457 let (idle_ms,timeout_ms) =
458 case (inout,duplex params) of
459 (Out,False) -> ( 0, 0 )
460 _ -> ( pingInterval params
461 , timeout params )
462
463 new <- do pinglogic <- forkPingMachine "newConnection" idle_ms timeout_ms
464 connectionThreads h pinglogic
465 started <- atomically $ newEmptyTMVar
466 kontvar <- atomically newEmptyTMVar
467 -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ?
468 let _ = kontvar :: TMVar (STM (IO ()))
469 forkLabeled ("connecting...") $ do
470 getkont <- atomically $ takeTMVar kontvar
471 kont <- atomically getkont
472 kont
473
474 atomically $ do
475 current <- fmap (Map.lookup conkey) $ readTVar (conmap server)
476 case current of
477 Nothing -> do
478 (newCon,e) <- return $
479 if duplex params
480 then let newcon = SaneConnection new
481 in ( newcon, ((conkey,u), conevent sessionConduits newcon) )
482 else ( case inout of
483 In -> ReadOnlyConnection new
484 Out -> WriteOnlyConnection new
485 , ((conkey,u), HalfConnection inout) )
486 modifyTVar' (conmap server) $ Map.insert conkey
487 ConnectionRecord { ckont = kontvar
488 , cstate = newCon
489 , cdata = u }
490 announce e
491 putTMVar kontvar $ return $ do
492 myThreadId >>= flip labelThread ("connection."++show inout) -- XXX: more info would be nice.
493 atomically $ putTMVar started ()
494 -- Wait for something interesting.
495 handleEOF conkey u kontvar newCon
496 Just what@ConnectionRecord { ckont =mvar }-> do
497 putTMVar kontvar $ return $ return () -- Kill redundant "connecting..." thread.
498 putTMVar mvar $ do
499 -- The action returned by updateConMap, eventually invokes handleEOF,
500 -- so the sequencer thread will not be terminated.
501 kont <- updateConMap conkey u new what
502 putTMVar started ()
503 return kont
504 return new
505 where
506
507 announce e = writeTChan (serverEvent server) e
508
509 -- This function loops and will not quit unless an action is posted to the
510 -- mvar that does not in turn invoke this function, or if an EOF occurs.
511 handleEOF conkey u mvar newCon = do
512 action <- atomically . foldr1 orElse $
513 [ takeTMVar mvar >>= id -- passed continuation
514 , connWait newCon >> return eof
515 , connWaitPing newCon >>= return . sendPing
516 -- , pingWait pingTimer >>= return . sendPing
517 ]
518 action :: IO ()
519 where
520 eof = do
521 -- warn $ "EOF " <>bshow conkey
522 connCancelPing newCon
523 atomically $ do connFlush newCon
524 announce ((conkey,u),EOF)
525 modifyTVar' (conmap server)
526 $ Map.delete conkey
527 -- warn $ "fin-EOF "<>bshow conkey
528
529 sendPing PingTimeOut = do
530 {-
531 utc <- getCurrentTime
532 let utc' = formatTime defaultTimeLocale "%s" utc
533 warn $ "ping:TIMEOUT " <> bshow utc'
534 -}
535 atomically (connClose newCon)
536 eof
537
538 sendPing PingIdle = do
539 {-
540 utc <- getCurrentTime
541 let utc' = formatTime defaultTimeLocale "%s" utc
542 -- warn $ "ping:IDLE " <> bshow utc'
543 -}
544 atomically $ announce ((conkey,u),RequiresPing)
545 handleEOF conkey u mvar newCon
546
547
548 updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do
549 new' <-
550 if duplex params then do
551 announce ((conkey,u),EOF)
552 connClose replaced
553 let newcon = SaneConnection new
554 announce $ ((conkey,u),conevent sessionConduits newcon)
555 return $ newcon
556 else
557 case replaced of
558 WriteOnlyConnection w | inout==In ->
559 do let newcon = ConnectionPair new w
560 announce ((conkey,u),conevent sessionConduits newcon)
561 return newcon
562 ReadOnlyConnection r | inout==Out ->
563 do let newcon = ConnectionPair r new
564 announce ((conkey,u),conevent sessionConduits newcon)
565 return newcon
566 _ -> do -- connFlush todo
567 announce ((conkey,u0), EOF)
568 connClose replaced
569 announce ((conkey,u), HalfConnection inout)
570 return $ case inout of
571 In -> ReadOnlyConnection new
572 Out -> WriteOnlyConnection new
573 modifyTVar' (conmap server) $ Map.insert conkey
574 ConnectionRecord { ckont = mvar
575 , cstate = new'
576 , cdata = u }
577 return $ handleEOF conkey u mvar new'
578
579
580getPacket :: Handle -> IO ByteString
581getPacket h = do hWaitForInput h (-1)
582 hGetNonBlocking h 1024
583
584
585
586-- | 'ConnectionThreads' is an interface to a pair of threads
587-- that are reading and writing a 'Handle'.
588data ConnectionThreads = ConnectionThreads
589 { threadsWriter :: TMVar (Maybe ByteString)
590 , threadsChannel :: TChan ByteString
591 , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close
592 , threadsPing :: PingMachine
593 }
594
595-- | This spawns the reader and writer threads and returns a newly
596-- constructed 'ConnectionThreads' object.
597connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads
598connectionThreads h pinglogic = do
599
600 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar
601
602 (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan
603 readerThread <- forkLabeled "readerThread" $ do
604 let finished e = do
605 hClose h
606 -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e)
607 -- let _ = fmap ioeGetErrorType e -- type hint
608 let _ = fmap what e where what (SomeException _) = undefined
609 atomically $ do tryTakeTMVar outs
610 putTMVar outs Nothing -- quit writer
611 putTMVar doner ()
612 handle (finished . Just) $ do
613 pingBump pinglogic -- start the ping timer
614 fix $ \loop -> do
615 packet <- getPacket h
616 -- warn $ "read: " <> S.take 60 packet
617 atomically $ writeTChan incomming packet
618 pingBump pinglogic
619 -- warn $ "bumped: " <> S.take 60 packet
620 isEof <- hIsEOF h
621 if isEof then finished Nothing else loop
622
623 writerThread <- forkLabeled "writerThread" . fix $ \loop -> do
624 let finished = do -- warn $ "finished write"
625 -- hClose h -- quit reader
626 throwTo readerThread (ErrorCall "EOF")
627 atomically $ putTMVar donew ()
628 mb <- atomically $ readTMVar outs
629 case mb of Just bs -> handle (\(SomeException e)->finished)
630 (do -- warn $ "writing: " <> S.take 60 bs
631 S.hPutStr h bs
632 -- warn $ "wrote: " <> S.take 60 bs
633 atomically $ takeTMVar outs
634 loop)
635 Nothing -> finished
636
637 let wait = do readTMVar donew
638 readTMVar doner
639 return ()
640 return ConnectionThreads { threadsWriter = outs
641 , threadsChannel = incomming
642 , threadsWait = wait
643 , threadsPing = pinglogic }
644
645
646-- | 'threadsWrite' writes the given 'ByteString' to the
647-- 'ConnectionThreads' object. It blocks until the ByteString
648-- is written and 'True' is returned, or the connection is
649-- interrupted and 'False' is returned.
650threadsWrite :: ConnectionThreads -> ByteString -> IO Bool
651threadsWrite c bs = atomically $
652 orElse (const False `fmap` threadsWait c)
653 (const True `fmap` putTMVar (threadsWriter c) (Just bs))
654
655-- | 'threadsClose' signals for the 'ConnectionThreads' object
656-- to quit and close the associated 'Handle'. This operation
657-- is non-blocking, follow it with 'threadsWait' if you want
658-- to wait for the operation to complete.
659threadsClose :: ConnectionThreads -> STM ()
660threadsClose c = do
661 let mvar = threadsWriter c
662 v <- tryReadTMVar mvar
663 case v of
664 Just Nothing -> return () -- already closed
665 _ -> putTMVar mvar Nothing
666
667-- | 'threadsRead' blocks until a 'ByteString' is available which
668-- is returned to the caller, or the connection is interrupted and
669-- 'Nothing' is returned.
670threadsRead :: ConnectionThreads -> IO (Maybe ByteString)
671threadsRead c = atomically $
672 orElse (const Nothing `fmap` threadsWait c)
673 (Just `fmap` readTChan (threadsChannel c))
674
675-- | A 'ConnectionState' is an interface to a single 'ConnectionThreads'
676-- or to a pair of 'ConnectionThreads' objects that are considered as one
677-- connection.
678data ConnectionState =
679 SaneConnection ConnectionThreads
680 -- ^ ordinary read/write connection
681 | WriteOnlyConnection ConnectionThreads
682 | ReadOnlyConnection ConnectionThreads
683 | ConnectionPair ConnectionThreads ConnectionThreads
684 -- ^ Two 'ConnectionThreads' objects, read operations use the
685 -- first, write operations use the second.
686
687
688
689connWrite :: ConnectionState -> ByteString -> IO Bool
690connWrite (ReadOnlyConnection _) bs = return False
691connWrite conn bs = threadsWrite c bs
692 where
693 c = case conn of SaneConnection c -> c
694 WriteOnlyConnection c -> c
695 ConnectionPair _ c -> c
696
697
698mapConn :: Bool ->
699 (ConnectionThreads -> STM ()) -> ConnectionState -> STM ()
700mapConn both action c =
701 case c of
702 SaneConnection rw -> action rw
703 ReadOnlyConnection r -> action r
704 WriteOnlyConnection w -> action w
705 ConnectionPair r w -> do
706 rem <- orElse (const w `fmap` action r)
707 (const r `fmap` action w)
708 when both $ action rem
709
710connClose :: ConnectionState -> STM ()
711connClose c = mapConn True threadsClose c
712
713connWait :: ConnectionState -> STM ()
714connWait c = doit -- mapConn False threadsWait c
715 where
716 action = threadsWait
717 doit =
718 case c of
719 SaneConnection rw -> action rw
720 ReadOnlyConnection r -> action r
721 WriteOnlyConnection w -> action w
722 ConnectionPair r w -> do
723 rem <- orElse (const w `fmap` action r)
724 (const r `fmap` action w)
725 threadsClose rem
726
727connPingTimer :: ConnectionState -> PingMachine
728connPingTimer c =
729 case c of
730 SaneConnection rw -> threadsPing rw
731 ReadOnlyConnection r -> threadsPing r
732 WriteOnlyConnection w -> threadsPing w -- should be disabled.
733 ConnectionPair r w -> threadsPing r
734
735connCancelPing :: ConnectionState -> IO ()
736connCancelPing c = pingCancel (connPingTimer c)
737
738connWaitPing :: ConnectionState -> STM PingEvent
739connWaitPing c = pingWait (connPingTimer c)
740
741connFlush :: ConnectionState -> STM ()
742connFlush c =
743 case c of
744 SaneConnection rw -> waitChan rw
745 ReadOnlyConnection r -> waitChan r
746 WriteOnlyConnection w -> return ()
747 ConnectionPair r w -> waitChan r
748 where
749 waitChan t = do
750 b <- isEmptyTChan (threadsChannel t)
751 when (not b) retry
752
753bshow :: Show a => a -> ByteString
754bshow e = S.pack . show $ e
755
756warn :: ByteString -> IO ()
757warn str =dputB XMisc str
758
759debugNoise :: Monad m => t -> m ()
760debugNoise str = return ()
761
762data TCPStatus = Resolving | AwaitingRead | AwaitingWrite
763
764-- SockAddr -> (SockAddr, ConnectionParameters SockAddr ConnectionData, Miliseconds)
765
766
767tcpManager :: (PeerAddress -> (SockAddr, ConnectionParameters PeerAddress u, Miliseconds))
768 -- -> (String -> Maybe Text)
769 -- -> (Text -> IO (Maybe PeerAddress))
770 -> Server PeerAddress u releaseKey x
771 -> IO (Manager TCPStatus Text)
772tcpManager grokKey sv = do
773 rmap <- atomically $ newTVar Map.empty -- Map k (Maybe conkey)
774 nullping <- forkPingMachine "tcpManager" 0 0
775 (rslv,rev) <- do
776 dns <- newDNSCache
777 let rslv k = map PeerAddress <$> forwardResolve dns k
778 rev (PeerAddress addr) = reverseResolve dns addr
779 return (rslv,rev)
780 return Manager {
781 setPolicy = \k -> \case
782 TryingToConnect -> join $ atomically $ do
783 r <- readTVar rmap
784 case Map.lookup k r of
785 Just {} -> return $ return () -- Connection already in progress.
786 Nothing -> do
787 modifyTVar' rmap $ Map.insert k Nothing
788 return $ void $ forkLabeled ("resolve."++show k) $ do
789 mconkey <- listToMaybe <$> rslv k
790 case mconkey of
791 Nothing -> atomically $ modifyTVar' rmap $ Map.delete k
792 Just conkey -> do
793 control sv $ case grokKey conkey of
794 (saddr,params,ms) -> ConnectWithEndlessRetry saddr params ms
795 OpenToConnect -> dput XMisc "TODO: TCP OpenToConnect"
796 RefusingToConnect -> dput XMisc "TODO: TCP RefusingToConnect"
797 , status = \k -> do
798 c <- readTVar (conmap sv)
799 ck <- Map.lookup k <$> readTVar rmap
800 return $ exportConnection c (join ck)
801 , connections = Map.keys <$> readTVar rmap
802 , stringToKey = Just . Text.pack
803 , showProgress = \case
804 Resolving -> "resolving"
805 AwaitingRead -> "awaiting inbound"
806 AwaitingWrite -> "awaiting outbound"
807 , showKey = show
808 , resolvePeer = rslv
809 , reverseAddress = rev
810 }
811
812exportConnection :: Ord conkey => Map conkey (ConnectionRecord u) -> Maybe conkey -> G.Connection TCPStatus
813exportConnection conmap mkey = G.Connection
814 { G.connStatus = case mkey of
815 Nothing -> G.Dormant
816 Just conkey -> case Map.lookup conkey conmap of
817 Nothing -> G.InProgress Resolving
818 Just (ConnectionRecord ckont cstate cdata) -> case cstate of
819 SaneConnection {} -> G.Established
820 ConnectionPair {} -> G.Established
821 ReadOnlyConnection {} -> G.InProgress AwaitingWrite
822 WriteOnlyConnection {} -> G.InProgress AwaitingRead
823 , G.connPolicy = TryingToConnect
824 }
diff --git a/dht/Presence/ControlMaybe.hs b/dht/Presence/ControlMaybe.hs
deleted file mode 100644
index a101d667..00000000
--- a/dht/Presence/ControlMaybe.hs
+++ /dev/null
@@ -1,64 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE ScopedTypeVariables #-}
3module ControlMaybe
4 ( module ControlMaybe
5 , module Data.Functor
6 ) where
7
8-- import GHC.IO.Exception (IOException(..))
9import Control.Monad
10import Data.Functor
11import System.IO.Error
12
13
14-- forM_ with less polymorphism.
15withJust :: Monad m => Maybe x -> (x -> m ()) -> m ()
16withJust m f = forM_ m f
17{-# INLINE withJust #-}
18
19whenJust :: Monad m => m (Maybe x) -> (x -> m ()) -> m ()
20whenJust acn f = acn >>= mapM_ f
21{-# INLINE whenJust #-}
22
23
24catchIO_ :: IO a -> IO a -> IO a
25catchIO_ body catcher = catchIOError body (\_ -> catcher)
26{-# INLINE catchIO_ #-}
27
28handleIO_ :: IO a -> IO a -> IO a
29handleIO_ catcher body = catchIOError body (\_ -> catcher)
30{-# INLINE handleIO_ #-}
31
32
33handleIO :: (IOError -> IO a) -> IO a -> IO a
34handleIO catcher body = catchIOError body catcher
35{-# INLINE handleIO #-}
36
37#if !MIN_VERSION_base(4,11,0)
38-- | Flipped version of '<$>'.
39--
40-- @
41-- ('<&>') = 'flip' 'fmap'
42-- @
43--
44-- @since 4.11.0.0
45--
46-- ==== __Examples__
47-- Apply @(+1)@ to a list, a 'Data.Maybe.Just' and a 'Data.Either.Right':
48--
49-- >>> Just 2 <&> (+1)
50-- Just 3
51--
52-- >>> [1,2,3] <&> (+1)
53-- [2,3,4]
54--
55-- >>> Right 3 <&> (+1)
56-- Right 4
57--
58(<&>) :: Functor f => f a -> (a -> b) -> f b
59as <&> f = f <$> as
60
61infixl 1 <&>
62#endif
63
64
diff --git a/dht/Presence/DNSCache.hs b/dht/Presence/DNSCache.hs
deleted file mode 100644
index 9bb354e1..00000000
--- a/dht/Presence/DNSCache.hs
+++ /dev/null
@@ -1,285 +0,0 @@
1-- | Both 'getAddrInfo' and 'getHostByAddr' have hard-coded timeouts for
2-- waiting upon network queries that can be a little too long for some use
3-- cases. This module wraps both of them so that they block for at most one
4-- second. It caches late-arriving results so that they can be returned by
5-- repeated timed-out queries.
6--
7-- In order to achieve the shorter timeout, it is likely that the you will need
8-- to build with GHC's -threaded option. Otherwise, if the wrapped FFI calls
9-- to resolve the address will block Haskell threads. Note: I didn't verify
10-- this.
11{-# LANGUAGE TupleSections #-}
12{-# LANGUAGE RankNTypes #-}
13{-# LANGUAGE CPP #-}
14module DNSCache
15 ( DNSCache
16 , reverseResolve
17 , forwardResolve
18 , newDNSCache
19 , parseAddress
20 , unsafeParseAddress
21 , strip_brackets
22 , withPort
23 ) where
24
25import Control.Concurrent.ThreadUtil
26import Control.Arrow
27import Control.Concurrent.STM
28import Data.Text ( Text )
29import Network.Socket ( SockAddr(..), AddrInfoFlag(..), defaultHints, getAddrInfo, AddrInfo(..) )
30import Data.Time.Clock ( UTCTime, getCurrentTime, diffUTCTime )
31import System.IO.Error ( isDoesNotExistError )
32import System.Endian ( fromBE32, toBE32 )
33import Control.Exception ( handle )
34import Data.Map ( Map )
35import qualified Data.Map as Map
36import qualified Network.BSD as BSD
37import qualified Data.Text as Text
38import Control.Monad
39import Data.Function
40import Data.List
41import Data.Ord
42import Data.Maybe
43import System.IO.Error
44import System.IO.Unsafe
45
46import SockAddr ()
47import ControlMaybe ( handleIO_ )
48import GetHostByAddr ( getHostByAddr )
49import Control.Concurrent.Delay
50import DPut
51import DebugTag
52
53type TimeStamp = UTCTime
54
55data DNSCache =
56 DNSCache
57 { fcache :: TVar (Map Text [(TimeStamp, SockAddr)])
58 , rcache :: TVar (Map SockAddr [(TimeStamp, Text)])
59 }
60
61
62newDNSCache :: IO DNSCache
63newDNSCache = do
64 fcache <- newTVarIO Map.empty
65 rcache <- newTVarIO Map.empty
66 return DNSCache { fcache=fcache, rcache=rcache }
67
68updateCache :: Eq x =>
69 Bool -> TimeStamp -> [x] -> Maybe [(TimeStamp,x)] -> Maybe [(TimeStamp,x)]
70updateCache withScrub utc xs mys = do
71 let ys = maybe [] id mys
72 ys' = filter scrub ys
73 ys'' = map (utc,) xs ++ ys'
74 minute = 60
75 scrub (t,x) | withScrub && diffUTCTime utc t < minute = False
76 scrub (t,x) | x `elem` xs = False
77 scrub _ = True
78 guard $ not (null ys'')
79 return ys''
80
81dnsObserve :: DNSCache -> Bool -> TimeStamp -> [(Text,SockAddr)] -> STM ()
82dnsObserve dns withScrub utc obs = do
83 f <- readTVar $ fcache dns
84 r <- readTVar $ rcache dns
85 let obs' = map (\(n,a)->(n,a `withPort` 0)) obs
86 gs = do
87 g <- groupBy ((==) `on` fst) $ sortBy (comparing fst) obs'
88 (n,_) <- take 1 g
89 return (n,map snd g)
90 f' = foldl' updatef f gs
91 hs = do
92 h <- groupBy ((==) `on` snd) $ sortBy (comparing snd) obs'
93 (_,a) <- take 1 h
94 return (a,map fst h)
95 r' = foldl' updater r hs
96 writeTVar (fcache dns) f'
97 writeTVar (rcache dns) r'
98 where
99 updatef f (n,addrs) = Map.alter (updateCache withScrub utc addrs) n f
100 updater r (a,ns) = Map.alter (updateCache withScrub utc ns) a r
101
102make6mapped4 :: SockAddr -> SockAddr
103make6mapped4 addr@(SockAddrInet6 {}) = addr
104make6mapped4 addr@(SockAddrInet port a) = SockAddrInet6 port 0 (0,0,0xFFFF,fromBE32 a) 0
105
106tryForkOS :: String -> IO () -> IO ThreadId
107tryForkOS lbl action = catchIOError (forkOSLabeled lbl action) $ \e -> do
108 dput XMisc $ "DNSCache: Link with -threaded to avoid excessively long time-out."
109 forkLabeled lbl action
110
111
112-- Attempt to resolve the given domain name. Returns an empty list if the
113-- resolve operation takes longer than the timeout, but the 'DNSCache' will be
114-- updated when the resolve completes.
115--
116-- When the resolve operation does complete, any entries less than a minute old
117-- will be overwritten with the new results. Older entries are allowed to
118-- persist for reasons I don't understand as of this writing. (See 'updateCache')
119rawForwardResolve ::
120 DNSCache -> (Text -> IO ()) -> Int -> Text -> IO [SockAddr]
121rawForwardResolve dns onFail timeout addrtext = do
122 r <- atomically newEmptyTMVar
123 mvar <- interruptibleDelay
124 rt <- tryForkOS ("resolve."++show addrtext) $ do
125 resolver r mvar
126 startDelay mvar timeout
127 did <- atomically $ tryPutTMVar r []
128 when did (onFail addrtext)
129 atomically $ readTMVar r
130 where
131 resolver r mvar = do
132 xs <- handle (\e -> let _ = isDoesNotExistError e in return [])
133 $ do fmap (nub . map (make6mapped4 . addrAddress)) $
134 getAddrInfo (Just $ defaultHints { addrFlags = [ AI_CANONNAME, AI_V4MAPPED ]})
135 (Just $ Text.unpack $ strip_brackets addrtext)
136 (Just "5269")
137 did <- atomically $ tryPutTMVar r xs
138 when did $ do
139 interruptDelay mvar
140 utc <- getCurrentTime
141 atomically $ dnsObserve dns True utc $ map (addrtext,) xs
142 return ()
143
144strip_brackets :: Text -> Text
145strip_brackets s =
146 case Text.uncons s of
147 Just ('[',t) -> Text.takeWhile (/=']') t
148 _ -> s
149
150
151reportTimeout :: forall a. Show a => a -> IO ()
152reportTimeout addrtext = do
153 dput XMisc $ "timeout resolving: "++show addrtext
154 -- killThread rt
155
156unmap6mapped4 :: SockAddr -> SockAddr
157unmap6mapped4 addr@(SockAddrInet6 port _ (0,0,0xFFFF,a) _) =
158 SockAddrInet port (toBE32 a)
159unmap6mapped4 addr = addr
160
161rawReverseResolve ::
162 DNSCache -> (SockAddr -> IO ()) -> Int -> SockAddr -> IO [Text]
163rawReverseResolve dns onFail timeout addr = do
164 r <- atomically newEmptyTMVar
165 mvar <- interruptibleDelay
166 rt <- forkOS $ resolver r mvar
167 startDelay mvar timeout
168 did <- atomically $ tryPutTMVar r []
169 when did (onFail addr)
170 atomically $ readTMVar r
171 where
172 resolver r mvar =
173 handleIO_ (return ()) $ do
174 ent <- getHostByAddr (unmap6mapped4 addr) -- AF_UNSPEC addr
175 let names = BSD.hostName ent : BSD.hostAliases ent
176 xs = map Text.pack $ nub names
177 forkIO $ do
178 utc <- getCurrentTime
179 atomically $ dnsObserve dns False utc $ map (,addr) xs
180 atomically $ putTMVar r xs
181
182-- Returns expired (older than a minute) cached reverse-dns results
183-- and removes them from the cache.
184expiredReverse :: DNSCache -> SockAddr -> IO [Text]
185expiredReverse dns addr = do
186 utc <- getCurrentTime
187 addr <- return $ addr `withPort` 0
188 es <- atomically $ do
189 r <- readTVar $ rcache dns
190 let ns = maybe [] id $ Map.lookup addr r
191 minute = 60 -- seconds
192 -- XXX: Is this right? flip diffUTCTime utc returns the age of the
193 -- cache entry?
194 (es0,ns') = partition ( (>=minute) . flip diffUTCTime utc . fst ) ns
195 es = map snd es0
196 modifyTVar' (rcache dns) $ Map.insert addr ns'
197 f <- readTVar $ fcache dns
198 let f' = foldl' (flip $ Map.alter (expire utc)) f es
199 expire utc Nothing = Nothing
200 expire utc (Just as) = if null as' then Nothing else Just as'
201 where as' = filter ( (<minute) . flip diffUTCTime utc . fst) as
202 writeTVar (fcache dns) f'
203 return es
204 return es
205
206cachedReverse :: DNSCache -> SockAddr -> IO [Text]
207cachedReverse dns addr = do
208 utc <- getCurrentTime
209 addr <- return $ addr `withPort` 0
210 atomically $ do
211 r <- readTVar (rcache dns)
212 let ns = maybe [] id $ Map.lookup addr r
213 {-
214 ns' = filter ( (<minute) . flip diffUTCTime utc . fst) ns
215 minute = 60 -- seconds
216 modifyTVar' (rcache dns) $ Map.insert addr ns'
217 return $ map snd ns'
218 -}
219 return $ map snd ns
220
221-- Returns any dns query results for the given name that were observed less
222-- than a minute ago and updates the forward-cache to remove any results older
223-- than that.
224cachedForward :: DNSCache -> Text -> IO [SockAddr]
225cachedForward dns n = do
226 utc <- getCurrentTime
227 atomically $ do
228 f <- readTVar (fcache dns)
229 let as = maybe [] id $ Map.lookup n f
230 as' = filter ( (<minute) . flip diffUTCTime utc . fst) as
231 minute = 60 -- seconds
232 modifyTVar' (fcache dns) $ Map.insert n as'
233 return $ map snd as'
234
235-- Reverse-resolves an address to a domain name. Returns both the result of a
236-- new query and any freshly cached results. Cache entries older than a minute
237-- will not be returned, but will be refreshed in spawned threads so that they
238-- may be available for the next call.
239reverseResolve :: DNSCache -> SockAddr -> IO [Text]
240reverseResolve dns addr = do
241 expired <- expiredReverse dns addr
242 forM_ expired $ \n -> forkIO $ do
243 rawForwardResolve dns (const $ return ()) 1000000 n
244 return ()
245 xs <- rawReverseResolve dns (const $ return ()) 1000000 addr
246 cs <- cachedReverse dns addr
247 return $ xs ++ filter (not . flip elem xs) cs
248
249-- Resolves a name, if there's no result within one second, then any cached
250-- results that are less than a minute old are returned.
251forwardResolve :: DNSCache -> Text -> IO [SockAddr]
252forwardResolve dns n = do
253 as <- rawForwardResolve dns (const $ return ()) 1000000 n
254 if null as
255 then cachedForward dns n
256 else return as
257
258parseAddress :: Text -> IO (Maybe SockAddr)
259parseAddress addr_str = do
260 info <- getAddrInfo (Just $ defaultHints { addrFlags = [ AI_NUMERICHOST ] })
261 (Just . Text.unpack $ addr_str)
262 (Just "0")
263 return . listToMaybe $ map addrAddress info
264
265
266splitAtPort :: String -> (String,String)
267splitAtPort s = second sanitizePort $ case s of
268 ('[':t) -> break (==']') t
269 _ -> break (==':') s
270 where
271 sanitizePort (']':':':p) = p
272 sanitizePort (':':p) = p
273 sanitizePort _ = "0"
274
275unsafeParseAddress :: String -> Maybe SockAddr
276unsafeParseAddress addr_str = unsafePerformIO $ do
277 let (ipstr,portstr) = splitAtPort addr_str
278 info <- getAddrInfo (Just $ defaultHints { addrFlags = [ AI_NUMERICHOST ] })
279 (Just ipstr)
280 (Just portstr)
281 return . listToMaybe $ map addrAddress info
282
283withPort :: SockAddr -> Int -> SockAddr
284withPort (SockAddrInet _ a) port = SockAddrInet (toEnum port) a
285withPort (SockAddrInet6 _ a b c) port = SockAddrInet6 (toEnum port) a b c
diff --git a/dht/Presence/GetHostByAddr.hs b/dht/Presence/GetHostByAddr.hs
deleted file mode 100644
index 45bca5e9..00000000
--- a/dht/Presence/GetHostByAddr.hs
+++ /dev/null
@@ -1,77 +0,0 @@
1{-# LANGUAGE ForeignFunctionInterface #-}
2module GetHostByAddr where
3
4import Network.BSD
5import Foreign.Ptr
6import Foreign.C.Types
7import Foreign.Storable (Storable(..))
8import Foreign.Marshal.Utils (with)
9import Foreign.Marshal.Alloc
10import Control.Concurrent
11import System.IO.Unsafe
12import System.IO.Error (ioeSetErrorString, mkIOError)
13import Network.Socket
14import GHC.IO.Exception
15
16
17throwNoSuchThingIfNull :: String -> String -> IO (Ptr a) -> IO (Ptr a)
18throwNoSuchThingIfNull loc desc act = do
19 ptr <- act
20 if (ptr == nullPtr)
21 then ioError (ioeSetErrorString (mkIOError NoSuchThing loc Nothing Nothing) desc)
22 else return ptr
23
24{-# NOINLINE lock #-}
25lock :: MVar ()
26lock = unsafePerformIO $ newMVar ()
27
28withLock :: IO a -> IO a
29withLock act = withMVar lock (\_ -> act)
30
31trySysCall :: IO a -> IO a
32trySysCall act = act
33
34{-
35-- The locking of gethostbyaddr is similar to gethostbyname.
36-- | Get a 'HostEntry' corresponding to the given address and family.
37-- Note that only IPv4 is currently supported.
38getHostByAddr :: Family -> SockAddr -> IO HostEntry
39getHostByAddr family addr = do
40 withSockAddr addr $ \ ptr_addr len -> withLock $ do
41 throwNoSuchThingIfNull "getHostByAddr" "no such host entry"
42 $ trySysCall $ c_gethostbyaddr ptr_addr (fromIntegral len) (packFamily family)
43 >>= peek
44-}
45
46
47-- The locking of gethostbyaddr is similar to gethostbyname.
48-- | Get a 'HostEntry' corresponding to the given address and family.
49-- Note that only IPv4 is currently supported.
50-- getHostByAddr :: Family -> HostAddress -> IO HostEntry
51-- getHostByAddr family addr = do
52getHostByAddr :: SockAddr -> IO HostEntry
53getHostByAddr (SockAddrInet port addr ) = do
54 let family = AF_INET
55 with addr $ \ ptr_addr -> withLock $ do
56 throwNoSuchThingIfNull "getHostByAddr" "no such host entry"
57 $ trySysCall $ c_gethostbyaddr ptr_addr (fromIntegral (sizeOf addr)) (packFamily family)
58 >>= peek
59getHostByAddr (SockAddrInet6 port flow (a,b,c,d) scope) = do
60 let family = AF_INET6
61 allocaBytes 16 $ \ ptr_addr -> do
62 pokeElemOff ptr_addr 0 a
63 pokeElemOff ptr_addr 1 b
64 pokeElemOff ptr_addr 2 c
65 pokeElemOff ptr_addr 3 d
66 withLock $ do
67 throwNoSuchThingIfNull "getHostByAddr" "no such host entry"
68 $ trySysCall $ c_gethostbyaddr ptr_addr 16 (packFamily family)
69 >>= peek
70
71
72foreign import ccall safe "gethostbyaddr"
73 c_gethostbyaddr :: Ptr a -> CInt -> CInt -> IO (Ptr HostEntry)
74
75
76
77-- vim:ft=haskell:
diff --git a/dht/Presence/SockAddr.hs b/dht/Presence/SockAddr.hs
deleted file mode 100644
index b5fbf16e..00000000
--- a/dht/Presence/SockAddr.hs
+++ /dev/null
@@ -1,14 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE StandaloneDeriving #-}
3module SockAddr () where
4
5#if MIN_VERSION_network(2,4,0)
6import Network.Socket ()
7#else
8import Network.Socket ( SockAddr(..) )
9
10deriving instance Ord SockAddr
11#endif
12
13
14
diff --git a/dht/dht-client.cabal b/dht/dht-client.cabal
index 64d48f53..0da181df 100644
--- a/dht/dht-client.cabal
+++ b/dht/dht-client.cabal
@@ -75,15 +75,10 @@ library
75 , RecordWildCards 75 , RecordWildCards
76 , NondecreasingIndentation 76 , NondecreasingIndentation
77 hs-source-dirs: src, ., Presence 77 hs-source-dirs: src, ., Presence
78 exposed-modules: Control.Concurrent.ThreadUtil 78 exposed-modules: Data.Digest.CRC32C
79 Network.SocketLike
80 Data.Digest.CRC32C
81 Data.Bits.ByteString 79 Data.Bits.ByteString
82 Data.TableMethods
83 Network.BitTorrent.DHT.ContactInfo 80 Network.BitTorrent.DHT.ContactInfo
84 Network.BitTorrent.DHT.Token 81 Network.BitTorrent.DHT.Token
85 Network.QueryResponse
86 Network.StreamServer
87 Data.BEncode.Pretty 82 Data.BEncode.Pretty
88 Network.BitTorrent.MainlineDHT 83 Network.BitTorrent.MainlineDHT
89 Network.BitTorrent.MainlineDHT.Symbols 84 Network.BitTorrent.MainlineDHT.Symbols
@@ -107,7 +102,6 @@ library
107 Network.Tox.Avahi 102 Network.Tox.Avahi
108 Network.Tox.RelayPinger 103 Network.Tox.RelayPinger
109 Network.UPNP 104 Network.UPNP
110 Network.QueryResponse.TCP
111 Network.Tox.Relay 105 Network.Tox.Relay
112 Network.Tox.TCP 106 Network.Tox.TCP
113 Data.Tox.Msg 107 Data.Tox.Msg
@@ -117,7 +111,6 @@ library
117 Network.Tox.ContactInfo 111 Network.Tox.ContactInfo
118 Announcer 112 Announcer
119 Announcer.Tox 113 Announcer.Tox
120 Control.Concurrent.Delay
121 ByteStringOperators 114 ByteStringOperators
122 ClientState 115 ClientState
123 ConfigFiles 116 ConfigFiles
@@ -126,19 +119,14 @@ library
126 Control.Concurrent.STM.StatusCache 119 Control.Concurrent.STM.StatusCache
127 Control.Concurrent.STM.UpdateStream 120 Control.Concurrent.STM.UpdateStream
128 Control.Concurrent.STM.Util 121 Control.Concurrent.STM.Util
129 ControlMaybe
130 Data.BitSyntax 122 Data.BitSyntax
131 DNSCache
132 EventUtil 123 EventUtil
133 FGConsole 124 FGConsole
134 GetHostByAddr
135 LocalPeerCred 125 LocalPeerCred
136 LockedChan 126 LockedChan
137 Logging 127 Logging
138 Nesting 128 Nesting
139 Paths 129 Paths
140 Connection.Tcp
141 SockAddr
142 UTmp 130 UTmp
143 MUC 131 MUC
144 LocalChat 132 LocalChat
@@ -149,13 +137,10 @@ library
149 XMPPServer 137 XMPPServer
150 Util 138 Util
151 Presence 139 Presence
152 Control.Concurrent.PingMachine
153 Connection
154 ToxChat 140 ToxChat
155 ToxToXMPP 141 ToxToXMPP
156 ToxManager 142 ToxManager
157 XMPPToTox 143 XMPPToTox
158 DebugUtil
159 Data.IntervalSet 144 Data.IntervalSet
160 Data.Tox.Message 145 Data.Tox.Message
161 HandshakeCache 146 HandshakeCache
@@ -233,6 +218,7 @@ library
233 , kad 218 , kad
234 , tasks 219 , tasks
235 , torrent-types 220 , torrent-types
221 , server
236 222
237 if impl(ghc < 8) 223 if impl(ghc < 8)
238 Build-depends: transformers 224 Build-depends: transformers
@@ -319,6 +305,7 @@ executable dhtd
319 , pretty 305 , pretty
320 , dependent-sum 306 , dependent-sum
321 , dht-client 307 , dht-client
308 , server
322 , dput-hslogger 309 , dput-hslogger
323 , word64-map 310 , word64-map
324 , tox-crypto 311 , tox-crypto
@@ -368,6 +355,7 @@ executable testTox
368 default-language: Haskell2010 355 default-language: Haskell2010
369 build-depends: base 356 build-depends: base
370 , dht-client 357 , dht-client
358 , server
371 , dput-hslogger 359 , dput-hslogger
372 , tox-crypto 360 , tox-crypto
373 , lifted-concurrent 361 , lifted-concurrent
diff --git a/dht/src/Control/Concurrent/Delay.hs b/dht/src/Control/Concurrent/Delay.hs
deleted file mode 100644
index 67dcd451..00000000
--- a/dht/src/Control/Concurrent/Delay.hs
+++ /dev/null
@@ -1,49 +0,0 @@
1module Control.Concurrent.Delay where
2
3import Control.Concurrent
4import Control.Monad
5import Control.Exception ({-evaluate,-}handle,finally,throwIO)
6import Data.Time.Clock (NominalDiffTime)
7import System.IO.Error
8
9type Microseconds = Int
10
11microseconds :: NominalDiffTime -> Microseconds
12microseconds d = round $ 1000000 * d
13
14data InterruptibleDelay = InterruptibleDelay
15 { delayThread :: MVar ThreadId
16 }
17
18interruptibleDelay :: IO InterruptibleDelay
19interruptibleDelay = do
20 fmap InterruptibleDelay newEmptyMVar
21
22-- | Delay for the given number of microseconds and return 'True' if the delay
23-- is not interrupted.
24--
25-- Note: If a thread is already waiting on the given 'InterruptibleDelay'
26-- object, then this will block until it becomes available and only then start
27-- the delay timer.
28startDelay :: InterruptibleDelay -> Microseconds -> IO Bool
29startDelay d interval = do
30 thread <- myThreadId
31 handle (\e -> do when (not $ isUserError e) (throwIO e)
32 return False) $ do
33 putMVar (delayThread d) thread
34 threadDelay interval
35 void $ takeMVar (delayThread d)
36 return True
37 -- The following cleanup shouldn't be necessary, but I'm paranoid.
38 `finally` tryTakeMVar (delayThread d)
39
40 where debugNoise str = return ()
41
42-- | Signal the thread waiting on the given 'InterruptibleDelay' object to
43-- continue even though the timeout has not elapsed. If no thread is waiting,
44-- then this is a no-op.
45interruptDelay :: InterruptibleDelay -> IO ()
46interruptDelay d = do
47 mthread <- tryTakeMVar (delayThread d)
48 forM_ mthread $ \thread -> do
49 throwTo thread (userError "Interrupted delay")
diff --git a/dht/src/Control/Concurrent/PingMachine.hs b/dht/src/Control/Concurrent/PingMachine.hs
deleted file mode 100644
index a8f10e83..00000000
--- a/dht/src/Control/Concurrent/PingMachine.hs
+++ /dev/null
@@ -1,161 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE TupleSections #-}
3module Control.Concurrent.PingMachine where
4
5import Control.Monad
6import Data.Function
7#ifdef THREAD_DEBUG
8import Control.Concurrent.Lifted.Instrument
9#else
10import Control.Concurrent.Lifted
11import GHC.Conc (labelThread)
12#endif
13import Control.Concurrent.STM
14
15import Control.Concurrent.Delay
16
17type Miliseconds = Int
18type TimeOut = Miliseconds
19type PingInterval = Miliseconds
20
21-- | Events that occur as a result of the 'PingMachine' watchdog.
22--
23-- Use 'pingWait' to wait for one of these to occur.
24data PingEvent
25 = PingIdle -- ^ You should send a ping if you observe this event.
26 | PingTimeOut -- ^ You should give up on the connection in case of this event.
27
28data PingMachine = PingMachine
29 { pingFlag :: TVar Bool
30 , pingInterruptible :: InterruptibleDelay
31 , pingEvent :: TMVar PingEvent
32 , pingStarted :: TMVar Bool
33 }
34
35-- | Fork a thread to monitor a connection for a ping timeout.
36--
37-- If 'pingBump' is not invoked after a idle is signaled, a timeout event will
38-- occur. When that happens, even if the caller chooses to ignore this event,
39-- the watchdog thread will be terminated and no more ping events will be
40-- signaled.
41--
42-- An idle connection will be signaled by:
43--
44-- (1) 'pingFlag' is set 'True'
45--
46-- (2) 'pingWait' returns 'PingIdle'
47--
48-- Either may be tested to determine whether a ping should be sent but
49-- 'pingFlag' is difficult to use properly because it is up to the caller to
50-- remember that the ping is already in progress.
51forkPingMachine
52 :: String
53 -> PingInterval -- ^ Milliseconds of idle before a ping is considered necessary.
54 -> TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'.
55 -> IO PingMachine
56forkPingMachine label idle timeout = do
57 d <- interruptibleDelay
58 flag <- atomically $ newTVar False
59 canceled <- atomically $ newTVar False
60 event <- atomically newEmptyTMVar
61 started <- atomically $ newEmptyTMVar
62 when (idle/=0) $ void . forkIO $ do
63 myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog")
64 (>>=) (atomically (readTMVar started)) $ flip when $ do
65 fix $ \loop -> do
66 atomically $ writeTVar flag False
67 fin <- startDelay d (1000*idle)
68 (>>=) (atomically (readTMVar started)) $ flip when $ do
69 if (not fin) then loop
70 else do
71 -- Idle event
72 atomically $ do
73 tryTakeTMVar event
74 putTMVar event PingIdle
75 writeTVar flag True
76 fin <- startDelay d (1000*timeout)
77 (>>=) (atomically (readTMVar started)) $ flip when $ do
78 me <- myThreadId
79 if (not fin) then loop
80 else do
81 -- Timeout event
82 atomically $ do
83 tryTakeTMVar event
84 writeTVar flag False
85 putTMVar event PingTimeOut
86 return PingMachine
87 { pingFlag = flag
88 , pingInterruptible = d
89 , pingEvent = event
90 , pingStarted = started
91 }
92
93-- | like 'forkPingMachine' but the timeout and idle parameters can be changed dynamically
94-- Unlike 'forkPingMachine', 'forkPingMachineDynamic' always launches a thread
95-- regardless of idle value.
96forkPingMachineDynamic
97 :: String
98 -> TVar PingInterval -- ^ Milliseconds of idle before a ping is considered necessary.
99 -> TVar TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'.
100 -> IO PingMachine
101forkPingMachineDynamic label idleV timeoutV = do
102 d <- interruptibleDelay
103 flag <- atomically $ newTVar False
104 canceled <- atomically $ newTVar False
105 event <- atomically newEmptyTMVar
106 started <- atomically $ newEmptyTMVar
107 void . forkIO $ do
108 myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog")
109 (>>=) (atomically (readTMVar started)) $ flip when $ do
110 fix $ \loop -> do
111 atomically $ writeTVar flag False
112 (idle,timeout) <- atomically $ (,) <$> readTVar idleV <*> readTVar timeoutV
113 fin <- startDelay d (1000*idle)
114 (>>=) (atomically (readTMVar started)) $ flip when $ do
115 if (not fin) then loop
116 else do
117 -- Idle event
118 atomically $ do
119 tryTakeTMVar event
120 putTMVar event PingIdle
121 writeTVar flag True
122 fin <- startDelay d (1000*timeout)
123 (>>=) (atomically (readTMVar started)) $ flip when $ do
124 me <- myThreadId
125 if (not fin) then loop
126 else do
127 -- Timeout event
128 atomically $ do
129 tryTakeTMVar event
130 writeTVar flag False
131 putTMVar event PingTimeOut
132 return PingMachine
133 { pingFlag = flag
134 , pingInterruptible = d
135 , pingEvent = event
136 , pingStarted = started
137 }
138
139-- | Terminate the watchdog thread. Call this upon connection close.
140--
141-- You should ensure no threads are waiting on 'pingWait' because there is no
142-- 'PingEvent' signaling termination.
143pingCancel :: PingMachine -> IO ()
144pingCancel me = do
145 atomically $ do tryTakeTMVar (pingStarted me)
146 putTMVar (pingStarted me) False
147 interruptDelay (pingInterruptible me)
148
149-- | Reset the ping timer. Call this regularly to prevent 'PingTimeOut'.
150pingBump :: PingMachine -> IO ()
151pingBump me = do
152 atomically $ do
153 b <- tryReadTMVar (pingStarted me)
154 when (b/=Just False) $ do
155 tryTakeTMVar (pingStarted me)
156 putTMVar (pingStarted me) True
157 interruptDelay (pingInterruptible me)
158
159-- | Retries until a 'PingEvent' occurs.
160pingWait :: PingMachine -> STM PingEvent
161pingWait me = takeTMVar (pingEvent me)
diff --git a/dht/src/Control/Concurrent/ThreadUtil.hs b/dht/src/Control/Concurrent/ThreadUtil.hs
deleted file mode 100644
index a258d933..00000000
--- a/dht/src/Control/Concurrent/ThreadUtil.hs
+++ /dev/null
@@ -1,31 +0,0 @@
1{-# LANGUAGE CPP #-}
2module Control.Concurrent.ThreadUtil
3 (
4#ifdef THREAD_DEBUG
5 module Control.Concurrent.Lifted.Instrument
6#else
7 module Control.Control.Lifted
8 , module GHC.Conc
9#endif
10 ) where
11
12#ifdef THREAD_DEBUG
13import Control.Concurrent.Lifted.Instrument
14#else
15import Control.Concurrent.Lifted
16import GHC.Conc (labelThread)
17
18forkLabeled :: String -> IO () -> IO ThreadId
19forkLabeled lbl action = do
20 t <- forkIO action
21 labelThread t lbl
22 return t
23{-# INLINE forkLabeled #-}
24
25forkOSLabeled :: String -> IO () -> IO ThreadId
26forkOSLabeled lbl action = do
27 t <- forkOS action
28 labelThread t lbl
29 return t
30{-# INLINE forkOSLabeled #-}
31#endif
diff --git a/dht/src/Data/TableMethods.hs b/dht/src/Data/TableMethods.hs
deleted file mode 100644
index e4208a69..00000000
--- a/dht/src/Data/TableMethods.hs
+++ /dev/null
@@ -1,105 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE GADTs #-}
3{-# LANGUAGE LambdaCase #-}
4{-# LANGUAGE PartialTypeSignatures #-}
5{-# LANGUAGE RankNTypes #-}
6{-# LANGUAGE ScopedTypeVariables #-}
7{-# LANGUAGE TupleSections #-}
8module Data.TableMethods where
9
10import Data.Functor.Contravariant
11import Data.Time.Clock.POSIX
12import Data.Word
13import qualified Data.IntMap.Strict as IntMap
14 ;import Data.IntMap.Strict (IntMap)
15import qualified Data.Map.Strict as Map
16 ;import Data.Map.Strict (Map)
17import qualified Data.Word64Map as W64Map
18 ;import Data.Word64Map (Word64Map)
19
20import Data.Wrapper.PSQ as PSQ
21
22type Priority = POSIXTime
23
24data OptionalPriority t tid x
25 = NoPriority
26 | HasPriority (Priority -> t x -> ([(tid, Priority, x)], t x))
27
28-- | The standard lookup table methods.
29data TableMethods t tid = TableMethods
30 { -- | Insert a new /tid/ entry into the transaction table.
31 tblInsert :: forall a. tid -> a -> Priority -> t a -> t a
32 -- | Delete transaction /tid/ from the transaction table.
33 , tblDelete :: forall a. tid -> t a -> t a
34 -- | Lookup the value associated with transaction /tid/.
35 , tblLookup :: forall a. tid -> t a -> Maybe a
36 }
37
38data QMethods t tid x = QMethods
39 { qTbl :: TableMethods t tid
40 , qAtMostView :: OptionalPriority t tid x
41 }
42
43vanillaTable :: TableMethods t tid -> QMethods t tid x
44vanillaTable tbl = QMethods tbl NoPriority
45
46priorityTable :: TableMethods t tid
47 -> (Priority -> t x -> ([(k, Priority, x)], t x))
48 -> (k -> x -> tid)
49 -> QMethods t tid x
50priorityTable tbl atmost f = QMethods
51 { qTbl = tbl
52 , qAtMostView = HasPriority $ \p t -> case atmost p t of
53 (es,t') -> (map (\(k,p,a) -> (f k a, p, a)) es, t')
54 }
55
56-- | Methods for using 'Data.IntMap'.
57intMapMethods :: TableMethods IntMap Int
58intMapMethods = TableMethods
59 { tblInsert = \tid a p -> IntMap.insert tid a
60 , tblDelete = IntMap.delete
61 , tblLookup = IntMap.lookup
62 }
63
64-- | Methods for using 'Data.Word64Map'.
65w64MapMethods :: TableMethods Word64Map Word64
66w64MapMethods = TableMethods
67 { tblInsert = \tid a p -> W64Map.insert tid a
68 , tblDelete = W64Map.delete
69 , tblLookup = W64Map.lookup
70 }
71
72-- | Methods for using 'Data.Map'
73mapMethods :: Ord tid => TableMethods (Map tid) tid
74mapMethods = TableMethods
75 { tblInsert = \tid a p -> Map.insert tid a
76 , tblDelete = Map.delete
77 , tblLookup = Map.lookup
78 }
79
80-- psqMethods :: PSQKey tid => QMethods (HashPSQ tid Priority) tid x
81psqMethods :: PSQKey k => (tid -> k) -> (k -> x -> tid) -> QMethods (PSQ' k Priority) tid x
82psqMethods g f = priorityTable (contramap g tbl) PSQ.atMostView f
83 where
84 tbl :: PSQKey tid => TableMethods (PSQ' tid Priority) tid
85 tbl = TableMethods
86 { tblInsert = PSQ.insert'
87 , tblDelete = PSQ.delete
88 , tblLookup = \tid t -> case PSQ.lookup tid t of
89 Just (p,a) -> Just a
90 Nothing -> Nothing
91 }
92
93
94-- | Change the key type for a lookup table implementation.
95--
96-- This can be used with 'intMapMethods' or 'mapMethods' to restrict lookups to
97-- only a part of the generated /tid/ value. This is useful for /tid/ types
98-- that are especially large due their use for other purposes, such as secure
99-- nonces for encryption.
100instance Contravariant (TableMethods t) where
101 -- contramap :: (tid -> t1) -> TableMethods t t1 -> TableMethods t tid
102 contramap f (TableMethods ins del lookup) =
103 TableMethods (\k p v t -> ins (f k) p v t)
104 (\k t -> del (f k) t)
105 (\k t -> lookup (f k) t)
diff --git a/dht/src/DebugUtil.hs b/dht/src/DebugUtil.hs
deleted file mode 100644
index 96ab8cc5..00000000
--- a/dht/src/DebugUtil.hs
+++ /dev/null
@@ -1,42 +0,0 @@
1{-# LANGUAGE CPP #-}
2module DebugUtil where
3
4import Control.Monad
5import Data.Time.Clock
6import Data.List
7import Text.Printf
8import GHC.Conc (threadStatus,ThreadStatus(..))
9#ifdef THREAD_DEBUG
10import Control.Concurrent.Lifted.Instrument
11#else
12import Control.Concurrent.Lifted
13import GHC.Conc (labelThread)
14#endif
15
16showReport :: [(String,String)] -> String
17showReport kvs = showColumns $ map (\(x,y)->[x,y]) kvs
18
19showColumns :: [[String]] -> String
20showColumns rows = do
21 let cols = transpose rows
22 ws = map (maximum . map (succ . length)) cols
23 fs <- rows
24 _ <- take 1 fs -- Guard against empty rows so that 'last' is safe.
25 " " ++ concat (zipWith (printf "%-*s") (init ws) (init fs)) ++ last fs ++ "\n"
26
27
28threadReport :: Bool -- ^ False to summarize search threads.
29 -> IO String
30threadReport want_ss = do
31 threads <- threadsInformation
32 tm <- getCurrentTime
33 let (ss,ts) = partition (("search" `isPrefixOf`) . lbl . snd)
34 threads
35 r <- forM (if want_ss then threads else ts) $ \(tid,PerThread{..}) -> do
36 stat <- threadStatus tid
37 let showStat (ThreadBlocked reason) = show reason
38 showStat stat = show stat
39 return [show lbl,show (diffUTCTime tm startTime),showStat stat]
40 return $ unlines [ showColumns r
41 , (if want_ss then " There are " else " and ")
42 ++ show (length ss) ++ " search threads." ]
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs
deleted file mode 100644
index 20e7ecf0..00000000
--- a/dht/src/Network/QueryResponse.hs
+++ /dev/null
@@ -1,716 +0,0 @@
1-- | This module can implement any query\/response protocol. It was written
2-- with Kademlia implementations in mind.
3
4{-# LANGUAGE CPP #-}
5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE LambdaCase #-}
7{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE ScopedTypeVariables #-}
10{-# LANGUAGE TupleSections #-}
11module Network.QueryResponse where
12
13#ifdef THREAD_DEBUG
14import Control.Concurrent.Lifted.Instrument
15#else
16import Control.Concurrent
17import GHC.Conc (labelThread)
18#endif
19import Control.Concurrent.STM
20import Control.Exception
21import Control.Monad
22import qualified Data.ByteString as B
23 ;import Data.ByteString (ByteString)
24import Data.Dependent.Map as DMap
25import Data.Dependent.Sum
26import Data.Function
27import Data.Functor.Contravariant
28import Data.Functor.Identity
29import Data.GADT.Show
30import qualified Data.IntMap.Strict as IntMap
31 ;import Data.IntMap.Strict (IntMap)
32import qualified Data.Map.Strict as Map
33 ;import Data.Map.Strict (Map)
34import Data.Time.Clock.POSIX
35import qualified Data.Word64Map as W64Map
36 ;import Data.Word64Map (Word64Map)
37import Data.Word
38import Data.Maybe
39import GHC.Conc (closeFdWith)
40import GHC.Event
41import Network.Socket
42import Network.Socket.ByteString as B
43import System.Endian
44import System.IO
45import System.IO.Error
46import System.Timeout
47
48import DPut
49import DebugTag
50import Data.TableMethods
51
52-- | An inbound packet or condition raised while monitoring a connection.
53data Arrival err addr x
54 = Terminated -- ^ Virtual message that signals EOF.
55 | ParseError !err -- ^ A badly-formed message was received.
56 | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message.
57
58-- | Three methods are required to implement a datagram based query\/response protocol.
59data TransportA err addr x y = Transport
60 { -- | Blocks until an inbound packet is available. Then calls the provided
61 -- continuation with the packet and origin addres or an error condition.
62 awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a)
63 -- | Send an /y/ packet to the given destination /addr/.
64 , sendMessage :: addr -> y -> IO ()
65 -- | Shutdown and clean up any state related to this 'Transport'.
66 , setActive :: Bool -> IO ()
67 }
68
69type Transport err addr x = TransportA err addr x x
70
71closeTransport :: TransportA err addr x y -> IO ()
72closeTransport tr = setActive tr False
73
74-- | This function modifies a 'Transport' to use higher-level addresses and
75-- packet representations. It could be used to change UDP 'ByteString's into
76-- bencoded syntax trees or to add an encryption layer in which addresses have
77-- associated public keys.
78layerTransportM ::
79 (x -> addr -> IO (Either err (x', addr')))
80 -- ^ Function that attempts to transform a low-level address/packet
81 -- pair into a higher level representation.
82 -> (y' -> addr' -> IO (y, addr))
83 -- ^ Function to encode a high-level address/packet into a lower level
84 -- representation.
85 -> TransportA err addr x y
86 -- ^ The low-level transport to be transformed.
87 -> TransportA err addr' x' y'
88layerTransportM parse encode tr =
89 tr { awaitMessage = \kont ->
90 awaitMessage tr $ \case
91 Terminated -> kont $ Terminated
92 ParseError e -> kont $ ParseError e
93 Arrival addr x -> parse x addr >>= \case
94 Left e -> kont $ ParseError e
95 Right (x',addr') -> kont $ Arrival addr' x'
96 , sendMessage = \addr' msg' -> do
97 (msg,addr) <- encode msg' addr'
98 sendMessage tr addr msg
99 }
100
101
102-- | This function modifies a 'Transport' to use higher-level addresses and
103-- packet representations. It could be used to change UDP 'ByteString's into
104-- bencoded syntax trees or to add an encryption layer in which addresses have
105-- associated public keys.
106layerTransport ::
107 (x -> addr -> Either err (x', addr'))
108 -- ^ Function that attempts to transform a low-level address/packet
109 -- pair into a higher level representation.
110 -> (y' -> addr' -> (y, addr))
111 -- ^ Function to encode a high-level address/packet into a lower level
112 -- representation.
113 -> TransportA err addr x y
114 -- ^ The low-level transport to be transformed.
115 -> TransportA err addr' x' y'
116layerTransport parse encode tr =
117 layerTransportM (\x addr -> return $ parse x addr)
118 (\x' addr' -> return $ encode x' addr')
119 tr
120
121-- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan'
122-- is used to share the same underlying socket, so be sure to fork a thread for
123-- both returned 'Transport's to avoid hanging.
124partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a)))
125 -> ((y,xaddr) -> IO (Maybe (c,a)))
126 -> TransportA err a b c
127 -> IO (TransportA err xaddr x y, TransportA err a b c)
128partitionTransportM parse encodex tr = do
129 tchan <- atomically newTChan
130 let ytr = tr { awaitMessage = \kont -> fix $ \again -> do
131 awaitMessage tr $ \m -> case m of
132 Arrival adr msg -> parse (msg,adr) >>= \case
133 Left x -> atomically (writeTChan tchan (Just x)) >> join (atomically again)
134 Right (y,yaddr) -> kont $ Arrival yaddr y
135 ParseError e -> kont $ ParseError e
136 Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated
137 , sendMessage = sendMessage tr
138 }
139 xtr = Transport
140 { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case
141 Nothing -> Terminated
142 Just (x,xaddr) -> Arrival xaddr x
143 , sendMessage = \addr' msg' -> do
144 msg_addr <- encodex (msg',addr')
145 mapM_ (uncurry . flip $ sendMessage tr) msg_addr
146 , setActive = const $ return ()
147 }
148 return (xtr, ytr)
149
150-- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan'
151-- is used to share the same underlying socket, so be sure to fork a thread for
152-- both returned 'Transport's to avoid hanging.
153partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a))
154 -> ((y,xaddr) -> Maybe (c,a))
155 -> TransportA err a b c
156 -> IO (TransportA err xaddr x y, TransportA err a b c)
157partitionTransport parse encodex tr =
158 partitionTransportM (return . parse) (return . encodex) tr
159
160-- |
161-- * f add x --> Nothing, consume x
162-- --> Just id, leave x to a different handler
163-- --> Just g, apply g to x and leave that to a different handler
164--
165-- Note: If you add a handler to one of the branches before applying a
166-- 'mergeTransports' combinator, then this handler may not block or return
167-- Nothing.
168addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> TransportA err addr x y -> TransportA err addr x y
169addHandler onParseError f tr = tr
170 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case
171 Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x))
172 ParseError e -> onParseError e >> kont (ParseError e)
173 Terminated -> kont Terminated
174 }
175
176-- | Modify a 'Transport' to invoke an action upon every received packet.
177onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x
178onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (Just id)) tr
179
180-- * Using a query\/response client.
181
182-- | Fork a thread that handles inbound packets. The returned action may be used
183-- to terminate the thread and clean up any related state.
184--
185-- Example usage:
186--
187-- > -- Start client.
188-- > quitServer <- forkListener "listener" (clientNet client)
189-- > -- Send a query q, recieve a response r.
190-- > r <- sendQuery client method q
191-- > -- Quit client.
192-- > quitServer
193forkListener :: String -> Transport err addr x -> IO (IO ())
194forkListener name client = do
195 setActive client True
196 thread_id <- forkIO $ do
197 myThreadId >>= flip labelThread ("listener."++name)
198 fix $ \loop -> join $ atomically $ awaitMessage client $ \case
199 Terminated -> return ()
200 _ -> loop
201 dput XMisc $ "Listener died: " ++ name
202 return $ do
203 setActive client False
204 -- killThread thread_id
205
206-- * Implementing a query\/response 'Client'.
207
208-- | These methods indicate what should be done upon various conditions. Write
209-- to a log file, make debug prints, or simply ignore them.
210--
211-- [ /addr/ ] Address of remote peer.
212--
213-- [ /x/ ] Incoming or outgoing packet.
214--
215-- [ /meth/ ] Method id of incoming or outgoing request.
216--
217-- [ /tid/ ] Transaction id for outgoing packet.
218--
219-- [ /err/ ] Error information, typically a 'String'.
220data ErrorReporter addr x meth tid err = ErrorReporter
221 { -- | Incoming: failed to parse packet.
222 reportParseError :: err -> IO ()
223 -- | Incoming: no handler for request.
224 , reportMissingHandler :: meth -> addr -> x -> IO ()
225 -- | Incoming: unable to identify request.
226 , reportUnknown :: addr -> x -> err -> IO ()
227 }
228
229ignoreErrors :: ErrorReporter addr x meth tid err
230ignoreErrors = ErrorReporter
231 { reportParseError = \_ -> return ()
232 , reportMissingHandler = \_ _ _ -> return ()
233 , reportUnknown = \_ _ _ -> return ()
234 }
235
236logErrors :: ( Show addr
237 , Show meth
238 ) => ErrorReporter addr x meth tid String
239logErrors = ErrorReporter
240 { reportParseError = \err -> dput XMisc err
241 , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")"
242 , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err
243 }
244
245printErrors :: ( Show addr
246 , Show meth
247 ) => Handle -> ErrorReporter addr x meth tid String
248printErrors h = ErrorReporter
249 { reportParseError = \err -> hPutStrLn h err
250 , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")"
251 , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err
252 }
253
254-- Change the /err/ type for an 'ErrorReporter'.
255instance Contravariant (ErrorReporter addr x meth tid) where
256 -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5
257 contramap f (ErrorReporter pe mh unk)
258 = ErrorReporter (\e -> pe (f e))
259 mh
260 (\addr x e -> unk addr x (f e))
261
262-- | An incoming message can be classified into three cases.
263data MessageClass err meth tid addr x
264 = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response
265 -- should include the provided /tid/ value.
266 | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value.
267 | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked
268 -- with the source and destination address of a message. If it handles the
269 -- message, it should return Nothing. Otherwise, it should return a transform
270 -- (usually /id/) to apply before the next handler examines it.
271 | IsUnknown err -- ^ None of the above.
272
273-- | Handler for an inbound query of type /x/ from an address of type _addr_.
274type MethodHandler err tid addr x = MethodHandlerA err tid addr x x
275
276-- | Handler for an inbound query of type /x/ with outbound response of type
277-- /y/ to an address of type /addr/.
278data MethodHandlerA err tid addr x y = forall a b. MethodHandler
279 { -- | Parse the query into a more specific type for this method.
280 methodParse :: x -> Either err a
281 -- | Serialize the response for transmission, given a context /ctx/ and the origin
282 -- and destination addresses.
283 , methodSerialize :: tid -> addr -> addr -> b -> y
284 -- | Fully typed action to perform upon the query. The remote origin
285 -- address of the query is provided to the handler.
286 --
287 -- TODO: Allow queries to be ignored?
288 , methodAction :: addr -> a -> IO b
289 }
290 -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary.
291 | forall a. NoReply
292 { -- | Parse the query into a more specific type for this method.
293 methodParse :: x -> Either err a
294 -- | Fully typed action to perform upon the query. The remote origin
295 -- address of the query is provided to the handler.
296 , noreplyAction :: addr -> a -> IO ()
297 }
298
299
300-- | To dispatch responses to our outbound queries, we require three
301-- primitives. See the 'transactionMethods' function to create these
302-- primitives out of a lookup table and a generator for transaction ids.
303--
304-- The type variable /d/ is used to represent the current state of the
305-- transaction generator and the table of pending transactions.
306data TransactionMethods d qid addr x = TransactionMethods
307 {
308 -- | Before a query is sent, this function stores an 'MVar' to which the
309 -- response will be written too. The returned /qid/ is a transaction id
310 -- that can be used to forget the 'MVar' if the remote peer is not
311 -- responding.
312 dispatchRegister :: POSIXTime -- time of expiry
313 -> (Maybe x -> IO ()) -- callback upon response (or timeout)
314 -> addr
315 -> d
316 -> STM (qid, d)
317 -- | This method is invoked when an incoming packet /x/ indicates it is
318 -- a response to the transaction with id /qid/. The returned IO action
319 -- will write the packet to the correct 'MVar' thus completing the
320 -- dispatch.
321 , dispatchResponse :: qid -> x -> d -> STM (d, IO ())
322 -- | When a timeout interval elapses, this method is called to remove the
323 -- transaction from the table.
324 , dispatchCancel :: qid -> d -> STM d
325 }
326
327-- | A set of methods necessary for dispatching incoming packets.
328type DispatchMethods tbl err meth tid addr x = DispatchMethodsA tbl err meth tid addr x x
329
330-- | A set of methods necessary for dispatching incoming packets.
331data DispatchMethodsA tbl err meth tid addr x y = DispatchMethods
332 { -- | Classify an inbound packet as a query or response.
333 classifyInbound :: x -> MessageClass err meth tid addr x
334 -- | Lookup the handler for a inbound query.
335 , lookupHandler :: meth -> Maybe (MethodHandlerA err tid addr x y)
336 -- | Methods for handling incoming responses.
337 , tableMethods :: TransactionMethods tbl tid addr x
338 }
339
340-- | All inputs required to implement a query\/response client.
341type Client err meth tid addr x = ClientA err meth tid addr x x
342
343-- | All inputs required to implement a query\/response client.
344data ClientA err meth tid addr x y = forall tbl. Client
345 { -- | The 'Transport' used to dispatch and receive packets.
346 clientNet :: TransportA err addr x y
347 -- | Methods for handling inbound packets.
348 , clientDispatcher :: DispatchMethodsA tbl err meth tid addr x y
349 -- | Methods for reporting various conditions.
350 , clientErrorReporter :: ErrorReporter addr x meth tid err
351 -- | State necessary for routing inbound responses and assigning unique
352 -- /tid/ values for outgoing queries.
353 , clientPending :: TVar tbl
354 -- | An action yielding this client\'s own address. It is invoked once
355 -- on each outbound and inbound packet. It is valid for this to always
356 -- return the same value.
357 --
358 -- The argument, if supplied, is the remote address for the transaction.
359 -- This can be used to maintain consistent aliases for specific peers.
360 , clientAddress :: Maybe addr -> IO addr
361 -- | Transform a query /tid/ value to an appropriate response /tid/
362 -- value. Normally, this would be the identity transformation, but if
363 -- /tid/ includes a unique cryptographic nonce, then it should be
364 -- generated here.
365 , clientResponseId :: tid -> IO tid
366 }
367
368-- | These four parameters are required to implement an outgoing query. A
369-- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that
370-- might be returned by 'lookupHandler'.
371data MethodSerializerA tid addr x y meth a b = MethodSerializer
372 { -- | Returns the microseconds to wait for a response to this query being
373 -- sent to the given address. The /addr/ may also be modified to add
374 -- routing information.
375 methodTimeout :: addr -> STM (addr,Int)
376 -- | A method identifier used for error reporting. This needn't be the
377 -- same as the /meth/ argument to 'MethodHandler', but it is suggested.
378 , method :: meth
379 -- | Serialize the outgoing query /a/ into a transmittable packet /x/.
380 -- The /addr/ arguments are, respectively, our own origin address and the
381 -- destination of the request. The /tid/ argument is useful for attaching
382 -- auxiliary notations on all outgoing packets.
383 , wrapQuery :: tid -> addr -> addr -> a -> x
384 -- | Parse an inbound packet /x/ into a response /b/ for this query.
385 , unwrapResponse :: y -> b
386 }
387
388type MethodSerializer tid addr x meth a b = MethodSerializerA tid addr x x meth a b
389
390microsecondsDiff :: Int -> POSIXTime
391microsecondsDiff us = fromIntegral us / 1000000
392
393asyncQuery_ :: Client err meth tid addr x
394 -> MethodSerializer tid addr x meth a b
395 -> a
396 -> addr
397 -> (Maybe b -> IO ())
398 -> IO (tid,POSIXTime,Int)
399asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do
400 now <- getPOSIXTime
401 (tid,addr,expiry) <- atomically $ do
402 tbl <- readTVar pending
403 (addr,expiry) <- methodTimeout meth addr0
404 (tid, tbl') <- dispatchRegister (tableMethods d)
405 (now + microsecondsDiff expiry)
406 (withResponse . fmap (unwrapResponse meth))
407 addr -- XXX: Should be addr0 or addr?
408 tbl
409 -- (addr,expiry) <- methodTimeout meth tid addr0
410 writeTVar pending tbl'
411 return (tid,addr,expiry)
412 self <- whoami (Just addr)
413 mres <- do sendMessage net addr (wrapQuery meth tid self addr q)
414 return $ Just ()
415 `catchIOError` (\e -> return Nothing)
416 return (tid,now,expiry)
417
418asyncQuery :: Show meth => Client err meth tid addr x
419 -> MethodSerializer tid addr x meth a b
420 -> a
421 -> addr
422 -> (Maybe b -> IO ())
423 -> IO ()
424asyncQuery client meth q addr withResponse0 = do
425 tm <- getSystemTimerManager
426 tidvar <- newEmptyMVar
427 timedout <- registerTimeout tm 1000000 $ do
428 dput XMisc $ "async TIMEDOUT " ++ show (method meth)
429 withResponse0 Nothing
430 tid <- takeMVar tidvar
431 dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth)
432 case client of
433 Client { clientDispatcher = d, clientPending = pending } -> do
434 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
435 (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do
436 unregisterTimeout tm timedout
437 withResponse0 x
438 putMVar tidvar tid
439 updateTimeout tm timedout expiry
440 dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry
441
442-- | Send a query to a remote peer. Note that this function will always time
443-- out if 'forkListener' was never invoked to spawn a thread to receive and
444-- dispatch the response.
445sendQuery ::
446 forall err a b tbl x meth tid addr.
447 Client err meth tid addr x -- ^ A query/response implementation.
448 -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query.
449 -> a -- ^ The outbound query.
450 -> addr -- ^ Destination address of query.
451 -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out.
452sendQuery c@(Client net d err pending whoami _) meth q addr0 = do
453 mvar <- newEmptyMVar
454 (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar)
455 mres <- timeout expiry $ takeMVar mvar
456 case mres of
457 Just b -> return $ Just b
458 Nothing -> do
459 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
460 return Nothing
461
462contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x
463contramapAddr f (MethodHandler p s a)
464 = MethodHandler
465 p
466 (\tid src dst result -> s tid (f src) (f dst) result)
467 (\addr arg -> a (f addr) arg)
468contramapAddr f (NoReply p a)
469 = NoReply p (\addr arg -> a (f addr) arg)
470
471-- | Query handlers can throw this to ignore a query instead of responding to
472-- it.
473data DropQuery = DropQuery
474 deriving Show
475
476instance Exception DropQuery
477
478-- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the
479-- parse is successful, the returned IO action will construct our reply if
480-- there is one. Otherwise, a parse err is returned.
481dispatchQuery :: MethodHandlerA err tid addr x y -- ^ Handler to invoke.
482 -> tid -- ^ The transaction id for this query\/response session.
483 -> addr -- ^ Our own address, to which the query was sent.
484 -> x -- ^ The query packet.
485 -> addr -- ^ The origin address of the query.
486 -> Either err (IO (Maybe y))
487dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr =
488 fmap (\a -> catch (Just . wrapR tid self addr <$> f addr a)
489 (\DropQuery -> return Nothing))
490 $ unwrapQ x
491dispatchQuery (NoReply unwrapQ f) tid self x addr =
492 fmap (\a -> f addr a >> return Nothing) $ unwrapQ x
493
494-- | Like 'transactionMethods' but allows extra information to be stored in the
495-- table of pending transactions. This also enables multiple 'Client's to
496-- share a single transaction table.
497transactionMethods' ::
498 ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry
499 -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry
500 -> TableMethods t tid -- ^ Table methods to lookup values by /tid/.
501 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
502 -> TransactionMethods (g,t a) tid addr x
503transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods
504 { dispatchCancel = \tid (g,t) -> return (g, delete tid t)
505 , dispatchRegister = \nowPlusExpiry v a (g,t) -> do
506 let (tid,g') = generate g
507 let t' = insert tid (store v) nowPlusExpiry t -- (now + microsecondsDiff expiry) t
508 return ( tid, (g',t') )
509 , dispatchResponse = \tid x (g,t) ->
510 case lookup tid t of
511 Just v -> let t' = delete tid t
512 in return ((g,t'),void $ load v $ Just x)
513 Nothing -> return ((g,t), return ())
514 }
515
516-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a
517-- function for generating unique transaction ids.
518transactionMethods ::
519 TableMethods t tid -- ^ Table methods to lookup values by /tid/.
520 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
521 -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x
522transactionMethods methods generate = transactionMethods' id id methods generate
523
524-- | Handle a single inbound packet and then invoke the given continuation.
525-- The 'forkListener' function is implemented by passing this function to 'fix'
526-- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or
527-- throws an exception.
528handleMessage ::
529 ClientA err meth tid addr x y
530 -> addr
531 -> x
532 -> IO (Maybe (x -> x))
533handleMessage (Client net d err pending whoami responseID) addr plain = do
534 -- Just (Left e) -> do reportParseError err e
535 -- return $! Just id
536 -- Just (Right (plain, addr)) -> do
537 case classifyInbound d plain of
538 IsQuery meth tid -> case lookupHandler d meth of
539 Nothing -> do reportMissingHandler err meth addr plain
540 return $! Just id
541 Just m -> do
542 self <- whoami (Just addr)
543 tid' <- responseID tid
544 either (\e -> do reportParseError err e
545 return $! Just id)
546 (>>= \m -> do mapM_ (sendMessage net addr) m
547 return $! Nothing)
548 (dispatchQuery m tid' self plain addr)
549 IsUnsolicited action -> do
550 self <- whoami (Just addr)
551 action self addr
552 return Nothing
553 IsResponse tid -> do
554 action <- atomically $ do
555 ts0 <- readTVar pending
556 (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0
557 writeTVar pending ts
558 return action
559 action
560 return $! Nothing
561 IsUnknown e -> do reportUnknown err addr plain e
562 return $! Just id
563 -- Nothing -> return $! id
564
565-- * UDP Datagrams.
566
567-- | Access the address family of a given 'SockAddr'. This convenient accessor
568-- is missing from 'Network.Socket', so I implemented it here.
569sockAddrFamily :: SockAddr -> Family
570sockAddrFamily (SockAddrInet _ _ ) = AF_INET
571sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
572sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
573#if !MIN_VERSION_network(3,0,0)
574sockAddrFamily _ = AF_CAN -- SockAddrCan constructor deprecated
575#endif
576
577-- | Packets with an empty payload may trigger EOF exception.
578-- 'udpTransport' uses this function to avoid throwing in that
579-- case.
580ignoreEOF :: Socket -> MVar () -> Arrival e a x -> IOError -> IO (Arrival e a x)
581ignoreEOF sock isClosed def e = do
582 done <- tryReadMVar isClosed
583 case done of
584 Just () -> do close sock
585 dput XMisc "Closing UDP socket."
586 pure Terminated
587 _ -> if isEOFError e then pure def
588 else throwIO e
589
590-- | Hard-coded maximum packet size for incoming UDP Packets received via
591-- 'udpTransport'.
592udpBufferSize :: Int
593udpBufferSize = 65536
594
595-- | Wrapper around 'B.sendTo' that silently ignores DoesNotExistError.
596saferSendTo :: Socket -> ByteString -> SockAddr -> IO ()
597saferSendTo sock bs saddr = void (B.sendTo sock bs saddr)
598 `catch` \e ->
599 -- sendTo: does not exist (Network is unreachable)
600 -- Occurs when IPv6 or IPv4 network is not available.
601 -- Currently, we require -threaded to prevent a forever-hang in this case.
602 if isDoesNotExistError e
603 then return ()
604 else throw e
605
606-- | Like 'udpTransport' except also returns the raw socket (for broadcast use).
607udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket)
608udpTransport' bind_address = do
609 let family = sockAddrFamily bind_address
610 sock <- socket family Datagram defaultProtocol
611 when (family == AF_INET6) $ do
612 setSocketOption sock IPv6Only 0
613 setSocketOption sock Broadcast 1
614 bind sock bind_address
615 isClosed <- newEmptyMVar
616 udpTChan <- atomically newTChan
617 let tr = Transport {
618 awaitMessage = \kont -> do
619 r <- readTChan udpTChan
620 return $ kont $! r
621 , sendMessage = case family of
622 AF_INET6 -> \case
623 (SockAddrInet port addr) -> \bs ->
624 -- Change IPv4 to 4mapped6 address.
625 saferSendTo sock bs $ SockAddrInet6 port 0 (0,0,0x0000ffff,fromBE32 addr) 0
626 addr6 -> \bs -> saferSendTo sock bs addr6
627 AF_INET -> \case
628 (SockAddrInet6 port 0 (0,0,0x0000ffff,raw4) 0) -> \bs -> do
629 let host4 = toBE32 raw4
630 -- Change 4mapped6 to ordinary IPv4.
631 -- dput XMisc $ "4mapped6 -> "++show (SockAddrInet port host4)
632 saferSendTo sock bs (SockAddrInet port host4)
633 addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr)
634 addr4 -> \bs -> saferSendTo sock bs addr4
635 _ -> \addr bs -> saferSendTo sock bs addr
636 , setActive = \case
637 False -> do
638 dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address
639 tryPutMVar isClosed () -- signal awaitMessage that the transport is closed.
640#if MIN_VERSION_network (3,1,0)
641#elif MIN_VERSION_network(3,0,0)
642 let withFdSocket sock f = fdSocket sock >>= f >>= seq sock . return
643#else
644 let withFdSocket sock f = f (fdSocket sock) >>= seq sock . return
645#endif
646 withFdSocket sock $ \fd -> do
647 let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return ()
648 -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage.
649 closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd)
650 True -> do
651 udpThread <- forkIO $ fix $ \again -> do
652 r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do
653 uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize
654 atomically $ writeTChan udpTChan r
655 case r of Terminated -> return ()
656 _ -> again
657 labelThread udpThread ("udp.io."++show bind_address)
658 }
659 return (tr, sock)
660
661-- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The
662-- argument is the listen-address for incoming packets. This is a useful
663-- low-level 'Transport' that can be transformed for higher-level protocols
664-- using 'layerTransport'.
665udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString)
666udpTransport bind_address = fst <$> udpTransport' bind_address
667
668chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x
669chanTransport chanFromAddr self achan aclosed = Transport
670 { awaitMessage = \kont -> do
671 x <- (uncurry (flip Arrival) <$> readTChan achan)
672 `orElse`
673 (readTVar aclosed >>= check >> return Terminated)
674 return $ kont x
675 , sendMessage = \them bs -> do
676 atomically $ writeTChan (chanFromAddr them) (bs,self)
677 , setActive = \case
678 False -> atomically $ writeTVar aclosed True
679 True -> return ()
680 }
681
682-- | Returns a pair of transports linked together to simulate two computers talking to each other.
683testPairTransport :: IO (Transport err SockAddr ByteString, Transport err SockAddr ByteString)
684testPairTransport = do
685 achan <- atomically newTChan
686 bchan <- atomically newTChan
687 aclosed <- atomically $ newTVar False
688 bclosed <- atomically $ newTVar False
689 let a = SockAddrInet 1 1
690 b = SockAddrInet 2 2
691 return ( chanTransport (const bchan) a achan aclosed
692 , chanTransport (const achan) b bchan bclosed )
693
694newtype ByAddress err x addr = ByAddress (Transport err addr x)
695
696newtype Tagged x addr = Tagged x
697
698decorateAddr :: tag addr -> Arrival e addr x -> Arrival e (DSum tag Identity) x
699decorateAddr tag Terminated = Terminated
700decorateAddr tag (ParseError e) = ParseError e
701decorateAddr tag (Arrival addr x) = Arrival (tag ==> addr) x
702
703mergeTransports :: GCompare tag => DMap tag (ByAddress err x) -> IO (Transport err (DSum tag Identity) x)
704mergeTransports tmap = do
705 -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap
706 -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap
707 return Transport
708 { awaitMessage = \kont ->
709 foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n)
710 retry
711 tmap
712 , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of
713 Just (ByAddress tr) -> sendMessage tr addr x
714 Nothing -> return ()
715 , setActive = \toggle -> foldrWithKey (\_ (ByAddress tr) next -> setActive tr toggle >> next) (return ()) tmap
716 }
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs
deleted file mode 100644
index 0028a5b6..00000000
--- a/dht/src/Network/QueryResponse/TCP.hs
+++ /dev/null
@@ -1,223 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE GeneralizedNewtypeDeriving #-}
3{-# LANGUAGE LambdaCase #-}
4module Network.QueryResponse.TCP where
5
6#ifdef THREAD_DEBUG
7import Control.Concurrent.Lifted.Instrument
8#else
9import Control.Concurrent.Lifted
10import GHC.Conc (labelThread)
11#endif
12
13import Control.Arrow
14import Control.Concurrent.STM
15import Control.Concurrent.STM.TMVar
16import Control.Monad
17import Data.ByteString (ByteString,hPut)
18import Data.Function
19import Data.Hashable
20import Data.Maybe
21import Data.Ord
22import Data.Time.Clock.POSIX
23import Data.Word
24import Data.String (IsString(..))
25import Network.BSD
26import Network.Socket as Socket
27import System.Timeout
28import System.IO
29import System.IO.Error
30
31import DebugTag
32import DebugUtil
33import DPut
34import Connection.Tcp (socketFamily)
35import qualified Data.MinMaxPSQ as MM
36import Network.QueryResponse
37
38data TCPSession st
39 = PendingTCPSession
40 | TCPSession
41 { tcpHandle :: Handle
42 , tcpState :: st
43 , tcpThread :: ThreadId
44 }
45
46newtype TCPAddress = TCPAddress SockAddr
47 deriving (Eq,Ord,Show)
48
49instance Hashable TCPAddress where
50 hashWithSalt salt (TCPAddress x) = case x of
51 SockAddrInet port addr -> hashWithSalt salt (fromIntegral port :: Word16,addr)
52 SockAddrInet6 port b c d -> hashWithSalt salt (fromIntegral port :: Word16,b,c,d)
53 _ -> 0
54
55data TCPCache st = TCPCache
56 { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st))
57 , tcpMax :: Int
58 }
59
60-- This is a suitable /st/ parameter to 'TCPCache'
61data SessionProtocol x y = SessionProtocol
62 { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination.
63 , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages.
64 , streamEncode :: y -> IO () -- ^ Serialize outbound messages.
65 }
66
67data StreamHandshake addr x y = StreamHandshake
68 { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection.
69 , streamAddr :: addr -> SockAddr
70 }
71
72killSession :: TCPSession st -> IO ()
73killSession PendingTCPSession = return ()
74killSession TCPSession{tcpThread=t} = killThread t
75
76showStat :: IsString p => TCPSession st -> p
77showStat r = case r of PendingTCPSession -> "pending."
78 TCPSession {} -> "established."
79
80tcp_timeout :: Int
81tcp_timeout = 10000000
82
83acquireConnection :: TMVar (Arrival a addr x)
84 -> TCPCache (SessionProtocol x y)
85 -> StreamHandshake addr x y
86 -> addr
87 -> Bool
88 -> IO (Maybe (y -> IO ()))
89acquireConnection mvar tcpcache stream addr bDoCon = do
90 now <- getPOSIXTime
91 -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr)
92 entry <- atomically $ do
93 c <- readTVar (lru tcpcache)
94 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
95 case v of
96 Nothing | bDoCon -> writeTVar (lru tcpcache)
97 $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c
98 | otherwise -> return ()
99 Just (tm, v) -> writeTVar (lru tcpcache)
100 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) c
101 return v
102 -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry)
103 case entry of
104 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
105 proto <- getProtocolNumber "tcp"
106 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
107 mh <- catchIOError (do h <- timeout tcp_timeout $ do
108 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
109 h <- socketToHandle sock ReadWriteMode
110 hSetBuffering h NoBuffering
111 return h
112 return h)
113 $ \e -> return Nothing
114 when (isNothing mh) $ do
115 atomically $ modifyTVar' (lru tcpcache)
116 $ MM.delete (TCPAddress $ streamAddr stream addr)
117 Socket.close sock
118 ret <- fmap join $ forM mh $ \h -> do
119 mst <- catchIOError (Just <$> streamHello stream addr h)
120 (\e -> return Nothing)
121 case mst of
122 Nothing -> do
123 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
124 return Nothing
125 Just st -> do
126 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
127 signal <- newTVarIO False
128 let showAddr a = show (streamAddr stream a)
129 rthread <- forkLabeled ("tcp:"++showAddr addr) $ do
130 atomically (readTVar signal >>= check)
131 fix $ \loop -> do
132 x <- streamDecode st
133 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
134 case x of
135 Just u -> do
136 m <- timeout tcp_timeout $ atomically (putTMVar mvar $ Arrival addr u)
137 when (isNothing m) $ do
138 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet."
139 atomically $ tryTakeTMVar mvar
140 return ()
141 loop
142 Nothing -> do
143 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
144 do atomically $ modifyTVar' (lru tcpcache)
145 $ MM.delete (TCPAddress $ streamAddr stream addr)
146 c <- atomically $ readTVar (lru tcpcache)
147 now <- getPOSIXTime
148 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
149 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r]
150 mreport <- timeout tcp_timeout $ threadReport False -- XXX: Paranoid timeout
151 case mreport of
152 Just treport -> dput XTCP treport
153 Nothing -> dput XTCP "TCP ERROR: threadReport timed out."
154 hClose h `catchIOError` \e -> return ()
155 let v = TCPSession
156 { tcpHandle = h
157 , tcpState = st
158 , tcpThread = rthread
159 }
160 t <- getPOSIXTime
161 retires <- atomically $ do
162 c <- readTVar (lru tcpcache)
163 let (rs,c') = MM.takeView (tcpMax tcpcache)
164 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
165 writeTVar (lru tcpcache) c'
166 writeTVar signal True
167 return rs
168 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do
169 dput XTCP $ "TCP dropped: " ++ show k
170 killSession r
171 case r of TCPSession {tcpState=st,tcpHandle=h} -> do
172 streamGoodbye st
173 hClose h
174 `catchIOError` \e -> return ()
175 _ -> return ()
176
177 return $ Just $ streamEncode st
178 when (isNothing ret) $ do
179 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
180 return ret
181 Just (tm, PendingTCPSession)
182 | not bDoCon -> return Nothing
183 | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do
184 c <- readTVar (lru tcpcache)
185 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
186 case v of
187 Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st
188 Nothing -> return Nothing
189 _ -> retry
190 Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st
191
192closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
193closeAll tcpcache stream = do
194 dput XTCP "TCP.closeAll called."
195 cache <- atomically $ swapTVar (lru tcpcache) MM.empty
196 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
197 killSession r
198 case r of TCPSession{tcpState=st,tcpHandle=h} -> catchIOError (streamGoodbye st >> hClose h)
199 (\e -> return ())
200 _ -> return ()
201
202-- Use a cache of TCP client connections for sending (and receiving) packets.
203-- The boolean value prepended to the message allows the sender to specify
204-- whether or not a new connection will be initiated if neccessary. If 'False'
205-- is passed, then the packet will be sent only if there already exists a
206-- connection.
207tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
208 -> StreamHandshake addr x y
209 -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y))
210tcpTransport maxcon stream = do
211 msgvar <- atomically newEmptyTMVar
212 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
213 return $ (,) tcpcache Transport
214 { awaitMessage = \f -> takeTMVar msgvar >>= \x -> return $ do
215 f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Terminated)
216 , sendMessage = \addr (bDoCon,y) -> do
217 void . forkLabeled "tcp-send" $ do
218 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
219 mapM_ ($ y) msock
220 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e
221 , setActive = \case False -> closeAll tcpcache stream >> atomically (putTMVar msgvar Terminated)
222 True -> return ()
223 }
diff --git a/dht/src/Network/SocketLike.hs b/dht/src/Network/SocketLike.hs
deleted file mode 100644
index 37891cfd..00000000
--- a/dht/src/Network/SocketLike.hs
+++ /dev/null
@@ -1,98 +0,0 @@
1{-# LANGUAGE GeneralizedNewtypeDeriving #-}
2{-# LANGUAGE TupleSections #-}
3{-# LANGUAGE CPP #-}
4-- |
5--
6-- A socket could be used indirectly via a 'System.IO.Handle' or a conduit from
7-- Michael Snoyman's conduit package. But doing so presents an encapsulation
8-- problem. Do we allow access to the underlying socket and trust that it wont
9-- be used in an unsafe way? Or do we protect it at the higher level and deny
10-- access to various state information?
11--
12-- The 'SocketLike' class enables the approach that provides a safe wrapper to
13-- the underlying socket and gives access to various state information without
14-- enabling direct reads or writes.
15module Network.SocketLike
16 ( SocketLike(..)
17 , RestrictedSocket
18 , restrictSocket
19 , restrictHandleSocket
20 -- * Re-exports
21 --
22 -- | To make the 'SocketLike' methods less awkward to use, the types
23 -- 'CUInt', 'SockAddr', and 'PortNumber' are re-exported.
24 , CUInt
25 , PortNumber
26 , SockAddr(..)
27 ) where
28
29import Network.Socket
30 ( PortNumber
31 , SockAddr
32 )
33import Foreign.C.Types ( CUInt )
34
35import qualified Network.Socket as NS
36import System.IO (Handle,hClose,hIsOpen)
37import Control.Arrow
38
39-- | A safe (mostly read-only) interface to a 'NS.Socket'. Note that despite
40-- how this class is named, it provides no access to typical 'NS.Socket' uses
41-- like sending or receiving network packets.
42class SocketLike sock where
43 -- | See 'NS.getSocketName'
44 getSocketName :: sock -> IO SockAddr
45 -- | See 'NS.getPeerName'
46 getPeerName :: sock -> IO SockAddr
47 -- | See 'NS.getPeerCred'
48-- getPeerCred :: sock -> IO (CUInt, CUInt, CUInt)
49
50 -- | Is the socket still valid? Connected
51 --
52 -- In order to give the instance writer
53 -- the option to do book-keeping in a pure
54 -- type, a conceptually modified version of
55 -- the 'SocketLike' is returned.
56 --
57 isValidSocket :: sock -> IO (sock,Bool)
58
59
60instance SocketLike NS.Socket where
61 getSocketName = NS.getSocketName
62 getPeerName = NS.getPeerName
63-- getPeerCred = NS.getPeerCred
64#if MIN_VERSION_network(3,1,0)
65 isValidSocket s = (s,) <$> NS.withFdSocket s (return . (/= (-1)))
66#else
67#if MIN_VERSION_network(3,0,0)
68 isValidSocket s = (s,) . (/= (-1)) <$> NS.fdSocket s
69#else
70#if MIN_VERSION_network(2,4,0)
71 isValidSocket s = (s,) <$> NS.isConnected s -- warning: this is always False if the socket
72 -- was converted to a Handle
73#else
74 isValidSocket s = (s,) <$> NS.sIsConnected s -- warning: this is always False if the socket
75 -- was converted to a Handle
76#endif
77#endif
78#endif
79
80-- | An encapsulated socket. Data reads and writes are not possible.
81data RestrictedSocket = Restricted (Maybe Handle) NS.Socket deriving Show
82
83instance SocketLike RestrictedSocket where
84 getSocketName (Restricted mb sock) = NS.getSocketName sock
85 getPeerName (Restricted mb sock) = NS.getPeerName sock
86-- getPeerCred (Restricted mb sock) = NS.getPeerCred sock
87 isValidSocket rs@(Restricted mb sock) = maybe (first (Restricted mb) <$> isValidSocket sock) (((rs,) <$>) . hIsOpen) mb
88
89-- | Create a 'RestrictedSocket' that explicitly disallows sending or
90-- receiving data.
91restrictSocket :: NS.Socket -> RestrictedSocket
92restrictSocket socket = Restricted Nothing socket
93
94-- | Build a 'RestrictedSocket' for which 'sClose' will close the given
95-- 'Handle'. It is intended that this 'Handle' was obtained via
96-- 'NS.socketToHandle'.
97restrictHandleSocket :: Handle -> NS.Socket -> RestrictedSocket
98restrictHandleSocket h socket = Restricted (Just h) socket
diff --git a/dht/src/Network/StreamServer.hs b/dht/src/Network/StreamServer.hs
deleted file mode 100644
index 1da612ce..00000000
--- a/dht/src/Network/StreamServer.hs
+++ /dev/null
@@ -1,167 +0,0 @@
1-- | This module implements a bare-bones TCP or Unix socket server.
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE TypeFamilies #-}
4{-# LANGUAGE TypeOperators #-}
5{-# LANGUAGE OverloadedStrings #-}
6{-# LANGUAGE RankNTypes #-}
7module Network.StreamServer
8 ( streamServer
9 , ServerHandle
10 , getAcceptLoopThreadId
11 , ServerConfig(..)
12 , withSession
13 , quitListening
14 --, dummyServerHandle
15 , listenSocket
16 , Local(..)
17 , Remote(..)
18 ) where
19
20import Data.Monoid
21import Network.Socket as Socket
22import System.Directory (removeFile)
23import System.IO
24 ( IOMode(..)
25 , stderr
26 , hFlush
27 )
28import Control.Monad
29import Control.Monad.Fix (fix)
30#ifdef THREAD_DEBUG
31import Control.Concurrent.Lifted.Instrument
32 ( forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId
33 , killThread )
34#else
35import GHC.Conc (labelThread)
36import Control.Concurrent
37 ( forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId
38 , killThread )
39#endif
40import Control.Exception (handle,finally)
41import System.IO.Error (tryIOError)
42import System.Mem.Weak
43import System.IO.Error
44
45-- import Data.Conduit
46import System.IO (Handle)
47import Control.Concurrent.MVar (newMVar)
48
49import Network.SocketLike
50import DPut
51import DebugTag
52
53data ServerHandle = ServerHandle Socket (Weak ThreadId)
54
55-- | Useful for testing.
56getAcceptLoopThreadId :: ServerHandle -> IO (Weak ThreadId)
57getAcceptLoopThreadId (ServerHandle _ t) = return t
58
59listenSocket :: ServerHandle -> RestrictedSocket
60listenSocket (ServerHandle sock _) = restrictSocket sock
61
62{- // Removed, bit-rotted and there are no call sites
63-- | Create a useless do-nothing 'ServerHandle'.
64dummyServerHandle :: IO ServerHandle
65dummyServerHandle = do
66 mvar <- newMVar Closed
67 let sock = MkSocket 0 AF_UNSPEC NoSocketType 0 mvar
68 thread <- mkWeakThreadId <=< forkIO $ return ()
69 return (ServerHandle sock thread)
70-}
71
72removeSocketFile :: SockAddr -> IO ()
73removeSocketFile (SockAddrUnix fname) = removeFile fname
74removeSocketFile _ = return ()
75
76-- | Terminate the server accept-loop. Call this to shut down the server.
77quitListening :: ServerHandle -> IO ()
78quitListening (ServerHandle socket acceptThread) =
79 finally (Socket.getSocketName socket >>= removeSocketFile)
80 (do mapM_ killThread =<< deRefWeak acceptThread
81 Socket.close socket)
82
83
84-- | It's 'bshow' instead of 'show' to enable swapping in a 'ByteString'
85-- variation. (This is not exported.)
86bshow :: Show a => a -> String
87bshow e = show e
88
89-- | Send a string to stderr. Not exported. Default 'serverWarn' when
90-- 'withSession' is used to configure the server.
91warnStderr :: String -> IO ()
92warnStderr str = dput XMisc str >> hFlush stderr
93
94newtype Local a = Local a deriving (Eq,Ord,Show)
95newtype Remote a = Remote a deriving (Eq,Ord,Show)
96
97data ServerConfig = ServerConfig
98 { serverWarn :: String -> IO ()
99 -- ^ Action to report warnings and errors.
100 , serverSession :: ( RestrictedSocket, (Local SockAddr, Remote SockAddr)) -> Int -> Handle -> IO ()
101 -- ^ Action to handle interaction with a client
102 }
103
104-- | Initialize a 'ServerConfig' using the provided session handler.
105withSession :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> Int -> Handle -> IO ()) -> ServerConfig
106withSession session = ServerConfig warnStderr session
107
108-- | Launch a thread to listen at the given bind address and dispatch
109-- to session handler threads on every incoming connection. Supports
110-- IPv4 and IPv6, TCP and unix sockets.
111--
112-- The returned handle can be used with 'quitListening' to terminate the
113-- thread and prevent any new sessions from starting. Currently active
114-- session threads will not be terminated or signaled in any way.
115streamServer :: ServerConfig -> [SockAddr] -> IO ServerHandle
116streamServer cfg addrs = do
117 let warn = serverWarn cfg
118 family = case addrs of
119 SockAddrInet {}:_ -> AF_INET
120 SockAddrInet6 {}:_ -> AF_INET6
121 SockAddrUnix {}:_ -> AF_UNIX
122 [] -> AF_INET6
123 sock <- socket family Stream 0
124 setSocketOption sock ReuseAddr 1
125 let tryBind addr next _ = do
126 tryIOError (removeSocketFile addr)
127 bind sock addr
128 `catchIOError` \e -> next (Just e)
129 fix $ \loop -> let again mbe = do
130 forM_ mbe $ \e -> warn $ "bind-error: " <> bshow addrs <> " " <> bshow e
131 threadDelay 5000000
132 loop
133 in foldr tryBind again addrs Nothing
134 listen sock maxListenQueue
135 thread <- mkWeakThreadId <=< forkIO $ do
136 bindaddr <- Socket.getSocketName sock
137 myThreadId >>= flip labelThread ("StreamServer.acceptLoop." <> bshow bindaddr)
138 acceptLoop cfg sock 0
139 return (ServerHandle sock thread)
140
141-- | Not exported. This, combined with 'acceptException' form a mutually
142-- recursive loop that handles incoming connections. To quit the loop, the
143-- socket must be closed by 'quitListening'.
144acceptLoop :: ServerConfig -> Socket -> Int -> IO ()
145acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do
146 (con,raddr) <- accept sock
147 let conkey = n + 1
148 laddr <- Socket.getSocketName con
149 h <- socketToHandle con ReadWriteMode
150 forkIO $ do
151 myThreadId >>= flip labelThread "StreamServer.session"
152 serverSession cfg (restrictHandleSocket h con, (Local laddr, Remote raddr)) conkey h
153 acceptLoop cfg sock (n + 1)
154
155acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO ()
156acceptException cfg n sock ioerror = do
157 case show (ioeGetErrorType ioerror) of
158 "resource exhausted" -> do -- try again (ioeGetErrorType ioerror == fullErrorType)
159 serverWarn cfg $ ("acceptLoop: resource exhasted")
160 threadDelay 500000
161 acceptLoop cfg sock (n + 1)
162 "invalid argument" -> do -- quit on closed socket
163 Socket.close sock
164 message -> do -- unexpected exception
165 serverWarn cfg $ ("acceptLoop: "<>bshow message)
166 Socket.close sock
167
diff --git a/dht/stack.yaml b/dht/stack.yaml
index 3ae992c7..5c6013a0 100644
--- a/dht/stack.yaml
+++ b/dht/stack.yaml
@@ -18,6 +18,7 @@ extra-deps:
18- "../concurrent-supply/" 18- "../concurrent-supply/"
19- "../base32-bytestring/" 19- "../base32-bytestring/"
20- "../dependent-map/" 20- "../dependent-map/"
21- "../server/"
21- cryptonite-0.23 22- cryptonite-0.23
22- reference-0.1 23- reference-0.1
23- avahi-0.2.0@sha256:eb725536d8427548685b531d4bf8271a3104da06f611ed38165a0e08e21c54eb,1799 24- avahi-0.2.0@sha256:eb725536d8427548685b531d4bf8271a3104da06f611ed38165a0e08e21c54eb,1799