{-# LANGUAGE CPP #-} {-# LANGUAGE TypeFamilies #-} module SendMessage where import Control.Monad import Control.Monad.Fix import Control.Monad.IO.Class import Control.Monad.Trans.Class import Control.Monad.Trans.Maybe import Control.Concurrent.STM import Control.Concurrent (forkIO) import Control.Concurrent.Async (async,waitSTM) import Control.Exception as E ( bracketOnError , finally ) import GHC.Conc ( threadStatus , ThreadStatus(..) , ThreadId ) import Data.IORef import qualified Data.Map as Map import Data.Map as Map (Map) import qualified Data.Set as Set import Data.Set as Set (Set,(\\)) import Data.XML.Types as XML (Event) import System.IO ( BufferMode(..) , IOMode(..) , hSetBuffering ) import Network.BSD ( getProtocolNumber ) import Network.Socket ( Family , connect , socketToHandle , sClose , Socket(..) , socket , SocketType(..) ) import Todo import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily) import SocketLike import ServerC (packetSink) import ControlMaybe (handleIO,handleIO_) import Data.Conduit (Sink,Source) import qualified Data.ByteString as S (ByteString) import XMLToByteStrings type ByteStringSink = Data.Conduit.Sink S.ByteString IO () type OutgoingToPeer sock cache msg = sock -> cache -> TChan msg -> ByteStringSink -> IO (Maybe msg) class CacheableCommand cmd where type CacheRef cmd :: * type Cache cmd :: * emptyCache :: CacheRef cmd cache :: cmd -> CacheRef cmd -> IO () interpretCommands :: SocketLike sock => sock -> Cache cmd -> TChan cmd -> ByteStringSink -> IO (Maybe cmd) sendMessage :: (cache, msg -> IORef cache -> IO (), OutBoundXML RestrictedSocket cache msg) -> TVar (Map Peer (TChan msg, ThreadId)) -> msg -> Peer -> IO () sendMessage (newCache,cacheCmd,toPeer) cons msg peer0 = do let peer = discardPort peer0 found <- atomically $ do consmap <- readTVar cons return (Map.lookup peer consmap) let newEntry = do chan <- atomically newTChan t <- forkIO $ connect_to_server chan peer (newCache,cacheCmd,toPeer) -- L.putStrLn $ "remote-map new: " <++> showPeer peer return (True,(chan,t)) (is_new,entry) <- maybe newEntry ( \(chan,t) -> do st <- threadStatus t let running = do -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer return (False,(chan,t)) died = do -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer newEntry case st of ThreadRunning -> running ThreadBlocked _ -> running ThreadDied -> died ThreadFinished -> died ) found -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg atomically $ writeTChan (fst entry) msg when is_new . atomically $ readTVar cons >>= writeTVar cons . Map.insert peer entry connect_to_server chan peer (newCache,cacheCmd,toPeer) = (>> return ()) . runMaybeT $ do let port = 5269 :: Int -- We'll cache Presence notifications until the socket -- is ready. cached <- liftIO $ newIORef newCache fix $ \sendmsgs -> do connected <- liftIO . async $ connect' (peerAddr peer) port sock <- MaybeT . fix $ \loop -> do e <- atomically $ orElse (fmap Right $ waitSTM connected) (fmap Left $ readTChan chan) case e of Left cmd -> cacheCmd cmd cached >> loop Right sock -> return sock retry <- do (cache,snk) <- liftIO $ do h <- socketToHandle sock ReadWriteMode hSetBuffering h NoBuffering cache <- readIORef $ cached -- hint garbage collector: we're done with this... writeIORef cached newCache return (cache,packetSink h) MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk liftIO $ cacheCmd retry cached -- liftIO $ putStrLn $ "retrying " ++ show retry sendmsgs connect' :: SockAddr -> Int -> IO (Maybe Socket) connect' addr port = do proto <- getProtocolNumber "tcp" {- -- Given (host :: HostName) ... let hints = defaultHints { addrFlags = [AI_ADDRCONFIG] , addrProtocol = proto , addrSocketType = Stream } addrs <- getAddrInfo (Just hints) (Just host) (Just serv) firstSuccessful $ map tryToConnect addrs -} let getport (SockAddrInet port _) = port getport (SockAddrInet6 port _ _ _) = port let doException e = do -- L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e return Nothing handleIO doException $ tryToConnect proto (addr `withPort` port) where tryToConnect proto addr = bracketOnError (socket (socketFamily addr) Stream proto) (sClose ) -- only done if there's an error (\sock -> do connect sock addr return (Just sock) -- socketToHandle sock ReadWriteMode ) {- mmInsert val key mm = Map.alter f key mm where f Nothing = Just $ Set.singleton val f (Just set) = Just $ Set.insert val set -} -- newCache = todo cacheCmd :: msg -> cache -> IO () cacheCmd _ cached = todo -- toPeer = todo type OutBoundXML sock cache msg = sock -> cache -> TChan msg -> (Maybe msg -> IO ()) -> Source IO [XML.Event] handleOutgoingToPeer :: SocketLike sock => OutBoundXML sock cache msg -> sock -> cache -> TChan msg -> ByteStringSink -> IO (Maybe msg) handleOutgoingToPeer toPeer sock cache chan snk = do p <- getPeerName sock -- L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p) failed <- newIORef Nothing let failure cmd = do writeIORef failed cmd -- putStrLn $ "Failed: " ++ show cmd finally ( handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p) readIORef failed