summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2019-12-14 01:54:31 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-01 23:26:05 -0500
commitb3dedb534768756c74448ed4066184e28a539c52 (patch)
treea7e62e029e372334534a8d40ef9eeb5089171aa7
parentb5a3c7b92e7effcd234037241b00f9f29773d870 (diff)
QueryReponse: mergeTransports
-rw-r--r--dht/dht-client.cabal1
-rw-r--r--dht/src/Network/QueryResponse.hs47
2 files changed, 39 insertions, 9 deletions
diff --git a/dht/dht-client.cabal b/dht/dht-client.cabal
index 24739767..37126d0a 100644
--- a/dht/dht-client.cabal
+++ b/dht/dht-client.cabal
@@ -166,6 +166,7 @@ library
166 , containers 166 , containers
167 -- TODO: Use GShow,Has 'Show, when dependent-sum>0.6 167 -- TODO: Use GShow,Has 'Show, when dependent-sum>0.6
168 , dependent-sum < 0.6 168 , dependent-sum < 0.6
169 , dependent-map
169 , array 170 , array
170 , hashable 171 , hashable
171 , iproute 172 , iproute
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs
index ba686929..d8dc8bfa 100644
--- a/dht/src/Network/QueryResponse.hs
+++ b/dht/src/Network/QueryResponse.hs
@@ -21,8 +21,12 @@ import Control.Exception
21import Control.Monad 21import Control.Monad
22import qualified Data.ByteString as B 22import qualified Data.ByteString as B
23 ;import Data.ByteString (ByteString) 23 ;import Data.ByteString (ByteString)
24import Data.Dependent.Map as DMap
25import Data.Dependent.Sum
24import Data.Function 26import Data.Function
25import Data.Functor.Contravariant 27import Data.Functor.Contravariant
28import Data.Functor.Identity
29import Data.GADT.Show
26import qualified Data.IntMap.Strict as IntMap 30import qualified Data.IntMap.Strict as IntMap
27 ;import Data.IntMap.Strict (IntMap) 31 ;import Data.IntMap.Strict (IntMap)
28import qualified Data.Map.Strict as Map 32import qualified Data.Map.Strict as Map
@@ -40,6 +44,7 @@ import System.Endian
40import System.IO 44import System.IO
41import System.IO.Error 45import System.IO.Error
42import System.Timeout 46import System.Timeout
47
43import DPut 48import DPut
44import DebugTag 49import DebugTag
45import Data.TableMethods 50import Data.TableMethods
@@ -122,22 +127,22 @@ partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a)))
122 -> IO (Transport err xaddr x, Transport err a b) 127 -> IO (Transport err xaddr x, Transport err a b)
123partitionTransportM parse encodex tr = do 128partitionTransportM parse encodex tr = do
124 tchan <- atomically newTChan 129 tchan <- atomically newTChan
125 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do 130 let ytr = tr { awaitMessage = \kont -> fix $ \again -> do
126 awaitMessage tr $ \m -> case m of 131 awaitMessage tr $ \m -> case m of
127 Arrival adr msg -> parse (msg,adr) >>= \case 132 Arrival adr msg -> parse (msg,adr) >>= \case
128 Left (x,xaddr) -> kont $ Arrival xaddr x 133 Left x -> atomically (writeTChan tchan (Just x)) >> join (atomically again)
129 Right y -> atomically (writeTChan tchan (Just y)) >> join (atomically again) 134 Right (y,yaddr) -> kont $ Arrival yaddr y
130 ParseError e -> kont $ ParseError e 135 ParseError e -> kont $ ParseError e
131 Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated 136 Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated
132 , sendMessage = \addr' msg' -> do 137 , sendMessage = sendMessage tr
133 msg_addr <- encodex (msg',addr')
134 mapM_ (uncurry . flip $ sendMessage tr) msg_addr
135 } 138 }
136 ytr = Transport 139 xtr = Transport
137 { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case 140 { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case
138 Nothing -> Terminated 141 Nothing -> Terminated
139 Just (y,yaddr) -> Arrival yaddr y 142 Just (x,xaddr) -> Arrival xaddr x
140 , sendMessage = sendMessage tr 143 , sendMessage = \addr' msg' -> do
144 msg_addr <- encodex (msg',addr')
145 mapM_ (uncurry . flip $ sendMessage tr) msg_addr
141 , setActive = const $ return () 146 , setActive = const $ return ()
142 } 147 }
143 return (xtr, ytr) 148 return (xtr, ytr)
@@ -687,3 +692,27 @@ testPairTransport = do
687 b = SockAddrInet 2 2 692 b = SockAddrInet 2 2
688 return ( chanTransport (const bchan) a achan aclosed 693 return ( chanTransport (const bchan) a achan aclosed
689 , chanTransport (const achan) b bchan bclosed ) 694 , chanTransport (const achan) b bchan bclosed )
695
696newtype ByAddress err x addr = ByAddress (Transport err addr x)
697
698newtype Tagged x addr = Tagged x
699
700decorateAddr :: tag addr -> Arrival e addr x -> Arrival e (DSum tag Identity) x
701decorateAddr tag Terminated = Terminated
702decorateAddr tag (ParseError e) = ParseError e
703decorateAddr tag (Arrival addr x) = Arrival (tag ==> addr) x
704
705mergeTransports :: GCompare tag => DMap tag (ByAddress err x) -> IO (Transport err (DSum tag Identity) x)
706mergeTransports tmap = do
707 -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap
708 -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap
709 return Transport
710 { awaitMessage = \kont ->
711 foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n)
712 retry
713 tmap
714 , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of
715 Just (ByAddress tr) -> sendMessage tr addr x
716 Nothing -> return ()
717 , setActive = \toggle -> foldrWithKey (\_ (ByAddress tr) next -> setActive tr toggle >> next) (return ()) tmap
718 }