diff options
Diffstat (limited to 'dht/src/Network/QueryResponse.hs')
-rw-r--r-- | dht/src/Network/QueryResponse.hs | 32 |
1 files changed, 1 insertions, 31 deletions
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs index 1bfa995f..89723da2 100644 --- a/dht/src/Network/QueryResponse.hs +++ b/dht/src/Network/QueryResponse.hs | |||
@@ -52,7 +52,7 @@ import Data.TableMethods | |||
52 | -- | An inbound packet or condition raised while monitoring a connection. | 52 | -- | An inbound packet or condition raised while monitoring a connection. |
53 | data Arrival err addr x | 53 | data Arrival err addr x |
54 | = Terminated -- ^ Virtual message that signals EOF. | 54 | = Terminated -- ^ Virtual message that signals EOF. |
55 | | ParseError !err -- ^ A badly-formed message was recieved. | 55 | | ParseError !err -- ^ A badly-formed message was received. |
56 | | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message. | 56 | | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message. |
57 | 57 | ||
58 | -- | Three methods are required to implement a datagram based query\/response protocol. | 58 | -- | Three methods are required to implement a datagram based query\/response protocol. |
@@ -157,36 +157,6 @@ partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) | |||
157 | partitionTransport parse encodex tr = | 157 | partitionTransport parse encodex tr = |
158 | partitionTransportM (return . parse) (return . encodex) tr | 158 | partitionTransportM (return . parse) (return . encodex) tr |
159 | 159 | ||
160 | |||
161 | partitionAndForkTransport :: | ||
162 | (dst -> msg -> IO ()) | ||
163 | -> ((b,a) -> IO (Either (x,xaddr) (b,a))) | ||
164 | -> ((x,xaddr) -> IO (Maybe (Either (msg,dst) (b,a)))) | ||
165 | -> Transport err a b | ||
166 | -> IO (Transport err xaddr x, Transport err a b) | ||
167 | partitionAndForkTransport forkedSend parse encodex tr = do | ||
168 | tchan <- atomically newTChan | ||
169 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do | ||
170 | awaitMessage tr $ \case | ||
171 | Arrival a b -> parse (b,a) >>= \case | ||
172 | Left (x,xaddr) -> kont $ Arrival xaddr x | ||
173 | Right (b,a) -> atomically (writeTChan tchan (Arrival a b)) >> join (atomically again) | ||
174 | ParseError e -> kont $ ParseError e | ||
175 | Terminated -> atomically (writeTChan tchan Terminated) >> kont Terminated | ||
176 | , sendMessage = \addr' msg' -> do | ||
177 | msg_addr <- encodex (msg',addr') | ||
178 | case msg_addr of | ||
179 | Just (Right (b,a)) -> sendMessage tr a b | ||
180 | Just (Left (msg,dst)) -> forkedSend dst msg | ||
181 | Nothing -> return () | ||
182 | } | ||
183 | ytr = Transport | ||
184 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont | ||
185 | , sendMessage = sendMessage tr | ||
186 | , setActive = \_ -> return () | ||
187 | } | ||
188 | return (xtr, ytr) | ||
189 | |||
190 | -- | | 160 | -- | |
191 | -- * f add x --> Nothing, consume x | 161 | -- * f add x --> Nothing, consume x |
192 | -- --> Just id, leave x to a different handler | 162 | -- --> Just id, leave x to a different handler |