summaryrefslogtreecommitdiff
path: root/dht/src
diff options
context:
space:
mode:
Diffstat (limited to 'dht/src')
-rw-r--r--dht/src/Control/Concurrent/Delay.hs49
-rw-r--r--dht/src/Control/Concurrent/PingMachine.hs161
-rw-r--r--dht/src/Control/Concurrent/ThreadUtil.hs31
-rw-r--r--dht/src/Data/TableMethods.hs105
-rw-r--r--dht/src/DebugUtil.hs42
-rw-r--r--dht/src/Network/QueryResponse.hs716
-rw-r--r--dht/src/Network/QueryResponse/TCP.hs223
-rw-r--r--dht/src/Network/SocketLike.hs98
-rw-r--r--dht/src/Network/StreamServer.hs167
9 files changed, 0 insertions, 1592 deletions
diff --git a/dht/src/Control/Concurrent/Delay.hs b/dht/src/Control/Concurrent/Delay.hs
deleted file mode 100644
index 67dcd451..00000000
--- a/dht/src/Control/Concurrent/Delay.hs
+++ /dev/null
@@ -1,49 +0,0 @@
1module Control.Concurrent.Delay where
2
3import Control.Concurrent
4import Control.Monad
5import Control.Exception ({-evaluate,-}handle,finally,throwIO)
6import Data.Time.Clock (NominalDiffTime)
7import System.IO.Error
8
9type Microseconds = Int
10
11microseconds :: NominalDiffTime -> Microseconds
12microseconds d = round $ 1000000 * d
13
14data InterruptibleDelay = InterruptibleDelay
15 { delayThread :: MVar ThreadId
16 }
17
18interruptibleDelay :: IO InterruptibleDelay
19interruptibleDelay = do
20 fmap InterruptibleDelay newEmptyMVar
21
22-- | Delay for the given number of microseconds and return 'True' if the delay
23-- is not interrupted.
24--
25-- Note: If a thread is already waiting on the given 'InterruptibleDelay'
26-- object, then this will block until it becomes available and only then start
27-- the delay timer.
28startDelay :: InterruptibleDelay -> Microseconds -> IO Bool
29startDelay d interval = do
30 thread <- myThreadId
31 handle (\e -> do when (not $ isUserError e) (throwIO e)
32 return False) $ do
33 putMVar (delayThread d) thread
34 threadDelay interval
35 void $ takeMVar (delayThread d)
36 return True
37 -- The following cleanup shouldn't be necessary, but I'm paranoid.
38 `finally` tryTakeMVar (delayThread d)
39
40 where debugNoise str = return ()
41
42-- | Signal the thread waiting on the given 'InterruptibleDelay' object to
43-- continue even though the timeout has not elapsed. If no thread is waiting,
44-- then this is a no-op.
45interruptDelay :: InterruptibleDelay -> IO ()
46interruptDelay d = do
47 mthread <- tryTakeMVar (delayThread d)
48 forM_ mthread $ \thread -> do
49 throwTo thread (userError "Interrupted delay")
diff --git a/dht/src/Control/Concurrent/PingMachine.hs b/dht/src/Control/Concurrent/PingMachine.hs
deleted file mode 100644
index a8f10e83..00000000
--- a/dht/src/Control/Concurrent/PingMachine.hs
+++ /dev/null
@@ -1,161 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE TupleSections #-}
3module Control.Concurrent.PingMachine where
4
5import Control.Monad
6import Data.Function
7#ifdef THREAD_DEBUG
8import Control.Concurrent.Lifted.Instrument
9#else
10import Control.Concurrent.Lifted
11import GHC.Conc (labelThread)
12#endif
13import Control.Concurrent.STM
14
15import Control.Concurrent.Delay
16
17type Miliseconds = Int
18type TimeOut = Miliseconds
19type PingInterval = Miliseconds
20
21-- | Events that occur as a result of the 'PingMachine' watchdog.
22--
23-- Use 'pingWait' to wait for one of these to occur.
24data PingEvent
25 = PingIdle -- ^ You should send a ping if you observe this event.
26 | PingTimeOut -- ^ You should give up on the connection in case of this event.
27
28data PingMachine = PingMachine
29 { pingFlag :: TVar Bool
30 , pingInterruptible :: InterruptibleDelay
31 , pingEvent :: TMVar PingEvent
32 , pingStarted :: TMVar Bool
33 }
34
35-- | Fork a thread to monitor a connection for a ping timeout.
36--
37-- If 'pingBump' is not invoked after a idle is signaled, a timeout event will
38-- occur. When that happens, even if the caller chooses to ignore this event,
39-- the watchdog thread will be terminated and no more ping events will be
40-- signaled.
41--
42-- An idle connection will be signaled by:
43--
44-- (1) 'pingFlag' is set 'True'
45--
46-- (2) 'pingWait' returns 'PingIdle'
47--
48-- Either may be tested to determine whether a ping should be sent but
49-- 'pingFlag' is difficult to use properly because it is up to the caller to
50-- remember that the ping is already in progress.
51forkPingMachine
52 :: String
53 -> PingInterval -- ^ Milliseconds of idle before a ping is considered necessary.
54 -> TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'.
55 -> IO PingMachine
56forkPingMachine label idle timeout = do
57 d <- interruptibleDelay
58 flag <- atomically $ newTVar False
59 canceled <- atomically $ newTVar False
60 event <- atomically newEmptyTMVar
61 started <- atomically $ newEmptyTMVar
62 when (idle/=0) $ void . forkIO $ do
63 myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog")
64 (>>=) (atomically (readTMVar started)) $ flip when $ do
65 fix $ \loop -> do
66 atomically $ writeTVar flag False
67 fin <- startDelay d (1000*idle)
68 (>>=) (atomically (readTMVar started)) $ flip when $ do
69 if (not fin) then loop
70 else do
71 -- Idle event
72 atomically $ do
73 tryTakeTMVar event
74 putTMVar event PingIdle
75 writeTVar flag True
76 fin <- startDelay d (1000*timeout)
77 (>>=) (atomically (readTMVar started)) $ flip when $ do
78 me <- myThreadId
79 if (not fin) then loop
80 else do
81 -- Timeout event
82 atomically $ do
83 tryTakeTMVar event
84 writeTVar flag False
85 putTMVar event PingTimeOut
86 return PingMachine
87 { pingFlag = flag
88 , pingInterruptible = d
89 , pingEvent = event
90 , pingStarted = started
91 }
92
93-- | like 'forkPingMachine' but the timeout and idle parameters can be changed dynamically
94-- Unlike 'forkPingMachine', 'forkPingMachineDynamic' always launches a thread
95-- regardless of idle value.
96forkPingMachineDynamic
97 :: String
98 -> TVar PingInterval -- ^ Milliseconds of idle before a ping is considered necessary.
99 -> TVar TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'.
100 -> IO PingMachine
101forkPingMachineDynamic label idleV timeoutV = do
102 d <- interruptibleDelay
103 flag <- atomically $ newTVar False
104 canceled <- atomically $ newTVar False
105 event <- atomically newEmptyTMVar
106 started <- atomically $ newEmptyTMVar
107 void . forkIO $ do
108 myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog")
109 (>>=) (atomically (readTMVar started)) $ flip when $ do
110 fix $ \loop -> do
111 atomically $ writeTVar flag False
112 (idle,timeout) <- atomically $ (,) <$> readTVar idleV <*> readTVar timeoutV
113 fin <- startDelay d (1000*idle)
114 (>>=) (atomically (readTMVar started)) $ flip when $ do
115 if (not fin) then loop
116 else do
117 -- Idle event
118 atomically $ do
119 tryTakeTMVar event
120 putTMVar event PingIdle
121 writeTVar flag True
122 fin <- startDelay d (1000*timeout)
123 (>>=) (atomically (readTMVar started)) $ flip when $ do
124 me <- myThreadId
125 if (not fin) then loop
126 else do
127 -- Timeout event
128 atomically $ do
129 tryTakeTMVar event
130 writeTVar flag False
131 putTMVar event PingTimeOut
132 return PingMachine
133 { pingFlag = flag
134 , pingInterruptible = d
135 , pingEvent = event
136 , pingStarted = started
137 }
138
139-- | Terminate the watchdog thread. Call this upon connection close.
140--
141-- You should ensure no threads are waiting on 'pingWait' because there is no
142-- 'PingEvent' signaling termination.
143pingCancel :: PingMachine -> IO ()
144pingCancel me = do
145 atomically $ do tryTakeTMVar (pingStarted me)
146 putTMVar (pingStarted me) False
147 interruptDelay (pingInterruptible me)
148
149-- | Reset the ping timer. Call this regularly to prevent 'PingTimeOut'.
150pingBump :: PingMachine -> IO ()
151pingBump me = do
152 atomically $ do
153 b <- tryReadTMVar (pingStarted me)
154 when (b/=Just False) $ do
155 tryTakeTMVar (pingStarted me)
156 putTMVar (pingStarted me) True
157 interruptDelay (pingInterruptible me)
158
159-- | Retries until a 'PingEvent' occurs.
160pingWait :: PingMachine -> STM PingEvent
161pingWait me = takeTMVar (pingEvent me)
diff --git a/dht/src/Control/Concurrent/ThreadUtil.hs b/dht/src/Control/Concurrent/ThreadUtil.hs
deleted file mode 100644
index a258d933..00000000
--- a/dht/src/Control/Concurrent/ThreadUtil.hs
+++ /dev/null
@@ -1,31 +0,0 @@
1{-# LANGUAGE CPP #-}
2module Control.Concurrent.ThreadUtil
3 (
4#ifdef THREAD_DEBUG
5 module Control.Concurrent.Lifted.Instrument
6#else
7 module Control.Control.Lifted
8 , module GHC.Conc
9#endif
10 ) where
11
12#ifdef THREAD_DEBUG
13import Control.Concurrent.Lifted.Instrument
14#else
15import Control.Concurrent.Lifted
16import GHC.Conc (labelThread)
17
18forkLabeled :: String -> IO () -> IO ThreadId
19forkLabeled lbl action = do
20 t <- forkIO action
21 labelThread t lbl
22 return t
23{-# INLINE forkLabeled #-}
24
25forkOSLabeled :: String -> IO () -> IO ThreadId
26forkOSLabeled lbl action = do
27 t <- forkOS action
28 labelThread t lbl
29 return t
30{-# INLINE forkOSLabeled #-}
31#endif
diff --git a/dht/src/Data/TableMethods.hs b/dht/src/Data/TableMethods.hs
deleted file mode 100644
index e4208a69..00000000
--- a/dht/src/Data/TableMethods.hs
+++ /dev/null
@@ -1,105 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE GADTs #-}
3{-# LANGUAGE LambdaCase #-}
4{-# LANGUAGE PartialTypeSignatures #-}
5{-# LANGUAGE RankNTypes #-}
6{-# LANGUAGE ScopedTypeVariables #-}
7{-# LANGUAGE TupleSections #-}
8module Data.TableMethods where
9
10import Data.Functor.Contravariant
11import Data.Time.Clock.POSIX
12import Data.Word
13import qualified Data.IntMap.Strict as IntMap
14 ;import Data.IntMap.Strict (IntMap)
15import qualified Data.Map.Strict as Map
16 ;import Data.Map.Strict (Map)
17import qualified Data.Word64Map as W64Map
18 ;import Data.Word64Map (Word64Map)
19
20import Data.Wrapper.PSQ as PSQ
21
22type Priority = POSIXTime
23
24data OptionalPriority t tid x
25 = NoPriority
26 | HasPriority (Priority -> t x -> ([(tid, Priority, x)], t x))
27
28-- | The standard lookup table methods.
29data TableMethods t tid = TableMethods
30 { -- | Insert a new /tid/ entry into the transaction table.
31 tblInsert :: forall a. tid -> a -> Priority -> t a -> t a
32 -- | Delete transaction /tid/ from the transaction table.
33 , tblDelete :: forall a. tid -> t a -> t a
34 -- | Lookup the value associated with transaction /tid/.
35 , tblLookup :: forall a. tid -> t a -> Maybe a
36 }
37
38data QMethods t tid x = QMethods
39 { qTbl :: TableMethods t tid
40 , qAtMostView :: OptionalPriority t tid x
41 }
42
43vanillaTable :: TableMethods t tid -> QMethods t tid x
44vanillaTable tbl = QMethods tbl NoPriority
45
46priorityTable :: TableMethods t tid
47 -> (Priority -> t x -> ([(k, Priority, x)], t x))
48 -> (k -> x -> tid)
49 -> QMethods t tid x
50priorityTable tbl atmost f = QMethods
51 { qTbl = tbl
52 , qAtMostView = HasPriority $ \p t -> case atmost p t of
53 (es,t') -> (map (\(k,p,a) -> (f k a, p, a)) es, t')
54 }
55
56-- | Methods for using 'Data.IntMap'.
57intMapMethods :: TableMethods IntMap Int
58intMapMethods = TableMethods
59 { tblInsert = \tid a p -> IntMap.insert tid a
60 , tblDelete = IntMap.delete
61 , tblLookup = IntMap.lookup
62 }
63
64-- | Methods for using 'Data.Word64Map'.
65w64MapMethods :: TableMethods Word64Map Word64
66w64MapMethods = TableMethods
67 { tblInsert = \tid a p -> W64Map.insert tid a
68 , tblDelete = W64Map.delete
69 , tblLookup = W64Map.lookup
70 }
71
72-- | Methods for using 'Data.Map'
73mapMethods :: Ord tid => TableMethods (Map tid) tid
74mapMethods = TableMethods
75 { tblInsert = \tid a p -> Map.insert tid a
76 , tblDelete = Map.delete
77 , tblLookup = Map.lookup
78 }
79
80-- psqMethods :: PSQKey tid => QMethods (HashPSQ tid Priority) tid x
81psqMethods :: PSQKey k => (tid -> k) -> (k -> x -> tid) -> QMethods (PSQ' k Priority) tid x
82psqMethods g f = priorityTable (contramap g tbl) PSQ.atMostView f
83 where
84 tbl :: PSQKey tid => TableMethods (PSQ' tid Priority) tid
85 tbl = TableMethods
86 { tblInsert = PSQ.insert'
87 , tblDelete = PSQ.delete
88 , tblLookup = \tid t -> case PSQ.lookup tid t of
89 Just (p,a) -> Just a
90 Nothing -> Nothing
91 }
92
93
94-- | Change the key type for a lookup table implementation.
95--
96-- This can be used with 'intMapMethods' or 'mapMethods' to restrict lookups to
97-- only a part of the generated /tid/ value. This is useful for /tid/ types
98-- that are especially large due their use for other purposes, such as secure
99-- nonces for encryption.
100instance Contravariant (TableMethods t) where
101 -- contramap :: (tid -> t1) -> TableMethods t t1 -> TableMethods t tid
102 contramap f (TableMethods ins del lookup) =
103 TableMethods (\k p v t -> ins (f k) p v t)
104 (\k t -> del (f k) t)
105 (\k t -> lookup (f k) t)
diff --git a/dht/src/DebugUtil.hs b/dht/src/DebugUtil.hs
deleted file mode 100644
index 96ab8cc5..00000000
--- a/dht/src/DebugUtil.hs
+++ /dev/null
@@ -1,42 +0,0 @@
1{-# LANGUAGE CPP #-}
2module DebugUtil where
3
4import Control.Monad
5import Data.Time.Clock
6import Data.List
7import Text.Printf
8import GHC.Conc (threadStatus,ThreadStatus(..))
9#ifdef THREAD_DEBUG
10import Control.Concurrent.Lifted.Instrument
11#else
12import Control.Concurrent.Lifted
13import GHC.Conc (labelThread)
14#endif
15
16showReport :: [(String,String)] -> String
17showReport kvs = showColumns $ map (\(x,y)->[x,y]) kvs
18
19showColumns :: [[String]] -> String
20showColumns rows = do
21 let cols = transpose rows
22 ws = map (maximum . map (succ . length)) cols
23 fs <- rows
24 _ <- take 1 fs -- Guard against empty rows so that 'last' is safe.
25 " " ++ concat (zipWith (printf "%-*s") (init ws) (init fs)) ++ last fs ++ "\n"
26
27
28threadReport :: Bool -- ^ False to summarize search threads.
29 -> IO String
30threadReport want_ss = do
31 threads <- threadsInformation
32 tm <- getCurrentTime
33 let (ss,ts) = partition (("search" `isPrefixOf`) . lbl . snd)
34 threads
35 r <- forM (if want_ss then threads else ts) $ \(tid,PerThread{..}) -> do
36 stat <- threadStatus tid
37 let showStat (ThreadBlocked reason) = show reason
38 showStat stat = show stat
39 return [show lbl,show (diffUTCTime tm startTime),showStat stat]
40 return $ unlines [ showColumns r
41 , (if want_ss then " There are " else " and ")
42 ++ show (length ss) ++ " search threads." ]
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs
deleted file mode 100644
index 20e7ecf0..00000000
--- a/dht/src/Network/QueryResponse.hs
+++ /dev/null
@@ -1,716 +0,0 @@
1-- | This module can implement any query\/response protocol. It was written
2-- with Kademlia implementations in mind.
3
4{-# LANGUAGE CPP #-}
5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE LambdaCase #-}
7{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE ScopedTypeVariables #-}
10{-# LANGUAGE TupleSections #-}
11module Network.QueryResponse where
12
13#ifdef THREAD_DEBUG
14import Control.Concurrent.Lifted.Instrument
15#else
16import Control.Concurrent
17import GHC.Conc (labelThread)
18#endif
19import Control.Concurrent.STM
20import Control.Exception
21import Control.Monad
22import qualified Data.ByteString as B
23 ;import Data.ByteString (ByteString)
24import Data.Dependent.Map as DMap
25import Data.Dependent.Sum
26import Data.Function
27import Data.Functor.Contravariant
28import Data.Functor.Identity
29import Data.GADT.Show
30import qualified Data.IntMap.Strict as IntMap
31 ;import Data.IntMap.Strict (IntMap)
32import qualified Data.Map.Strict as Map
33 ;import Data.Map.Strict (Map)
34import Data.Time.Clock.POSIX
35import qualified Data.Word64Map as W64Map
36 ;import Data.Word64Map (Word64Map)
37import Data.Word
38import Data.Maybe
39import GHC.Conc (closeFdWith)
40import GHC.Event
41import Network.Socket
42import Network.Socket.ByteString as B
43import System.Endian
44import System.IO
45import System.IO.Error
46import System.Timeout
47
48import DPut
49import DebugTag
50import Data.TableMethods
51
52-- | An inbound packet or condition raised while monitoring a connection.
53data Arrival err addr x
54 = Terminated -- ^ Virtual message that signals EOF.
55 | ParseError !err -- ^ A badly-formed message was received.
56 | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message.
57
58-- | Three methods are required to implement a datagram based query\/response protocol.
59data TransportA err addr x y = Transport
60 { -- | Blocks until an inbound packet is available. Then calls the provided
61 -- continuation with the packet and origin addres or an error condition.
62 awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a)
63 -- | Send an /y/ packet to the given destination /addr/.
64 , sendMessage :: addr -> y -> IO ()
65 -- | Shutdown and clean up any state related to this 'Transport'.
66 , setActive :: Bool -> IO ()
67 }
68
69type Transport err addr x = TransportA err addr x x
70
71closeTransport :: TransportA err addr x y -> IO ()
72closeTransport tr = setActive tr False
73
74-- | This function modifies a 'Transport' to use higher-level addresses and
75-- packet representations. It could be used to change UDP 'ByteString's into
76-- bencoded syntax trees or to add an encryption layer in which addresses have
77-- associated public keys.
78layerTransportM ::
79 (x -> addr -> IO (Either err (x', addr')))
80 -- ^ Function that attempts to transform a low-level address/packet
81 -- pair into a higher level representation.
82 -> (y' -> addr' -> IO (y, addr))
83 -- ^ Function to encode a high-level address/packet into a lower level
84 -- representation.
85 -> TransportA err addr x y
86 -- ^ The low-level transport to be transformed.
87 -> TransportA err addr' x' y'
88layerTransportM parse encode tr =
89 tr { awaitMessage = \kont ->
90 awaitMessage tr $ \case
91 Terminated -> kont $ Terminated
92 ParseError e -> kont $ ParseError e
93 Arrival addr x -> parse x addr >>= \case
94 Left e -> kont $ ParseError e
95 Right (x',addr') -> kont $ Arrival addr' x'
96 , sendMessage = \addr' msg' -> do
97 (msg,addr) <- encode msg' addr'
98 sendMessage tr addr msg
99 }
100
101
102-- | This function modifies a 'Transport' to use higher-level addresses and
103-- packet representations. It could be used to change UDP 'ByteString's into
104-- bencoded syntax trees or to add an encryption layer in which addresses have
105-- associated public keys.
106layerTransport ::
107 (x -> addr -> Either err (x', addr'))
108 -- ^ Function that attempts to transform a low-level address/packet
109 -- pair into a higher level representation.
110 -> (y' -> addr' -> (y, addr))
111 -- ^ Function to encode a high-level address/packet into a lower level
112 -- representation.
113 -> TransportA err addr x y
114 -- ^ The low-level transport to be transformed.
115 -> TransportA err addr' x' y'
116layerTransport parse encode tr =
117 layerTransportM (\x addr -> return $ parse x addr)
118 (\x' addr' -> return $ encode x' addr')
119 tr
120
121-- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan'
122-- is used to share the same underlying socket, so be sure to fork a thread for
123-- both returned 'Transport's to avoid hanging.
124partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a)))
125 -> ((y,xaddr) -> IO (Maybe (c,a)))
126 -> TransportA err a b c
127 -> IO (TransportA err xaddr x y, TransportA err a b c)
128partitionTransportM parse encodex tr = do
129 tchan <- atomically newTChan
130 let ytr = tr { awaitMessage = \kont -> fix $ \again -> do
131 awaitMessage tr $ \m -> case m of
132 Arrival adr msg -> parse (msg,adr) >>= \case
133 Left x -> atomically (writeTChan tchan (Just x)) >> join (atomically again)
134 Right (y,yaddr) -> kont $ Arrival yaddr y
135 ParseError e -> kont $ ParseError e
136 Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated
137 , sendMessage = sendMessage tr
138 }
139 xtr = Transport
140 { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case
141 Nothing -> Terminated
142 Just (x,xaddr) -> Arrival xaddr x
143 , sendMessage = \addr' msg' -> do
144 msg_addr <- encodex (msg',addr')
145 mapM_ (uncurry . flip $ sendMessage tr) msg_addr
146 , setActive = const $ return ()
147 }
148 return (xtr, ytr)
149
150-- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan'
151-- is used to share the same underlying socket, so be sure to fork a thread for
152-- both returned 'Transport's to avoid hanging.
153partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a))
154 -> ((y,xaddr) -> Maybe (c,a))
155 -> TransportA err a b c
156 -> IO (TransportA err xaddr x y, TransportA err a b c)
157partitionTransport parse encodex tr =
158 partitionTransportM (return . parse) (return . encodex) tr
159
160-- |
161-- * f add x --> Nothing, consume x
162-- --> Just id, leave x to a different handler
163-- --> Just g, apply g to x and leave that to a different handler
164--
165-- Note: If you add a handler to one of the branches before applying a
166-- 'mergeTransports' combinator, then this handler may not block or return
167-- Nothing.
168addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> TransportA err addr x y -> TransportA err addr x y
169addHandler onParseError f tr = tr
170 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case
171 Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x))
172 ParseError e -> onParseError e >> kont (ParseError e)
173 Terminated -> kont Terminated
174 }
175
176-- | Modify a 'Transport' to invoke an action upon every received packet.
177onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x
178onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (Just id)) tr
179
180-- * Using a query\/response client.
181
182-- | Fork a thread that handles inbound packets. The returned action may be used
183-- to terminate the thread and clean up any related state.
184--
185-- Example usage:
186--
187-- > -- Start client.
188-- > quitServer <- forkListener "listener" (clientNet client)
189-- > -- Send a query q, recieve a response r.
190-- > r <- sendQuery client method q
191-- > -- Quit client.
192-- > quitServer
193forkListener :: String -> Transport err addr x -> IO (IO ())
194forkListener name client = do
195 setActive client True
196 thread_id <- forkIO $ do
197 myThreadId >>= flip labelThread ("listener."++name)
198 fix $ \loop -> join $ atomically $ awaitMessage client $ \case
199 Terminated -> return ()
200 _ -> loop
201 dput XMisc $ "Listener died: " ++ name
202 return $ do
203 setActive client False
204 -- killThread thread_id
205
206-- * Implementing a query\/response 'Client'.
207
208-- | These methods indicate what should be done upon various conditions. Write
209-- to a log file, make debug prints, or simply ignore them.
210--
211-- [ /addr/ ] Address of remote peer.
212--
213-- [ /x/ ] Incoming or outgoing packet.
214--
215-- [ /meth/ ] Method id of incoming or outgoing request.
216--
217-- [ /tid/ ] Transaction id for outgoing packet.
218--
219-- [ /err/ ] Error information, typically a 'String'.
220data ErrorReporter addr x meth tid err = ErrorReporter
221 { -- | Incoming: failed to parse packet.
222 reportParseError :: err -> IO ()
223 -- | Incoming: no handler for request.
224 , reportMissingHandler :: meth -> addr -> x -> IO ()
225 -- | Incoming: unable to identify request.
226 , reportUnknown :: addr -> x -> err -> IO ()
227 }
228
229ignoreErrors :: ErrorReporter addr x meth tid err
230ignoreErrors = ErrorReporter
231 { reportParseError = \_ -> return ()
232 , reportMissingHandler = \_ _ _ -> return ()
233 , reportUnknown = \_ _ _ -> return ()
234 }
235
236logErrors :: ( Show addr
237 , Show meth
238 ) => ErrorReporter addr x meth tid String
239logErrors = ErrorReporter
240 { reportParseError = \err -> dput XMisc err
241 , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")"
242 , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err
243 }
244
245printErrors :: ( Show addr
246 , Show meth
247 ) => Handle -> ErrorReporter addr x meth tid String
248printErrors h = ErrorReporter
249 { reportParseError = \err -> hPutStrLn h err
250 , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")"
251 , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err
252 }
253
254-- Change the /err/ type for an 'ErrorReporter'.
255instance Contravariant (ErrorReporter addr x meth tid) where
256 -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5
257 contramap f (ErrorReporter pe mh unk)
258 = ErrorReporter (\e -> pe (f e))
259 mh
260 (\addr x e -> unk addr x (f e))
261
262-- | An incoming message can be classified into three cases.
263data MessageClass err meth tid addr x
264 = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response
265 -- should include the provided /tid/ value.
266 | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value.
267 | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked
268 -- with the source and destination address of a message. If it handles the
269 -- message, it should return Nothing. Otherwise, it should return a transform
270 -- (usually /id/) to apply before the next handler examines it.
271 | IsUnknown err -- ^ None of the above.
272
273-- | Handler for an inbound query of type /x/ from an address of type _addr_.
274type MethodHandler err tid addr x = MethodHandlerA err tid addr x x
275
276-- | Handler for an inbound query of type /x/ with outbound response of type
277-- /y/ to an address of type /addr/.
278data MethodHandlerA err tid addr x y = forall a b. MethodHandler
279 { -- | Parse the query into a more specific type for this method.
280 methodParse :: x -> Either err a
281 -- | Serialize the response for transmission, given a context /ctx/ and the origin
282 -- and destination addresses.
283 , methodSerialize :: tid -> addr -> addr -> b -> y
284 -- | Fully typed action to perform upon the query. The remote origin
285 -- address of the query is provided to the handler.
286 --
287 -- TODO: Allow queries to be ignored?
288 , methodAction :: addr -> a -> IO b
289 }
290 -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary.
291 | forall a. NoReply
292 { -- | Parse the query into a more specific type for this method.
293 methodParse :: x -> Either err a
294 -- | Fully typed action to perform upon the query. The remote origin
295 -- address of the query is provided to the handler.
296 , noreplyAction :: addr -> a -> IO ()
297 }
298
299
300-- | To dispatch responses to our outbound queries, we require three
301-- primitives. See the 'transactionMethods' function to create these
302-- primitives out of a lookup table and a generator for transaction ids.
303--
304-- The type variable /d/ is used to represent the current state of the
305-- transaction generator and the table of pending transactions.
306data TransactionMethods d qid addr x = TransactionMethods
307 {
308 -- | Before a query is sent, this function stores an 'MVar' to which the
309 -- response will be written too. The returned /qid/ is a transaction id
310 -- that can be used to forget the 'MVar' if the remote peer is not
311 -- responding.
312 dispatchRegister :: POSIXTime -- time of expiry
313 -> (Maybe x -> IO ()) -- callback upon response (or timeout)
314 -> addr
315 -> d
316 -> STM (qid, d)
317 -- | This method is invoked when an incoming packet /x/ indicates it is
318 -- a response to the transaction with id /qid/. The returned IO action
319 -- will write the packet to the correct 'MVar' thus completing the
320 -- dispatch.
321 , dispatchResponse :: qid -> x -> d -> STM (d, IO ())
322 -- | When a timeout interval elapses, this method is called to remove the
323 -- transaction from the table.
324 , dispatchCancel :: qid -> d -> STM d
325 }
326
327-- | A set of methods necessary for dispatching incoming packets.
328type DispatchMethods tbl err meth tid addr x = DispatchMethodsA tbl err meth tid addr x x
329
330-- | A set of methods necessary for dispatching incoming packets.
331data DispatchMethodsA tbl err meth tid addr x y = DispatchMethods
332 { -- | Classify an inbound packet as a query or response.
333 classifyInbound :: x -> MessageClass err meth tid addr x
334 -- | Lookup the handler for a inbound query.
335 , lookupHandler :: meth -> Maybe (MethodHandlerA err tid addr x y)
336 -- | Methods for handling incoming responses.
337 , tableMethods :: TransactionMethods tbl tid addr x
338 }
339
340-- | All inputs required to implement a query\/response client.
341type Client err meth tid addr x = ClientA err meth tid addr x x
342
343-- | All inputs required to implement a query\/response client.
344data ClientA err meth tid addr x y = forall tbl. Client
345 { -- | The 'Transport' used to dispatch and receive packets.
346 clientNet :: TransportA err addr x y
347 -- | Methods for handling inbound packets.
348 , clientDispatcher :: DispatchMethodsA tbl err meth tid addr x y
349 -- | Methods for reporting various conditions.
350 , clientErrorReporter :: ErrorReporter addr x meth tid err
351 -- | State necessary for routing inbound responses and assigning unique
352 -- /tid/ values for outgoing queries.
353 , clientPending :: TVar tbl
354 -- | An action yielding this client\'s own address. It is invoked once
355 -- on each outbound and inbound packet. It is valid for this to always
356 -- return the same value.
357 --
358 -- The argument, if supplied, is the remote address for the transaction.
359 -- This can be used to maintain consistent aliases for specific peers.
360 , clientAddress :: Maybe addr -> IO addr
361 -- | Transform a query /tid/ value to an appropriate response /tid/
362 -- value. Normally, this would be the identity transformation, but if
363 -- /tid/ includes a unique cryptographic nonce, then it should be
364 -- generated here.
365 , clientResponseId :: tid -> IO tid
366 }
367
368-- | These four parameters are required to implement an outgoing query. A
369-- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that
370-- might be returned by 'lookupHandler'.
371data MethodSerializerA tid addr x y meth a b = MethodSerializer
372 { -- | Returns the microseconds to wait for a response to this query being
373 -- sent to the given address. The /addr/ may also be modified to add
374 -- routing information.
375 methodTimeout :: addr -> STM (addr,Int)
376 -- | A method identifier used for error reporting. This needn't be the
377 -- same as the /meth/ argument to 'MethodHandler', but it is suggested.
378 , method :: meth
379 -- | Serialize the outgoing query /a/ into a transmittable packet /x/.
380 -- The /addr/ arguments are, respectively, our own origin address and the
381 -- destination of the request. The /tid/ argument is useful for attaching
382 -- auxiliary notations on all outgoing packets.
383 , wrapQuery :: tid -> addr -> addr -> a -> x
384 -- | Parse an inbound packet /x/ into a response /b/ for this query.
385 , unwrapResponse :: y -> b
386 }
387
388type MethodSerializer tid addr x meth a b = MethodSerializerA tid addr x x meth a b
389
390microsecondsDiff :: Int -> POSIXTime
391microsecondsDiff us = fromIntegral us / 1000000
392
393asyncQuery_ :: Client err meth tid addr x
394 -> MethodSerializer tid addr x meth a b
395 -> a
396 -> addr
397 -> (Maybe b -> IO ())
398 -> IO (tid,POSIXTime,Int)
399asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do
400 now <- getPOSIXTime
401 (tid,addr,expiry) <- atomically $ do
402 tbl <- readTVar pending
403 (addr,expiry) <- methodTimeout meth addr0
404 (tid, tbl') <- dispatchRegister (tableMethods d)
405 (now + microsecondsDiff expiry)
406 (withResponse . fmap (unwrapResponse meth))
407 addr -- XXX: Should be addr0 or addr?
408 tbl
409 -- (addr,expiry) <- methodTimeout meth tid addr0
410 writeTVar pending tbl'
411 return (tid,addr,expiry)
412 self <- whoami (Just addr)
413 mres <- do sendMessage net addr (wrapQuery meth tid self addr q)
414 return $ Just ()
415 `catchIOError` (\e -> return Nothing)
416 return (tid,now,expiry)
417
418asyncQuery :: Show meth => Client err meth tid addr x
419 -> MethodSerializer tid addr x meth a b
420 -> a
421 -> addr
422 -> (Maybe b -> IO ())
423 -> IO ()
424asyncQuery client meth q addr withResponse0 = do
425 tm <- getSystemTimerManager
426 tidvar <- newEmptyMVar
427 timedout <- registerTimeout tm 1000000 $ do
428 dput XMisc $ "async TIMEDOUT " ++ show (method meth)
429 withResponse0 Nothing
430 tid <- takeMVar tidvar
431 dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth)
432 case client of
433 Client { clientDispatcher = d, clientPending = pending } -> do
434 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
435 (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do
436 unregisterTimeout tm timedout
437 withResponse0 x
438 putMVar tidvar tid
439 updateTimeout tm timedout expiry
440 dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry
441
442-- | Send a query to a remote peer. Note that this function will always time
443-- out if 'forkListener' was never invoked to spawn a thread to receive and
444-- dispatch the response.
445sendQuery ::
446 forall err a b tbl x meth tid addr.
447 Client err meth tid addr x -- ^ A query/response implementation.
448 -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query.
449 -> a -- ^ The outbound query.
450 -> addr -- ^ Destination address of query.
451 -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out.
452sendQuery c@(Client net d err pending whoami _) meth q addr0 = do
453 mvar <- newEmptyMVar
454 (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar)
455 mres <- timeout expiry $ takeMVar mvar
456 case mres of
457 Just b -> return $ Just b
458 Nothing -> do
459 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
460 return Nothing
461
462contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x
463contramapAddr f (MethodHandler p s a)
464 = MethodHandler
465 p
466 (\tid src dst result -> s tid (f src) (f dst) result)
467 (\addr arg -> a (f addr) arg)
468contramapAddr f (NoReply p a)
469 = NoReply p (\addr arg -> a (f addr) arg)
470
471-- | Query handlers can throw this to ignore a query instead of responding to
472-- it.
473data DropQuery = DropQuery
474 deriving Show
475
476instance Exception DropQuery
477
478-- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the
479-- parse is successful, the returned IO action will construct our reply if
480-- there is one. Otherwise, a parse err is returned.
481dispatchQuery :: MethodHandlerA err tid addr x y -- ^ Handler to invoke.
482 -> tid -- ^ The transaction id for this query\/response session.
483 -> addr -- ^ Our own address, to which the query was sent.
484 -> x -- ^ The query packet.
485 -> addr -- ^ The origin address of the query.
486 -> Either err (IO (Maybe y))
487dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr =
488 fmap (\a -> catch (Just . wrapR tid self addr <$> f addr a)
489 (\DropQuery -> return Nothing))
490 $ unwrapQ x
491dispatchQuery (NoReply unwrapQ f) tid self x addr =
492 fmap (\a -> f addr a >> return Nothing) $ unwrapQ x
493
494-- | Like 'transactionMethods' but allows extra information to be stored in the
495-- table of pending transactions. This also enables multiple 'Client's to
496-- share a single transaction table.
497transactionMethods' ::
498 ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry
499 -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry
500 -> TableMethods t tid -- ^ Table methods to lookup values by /tid/.
501 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
502 -> TransactionMethods (g,t a) tid addr x
503transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods
504 { dispatchCancel = \tid (g,t) -> return (g, delete tid t)
505 , dispatchRegister = \nowPlusExpiry v a (g,t) -> do
506 let (tid,g') = generate g
507 let t' = insert tid (store v) nowPlusExpiry t -- (now + microsecondsDiff expiry) t
508 return ( tid, (g',t') )
509 , dispatchResponse = \tid x (g,t) ->
510 case lookup tid t of
511 Just v -> let t' = delete tid t
512 in return ((g,t'),void $ load v $ Just x)
513 Nothing -> return ((g,t), return ())
514 }
515
516-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a
517-- function for generating unique transaction ids.
518transactionMethods ::
519 TableMethods t tid -- ^ Table methods to lookup values by /tid/.
520 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
521 -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x
522transactionMethods methods generate = transactionMethods' id id methods generate
523
524-- | Handle a single inbound packet and then invoke the given continuation.
525-- The 'forkListener' function is implemented by passing this function to 'fix'
526-- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or
527-- throws an exception.
528handleMessage ::
529 ClientA err meth tid addr x y
530 -> addr
531 -> x
532 -> IO (Maybe (x -> x))
533handleMessage (Client net d err pending whoami responseID) addr plain = do
534 -- Just (Left e) -> do reportParseError err e
535 -- return $! Just id
536 -- Just (Right (plain, addr)) -> do
537 case classifyInbound d plain of
538 IsQuery meth tid -> case lookupHandler d meth of
539 Nothing -> do reportMissingHandler err meth addr plain
540 return $! Just id
541 Just m -> do
542 self <- whoami (Just addr)
543 tid' <- responseID tid
544 either (\e -> do reportParseError err e
545 return $! Just id)
546 (>>= \m -> do mapM_ (sendMessage net addr) m
547 return $! Nothing)
548 (dispatchQuery m tid' self plain addr)
549 IsUnsolicited action -> do
550 self <- whoami (Just addr)
551 action self addr
552 return Nothing
553 IsResponse tid -> do
554 action <- atomically $ do
555 ts0 <- readTVar pending
556 (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0
557 writeTVar pending ts
558 return action
559 action
560 return $! Nothing
561 IsUnknown e -> do reportUnknown err addr plain e
562 return $! Just id
563 -- Nothing -> return $! id
564
565-- * UDP Datagrams.
566
567-- | Access the address family of a given 'SockAddr'. This convenient accessor
568-- is missing from 'Network.Socket', so I implemented it here.
569sockAddrFamily :: SockAddr -> Family
570sockAddrFamily (SockAddrInet _ _ ) = AF_INET
571sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
572sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
573#if !MIN_VERSION_network(3,0,0)
574sockAddrFamily _ = AF_CAN -- SockAddrCan constructor deprecated
575#endif
576
577-- | Packets with an empty payload may trigger EOF exception.
578-- 'udpTransport' uses this function to avoid throwing in that
579-- case.
580ignoreEOF :: Socket -> MVar () -> Arrival e a x -> IOError -> IO (Arrival e a x)
581ignoreEOF sock isClosed def e = do
582 done <- tryReadMVar isClosed
583 case done of
584 Just () -> do close sock
585 dput XMisc "Closing UDP socket."
586 pure Terminated
587 _ -> if isEOFError e then pure def
588 else throwIO e
589
590-- | Hard-coded maximum packet size for incoming UDP Packets received via
591-- 'udpTransport'.
592udpBufferSize :: Int
593udpBufferSize = 65536
594
595-- | Wrapper around 'B.sendTo' that silently ignores DoesNotExistError.
596saferSendTo :: Socket -> ByteString -> SockAddr -> IO ()
597saferSendTo sock bs saddr = void (B.sendTo sock bs saddr)
598 `catch` \e ->
599 -- sendTo: does not exist (Network is unreachable)
600 -- Occurs when IPv6 or IPv4 network is not available.
601 -- Currently, we require -threaded to prevent a forever-hang in this case.
602 if isDoesNotExistError e
603 then return ()
604 else throw e
605
606-- | Like 'udpTransport' except also returns the raw socket (for broadcast use).
607udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket)
608udpTransport' bind_address = do
609 let family = sockAddrFamily bind_address
610 sock <- socket family Datagram defaultProtocol
611 when (family == AF_INET6) $ do
612 setSocketOption sock IPv6Only 0
613 setSocketOption sock Broadcast 1
614 bind sock bind_address
615 isClosed <- newEmptyMVar
616 udpTChan <- atomically newTChan
617 let tr = Transport {
618 awaitMessage = \kont -> do
619 r <- readTChan udpTChan
620 return $ kont $! r
621 , sendMessage = case family of
622 AF_INET6 -> \case
623 (SockAddrInet port addr) -> \bs ->
624 -- Change IPv4 to 4mapped6 address.
625 saferSendTo sock bs $ SockAddrInet6 port 0 (0,0,0x0000ffff,fromBE32 addr) 0
626 addr6 -> \bs -> saferSendTo sock bs addr6
627 AF_INET -> \case
628 (SockAddrInet6 port 0 (0,0,0x0000ffff,raw4) 0) -> \bs -> do
629 let host4 = toBE32 raw4
630 -- Change 4mapped6 to ordinary IPv4.
631 -- dput XMisc $ "4mapped6 -> "++show (SockAddrInet port host4)
632 saferSendTo sock bs (SockAddrInet port host4)
633 addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr)
634 addr4 -> \bs -> saferSendTo sock bs addr4
635 _ -> \addr bs -> saferSendTo sock bs addr
636 , setActive = \case
637 False -> do
638 dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address
639 tryPutMVar isClosed () -- signal awaitMessage that the transport is closed.
640#if MIN_VERSION_network (3,1,0)
641#elif MIN_VERSION_network(3,0,0)
642 let withFdSocket sock f = fdSocket sock >>= f >>= seq sock . return
643#else
644 let withFdSocket sock f = f (fdSocket sock) >>= seq sock . return
645#endif
646 withFdSocket sock $ \fd -> do
647 let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return ()
648 -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage.
649 closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd)
650 True -> do
651 udpThread <- forkIO $ fix $ \again -> do
652 r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do
653 uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize
654 atomically $ writeTChan udpTChan r
655 case r of Terminated -> return ()
656 _ -> again
657 labelThread udpThread ("udp.io."++show bind_address)
658 }
659 return (tr, sock)
660
661-- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The
662-- argument is the listen-address for incoming packets. This is a useful
663-- low-level 'Transport' that can be transformed for higher-level protocols
664-- using 'layerTransport'.
665udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString)
666udpTransport bind_address = fst <$> udpTransport' bind_address
667
668chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x
669chanTransport chanFromAddr self achan aclosed = Transport
670 { awaitMessage = \kont -> do
671 x <- (uncurry (flip Arrival) <$> readTChan achan)
672 `orElse`
673 (readTVar aclosed >>= check >> return Terminated)
674 return $ kont x
675 , sendMessage = \them bs -> do
676 atomically $ writeTChan (chanFromAddr them) (bs,self)
677 , setActive = \case
678 False -> atomically $ writeTVar aclosed True
679 True -> return ()
680 }
681
682-- | Returns a pair of transports linked together to simulate two computers talking to each other.
683testPairTransport :: IO (Transport err SockAddr ByteString, Transport err SockAddr ByteString)
684testPairTransport = do
685 achan <- atomically newTChan
686 bchan <- atomically newTChan
687 aclosed <- atomically $ newTVar False
688 bclosed <- atomically $ newTVar False
689 let a = SockAddrInet 1 1
690 b = SockAddrInet 2 2
691 return ( chanTransport (const bchan) a achan aclosed
692 , chanTransport (const achan) b bchan bclosed )
693
694newtype ByAddress err x addr = ByAddress (Transport err addr x)
695
696newtype Tagged x addr = Tagged x
697
698decorateAddr :: tag addr -> Arrival e addr x -> Arrival e (DSum tag Identity) x
699decorateAddr tag Terminated = Terminated
700decorateAddr tag (ParseError e) = ParseError e
701decorateAddr tag (Arrival addr x) = Arrival (tag ==> addr) x
702
703mergeTransports :: GCompare tag => DMap tag (ByAddress err x) -> IO (Transport err (DSum tag Identity) x)
704mergeTransports tmap = do
705 -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap
706 -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap
707 return Transport
708 { awaitMessage = \kont ->
709 foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n)
710 retry
711 tmap
712 , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of
713 Just (ByAddress tr) -> sendMessage tr addr x
714 Nothing -> return ()
715 , setActive = \toggle -> foldrWithKey (\_ (ByAddress tr) next -> setActive tr toggle >> next) (return ()) tmap
716 }
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs
deleted file mode 100644
index 0028a5b6..00000000
--- a/dht/src/Network/QueryResponse/TCP.hs
+++ /dev/null
@@ -1,223 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE GeneralizedNewtypeDeriving #-}
3{-# LANGUAGE LambdaCase #-}
4module Network.QueryResponse.TCP where
5
6#ifdef THREAD_DEBUG
7import Control.Concurrent.Lifted.Instrument
8#else
9import Control.Concurrent.Lifted
10import GHC.Conc (labelThread)
11#endif
12
13import Control.Arrow
14import Control.Concurrent.STM
15import Control.Concurrent.STM.TMVar
16import Control.Monad
17import Data.ByteString (ByteString,hPut)
18import Data.Function
19import Data.Hashable
20import Data.Maybe
21import Data.Ord
22import Data.Time.Clock.POSIX
23import Data.Word
24import Data.String (IsString(..))
25import Network.BSD
26import Network.Socket as Socket
27import System.Timeout
28import System.IO
29import System.IO.Error
30
31import DebugTag
32import DebugUtil
33import DPut
34import Connection.Tcp (socketFamily)
35import qualified Data.MinMaxPSQ as MM
36import Network.QueryResponse
37
38data TCPSession st
39 = PendingTCPSession
40 | TCPSession
41 { tcpHandle :: Handle
42 , tcpState :: st
43 , tcpThread :: ThreadId
44 }
45
46newtype TCPAddress = TCPAddress SockAddr
47 deriving (Eq,Ord,Show)
48
49instance Hashable TCPAddress where
50 hashWithSalt salt (TCPAddress x) = case x of
51 SockAddrInet port addr -> hashWithSalt salt (fromIntegral port :: Word16,addr)
52 SockAddrInet6 port b c d -> hashWithSalt salt (fromIntegral port :: Word16,b,c,d)
53 _ -> 0
54
55data TCPCache st = TCPCache
56 { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st))
57 , tcpMax :: Int
58 }
59
60-- This is a suitable /st/ parameter to 'TCPCache'
61data SessionProtocol x y = SessionProtocol
62 { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination.
63 , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages.
64 , streamEncode :: y -> IO () -- ^ Serialize outbound messages.
65 }
66
67data StreamHandshake addr x y = StreamHandshake
68 { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection.
69 , streamAddr :: addr -> SockAddr
70 }
71
72killSession :: TCPSession st -> IO ()
73killSession PendingTCPSession = return ()
74killSession TCPSession{tcpThread=t} = killThread t
75
76showStat :: IsString p => TCPSession st -> p
77showStat r = case r of PendingTCPSession -> "pending."
78 TCPSession {} -> "established."
79
80tcp_timeout :: Int
81tcp_timeout = 10000000
82
83acquireConnection :: TMVar (Arrival a addr x)
84 -> TCPCache (SessionProtocol x y)
85 -> StreamHandshake addr x y
86 -> addr
87 -> Bool
88 -> IO (Maybe (y -> IO ()))
89acquireConnection mvar tcpcache stream addr bDoCon = do
90 now <- getPOSIXTime
91 -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr)
92 entry <- atomically $ do
93 c <- readTVar (lru tcpcache)
94 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
95 case v of
96 Nothing | bDoCon -> writeTVar (lru tcpcache)
97 $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c
98 | otherwise -> return ()
99 Just (tm, v) -> writeTVar (lru tcpcache)
100 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) c
101 return v
102 -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry)
103 case entry of
104 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
105 proto <- getProtocolNumber "tcp"
106 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
107 mh <- catchIOError (do h <- timeout tcp_timeout $ do
108 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
109 h <- socketToHandle sock ReadWriteMode
110 hSetBuffering h NoBuffering
111 return h
112 return h)
113 $ \e -> return Nothing
114 when (isNothing mh) $ do
115 atomically $ modifyTVar' (lru tcpcache)
116 $ MM.delete (TCPAddress $ streamAddr stream addr)
117 Socket.close sock
118 ret <- fmap join $ forM mh $ \h -> do
119 mst <- catchIOError (Just <$> streamHello stream addr h)
120 (\e -> return Nothing)
121 case mst of
122 Nothing -> do
123 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
124 return Nothing
125 Just st -> do
126 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
127 signal <- newTVarIO False
128 let showAddr a = show (streamAddr stream a)
129 rthread <- forkLabeled ("tcp:"++showAddr addr) $ do
130 atomically (readTVar signal >>= check)
131 fix $ \loop -> do
132 x <- streamDecode st
133 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
134 case x of
135 Just u -> do
136 m <- timeout tcp_timeout $ atomically (putTMVar mvar $ Arrival addr u)
137 when (isNothing m) $ do
138 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet."
139 atomically $ tryTakeTMVar mvar
140 return ()
141 loop
142 Nothing -> do
143 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
144 do atomically $ modifyTVar' (lru tcpcache)
145 $ MM.delete (TCPAddress $ streamAddr stream addr)
146 c <- atomically $ readTVar (lru tcpcache)
147 now <- getPOSIXTime
148 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
149 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r]
150 mreport <- timeout tcp_timeout $ threadReport False -- XXX: Paranoid timeout
151 case mreport of
152 Just treport -> dput XTCP treport
153 Nothing -> dput XTCP "TCP ERROR: threadReport timed out."
154 hClose h `catchIOError` \e -> return ()
155 let v = TCPSession
156 { tcpHandle = h
157 , tcpState = st
158 , tcpThread = rthread
159 }
160 t <- getPOSIXTime
161 retires <- atomically $ do
162 c <- readTVar (lru tcpcache)
163 let (rs,c') = MM.takeView (tcpMax tcpcache)
164 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
165 writeTVar (lru tcpcache) c'
166 writeTVar signal True
167 return rs
168 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do
169 dput XTCP $ "TCP dropped: " ++ show k
170 killSession r
171 case r of TCPSession {tcpState=st,tcpHandle=h} -> do
172 streamGoodbye st
173 hClose h
174 `catchIOError` \e -> return ()
175 _ -> return ()
176
177 return $ Just $ streamEncode st
178 when (isNothing ret) $ do
179 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
180 return ret
181 Just (tm, PendingTCPSession)
182 | not bDoCon -> return Nothing
183 | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do
184 c <- readTVar (lru tcpcache)
185 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
186 case v of
187 Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st
188 Nothing -> return Nothing
189 _ -> retry
190 Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st
191
192closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
193closeAll tcpcache stream = do
194 dput XTCP "TCP.closeAll called."
195 cache <- atomically $ swapTVar (lru tcpcache) MM.empty
196 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
197 killSession r
198 case r of TCPSession{tcpState=st,tcpHandle=h} -> catchIOError (streamGoodbye st >> hClose h)
199 (\e -> return ())
200 _ -> return ()
201
202-- Use a cache of TCP client connections for sending (and receiving) packets.
203-- The boolean value prepended to the message allows the sender to specify
204-- whether or not a new connection will be initiated if neccessary. If 'False'
205-- is passed, then the packet will be sent only if there already exists a
206-- connection.
207tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
208 -> StreamHandshake addr x y
209 -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y))
210tcpTransport maxcon stream = do
211 msgvar <- atomically newEmptyTMVar
212 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
213 return $ (,) tcpcache Transport
214 { awaitMessage = \f -> takeTMVar msgvar >>= \x -> return $ do
215 f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Terminated)
216 , sendMessage = \addr (bDoCon,y) -> do
217 void . forkLabeled "tcp-send" $ do
218 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
219 mapM_ ($ y) msock
220 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e
221 , setActive = \case False -> closeAll tcpcache stream >> atomically (putTMVar msgvar Terminated)
222 True -> return ()
223 }
diff --git a/dht/src/Network/SocketLike.hs b/dht/src/Network/SocketLike.hs
deleted file mode 100644
index 37891cfd..00000000
--- a/dht/src/Network/SocketLike.hs
+++ /dev/null
@@ -1,98 +0,0 @@
1{-# LANGUAGE GeneralizedNewtypeDeriving #-}
2{-# LANGUAGE TupleSections #-}
3{-# LANGUAGE CPP #-}
4-- |
5--
6-- A socket could be used indirectly via a 'System.IO.Handle' or a conduit from
7-- Michael Snoyman's conduit package. But doing so presents an encapsulation
8-- problem. Do we allow access to the underlying socket and trust that it wont
9-- be used in an unsafe way? Or do we protect it at the higher level and deny
10-- access to various state information?
11--
12-- The 'SocketLike' class enables the approach that provides a safe wrapper to
13-- the underlying socket and gives access to various state information without
14-- enabling direct reads or writes.
15module Network.SocketLike
16 ( SocketLike(..)
17 , RestrictedSocket
18 , restrictSocket
19 , restrictHandleSocket
20 -- * Re-exports
21 --
22 -- | To make the 'SocketLike' methods less awkward to use, the types
23 -- 'CUInt', 'SockAddr', and 'PortNumber' are re-exported.
24 , CUInt
25 , PortNumber
26 , SockAddr(..)
27 ) where
28
29import Network.Socket
30 ( PortNumber
31 , SockAddr
32 )
33import Foreign.C.Types ( CUInt )
34
35import qualified Network.Socket as NS
36import System.IO (Handle,hClose,hIsOpen)
37import Control.Arrow
38
39-- | A safe (mostly read-only) interface to a 'NS.Socket'. Note that despite
40-- how this class is named, it provides no access to typical 'NS.Socket' uses
41-- like sending or receiving network packets.
42class SocketLike sock where
43 -- | See 'NS.getSocketName'
44 getSocketName :: sock -> IO SockAddr
45 -- | See 'NS.getPeerName'
46 getPeerName :: sock -> IO SockAddr
47 -- | See 'NS.getPeerCred'
48-- getPeerCred :: sock -> IO (CUInt, CUInt, CUInt)
49
50 -- | Is the socket still valid? Connected
51 --
52 -- In order to give the instance writer
53 -- the option to do book-keeping in a pure
54 -- type, a conceptually modified version of
55 -- the 'SocketLike' is returned.
56 --
57 isValidSocket :: sock -> IO (sock,Bool)
58
59
60instance SocketLike NS.Socket where
61 getSocketName = NS.getSocketName
62 getPeerName = NS.getPeerName
63-- getPeerCred = NS.getPeerCred
64#if MIN_VERSION_network(3,1,0)
65 isValidSocket s = (s,) <$> NS.withFdSocket s (return . (/= (-1)))
66#else
67#if MIN_VERSION_network(3,0,0)
68 isValidSocket s = (s,) . (/= (-1)) <$> NS.fdSocket s
69#else
70#if MIN_VERSION_network(2,4,0)
71 isValidSocket s = (s,) <$> NS.isConnected s -- warning: this is always False if the socket
72 -- was converted to a Handle
73#else
74 isValidSocket s = (s,) <$> NS.sIsConnected s -- warning: this is always False if the socket
75 -- was converted to a Handle
76#endif
77#endif
78#endif
79
80-- | An encapsulated socket. Data reads and writes are not possible.
81data RestrictedSocket = Restricted (Maybe Handle) NS.Socket deriving Show
82
83instance SocketLike RestrictedSocket where
84 getSocketName (Restricted mb sock) = NS.getSocketName sock
85 getPeerName (Restricted mb sock) = NS.getPeerName sock
86-- getPeerCred (Restricted mb sock) = NS.getPeerCred sock
87 isValidSocket rs@(Restricted mb sock) = maybe (first (Restricted mb) <$> isValidSocket sock) (((rs,) <$>) . hIsOpen) mb
88
89-- | Create a 'RestrictedSocket' that explicitly disallows sending or
90-- receiving data.
91restrictSocket :: NS.Socket -> RestrictedSocket
92restrictSocket socket = Restricted Nothing socket
93
94-- | Build a 'RestrictedSocket' for which 'sClose' will close the given
95-- 'Handle'. It is intended that this 'Handle' was obtained via
96-- 'NS.socketToHandle'.
97restrictHandleSocket :: Handle -> NS.Socket -> RestrictedSocket
98restrictHandleSocket h socket = Restricted (Just h) socket
diff --git a/dht/src/Network/StreamServer.hs b/dht/src/Network/StreamServer.hs
deleted file mode 100644
index 1da612ce..00000000
--- a/dht/src/Network/StreamServer.hs
+++ /dev/null
@@ -1,167 +0,0 @@
1-- | This module implements a bare-bones TCP or Unix socket server.
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE TypeFamilies #-}
4{-# LANGUAGE TypeOperators #-}
5{-# LANGUAGE OverloadedStrings #-}
6{-# LANGUAGE RankNTypes #-}
7module Network.StreamServer
8 ( streamServer
9 , ServerHandle
10 , getAcceptLoopThreadId
11 , ServerConfig(..)
12 , withSession
13 , quitListening
14 --, dummyServerHandle
15 , listenSocket
16 , Local(..)
17 , Remote(..)
18 ) where
19
20import Data.Monoid
21import Network.Socket as Socket
22import System.Directory (removeFile)
23import System.IO
24 ( IOMode(..)
25 , stderr
26 , hFlush
27 )
28import Control.Monad
29import Control.Monad.Fix (fix)
30#ifdef THREAD_DEBUG
31import Control.Concurrent.Lifted.Instrument
32 ( forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId
33 , killThread )
34#else
35import GHC.Conc (labelThread)
36import Control.Concurrent
37 ( forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId
38 , killThread )
39#endif
40import Control.Exception (handle,finally)
41import System.IO.Error (tryIOError)
42import System.Mem.Weak
43import System.IO.Error
44
45-- import Data.Conduit
46import System.IO (Handle)
47import Control.Concurrent.MVar (newMVar)
48
49import Network.SocketLike
50import DPut
51import DebugTag
52
53data ServerHandle = ServerHandle Socket (Weak ThreadId)
54
55-- | Useful for testing.
56getAcceptLoopThreadId :: ServerHandle -> IO (Weak ThreadId)
57getAcceptLoopThreadId (ServerHandle _ t) = return t
58
59listenSocket :: ServerHandle -> RestrictedSocket
60listenSocket (ServerHandle sock _) = restrictSocket sock
61
62{- // Removed, bit-rotted and there are no call sites
63-- | Create a useless do-nothing 'ServerHandle'.
64dummyServerHandle :: IO ServerHandle
65dummyServerHandle = do
66 mvar <- newMVar Closed
67 let sock = MkSocket 0 AF_UNSPEC NoSocketType 0 mvar
68 thread <- mkWeakThreadId <=< forkIO $ return ()
69 return (ServerHandle sock thread)
70-}
71
72removeSocketFile :: SockAddr -> IO ()
73removeSocketFile (SockAddrUnix fname) = removeFile fname
74removeSocketFile _ = return ()
75
76-- | Terminate the server accept-loop. Call this to shut down the server.
77quitListening :: ServerHandle -> IO ()
78quitListening (ServerHandle socket acceptThread) =
79 finally (Socket.getSocketName socket >>= removeSocketFile)
80 (do mapM_ killThread =<< deRefWeak acceptThread
81 Socket.close socket)
82
83
84-- | It's 'bshow' instead of 'show' to enable swapping in a 'ByteString'
85-- variation. (This is not exported.)
86bshow :: Show a => a -> String
87bshow e = show e
88
89-- | Send a string to stderr. Not exported. Default 'serverWarn' when
90-- 'withSession' is used to configure the server.
91warnStderr :: String -> IO ()
92warnStderr str = dput XMisc str >> hFlush stderr
93
94newtype Local a = Local a deriving (Eq,Ord,Show)
95newtype Remote a = Remote a deriving (Eq,Ord,Show)
96
97data ServerConfig = ServerConfig
98 { serverWarn :: String -> IO ()
99 -- ^ Action to report warnings and errors.
100 , serverSession :: ( RestrictedSocket, (Local SockAddr, Remote SockAddr)) -> Int -> Handle -> IO ()
101 -- ^ Action to handle interaction with a client
102 }
103
104-- | Initialize a 'ServerConfig' using the provided session handler.
105withSession :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> Int -> Handle -> IO ()) -> ServerConfig
106withSession session = ServerConfig warnStderr session
107
108-- | Launch a thread to listen at the given bind address and dispatch
109-- to session handler threads on every incoming connection. Supports
110-- IPv4 and IPv6, TCP and unix sockets.
111--
112-- The returned handle can be used with 'quitListening' to terminate the
113-- thread and prevent any new sessions from starting. Currently active
114-- session threads will not be terminated or signaled in any way.
115streamServer :: ServerConfig -> [SockAddr] -> IO ServerHandle
116streamServer cfg addrs = do
117 let warn = serverWarn cfg
118 family = case addrs of
119 SockAddrInet {}:_ -> AF_INET
120 SockAddrInet6 {}:_ -> AF_INET6
121 SockAddrUnix {}:_ -> AF_UNIX
122 [] -> AF_INET6
123 sock <- socket family Stream 0
124 setSocketOption sock ReuseAddr 1
125 let tryBind addr next _ = do
126 tryIOError (removeSocketFile addr)
127 bind sock addr
128 `catchIOError` \e -> next (Just e)
129 fix $ \loop -> let again mbe = do
130 forM_ mbe $ \e -> warn $ "bind-error: " <> bshow addrs <> " " <> bshow e
131 threadDelay 5000000
132 loop
133 in foldr tryBind again addrs Nothing
134 listen sock maxListenQueue
135 thread <- mkWeakThreadId <=< forkIO $ do
136 bindaddr <- Socket.getSocketName sock
137 myThreadId >>= flip labelThread ("StreamServer.acceptLoop." <> bshow bindaddr)
138 acceptLoop cfg sock 0
139 return (ServerHandle sock thread)
140
141-- | Not exported. This, combined with 'acceptException' form a mutually
142-- recursive loop that handles incoming connections. To quit the loop, the
143-- socket must be closed by 'quitListening'.
144acceptLoop :: ServerConfig -> Socket -> Int -> IO ()
145acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do
146 (con,raddr) <- accept sock
147 let conkey = n + 1
148 laddr <- Socket.getSocketName con
149 h <- socketToHandle con ReadWriteMode
150 forkIO $ do
151 myThreadId >>= flip labelThread "StreamServer.session"
152 serverSession cfg (restrictHandleSocket h con, (Local laddr, Remote raddr)) conkey h
153 acceptLoop cfg sock (n + 1)
154
155acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO ()
156acceptException cfg n sock ioerror = do
157 case show (ioeGetErrorType ioerror) of
158 "resource exhausted" -> do -- try again (ioeGetErrorType ioerror == fullErrorType)
159 serverWarn cfg $ ("acceptLoop: resource exhasted")
160 threadDelay 500000
161 acceptLoop cfg sock (n + 1)
162 "invalid argument" -> do -- quit on closed socket
163 Socket.close sock
164 message -> do -- unexpected exception
165 serverWarn cfg $ ("acceptLoop: "<>bshow message)
166 Socket.close sock
167