From fa1b497fe08a85b79061ec663e78a869fcad3003 Mon Sep 17 00:00:00 2001 From: joe Date: Sun, 14 Jul 2013 02:12:52 -0400 Subject: experimental module SendMessage --- SendMessage.hs | 208 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 SendMessage.hs (limited to 'SendMessage.hs') diff --git a/SendMessage.hs b/SendMessage.hs new file mode 100644 index 00000000..c6eec01e --- /dev/null +++ b/SendMessage.hs @@ -0,0 +1,208 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE TypeFamilies #-} +module SendMessage where + +import Control.Monad +import Control.Monad.Fix +import Control.Monad.IO.Class +import Control.Monad.Trans.Class +import Control.Monad.Trans.Maybe +import Control.Concurrent.STM +import Control.Concurrent (forkIO) +import Control.Concurrent.Async (async,waitSTM) +import Control.Exception as E + ( bracketOnError + , finally + ) +import GHC.Conc + ( threadStatus + , ThreadStatus(..) + , ThreadId + ) +import Data.IORef +import qualified Data.Map as Map +import Data.Map as Map (Map) +import qualified Data.Set as Set +import Data.Set as Set (Set,(\\)) +import Data.XML.Types as XML (Event) +import System.IO + ( BufferMode(..) + , IOMode(..) + , hSetBuffering + ) +import Network.BSD + ( getProtocolNumber + ) +import Network.Socket + ( Family + , connect + , socketToHandle + , sClose + , Socket(..) + , socket + , SocketType(..) + ) + +import Todo +import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily) +import SocketLike +import ServerC (packetSink) +import ControlMaybe (handleIO,handleIO_) +import Data.Conduit (Sink,Source) +import qualified Data.ByteString as S (ByteString) +import XMLToByteStrings + +type ByteStringSink = Data.Conduit.Sink S.ByteString IO () + +type OutgoingToPeer sock cache msg = + sock + -> cache + -> TChan msg + -> ByteStringSink + -> IO (Maybe msg) + +class CacheableCommand cmd where + type CacheRef cmd :: * + type Cache cmd :: * + + emptyCache :: CacheRef cmd + + cache :: cmd -> CacheRef cmd -> IO () + + interpretCommands :: SocketLike sock => + sock -> Cache cmd -> TChan cmd -> ByteStringSink -> IO (Maybe cmd) + + +sendMessage :: + (cache, msg -> IORef cache -> IO (), OutBoundXML RestrictedSocket cache msg) -> TVar (Map Peer (TChan msg, ThreadId)) -> msg -> Peer -> IO () +sendMessage (newCache,cacheCmd,toPeer) cons msg peer0 = do + let peer = discardPort peer0 + found <- atomically $ do + consmap <- readTVar cons + return (Map.lookup peer consmap) + let newEntry = do + chan <- atomically newTChan + t <- forkIO $ connect_to_server chan peer (newCache,cacheCmd,toPeer) + -- L.putStrLn $ "remote-map new: " <++> showPeer peer + return (True,(chan,t)) + (is_new,entry) <- maybe newEntry + ( \(chan,t) -> do + st <- threadStatus t + let running = do + -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer + return (False,(chan,t)) + died = do + -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer + newEntry + case st of + ThreadRunning -> running + ThreadBlocked _ -> running + ThreadDied -> died + ThreadFinished -> died + ) + found + -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg + atomically $ writeTChan (fst entry) msg + when is_new . atomically $ + readTVar cons >>= writeTVar cons . Map.insert peer entry + + +connect_to_server chan peer (newCache,cacheCmd,toPeer) = (>> return ()) . runMaybeT $ do + let port = 5269 :: Int + -- We'll cache Presence notifications until the socket + -- is ready. + cached <- liftIO $ newIORef newCache + + fix $ \sendmsgs -> do + connected <- liftIO . async $ connect' (peerAddr peer) port + + sock <- MaybeT . fix $ \loop -> do + e <- atomically $ orElse + (fmap Right $ waitSTM connected) + (fmap Left $ readTChan chan) + case e of + Left cmd -> cacheCmd cmd cached >> loop + Right sock -> return sock + + retry <- do + (cache,snk) <- liftIO $ do + h <- socketToHandle sock ReadWriteMode + hSetBuffering h NoBuffering + cache <- readIORef $ cached + -- hint garbage collector: we're done with this... + writeIORef cached newCache + return (cache,packetSink h) + MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk + + liftIO $ cacheCmd retry cached + -- liftIO $ putStrLn $ "retrying " ++ show retry + sendmsgs + +connect' :: SockAddr -> Int -> IO (Maybe Socket) +connect' addr port = do + proto <- getProtocolNumber "tcp" + {- + -- Given (host :: HostName) ... + let hints = defaultHints { addrFlags = [AI_ADDRCONFIG] + , addrProtocol = proto + , addrSocketType = Stream } + addrs <- getAddrInfo (Just hints) (Just host) (Just serv) + firstSuccessful $ map tryToConnect addrs + -} + let getport (SockAddrInet port _) = port + getport (SockAddrInet6 port _ _ _) = port + let doException e = do + -- L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e + return Nothing + handleIO doException + $ tryToConnect proto (addr `withPort` port) + where + tryToConnect proto addr = + bracketOnError + (socket (socketFamily addr) Stream proto) + (sClose ) -- only done if there's an error + (\sock -> do + connect sock addr + return (Just sock) -- socketToHandle sock ReadWriteMode + ) + + +{- +mmInsert val key mm = Map.alter f key mm + where + f Nothing = Just $ Set.singleton val + f (Just set) = Just $ Set.insert val set +-} + +-- newCache = todo +cacheCmd :: msg -> cache -> IO () +cacheCmd _ cached = todo +-- toPeer = todo + +type OutBoundXML sock cache msg = + sock + -> cache + -> TChan msg + -> (Maybe msg -> IO ()) + -> Source IO [XML.Event] + +handleOutgoingToPeer + :: SocketLike sock => + OutBoundXML sock cache msg + -> sock + -> cache + -> TChan msg + -> ByteStringSink + -> IO (Maybe msg) +handleOutgoingToPeer toPeer sock cache chan snk = do + p <- getPeerName sock + -- L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p) + failed <- newIORef Nothing + let failure cmd = do + writeIORef failed cmd + -- putStrLn $ "Failed: " ++ show cmd + finally ( + handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk + ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p) + readIORef failed + -- cgit v1.2.3