diff options
author | Joe Crayne <joe@jerkface.net> | 2019-12-14 01:54:31 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 23:26:05 -0500 |
commit | b3dedb534768756c74448ed4066184e28a539c52 (patch) | |
tree | a7e62e029e372334534a8d40ef9eeb5089171aa7 /dht/src/Network/QueryResponse.hs | |
parent | b5a3c7b92e7effcd234037241b00f9f29773d870 (diff) |
QueryReponse: mergeTransports
Diffstat (limited to 'dht/src/Network/QueryResponse.hs')
-rw-r--r-- | dht/src/Network/QueryResponse.hs | 47 |
1 files changed, 38 insertions, 9 deletions
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs index ba686929..d8dc8bfa 100644 --- a/dht/src/Network/QueryResponse.hs +++ b/dht/src/Network/QueryResponse.hs | |||
@@ -21,8 +21,12 @@ import Control.Exception | |||
21 | import Control.Monad | 21 | import Control.Monad |
22 | import qualified Data.ByteString as B | 22 | import qualified Data.ByteString as B |
23 | ;import Data.ByteString (ByteString) | 23 | ;import Data.ByteString (ByteString) |
24 | import Data.Dependent.Map as DMap | ||
25 | import Data.Dependent.Sum | ||
24 | import Data.Function | 26 | import Data.Function |
25 | import Data.Functor.Contravariant | 27 | import Data.Functor.Contravariant |
28 | import Data.Functor.Identity | ||
29 | import Data.GADT.Show | ||
26 | import qualified Data.IntMap.Strict as IntMap | 30 | import qualified Data.IntMap.Strict as IntMap |
27 | ;import Data.IntMap.Strict (IntMap) | 31 | ;import Data.IntMap.Strict (IntMap) |
28 | import qualified Data.Map.Strict as Map | 32 | import qualified Data.Map.Strict as Map |
@@ -40,6 +44,7 @@ import System.Endian | |||
40 | import System.IO | 44 | import System.IO |
41 | import System.IO.Error | 45 | import System.IO.Error |
42 | import System.Timeout | 46 | import System.Timeout |
47 | |||
43 | import DPut | 48 | import DPut |
44 | import DebugTag | 49 | import DebugTag |
45 | import Data.TableMethods | 50 | import Data.TableMethods |
@@ -122,22 +127,22 @@ partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | |||
122 | -> IO (Transport err xaddr x, Transport err a b) | 127 | -> IO (Transport err xaddr x, Transport err a b) |
123 | partitionTransportM parse encodex tr = do | 128 | partitionTransportM parse encodex tr = do |
124 | tchan <- atomically newTChan | 129 | tchan <- atomically newTChan |
125 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do | 130 | let ytr = tr { awaitMessage = \kont -> fix $ \again -> do |
126 | awaitMessage tr $ \m -> case m of | 131 | awaitMessage tr $ \m -> case m of |
127 | Arrival adr msg -> parse (msg,adr) >>= \case | 132 | Arrival adr msg -> parse (msg,adr) >>= \case |
128 | Left (x,xaddr) -> kont $ Arrival xaddr x | 133 | Left x -> atomically (writeTChan tchan (Just x)) >> join (atomically again) |
129 | Right y -> atomically (writeTChan tchan (Just y)) >> join (atomically again) | 134 | Right (y,yaddr) -> kont $ Arrival yaddr y |
130 | ParseError e -> kont $ ParseError e | 135 | ParseError e -> kont $ ParseError e |
131 | Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated | 136 | Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated |
132 | , sendMessage = \addr' msg' -> do | 137 | , sendMessage = sendMessage tr |
133 | msg_addr <- encodex (msg',addr') | ||
134 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr | ||
135 | } | 138 | } |
136 | ytr = Transport | 139 | xtr = Transport |
137 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case | 140 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case |
138 | Nothing -> Terminated | 141 | Nothing -> Terminated |
139 | Just (y,yaddr) -> Arrival yaddr y | 142 | Just (x,xaddr) -> Arrival xaddr x |
140 | , sendMessage = sendMessage tr | 143 | , sendMessage = \addr' msg' -> do |
144 | msg_addr <- encodex (msg',addr') | ||
145 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr | ||
141 | , setActive = const $ return () | 146 | , setActive = const $ return () |
142 | } | 147 | } |
143 | return (xtr, ytr) | 148 | return (xtr, ytr) |
@@ -687,3 +692,27 @@ testPairTransport = do | |||
687 | b = SockAddrInet 2 2 | 692 | b = SockAddrInet 2 2 |
688 | return ( chanTransport (const bchan) a achan aclosed | 693 | return ( chanTransport (const bchan) a achan aclosed |
689 | , chanTransport (const achan) b bchan bclosed ) | 694 | , chanTransport (const achan) b bchan bclosed ) |
695 | |||
696 | newtype ByAddress err x addr = ByAddress (Transport err addr x) | ||
697 | |||
698 | newtype Tagged x addr = Tagged x | ||
699 | |||
700 | decorateAddr :: tag addr -> Arrival e addr x -> Arrival e (DSum tag Identity) x | ||
701 | decorateAddr tag Terminated = Terminated | ||
702 | decorateAddr tag (ParseError e) = ParseError e | ||
703 | decorateAddr tag (Arrival addr x) = Arrival (tag ==> addr) x | ||
704 | |||
705 | mergeTransports :: GCompare tag => DMap tag (ByAddress err x) -> IO (Transport err (DSum tag Identity) x) | ||
706 | mergeTransports tmap = do | ||
707 | -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap | ||
708 | -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap | ||
709 | return Transport | ||
710 | { awaitMessage = \kont -> | ||
711 | foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n) | ||
712 | retry | ||
713 | tmap | ||
714 | , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of | ||
715 | Just (ByAddress tr) -> sendMessage tr addr x | ||
716 | Nothing -> return () | ||
717 | , setActive = \toggle -> foldrWithKey (\_ (ByAddress tr) next -> setActive tr toggle >> next) (return ()) tmap | ||
718 | } | ||