{-# LANGUAGE CPP #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} module SendMessage ( sendMessage , CommandCache(..) , ThreadChannelCommand(..) , newOutgoingConnections , OutgoingConnections ) where import Control.Monad import Control.Monad.Fix import Control.Monad.IO.Class import Control.Monad.Trans.Maybe import Control.Concurrent.STM import Control.Concurrent (forkIO) import Control.Concurrent.Async (async,waitSTM) import Control.Exception as E ( bracketOnError , finally ) import GHC.Conc ( threadStatus , ThreadStatus(..) , ThreadId ) import Data.IORef import qualified Data.Map as Map import Data.Map as Map (Map) import Data.XML.Types as XML (Event) import System.IO ( BufferMode(..) , IOMode(..) , hSetBuffering ) import Network.BSD ( getProtocolNumber ) import Network.Socket ( connect , socketToHandle , sClose , Socket(..) , socket , SocketType(..) ) import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily,showPeer) import SocketLike import ServerC (packetSink) import ControlMaybe import Data.Conduit (Sink,Source) import qualified Data.ByteString as S (ByteString) import XMLToByteStrings import Logging import ByteStringOperators type ByteStringSink = Sink S.ByteString IO () -- |Strict version of 'modifyIORef' modifyIORef' :: IORef a -> (a -> a) -> IO () modifyIORef' ref f = do x <- readIORef ref let x' = f x x' `seq` writeIORef ref x' class CommandCache cache where type CacheableCommand cache emptyCache :: cache updateCache :: CacheableCommand cache -> cache -> cache class ThreadChannelCommand cmd where isQuitCommand :: cmd -> Bool data OutgoingConnections cache = OutgoingConnections (TVar (Map Peer (TChan (CacheableCommand cache), ThreadId))) (OutBoundXML RestrictedSocket cache (CacheableCommand cache)) newOutgoingConnections :: ( RestrictedSocket -> cache -> TChan (CacheableCommand cache) -> (Maybe (CacheableCommand cache) -> IO ()) -> Source IO [XML.Event] ) -> STM (OutgoingConnections cache) newOutgoingConnections interpretCommands = do remotes <- newTVar (Map.empty) return (OutgoingConnections remotes interpretCommands) sendMessage :: (Show (CacheableCommand a), CommandCache a, ThreadChannelCommand (CacheableCommand a)) => OutgoingConnections a -> CacheableCommand a -> Peer -> IO () sendMessage (OutgoingConnections cons interpretCommands) msg peer0 = do let peer = discardPort peer0 found <- atomically $ do consmap <- readTVar cons return (Map.lookup peer consmap) let newEntry = do chan <- atomically newTChan t <- forkIO $ connect_to_server chan peer interpretCommands -- (newCache,updateCache,toPeer) -- L.putStrLn $ "remote-map new: " <++> showPeer peer return (True,(chan,t)) (is_new,entry) <- maybe newEntry ( \(chan,t) -> do st <- threadStatus t let running = do -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer return (False,(chan,t)) died = do -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer newEntry case st of ThreadRunning -> running ThreadBlocked _ -> running ThreadDied -> died ThreadFinished -> died ) found -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg atomically $ writeTChan (fst entry) msg when is_new . atomically $ readTVar cons >>= writeTVar cons . Map.insert peer entry connect_to_server chan peer toPeer = (>> return ()) . runMaybeT $ do let port = 5269 :: Int -- We'll cache Presence notifications until the socket -- is ready. cached <- liftIO $ newIORef emptyCache let cacheCmd msg cached = modifyIORef' cached (updateCache msg) fix $ \sendmsgs -> do connected <- liftIO . async $ connect' (peerAddr peer) port msock <- MaybeT . fix $ \loop -> do e <- atomically $ orElse (fmap Right $ waitSTM connected) (fmap Left $ readTChan chan) case e of Left cmd | isQuitCommand cmd -> return Nothing Left cmd -> cacheCmd cmd cached >> loop Right sock -> return (Just sock) withJust msock $ \sock -> do retry <- do (cache,snk) <- liftIO $ do h <- socketToHandle sock ReadWriteMode hSetBuffering h NoBuffering cache <- readIORef $ cached -- hint garbage collector: we're done with this... writeIORef cached emptyCache return (cache,packetSink h) MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk liftIO $ cacheCmd retry cached -- liftIO $ putStrLn $ "retrying " ++ show retry sendmsgs connect' :: SockAddr -> Int -> IO (Maybe Socket) connect' addr port = do proto <- getProtocolNumber "tcp" let getport (SockAddrInet port _) = port getport (SockAddrInet6 port _ _ _) = port let doException e = do -- L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e return Nothing handleIO doException $ tryToConnect proto (addr `withPort` port) where tryToConnect proto addr = bracketOnError (socket (socketFamily addr) Stream proto) (sClose ) -- only done if there's an error (\sock -> do connect sock addr return (Just sock) -- socketToHandle sock ReadWriteMode ) type OutBoundXML sock cache msg = sock -> cache -> TChan msg -> (Maybe msg -> IO ()) -> Source IO [XML.Event] handleOutgoingToPeer :: (SocketLike sock, Show msg) => OutBoundXML sock cache msg -> sock -> cache -> TChan msg -> ByteStringSink -> IO (Maybe msg) handleOutgoingToPeer toPeer sock cache chan snk = do p <- getPeerName sock debugL $ "(>P) connected " <++> showPeer (RemotePeer p) failed <- newIORef Nothing let failure cmd = do writeIORef failed cmd debugStr $ "Failed: " ++ show cmd finally ( handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p) readIORef failed