summaryrefslogtreecommitdiff
path: root/Presence/SendMessage.hs
blob: d13cf1dedc18c94bc6f1b75661cbd0a57ad5780e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
{-# LANGUAGE CPP #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
module SendMessage
    ( sendMessage
    , CommandCache(..)
    , ThreadChannelCommand(..)
    , newOutgoingConnections
    , OutgoingConnections
    ) where

import Control.Monad
import Control.Monad.Fix
import Control.Monad.IO.Class
import Control.Monad.Trans.Maybe
import Control.Concurrent.STM
import Control.Concurrent (forkIO)
import Control.Concurrent.Async (async,waitSTM)
import Control.Exception as E
    ( bracketOnError 
    , finally 
    )
import GHC.Conc 
    ( threadStatus
    , ThreadStatus(..)
    , ThreadId
    )
import Data.IORef
import qualified Data.Map as Map
import           Data.Map as Map (Map)
import Data.XML.Types as XML (Event)
import System.IO
    ( BufferMode(..)
    , IOMode(..)
    , hSetBuffering
    )
import Network.BSD 
    ( getProtocolNumber
    )
import Network.Socket 
    ( connect
    , socketToHandle
    -- , sClose
    , Socket(..)
    , socket
    , SocketType(..)
    )

import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily,showPeer)
import SocketLike
import ServerC (packetSink)
import ControlMaybe 
import Data.Conduit (Sink,Source)
import qualified Data.ByteString as S (ByteString)
import qualified Data.ByteString.Lazy.Char8 as L
import XMLToByteStrings
import Logging
import ByteStringOperators

type ByteStringSink = Sink S.ByteString IO ()

#if MIN_VERSION_base(4,6,0)
#else
-- |Strict version of 'modifyIORef'
modifyIORef' :: IORef a -> (a -> a) -> IO ()
modifyIORef' ref f = do
    x <- readIORef ref
    let x' = f x
    x' `seq` writeIORef ref x'
#endif


class CommandCache cache where
    type CacheableCommand cache
    emptyCache :: cache
    updateCache :: CacheableCommand cache -> cache -> cache

class ThreadChannelCommand cmd where
    isQuitCommand :: cmd -> Bool


data OutgoingConnections cache = 
        OutgoingConnections (TVar (Map Peer (TChan (CacheableCommand cache), ThreadId)))
                            (OutBoundXML RestrictedSocket cache (CacheableCommand cache))                        


newOutgoingConnections ::
  ( RestrictedSocket
       -> cache
       -> TChan (CacheableCommand cache)
       -> (Maybe (CacheableCommand cache) -> IO ())
       -> Source IO [XML.Event] )

  -> STM (OutgoingConnections cache)

newOutgoingConnections interpretCommands = do
    remotes <- newTVar (Map.empty)
    return (OutgoingConnections remotes interpretCommands)
    


sendMessage
  :: (Show (CacheableCommand a), CommandCache a, ThreadChannelCommand (CacheableCommand a)) =>
     OutgoingConnections a -> CacheableCommand a -> Peer -> IO ()
sendMessage (OutgoingConnections cons interpretCommands) msg peer0 = do
    let peer = discardPort peer0
    found <- atomically $ do
        consmap <- readTVar cons
        return (Map.lookup peer consmap)
    let newEntry = do
          chan <- atomically newTChan
          t <- forkIO $ connect_to_server chan peer interpretCommands -- (newCache,updateCache,toPeer)
          -- L.putStrLn $ "remote-map new: " <++> showPeer peer
          return (True,(chan,t))
    (is_new,entry) <- maybe newEntry
          ( \(chan,t) -> do 
                        st <- threadStatus t
                        let running = do
                                -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer
                                return (False,(chan,t))
                            died = do
                                -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer
                                newEntry 
                        case st of 
                          ThreadRunning   -> running
                          ThreadBlocked _ -> running
                          ThreadDied      -> died
                          ThreadFinished  -> died
                        )
          found
    if is_new then L.putStrLn $ "sendMessage NEW ->"<++>showPeer peer<++>": "<++>bshow msg
              else L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg
    atomically $ writeTChan (fst entry) msg
    when is_new . atomically $
        readTVar cons >>= writeTVar cons . Map.insert peer entry


connect_to_server chan peer toPeer = (>> return ()) . runMaybeT $ do
    let port = 5269 :: Int
    -- We'll cache Presence notifications until the socket
    -- is ready.
    cached <- liftIO $ newIORef emptyCache

    let cacheCmd msg cached = modifyIORef' cached (updateCache msg)

    fix $ \sendmsgs -> do
        connected <- liftIO . async $ connect' (peerAddr peer) port

        msock <- MaybeT . fix $ \loop -> do
            e <- atomically $ orElse
                    (fmap Right $ waitSTM connected)
                    (fmap Left  $ readTChan chan)
            case e of
                Left cmd | isQuitCommand cmd -> return Nothing
                Left cmd   -> cacheCmd cmd cached >> loop
                Right sock -> return (Just sock)

        withJust msock $ \sock -> do
        retry <- do
            (cache,snk) <- liftIO $ do
                h <- socketToHandle sock ReadWriteMode
                hSetBuffering h NoBuffering
                cache <- readIORef $ cached
                -- hint garbage collector: we're done with this...
                writeIORef cached emptyCache 
                return (cache,packetSink h)
            MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk

        liftIO $ cacheCmd retry cached
        liftIO $ putStrLn $ "retrying " ++ show retry
        sendmsgs

connect' :: SockAddr -> Int -> IO (Maybe Socket)
connect' addr port = do
    proto <- getProtocolNumber "tcp"
    let getport (SockAddrInet port _) = port
        getport (SockAddrInet6 port _ _ _) = port
    let doException e = do
            -- L.putStrLn $  "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e
            return Nothing
    handleIO doException
         $ tryToConnect proto (addr `withPort` port)
  where
  tryToConnect proto addr =
    bracketOnError
        (socket (socketFamily addr) Stream proto)
        (sClose )  -- only done if there's an error
        (\sock -> do
          connect sock addr
          return (Just sock) -- socketToHandle sock ReadWriteMode
        )


type OutBoundXML sock cache msg =
         sock
         -> cache
         -> TChan msg
         -> (Maybe msg -> IO ())
         -> Source IO [XML.Event]

handleOutgoingToPeer
  :: (SocketLike sock, Show msg) =>
     OutBoundXML sock cache msg
     -> sock
     -> cache 
     -> TChan msg
     -> ByteStringSink
     -> IO (Maybe msg)
handleOutgoingToPeer toPeer sock cache chan snk = do
    p <- getPeerName sock
    debugL $ "(>P) connected " <++> showPeer (RemotePeer p)
    failed <- newIORef Nothing
    let failure cmd = do
            writeIORef failed cmd
            debugStr $ "Failed: " ++ show cmd
    finally (
        handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk
        ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p)
    readIORef failed