diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-09-28 13:43:29 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:27:53 -0500 |
commit | 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch) | |
tree | 5716463275c2d3e902889db619908ded2a73971c /dht/Presence/DNSCache.hs | |
parent | add2c76bced51fde5e9917e7449ef52be70faf87 (diff) |
Factor out some new libraries
word64-map:
Data.Word64Map
network-addr:
Network.Address
tox-crypto:
Crypto.Tox
lifted-concurrent:
Control.Concurrent.Lifted.Instrument
Control.Concurrent.Async.Lifted.Instrument
psq-wrap:
Data.Wrapper.PSQInt
Data.Wrapper.PSQ
minmax-psq:
Data.MinMaxPSQ
tasks:
Control.Concurrent.Tasks
kad:
Network.Kademlia
Network.Kademlia.Bootstrap
Network.Kademlia.Routing
Network.Kademlia.CommonAPI
Network.Kademlia.Persistence
Network.Kademlia.Search
Diffstat (limited to 'dht/Presence/DNSCache.hs')
-rw-r--r-- | dht/Presence/DNSCache.hs | 291 |
1 files changed, 291 insertions, 0 deletions
diff --git a/dht/Presence/DNSCache.hs b/dht/Presence/DNSCache.hs new file mode 100644 index 00000000..e28655c5 --- /dev/null +++ b/dht/Presence/DNSCache.hs | |||
@@ -0,0 +1,291 @@ | |||
1 | -- | Both 'getAddrInfo' and 'getHostByAddr' have hard-coded timeouts for | ||
2 | -- waiting upon network queries that can be a little too long for some use | ||
3 | -- cases. This module wraps both of them so that they block for at most one | ||
4 | -- second. It caches late-arriving results so that they can be returned by | ||
5 | -- repeated timed-out queries. | ||
6 | -- | ||
7 | -- In order to achieve the shorter timeout, it is likely that the you will need | ||
8 | -- to build with GHC's -threaded option. Otherwise, if the wrapped FFI calls | ||
9 | -- to resolve the address will block Haskell threads. Note: I didn't verify | ||
10 | -- this. | ||
11 | {-# LANGUAGE TupleSections #-} | ||
12 | {-# LANGUAGE RankNTypes #-} | ||
13 | {-# LANGUAGE CPP #-} | ||
14 | module DNSCache | ||
15 | ( DNSCache | ||
16 | , reverseResolve | ||
17 | , forwardResolve | ||
18 | , newDNSCache | ||
19 | , parseAddress | ||
20 | , unsafeParseAddress | ||
21 | , strip_brackets | ||
22 | , withPort | ||
23 | ) where | ||
24 | |||
25 | #ifdef THREAD_DEBUG | ||
26 | import Control.Concurrent.Lifted.Instrument | ||
27 | #else | ||
28 | import Control.Concurrent.Lifted | ||
29 | import GHC.Conc (labelThread) | ||
30 | #endif | ||
31 | import Control.Arrow | ||
32 | import Control.Concurrent.STM | ||
33 | import Data.Text ( Text ) | ||
34 | import Network.Socket ( SockAddr(..), AddrInfoFlag(..), defaultHints, getAddrInfo, AddrInfo(..) ) | ||
35 | import Data.Time.Clock ( UTCTime, getCurrentTime, diffUTCTime ) | ||
36 | import System.IO.Error ( isDoesNotExistError ) | ||
37 | import System.Endian ( fromBE32, toBE32 ) | ||
38 | import Control.Exception ( handle ) | ||
39 | import Data.Map ( Map ) | ||
40 | import qualified Data.Map as Map | ||
41 | import qualified Network.BSD as BSD | ||
42 | import qualified Data.Text as Text | ||
43 | import Control.Monad | ||
44 | import Data.Function | ||
45 | import Data.List | ||
46 | import Data.Ord | ||
47 | import Data.Maybe | ||
48 | import System.IO.Error | ||
49 | import System.IO.Unsafe | ||
50 | |||
51 | import SockAddr () | ||
52 | import ControlMaybe ( handleIO_ ) | ||
53 | import GetHostByAddr ( getHostByAddr ) | ||
54 | import InterruptibleDelay | ||
55 | import DPut | ||
56 | import DebugTag | ||
57 | |||
58 | type TimeStamp = UTCTime | ||
59 | |||
60 | data DNSCache = | ||
61 | DNSCache | ||
62 | { fcache :: TVar (Map Text [(TimeStamp, SockAddr)]) | ||
63 | , rcache :: TVar (Map SockAddr [(TimeStamp, Text)]) | ||
64 | } | ||
65 | |||
66 | |||
67 | newDNSCache :: IO DNSCache | ||
68 | newDNSCache = do | ||
69 | fcache <- newTVarIO Map.empty | ||
70 | rcache <- newTVarIO Map.empty | ||
71 | return DNSCache { fcache=fcache, rcache=rcache } | ||
72 | |||
73 | updateCache :: Eq x => | ||
74 | Bool -> TimeStamp -> [x] -> Maybe [(TimeStamp,x)] -> Maybe [(TimeStamp,x)] | ||
75 | updateCache withScrub utc xs mys = do | ||
76 | let ys = maybe [] id mys | ||
77 | ys' = filter scrub ys | ||
78 | ys'' = map (utc,) xs ++ ys' | ||
79 | minute = 60 | ||
80 | scrub (t,x) | withScrub && diffUTCTime utc t < minute = False | ||
81 | scrub (t,x) | x `elem` xs = False | ||
82 | scrub _ = True | ||
83 | guard $ not (null ys'') | ||
84 | return ys'' | ||
85 | |||
86 | dnsObserve :: DNSCache -> Bool -> TimeStamp -> [(Text,SockAddr)] -> STM () | ||
87 | dnsObserve dns withScrub utc obs = do | ||
88 | f <- readTVar $ fcache dns | ||
89 | r <- readTVar $ rcache dns | ||
90 | let obs' = map (\(n,a)->(n,a `withPort` 0)) obs | ||
91 | gs = do | ||
92 | g <- groupBy ((==) `on` fst) $ sortBy (comparing fst) obs' | ||
93 | (n,_) <- take 1 g | ||
94 | return (n,map snd g) | ||
95 | f' = foldl' updatef f gs | ||
96 | hs = do | ||
97 | h <- groupBy ((==) `on` snd) $ sortBy (comparing snd) obs' | ||
98 | (_,a) <- take 1 h | ||
99 | return (a,map fst h) | ||
100 | r' = foldl' updater r hs | ||
101 | writeTVar (fcache dns) f' | ||
102 | writeTVar (rcache dns) r' | ||
103 | where | ||
104 | updatef f (n,addrs) = Map.alter (updateCache withScrub utc addrs) n f | ||
105 | updater r (a,ns) = Map.alter (updateCache withScrub utc ns) a r | ||
106 | |||
107 | make6mapped4 :: SockAddr -> SockAddr | ||
108 | make6mapped4 addr@(SockAddrInet6 {}) = addr | ||
109 | make6mapped4 addr@(SockAddrInet port a) = SockAddrInet6 port 0 (0,0,0xFFFF,fromBE32 a) 0 | ||
110 | |||
111 | tryForkOS :: IO () -> IO ThreadId | ||
112 | tryForkOS action = catchIOError (forkOS action) $ \e -> do | ||
113 | dput XMisc $ "DNSCache: Link with -threaded to avoid excessively long time-out." | ||
114 | forkIO action | ||
115 | |||
116 | |||
117 | -- Attempt to resolve the given domain name. Returns an empty list if the | ||
118 | -- resolve operation takes longer than the timeout, but the 'DNSCache' will be | ||
119 | -- updated when the resolve completes. | ||
120 | -- | ||
121 | -- When the resolve operation does complete, any entries less than a minute old | ||
122 | -- will be overwritten with the new results. Older entries are allowed to | ||
123 | -- persist for reasons I don't understand as of this writing. (See 'updateCache') | ||
124 | rawForwardResolve :: | ||
125 | DNSCache -> (Text -> IO ()) -> Int -> Text -> IO [SockAddr] | ||
126 | rawForwardResolve dns onFail timeout addrtext = do | ||
127 | r <- atomically newEmptyTMVar | ||
128 | mvar <- interruptibleDelay | ||
129 | rt <- tryForkOS $ do | ||
130 | myThreadId >>= flip labelThread ("resolve."++show addrtext) | ||
131 | resolver r mvar | ||
132 | startDelay mvar timeout | ||
133 | did <- atomically $ tryPutTMVar r [] | ||
134 | when did (onFail addrtext) | ||
135 | atomically $ readTMVar r | ||
136 | where | ||
137 | resolver r mvar = do | ||
138 | xs <- handle (\e -> let _ = isDoesNotExistError e in return []) | ||
139 | $ do fmap (nub . map (make6mapped4 . addrAddress)) $ | ||
140 | getAddrInfo (Just $ defaultHints { addrFlags = [ AI_CANONNAME, AI_V4MAPPED ]}) | ||
141 | (Just $ Text.unpack $ strip_brackets addrtext) | ||
142 | (Just "5269") | ||
143 | did <- atomically $ tryPutTMVar r xs | ||
144 | when did $ do | ||
145 | interruptDelay mvar | ||
146 | utc <- getCurrentTime | ||
147 | atomically $ dnsObserve dns True utc $ map (addrtext,) xs | ||
148 | return () | ||
149 | |||
150 | strip_brackets :: Text -> Text | ||
151 | strip_brackets s = | ||
152 | case Text.uncons s of | ||
153 | Just ('[',t) -> Text.takeWhile (/=']') t | ||
154 | _ -> s | ||
155 | |||
156 | |||
157 | reportTimeout :: forall a. Show a => a -> IO () | ||
158 | reportTimeout addrtext = do | ||
159 | dput XMisc $ "timeout resolving: "++show addrtext | ||
160 | -- killThread rt | ||
161 | |||
162 | unmap6mapped4 :: SockAddr -> SockAddr | ||
163 | unmap6mapped4 addr@(SockAddrInet6 port _ (0,0,0xFFFF,a) _) = | ||
164 | SockAddrInet port (toBE32 a) | ||
165 | unmap6mapped4 addr = addr | ||
166 | |||
167 | rawReverseResolve :: | ||
168 | DNSCache -> (SockAddr -> IO ()) -> Int -> SockAddr -> IO [Text] | ||
169 | rawReverseResolve dns onFail timeout addr = do | ||
170 | r <- atomically newEmptyTMVar | ||
171 | mvar <- interruptibleDelay | ||
172 | rt <- forkOS $ resolver r mvar | ||
173 | startDelay mvar timeout | ||
174 | did <- atomically $ tryPutTMVar r [] | ||
175 | when did (onFail addr) | ||
176 | atomically $ readTMVar r | ||
177 | where | ||
178 | resolver r mvar = | ||
179 | handleIO_ (return ()) $ do | ||
180 | ent <- getHostByAddr (unmap6mapped4 addr) -- AF_UNSPEC addr | ||
181 | let names = BSD.hostName ent : BSD.hostAliases ent | ||
182 | xs = map Text.pack $ nub names | ||
183 | forkIO $ do | ||
184 | utc <- getCurrentTime | ||
185 | atomically $ dnsObserve dns False utc $ map (,addr) xs | ||
186 | atomically $ putTMVar r xs | ||
187 | |||
188 | -- Returns expired (older than a minute) cached reverse-dns results | ||
189 | -- and removes them from the cache. | ||
190 | expiredReverse :: DNSCache -> SockAddr -> IO [Text] | ||
191 | expiredReverse dns addr = do | ||
192 | utc <- getCurrentTime | ||
193 | addr <- return $ addr `withPort` 0 | ||
194 | es <- atomically $ do | ||
195 | r <- readTVar $ rcache dns | ||
196 | let ns = maybe [] id $ Map.lookup addr r | ||
197 | minute = 60 -- seconds | ||
198 | -- XXX: Is this right? flip diffUTCTime utc returns the age of the | ||
199 | -- cache entry? | ||
200 | (es0,ns') = partition ( (>=minute) . flip diffUTCTime utc . fst ) ns | ||
201 | es = map snd es0 | ||
202 | modifyTVar' (rcache dns) $ Map.insert addr ns' | ||
203 | f <- readTVar $ fcache dns | ||
204 | let f' = foldl' (flip $ Map.alter (expire utc)) f es | ||
205 | expire utc Nothing = Nothing | ||
206 | expire utc (Just as) = if null as' then Nothing else Just as' | ||
207 | where as' = filter ( (<minute) . flip diffUTCTime utc . fst) as | ||
208 | writeTVar (fcache dns) f' | ||
209 | return es | ||
210 | return es | ||
211 | |||
212 | cachedReverse :: DNSCache -> SockAddr -> IO [Text] | ||
213 | cachedReverse dns addr = do | ||
214 | utc <- getCurrentTime | ||
215 | addr <- return $ addr `withPort` 0 | ||
216 | atomically $ do | ||
217 | r <- readTVar (rcache dns) | ||
218 | let ns = maybe [] id $ Map.lookup addr r | ||
219 | {- | ||
220 | ns' = filter ( (<minute) . flip diffUTCTime utc . fst) ns | ||
221 | minute = 60 -- seconds | ||
222 | modifyTVar' (rcache dns) $ Map.insert addr ns' | ||
223 | return $ map snd ns' | ||
224 | -} | ||
225 | return $ map snd ns | ||
226 | |||
227 | -- Returns any dns query results for the given name that were observed less | ||
228 | -- than a minute ago and updates the forward-cache to remove any results older | ||
229 | -- than that. | ||
230 | cachedForward :: DNSCache -> Text -> IO [SockAddr] | ||
231 | cachedForward dns n = do | ||
232 | utc <- getCurrentTime | ||
233 | atomically $ do | ||
234 | f <- readTVar (fcache dns) | ||
235 | let as = maybe [] id $ Map.lookup n f | ||
236 | as' = filter ( (<minute) . flip diffUTCTime utc . fst) as | ||
237 | minute = 60 -- seconds | ||
238 | modifyTVar' (fcache dns) $ Map.insert n as' | ||
239 | return $ map snd as' | ||
240 | |||
241 | -- Reverse-resolves an address to a domain name. Returns both the result of a | ||
242 | -- new query and any freshly cached results. Cache entries older than a minute | ||
243 | -- will not be returned, but will be refreshed in spawned threads so that they | ||
244 | -- may be available for the next call. | ||
245 | reverseResolve :: DNSCache -> SockAddr -> IO [Text] | ||
246 | reverseResolve dns addr = do | ||
247 | expired <- expiredReverse dns addr | ||
248 | forM_ expired $ \n -> forkIO $ do | ||
249 | rawForwardResolve dns (const $ return ()) 1000000 n | ||
250 | return () | ||
251 | xs <- rawReverseResolve dns (const $ return ()) 1000000 addr | ||
252 | cs <- cachedReverse dns addr | ||
253 | return $ xs ++ filter (not . flip elem xs) cs | ||
254 | |||
255 | -- Resolves a name, if there's no result within one second, then any cached | ||
256 | -- results that are less than a minute old are returned. | ||
257 | forwardResolve :: DNSCache -> Text -> IO [SockAddr] | ||
258 | forwardResolve dns n = do | ||
259 | as <- rawForwardResolve dns (const $ return ()) 1000000 n | ||
260 | if null as | ||
261 | then cachedForward dns n | ||
262 | else return as | ||
263 | |||
264 | parseAddress :: Text -> IO (Maybe SockAddr) | ||
265 | parseAddress addr_str = do | ||
266 | info <- getAddrInfo (Just $ defaultHints { addrFlags = [ AI_NUMERICHOST ] }) | ||
267 | (Just . Text.unpack $ addr_str) | ||
268 | (Just "0") | ||
269 | return . listToMaybe $ map addrAddress info | ||
270 | |||
271 | |||
272 | splitAtPort :: String -> (String,String) | ||
273 | splitAtPort s = second sanitizePort $ case s of | ||
274 | ('[':t) -> break (==']') t | ||
275 | _ -> break (==':') s | ||
276 | where | ||
277 | sanitizePort (']':':':p) = p | ||
278 | sanitizePort (':':p) = p | ||
279 | sanitizePort _ = "0" | ||
280 | |||
281 | unsafeParseAddress :: String -> Maybe SockAddr | ||
282 | unsafeParseAddress addr_str = unsafePerformIO $ do | ||
283 | let (ipstr,portstr) = splitAtPort addr_str | ||
284 | info <- getAddrInfo (Just $ defaultHints { addrFlags = [ AI_NUMERICHOST ] }) | ||
285 | (Just ipstr) | ||
286 | (Just portstr) | ||
287 | return . listToMaybe $ map addrAddress info | ||
288 | |||
289 | withPort :: SockAddr -> Int -> SockAddr | ||
290 | withPort (SockAddrInet _ a) port = SockAddrInet (toEnum port) a | ||
291 | withPort (SockAddrInet6 _ a b c) port = SockAddrInet6 (toEnum port) a b c | ||