diff options
-rw-r--r-- | OnionRouter.hs | 14 | ||||
-rw-r--r-- | src/Network/BitTorrent/MainlineDHT.hs | 3 | ||||
-rw-r--r-- | src/Network/QueryResponse.hs | 160 | ||||
-rw-r--r-- | src/Network/Tox.hs | 18 | ||||
-rw-r--r-- | src/Network/Tox/TCP.hs | 9 |
5 files changed, 89 insertions, 115 deletions
diff --git a/OnionRouter.hs b/OnionRouter.hs index 55a08d48..57c8ba35 100644 --- a/OnionRouter.hs +++ b/OnionRouter.hs | |||
@@ -155,8 +155,8 @@ newOnionRouter :: TransportCrypto | |||
155 | -> (String -> IO ()) | 155 | -> (String -> IO ()) |
156 | -> IO ( OnionRouter | 156 | -> IO ( OnionRouter |
157 | , TVar ( ChaChaDRG | 157 | , TVar ( ChaChaDRG |
158 | , Word64Map (Either (MVar (Bool,TCP.RelayPacket)) | 158 | , Word64Map (Either (Maybe (Bool,TCP.RelayPacket) -> IO ()) |
159 | (MVar (OnionMessage Identity))))) | 159 | (Maybe (OnionMessage Identity) -> IO ())))) |
160 | newOnionRouter crypto perror = do | 160 | newOnionRouter crypto perror = do |
161 | drg0 <- drgNew | 161 | drg0 <- drgNew |
162 | (rlog,pq,rm) <- atomically $ do | 162 | (rlog,pq,rm) <- atomically $ do |
@@ -166,7 +166,7 @@ newOnionRouter crypto perror = do | |||
166 | return (rlog,pq,rm) | 166 | return (rlog,pq,rm) |
167 | ((tbl,(tcptbl,tcpcons)),tcp) <- do | 167 | ((tbl,(tcptbl,tcpcons)),tcp) <- do |
168 | (tcptbl, client) <- TCP.newClient crypto Left $ \case | 168 | (tcptbl, client) <- TCP.newClient crypto Left $ \case |
169 | Left v -> void . tryPutMVar v . (,) False | 169 | Left v -> void . v . Just . (,) False |
170 | Right v -> \case | 170 | Right v -> \case |
171 | TCP.OnionPacketResponse x@(OnionAnnounceResponse n8 n24 _) -> do | 171 | TCP.OnionPacketResponse x@(OnionAnnounceResponse n8 n24 _) -> do |
172 | mod <- lookupSender' pq rlog localhost4 n8 | 172 | mod <- lookupSender' pq rlog localhost4 n8 |
@@ -181,7 +181,7 @@ newOnionRouter crypto perror = do | |||
181 | atomically $ do | 181 | atomically $ do |
182 | modifyTVar' pq (W64.delete w8) | 182 | modifyTVar' pq (W64.delete w8) |
183 | modifyArray rm (fmap gotResponse) rid | 183 | modifyArray rm (fmap gotResponse) rid |
184 | void $ tryPutMVar v y | 184 | void $ v $ Just y |
185 | _ -> return () | 185 | _ -> return () |
186 | x -> perror $ "Unexpected TCP query result: " ++ show x | 186 | x -> perror $ "Unexpected TCP query result: " ++ show x |
187 | 187 | ||
@@ -578,7 +578,7 @@ hookQueries :: OnionRouter -> (tid -> Nonce8) | |||
578 | -> TransactionMethods d tid (OnionDestination RouteId) x | 578 | -> TransactionMethods d tid (OnionDestination RouteId) x |
579 | -> TransactionMethods d tid (OnionDestination RouteId) x | 579 | -> TransactionMethods d tid (OnionDestination RouteId) x |
580 | hookQueries or t8 tmethods = TransactionMethods | 580 | hookQueries or t8 tmethods = TransactionMethods |
581 | { dispatchRegister = \mvar od d -> {-# SCC "hookQ.dispatchRegister" #-} do -- :: MVar x -> d -> STM (tid, d) | 581 | { dispatchRegister = \getTimeout now mvar od d -> {-# SCC "hookQ.dispatchRegister" #-} do -- :: MVar x -> d -> STM (tid, d) |
582 | let ni = onionNodeInfo od | 582 | let ni = onionNodeInfo od |
583 | rid@(RouteId ridn) = fromMaybe (routeId (nodeId ni)) $ onionRouteSpec od | 583 | rid@(RouteId ridn) = fromMaybe (routeId (nodeId ni)) $ onionRouteSpec od |
584 | wanted <- {-# SCC "hookQ.wanted" #-} (readArray (pendingRoutes or) ridn) | 584 | wanted <- {-# SCC "hookQ.wanted" #-} (readArray (pendingRoutes or) ridn) |
@@ -587,7 +587,7 @@ hookQueries or t8 tmethods = TransactionMethods | |||
587 | check $ fromMaybe False $ do | 587 | check $ fromMaybe False $ do |
588 | RouteRecord{routeVersion=rv} <- {-# SCC "hookQ.mr" #-} mr | 588 | RouteRecord{routeVersion=rv} <- {-# SCC "hookQ.mr" #-} mr |
589 | return $ wanted <= rv | 589 | return $ wanted <= rv |
590 | (tid,d') <- dispatchRegister tmethods mvar od d | 590 | ((tid,a,expiry),d') <- dispatchRegister tmethods getTimeout now mvar od d |
591 | let Nonce8 w8 = t8 tid | 591 | let Nonce8 w8 = t8 tid |
592 | od' = case od of | 592 | od' = case od of |
593 | OnionDestination {} -> od { onionRouteSpec = Just rid } | 593 | OnionDestination {} -> od { onionRouteSpec = Just rid } |
@@ -599,7 +599,7 @@ hookQueries or t8 tmethods = TransactionMethods | |||
599 | -- check $ W64.size pqs < 20 | 599 | -- check $ W64.size pqs < 20 |
600 | modifyTVar' (pendingQueries or) (W64.insert w8 pq) | 600 | modifyTVar' (pendingQueries or) (W64.insert w8 pq) |
601 | writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ] | 601 | writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ] |
602 | return (tid,d') | 602 | return ((tid,a,expiry),d') |
603 | , dispatchResponse = \tid x d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ()) | 603 | , dispatchResponse = \tid x d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ()) |
604 | let Nonce8 w8 = t8 tid | 604 | let Nonce8 w8 = t8 tid |
605 | mb <- W64.lookup w8 <$> readTVar (pendingQueries or) | 605 | mb <- W64.lookup w8 <$> readTVar (pendingQueries or) |
diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs index 6f47e38f..180ae82d 100644 --- a/src/Network/BitTorrent/MainlineDHT.hs +++ b/src/Network/BitTorrent/MainlineDHT.hs | |||
@@ -82,6 +82,7 @@ import qualified Data.Aeson as JSON | |||
82 | import Text.Read | 82 | import Text.Read |
83 | import System.Global6 | 83 | import System.Global6 |
84 | import Control.TriadCommittee | 84 | import Control.TriadCommittee |
85 | import Data.TableMethods | ||
85 | import DPut | 86 | import DPut |
86 | import DebugTag | 87 | import DebugTag |
87 | 88 | ||
@@ -630,8 +631,6 @@ newClient swarms addr = do | |||
630 | _ -> routing4 routing | 631 | _ -> routing4 routing |
631 | R.thisNode <$> readTVar var | 632 | R.thisNode <$> readTVar var |
632 | , clientResponseId = return | 633 | , clientResponseId = return |
633 | , clientEnterQuery = \_ -> return () | ||
634 | , clientLeaveQuery = \_ _ -> return () | ||
635 | } | 634 | } |
636 | 635 | ||
637 | -- TODO: Provide some means of shutting down these five auxillary threads: | 636 | -- TODO: Provide some means of shutting down these five auxillary threads: |
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs index 13160d31..01981cc8 100644 --- a/src/Network/QueryResponse.hs +++ b/src/Network/QueryResponse.hs | |||
@@ -27,10 +27,12 @@ import qualified Data.IntMap.Strict as IntMap | |||
27 | ;import Data.IntMap.Strict (IntMap) | 27 | ;import Data.IntMap.Strict (IntMap) |
28 | import qualified Data.Map.Strict as Map | 28 | import qualified Data.Map.Strict as Map |
29 | ;import Data.Map.Strict (Map) | 29 | ;import Data.Map.Strict (Map) |
30 | import Data.Time.Clock.POSIX | ||
30 | import qualified Data.Word64Map as W64Map | 31 | import qualified Data.Word64Map as W64Map |
31 | ;import Data.Word64Map (Word64Map) | 32 | ;import Data.Word64Map (Word64Map) |
32 | import Data.Word | 33 | import Data.Word |
33 | import Data.Maybe | 34 | import Data.Maybe |
35 | import GHC.Event | ||
34 | import Network.Socket | 36 | import Network.Socket |
35 | import Network.Socket.ByteString as B | 37 | import Network.Socket.ByteString as B |
36 | import System.Endian | 38 | import System.Endian |
@@ -39,6 +41,7 @@ import System.IO.Error | |||
39 | import System.Timeout | 41 | import System.Timeout |
40 | import DPut | 42 | import DPut |
41 | import DebugTag | 43 | import DebugTag |
44 | import Data.TableMethods | ||
42 | 45 | ||
43 | -- | Three methods are required to implement a datagram based query\/response protocol. | 46 | -- | Three methods are required to implement a datagram based query\/response protocol. |
44 | data TransportA err addr x y = Transport | 47 | data TransportA err addr x y = Transport |
@@ -203,6 +206,52 @@ forkListener name client = do | |||
203 | closeTransport client | 206 | closeTransport client |
204 | killThread thread_id | 207 | killThread thread_id |
205 | 208 | ||
209 | asyncQuery_ :: Client err meth tid addr x | ||
210 | -> MethodSerializer tid addr x meth a b | ||
211 | -> a | ||
212 | -> addr | ||
213 | -> (Maybe b -> IO ()) | ||
214 | -> IO (tid,POSIXTime,Int) | ||
215 | asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do | ||
216 | now <- getPOSIXTime | ||
217 | (tid,addr,expiry) <- atomically $ do | ||
218 | tbl <- readTVar pending | ||
219 | ((tid,addr,expiry), tbl') <- dispatchRegister (tableMethods d) | ||
220 | (methodTimeout meth) | ||
221 | now | ||
222 | (withResponse . fmap (unwrapResponse meth)) | ||
223 | addr0 | ||
224 | tbl | ||
225 | -- (addr,expiry) <- methodTimeout meth tid addr0 | ||
226 | writeTVar pending tbl' | ||
227 | return (tid,addr,expiry) | ||
228 | self <- whoami (Just addr) | ||
229 | mres <- do sendMessage net addr (wrapQuery meth tid self addr q) | ||
230 | return $ Just () | ||
231 | `catchIOError` (\e -> return Nothing) | ||
232 | return (tid,now,expiry) | ||
233 | |||
234 | asyncQuery :: Client err meth tid addr x | ||
235 | -> MethodSerializer tid addr x meth a b | ||
236 | -> a | ||
237 | -> addr | ||
238 | -> (Maybe b -> IO ()) | ||
239 | -> IO () | ||
240 | asyncQuery client meth q addr withResponse0 = do | ||
241 | tm <- getSystemTimerManager | ||
242 | tidvar <- newEmptyMVar | ||
243 | timedout <- registerTimeout tm 300000000 $ do | ||
244 | withResponse0 Nothing | ||
245 | tid <- takeMVar tidvar | ||
246 | case client of | ||
247 | Client { clientDispatcher = d, clientPending = pending } -> do | ||
248 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
249 | (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do | ||
250 | unregisterTimeout tm timedout | ||
251 | withResponse0 x | ||
252 | putMVar tidvar tid | ||
253 | updateTimeout tm timedout expiry | ||
254 | |||
206 | -- | Send a query to a remote peer. Note that this function will always time | 255 | -- | Send a query to a remote peer. Note that this function will always time |
207 | -- out if 'forkListener' was never invoked to spawn a thread to receive and | 256 | -- out if 'forkListener' was never invoked to spawn a thread to receive and |
208 | -- dispatch the response. | 257 | -- dispatch the response. |
@@ -213,25 +262,14 @@ sendQuery :: | |||
213 | -> a -- ^ The outbound query. | 262 | -> a -- ^ The outbound query. |
214 | -> addr -- ^ Destination address of query. | 263 | -> addr -- ^ Destination address of query. |
215 | -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. | 264 | -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. |
216 | sendQuery (Client net d err pending whoami _ enterQuery leaveQuery) meth q addr0 = do | 265 | sendQuery c@(Client net d err pending whoami _) meth q addr0 = do |
217 | mvar <- newEmptyMVar | 266 | mvar <- newEmptyMVar |
218 | (tid,addr,expiry) <- atomically $ do | 267 | (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) |
219 | tbl <- readTVar pending | 268 | mres <- timeout expiry $ takeMVar mvar |
220 | (tid, tbl') <- dispatchRegister (tableMethods d) mvar addr0 tbl | ||
221 | (addr,expiry) <- methodTimeout meth tid addr0 | ||
222 | writeTVar pending tbl' | ||
223 | return (tid,addr,expiry) | ||
224 | self <- whoami (Just addr) | ||
225 | enterQuery tid | ||
226 | mres <- do sendMessage net addr (wrapQuery meth tid self addr q) | ||
227 | timeout expiry $ takeMVar mvar | ||
228 | `catchIOError` (\e -> return Nothing) | ||
229 | leaveQuery tid (isJust mres) | ||
230 | case mres of | 269 | case mres of |
231 | Just x -> return $ Just $ unwrapResponse meth x | 270 | Just b -> return $ Just b |
232 | Nothing -> do | 271 | Nothing -> do |
233 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | 272 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending |
234 | reportTimeout err (method meth) tid addr | ||
235 | return Nothing | 273 | return Nothing |
236 | 274 | ||
237 | -- * Implementing a query\/response 'Client'. | 275 | -- * Implementing a query\/response 'Client'. |
@@ -259,10 +297,6 @@ data Client err meth tid addr x = forall tbl. Client | |||
259 | -- /tid/ includes a unique cryptographic nonce, then it should be | 297 | -- /tid/ includes a unique cryptographic nonce, then it should be |
260 | -- generated here. | 298 | -- generated here. |
261 | , clientResponseId :: tid -> IO tid | 299 | , clientResponseId :: tid -> IO tid |
262 | -- | The enter/leave methods are no-ops by default. They are useful for | ||
263 | -- serializing all queries for debugging purposes. | ||
264 | , clientEnterQuery :: tid -> IO () | ||
265 | , clientLeaveQuery :: tid -> Bool -> IO () | ||
266 | } | 300 | } |
267 | 301 | ||
268 | -- | An incoming message can be classified into three cases. | 302 | -- | An incoming message can be classified into three cases. |
@@ -353,7 +387,7 @@ data TransactionMethods d tid addr x = TransactionMethods | |||
353 | -- response will be written too. The returned /tid/ is a transaction id | 387 | -- response will be written too. The returned /tid/ is a transaction id |
354 | -- that can be used to forget the 'MVar' if the remote peer is not | 388 | -- that can be used to forget the 'MVar' if the remote peer is not |
355 | -- responding. | 389 | -- responding. |
356 | dispatchRegister :: MVar x -> addr -> d -> STM (tid, d) | 390 | dispatchRegister :: (tid -> addr -> STM (addr,Int)) -> POSIXTime -> (Maybe x -> IO ()) -> addr -> d -> STM ((tid,addr,Int), d) |
357 | -- | This method is invoked when an incoming packet /x/ indicates it is | 391 | -- | This method is invoked when an incoming packet /x/ indicates it is |
358 | -- a response to the transaction with id /tid/. The returned IO action | 392 | -- a response to the transaction with id /tid/. The returned IO action |
359 | -- will write the packet to the correct 'MVar' thus completing the | 393 | -- will write the packet to the correct 'MVar' thus completing the |
@@ -364,69 +398,37 @@ data TransactionMethods d tid addr x = TransactionMethods | |||
364 | , dispatchCancel :: tid -> d -> STM d | 398 | , dispatchCancel :: tid -> d -> STM d |
365 | } | 399 | } |
366 | 400 | ||
367 | -- | The standard lookup table methods for use as input to 'transactionMethods' | ||
368 | -- in lieu of directly implementing 'TransactionMethods'. | ||
369 | data TableMethods t tid = TableMethods | ||
370 | { -- | Insert a new /tid/ entry into the transaction table. | ||
371 | tblInsert :: forall a. tid -> a -> t a -> t a | ||
372 | -- | Delete transaction /tid/ from the transaction table. | ||
373 | , tblDelete :: forall a. tid -> t a -> t a | ||
374 | -- | Lookup the value associated with transaction /tid/. | ||
375 | , tblLookup :: forall a. tid -> t a -> Maybe a | ||
376 | } | ||
377 | |||
378 | -- | Methods for using 'Data.IntMap'. | ||
379 | intMapMethods :: TableMethods IntMap Int | ||
380 | intMapMethods = TableMethods IntMap.insert IntMap.delete IntMap.lookup | ||
381 | |||
382 | -- | Methods for using 'Data.Word64Map'. | ||
383 | w64MapMethods :: TableMethods Word64Map Word64 | ||
384 | w64MapMethods = TableMethods W64Map.insert W64Map.delete W64Map.lookup | ||
385 | |||
386 | -- | Methods for using 'Data.Map' | ||
387 | mapMethods :: Ord tid => TableMethods (Map tid) tid | ||
388 | mapMethods = TableMethods Map.insert Map.delete Map.lookup | ||
389 | |||
390 | -- | Change the key type for a lookup table implementation. | ||
391 | -- | ||
392 | -- This can be used with 'intMapMethods' or 'mapMethods' to restrict lookups to | ||
393 | -- only a part of the generated /tid/ value. This is useful for /tid/ types | ||
394 | -- that are especially large due their use for other purposes, such as secure | ||
395 | -- nonces for encryption. | ||
396 | instance Contravariant (TableMethods t) where | ||
397 | -- contramap :: (tid -> t1) -> TableMethods t t1 -> TableMethods t tid | ||
398 | contramap f (TableMethods ins del lookup) = | ||
399 | TableMethods (\k v t -> ins (f k) v t) | ||
400 | (\k t -> del (f k) t) | ||
401 | (\k t -> lookup (f k) t) | ||
402 | |||
403 | -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a | 401 | -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a |
404 | -- function for generating unique transaction ids. | 402 | -- function for generating unique transaction ids. |
405 | transactionMethods :: | 403 | transactionMethods :: |
406 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. | 404 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. |
407 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | 405 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. |
408 | -> TransactionMethods (g,t (MVar x)) tid addr x | 406 | -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x |
409 | transactionMethods methods generate = transactionMethods' id tryPutMVar methods generate | 407 | transactionMethods methods generate = transactionMethods' id id methods generate |
408 | |||
409 | microsecondsDiff :: Int -> POSIXTime | ||
410 | microsecondsDiff us = fromIntegral us / 1000000 | ||
410 | 411 | ||
411 | -- | Like 'transactionMethods' but allows extra information to be stored in the | 412 | -- | Like 'transactionMethods' but allows extra information to be stored in the |
412 | -- table of pending transactions. This also enables multiple 'Client's to | 413 | -- table of pending transactions. This also enables multiple 'Client's to |
413 | -- share a single transaction table. | 414 | -- share a single transaction table. |
414 | transactionMethods' :: | 415 | transactionMethods' :: |
415 | (MVar x -> a) -- ^ store MVar into table entry | 416 | ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry |
416 | -> (a -> x -> IO void) -- ^ load MVar from table entry | 417 | -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry |
417 | -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. | 418 | -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. |
418 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | 419 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. |
419 | -> TransactionMethods (g,t a) tid addr x | 420 | -> TransactionMethods (g,t a) tid addr x |
420 | transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods | 421 | transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods |
421 | { dispatchCancel = \tid (g,t) -> return (g, delete tid t) | 422 | { dispatchCancel = \tid (g,t) -> return (g, delete tid t) |
422 | , dispatchRegister = \v _ (g,t) -> | 423 | , dispatchRegister = \getTimeout now v a0 (g,t) -> do |
423 | let (tid,g') = generate g | 424 | let (tid,g') = generate g |
424 | t' = insert tid (store v) t | 425 | (a,expiry) <- getTimeout tid a0 |
425 | in return ( tid, (g',t') ) | 426 | let t' = insert tid (store v) (now + microsecondsDiff expiry) t |
427 | return ( (tid,a,expiry), (g',t') ) | ||
426 | , dispatchResponse = \tid x (g,t) -> | 428 | , dispatchResponse = \tid x (g,t) -> |
427 | case lookup tid t of | 429 | case lookup tid t of |
428 | Just v -> let t' = delete tid t | 430 | Just v -> let t' = delete tid t |
429 | in return ((g,t'),void $ load v x) | 431 | in return ((g,t'),void $ load v $ Just x) |
430 | Nothing -> return ((g,t), return ()) | 432 | Nothing -> return ((g,t), return ()) |
431 | } | 433 | } |
432 | 434 | ||
@@ -459,8 +461,6 @@ data ErrorReporter addr x meth tid err = ErrorReporter | |||
459 | , reportMissingHandler :: meth -> addr -> x -> IO () | 461 | , reportMissingHandler :: meth -> addr -> x -> IO () |
460 | -- | Incoming: unable to identify request. | 462 | -- | Incoming: unable to identify request. |
461 | , reportUnknown :: addr -> x -> err -> IO () | 463 | , reportUnknown :: addr -> x -> err -> IO () |
462 | -- | Outgoing: remote peer is not responding. | ||
463 | , reportTimeout :: meth -> tid -> addr -> IO () | ||
464 | } | 464 | } |
465 | 465 | ||
466 | ignoreErrors :: ErrorReporter addr x meth tid err | 466 | ignoreErrors :: ErrorReporter addr x meth tid err |
@@ -468,7 +468,6 @@ ignoreErrors = ErrorReporter | |||
468 | { reportParseError = \_ -> return () | 468 | { reportParseError = \_ -> return () |
469 | , reportMissingHandler = \_ _ _ -> return () | 469 | , reportMissingHandler = \_ _ _ -> return () |
470 | , reportUnknown = \_ _ _ -> return () | 470 | , reportUnknown = \_ _ _ -> return () |
471 | , reportTimeout = \_ _ _ -> return () | ||
472 | } | 471 | } |
473 | 472 | ||
474 | logErrors :: ( Show addr | 473 | logErrors :: ( Show addr |
@@ -478,7 +477,6 @@ logErrors = ErrorReporter | |||
478 | { reportParseError = \err -> dput XMisc err | 477 | { reportParseError = \err -> dput XMisc err |
479 | , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" | 478 | , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" |
480 | , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err | 479 | , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err |
481 | , reportTimeout = \meth tid addr -> dput XMisc $ show addr ++ " --> Timeout ("++show meth++")" | ||
482 | } | 480 | } |
483 | 481 | ||
484 | printErrors :: ( Show addr | 482 | printErrors :: ( Show addr |
@@ -488,17 +486,15 @@ printErrors h = ErrorReporter | |||
488 | { reportParseError = \err -> hPutStrLn h err | 486 | { reportParseError = \err -> hPutStrLn h err |
489 | , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" | 487 | , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" |
490 | , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err | 488 | , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err |
491 | , reportTimeout = \meth tid addr -> hPutStrLn h $ show addr ++ " --> Timeout ("++show meth++")" | ||
492 | } | 489 | } |
493 | 490 | ||
494 | -- Change the /err/ type for an 'ErrorReporter'. | 491 | -- Change the /err/ type for an 'ErrorReporter'. |
495 | instance Contravariant (ErrorReporter addr x meth tid) where | 492 | instance Contravariant (ErrorReporter addr x meth tid) where |
496 | -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 | 493 | -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 |
497 | contramap f (ErrorReporter pe mh unk tim) | 494 | contramap f (ErrorReporter pe mh unk) |
498 | = ErrorReporter (\e -> pe (f e)) | 495 | = ErrorReporter (\e -> pe (f e)) |
499 | mh | 496 | mh |
500 | (\addr x e -> unk addr x (f e)) | 497 | (\addr x e -> unk addr x (f e)) |
501 | tim | ||
502 | 498 | ||
503 | -- | Handle a single inbound packet and then invoke the given continuation. | 499 | -- | Handle a single inbound packet and then invoke the given continuation. |
504 | -- The 'forkListener' function is implemented by passing this function to 'fix' | 500 | -- The 'forkListener' function is implemented by passing this function to 'fix' |
@@ -509,7 +505,7 @@ handleMessage :: | |||
509 | -> addr | 505 | -> addr |
510 | -> x | 506 | -> x |
511 | -> IO (Maybe (x -> x)) | 507 | -> IO (Maybe (x -> x)) |
512 | handleMessage (Client net d err pending whoami responseID _ _) addr plain = do | 508 | handleMessage (Client net d err pending whoami responseID) addr plain = do |
513 | -- Just (Left e) -> do reportParseError err e | 509 | -- Just (Left e) -> do reportParseError err e |
514 | -- return $! Just id | 510 | -- return $! Just id |
515 | -- Just (Right (plain, addr)) -> do | 511 | -- Just (Right (plain, addr)) -> do |
@@ -637,19 +633,3 @@ testPairTransport = do | |||
637 | b = SockAddrInet 2 2 | 633 | b = SockAddrInet 2 2 |
638 | return ( chanTransport (const bchan) a achan aclosed | 634 | return ( chanTransport (const bchan) a achan aclosed |
639 | , chanTransport (const achan) b bchan bclosed ) | 635 | , chanTransport (const achan) b bchan bclosed ) |
640 | |||
641 | serializeClient :: Client err meth tid addr x -> IO (Client err meth tid addr x) | ||
642 | serializeClient c = do | ||
643 | mvar <- newMVar () | ||
644 | return $ c { clientEnterQuery = \tid -> takeMVar mvar | ||
645 | , clientLeaveQuery = \tid didRespond -> putMVar mvar () | ||
646 | } | ||
647 | |||
648 | retardSend :: Int -> Client err meth tid addr x -> IO (Client err meth tid addr x) | ||
649 | retardSend micros client = do | ||
650 | mvar <- newMVar () :: IO (MVar ()) | ||
651 | return client { clientEnterQuery = \tid -> do | ||
652 | takeMVar mvar | ||
653 | threadDelay micros | ||
654 | putMVar mvar () | ||
655 | } | ||
diff --git a/src/Network/Tox.hs b/src/Network/Tox.hs index c14339e4..98c03b80 100644 --- a/src/Network/Tox.hs +++ b/src/Network/Tox.hs | |||
@@ -44,6 +44,7 @@ import Network.Socket | |||
44 | import System.Endian | 44 | import System.Endian |
45 | import System.IO.Error | 45 | import System.IO.Error |
46 | 46 | ||
47 | import Data.TableMethods | ||
47 | import qualified Data.Word64Map | 48 | import qualified Data.Word64Map |
48 | import Network.BitTorrent.DHT.Token as Token | 49 | import Network.BitTorrent.DHT.Token as Token |
49 | import qualified Data.Wrapper.PSQ as PSQ | 50 | import qualified Data.Wrapper.PSQ as PSQ |
@@ -159,12 +160,10 @@ newClient drg net classify selfAddr handlers modifytbl modifynet = do | |||
159 | let client = Client | 160 | let client = Client |
160 | { clientNet = addHandler (reportParseError eprinter) (handleMessage client) $ modifynet client net | 161 | { clientNet = addHandler (reportParseError eprinter) (handleMessage client) $ modifynet client net |
161 | , clientDispatcher = dispatch tbl var (handlers client) client | 162 | , clientDispatcher = dispatch tbl var (handlers client) client |
162 | , clientErrorReporter = eprinter { reportTimeout = reportTimeout ignoreErrors } | 163 | , clientErrorReporter = eprinter |
163 | , clientPending = var | 164 | , clientPending = var |
164 | , clientAddress = selfAddr | 165 | , clientAddress = selfAddr |
165 | , clientResponseId = genNonce24 var | 166 | , clientResponseId = genNonce24 var |
166 | , clientEnterQuery = \_ -> return () | ||
167 | , clientLeaveQuery = \_ _ -> return () | ||
168 | } | 167 | } |
169 | in client | 168 | in client |
170 | return $ either mkclient mkclient tblvar handlers | 169 | return $ either mkclient mkclient tblvar handlers |
@@ -250,8 +249,8 @@ newOnionClient :: DRG g => | |||
250 | -> TVar Onion.AnnouncedKeys | 249 | -> TVar Onion.AnnouncedKeys |
251 | -> OnionRouter | 250 | -> OnionRouter |
252 | -> TVar (g, Data.Word64Map.Word64Map a) | 251 | -> TVar (g, Data.Word64Map.Word64Map a) |
253 | -> (MVar Onion.Message -> a) | 252 | -> ((Maybe Onion.Message -> IO ()) -> a) |
254 | -> (a -> Onion.Message -> IO void) | 253 | -> (a -> Maybe Onion.Message -> IO void) |
255 | -> Client String | 254 | -> Client String |
256 | DHT.PacketKind | 255 | DHT.PacketKind |
257 | DHT.TransactionId | 256 | DHT.TransactionId |
@@ -268,12 +267,10 @@ newOnionClient crypto net r toks keydb orouter map_var store load = c | |||
268 | , tableMethods = hookQueries orouter DHT.transactionKey | 267 | , tableMethods = hookQueries orouter DHT.transactionKey |
269 | $ transactionMethods' store load (contramap w64Key w64MapMethods) gen | 268 | $ transactionMethods' store load (contramap w64Key w64MapMethods) gen |
270 | } | 269 | } |
271 | , clientErrorReporter = eprinter { reportTimeout = reportTimeout ignoreErrors } | 270 | , clientErrorReporter = eprinter |
272 | , clientPending = map_var | 271 | , clientPending = map_var |
273 | , clientAddress = getOnionAlias crypto $ R.thisNode <$> readTVar (DHT.routing4 r) | 272 | , clientAddress = getOnionAlias crypto $ R.thisNode <$> readTVar (DHT.routing4 r) |
274 | , clientResponseId = genNonce24 map_var | 273 | , clientResponseId = genNonce24 map_var |
275 | , clientEnterQuery = \_ -> return () | ||
276 | , clientLeaveQuery = \_ _ -> return () | ||
277 | } | 274 | } |
278 | 275 | ||
279 | newTox :: TVar Onion.AnnouncedKeys -- ^ Store of announced keys we are a rendezvous for. | 276 | newTox :: TVar Onion.AnnouncedKeys -- ^ Store of announced keys we are a rendezvous for. |
@@ -359,10 +356,9 @@ newToxOverTransport keydb addr onNewSession suppliedDHTKey udp tcp = do | |||
359 | let onionnet = layerTransportM (Onion.decrypt crypto) (Onion.encrypt crypto) onioncrypt | 356 | let onionnet = layerTransportM (Onion.decrypt crypto) (Onion.encrypt crypto) onioncrypt |
360 | let onionclient = newOnionClient crypto onionnet (mkrouting dhtclient) toks keydb orouter' otbl | 357 | let onionclient = newOnionClient crypto onionnet (mkrouting dhtclient) toks keydb orouter' otbl |
361 | Right $ \case | 358 | Right $ \case |
362 | Right v -> tryPutMVar v | 359 | Right v -> v |
363 | Left v -> \_ -> do | 360 | Left v -> \_ -> |
364 | dput XUnexpected "TCP-sent onion query got response over UDP?" | 361 | dput XUnexpected "TCP-sent onion query got response over UDP?" |
365 | return False | ||
366 | 362 | ||
367 | return Tox | 363 | return Tox |
368 | { toxDHT = dhtclient | 364 | { toxDHT = dhtclient |
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs index adb42514..9c1ffe48 100644 --- a/src/Network/Tox/TCP.hs +++ b/src/Network/Tox/TCP.hs | |||
@@ -34,6 +34,7 @@ import System.Timeout | |||
34 | import ControlMaybe | 34 | import ControlMaybe |
35 | import Crypto.Tox | 35 | import Crypto.Tox |
36 | import Data.ByteString (hPut,hGet,ByteString,length) | 36 | import Data.ByteString (hPut,hGet,ByteString,length) |
37 | import Data.TableMethods | ||
37 | import Data.Tox.Relay | 38 | import Data.Tox.Relay |
38 | import qualified Data.Word64Map | 39 | import qualified Data.Word64Map |
39 | import DebugTag | 40 | import DebugTag |
@@ -269,7 +270,7 @@ type RelayClient = Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket) | |||
269 | -- defaults are 'id' and 'tryPutMVar'. The resulting customized table state | 270 | -- defaults are 'id' and 'tryPutMVar'. The resulting customized table state |
270 | -- will be returned to the caller along with the new client. | 271 | -- will be returned to the caller along with the new client. |
271 | newClient :: TransportCrypto | 272 | newClient :: TransportCrypto |
272 | -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query | 273 | -> ((Maybe (Bool,RelayPacket) -> IO ()) -> a) -- ^ store mvar for query |
273 | -> (a -> RelayPacket -> IO void) -- ^ load mvar for query | 274 | -> (a -> RelayPacket -> IO void) -- ^ load mvar for query |
274 | -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) | 275 | -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) |
275 | , TCPCache (SessionProtocol RelayPacket RelayPacket) ) | 276 | , TCPCache (SessionProtocol RelayPacket RelayPacket) ) |
@@ -299,16 +300,14 @@ newClient crypto store load = do | |||
299 | { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a | 300 | { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a |
300 | , noreplyAction = \addr a -> dput XTCP $ "tcp-lookupHandler: "++show w | 301 | , noreplyAction = \addr a -> dput XTCP $ "tcp-lookupHandler: "++show w |
301 | } | 302 | } |
302 | , tableMethods = transactionMethods' store (\x -> load x . snd) (contramap (\(Nonce8 w64) -> w64) w64MapMethods) | 303 | , tableMethods = transactionMethods' store (\x -> mapM_ (load x . snd)) (contramap (\(Nonce8 w64) -> w64) w64MapMethods) |
303 | $ first (either error Nonce8 . decode) . randomBytesGenerate 8 | 304 | $ first (either error Nonce8 . decode) . randomBytesGenerate 8 |
304 | } | 305 | } |
305 | , clientErrorReporter = logErrors { reportTimeout = reportTimeout ignoreErrors } | 306 | , clientErrorReporter = logErrors |
306 | , clientPending = map_var | 307 | , clientPending = map_var |
307 | , clientAddress = \_ -> return $ NodeInfo | 308 | , clientAddress = \_ -> return $ NodeInfo |
308 | { udpNodeInfo = either error id $ UDP.nodeInfo (UDP.key2id $ transportPublic crypto) (SockAddrInet 0 0) | 309 | { udpNodeInfo = either error id $ UDP.nodeInfo (UDP.key2id $ transportPublic crypto) (SockAddrInet 0 0) |
309 | , tcpPort = 0 | 310 | , tcpPort = 0 |
310 | } | 311 | } |
311 | , clientResponseId = return | 312 | , clientResponseId = return |
312 | , clientEnterQuery = \_ -> return () | ||
313 | , clientLeaveQuery = \_ _ -> return () | ||
314 | } | 313 | } |