summaryrefslogtreecommitdiff
path: root/SendMessage.hs
blob: ff779e118d4f0b1621ce9ba7d467663acc7732c2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
{-# LANGUAGE CPP #-}
{-# LANGUAGE TypeFamilies #-}
module SendMessage where

import Control.Monad
import Control.Monad.Fix
import Control.Monad.IO.Class
import Control.Monad.Trans.Class
import Control.Monad.Trans.Maybe
import Control.Concurrent.STM
import Control.Concurrent (forkIO)
import Control.Concurrent.Async (async,waitSTM)
import Control.Exception as E
    ( bracketOnError 
    , finally 
    )
import GHC.Conc 
    ( threadStatus
    , ThreadStatus(..)
    , ThreadId
    )
import Data.IORef
import qualified Data.Map as Map
import           Data.Map as Map (Map)
import qualified Data.Set as Set
import           Data.Set as Set (Set,(\\))
import Data.XML.Types as XML (Event)
import System.IO
    ( BufferMode(..)
    , IOMode(..)
    , hSetBuffering
    )
import Network.BSD 
    ( getProtocolNumber
    )
import Network.Socket 
    ( Family
    , connect
    , socketToHandle
    , sClose
    , Socket(..)
    , socket
    , SocketType(..)
    )

import Todo
import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily)
import SocketLike
import ServerC (packetSink)
import ControlMaybe (handleIO,handleIO_)
import Data.Conduit (Sink,Source)
import qualified Data.ByteString as S (ByteString)
import XMLToByteStrings

type ByteStringSink = Data.Conduit.Sink S.ByteString IO ()

type OutgoingToPeer sock cache msg =
         sock
         -> cache
         -> TChan msg
         -> ByteStringSink
         -> IO (Maybe msg)

-- |Strict version of 'modifyIORef'
modifyIORef' :: IORef a -> (a -> a) -> IO ()
modifyIORef' ref f = do
    x <- readIORef ref
    let x' = f x
    x' `seq` writeIORef ref x'


sendMessage  :: 
    (cache, msg -> cache -> cache, OutBoundXML RestrictedSocket cache msg) -> TVar (Map Peer (TChan msg, ThreadId)) -> msg -> Peer -> IO ()
sendMessage (newCache,updateCache,toPeer) cons msg peer0 = do
    let peer = discardPort peer0
    found <- atomically $ do
        consmap <- readTVar cons
        return (Map.lookup peer consmap)
    let newEntry = do
          chan <- atomically newTChan
          t <- forkIO $ connect_to_server chan peer (newCache,updateCache,toPeer)
          -- L.putStrLn $ "remote-map new: " <++> showPeer peer
          return (True,(chan,t))
    (is_new,entry) <- maybe newEntry
          ( \(chan,t) -> do 
                        st <- threadStatus t
                        let running = do
                                -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer
                                return (False,(chan,t))
                            died = do
                                -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer
                                newEntry 
                        case st of 
                          ThreadRunning   -> running
                          ThreadBlocked _ -> running
                          ThreadDied      -> died
                          ThreadFinished  -> died
                        )
          found
    -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg
    atomically $ writeTChan (fst entry) msg
    when is_new . atomically $
        readTVar cons >>= writeTVar cons . Map.insert peer entry


connect_to_server chan peer (newCache,updateCache,toPeer) = (>> return ()) . runMaybeT $ do
    let port = 5269 :: Int
    -- We'll cache Presence notifications until the socket
    -- is ready.
    cached <- liftIO $ newIORef newCache

    let cacheCmd msg cached = modifyIORef' cached (updateCache msg)

    fix $ \sendmsgs -> do
        connected <- liftIO . async $ connect' (peerAddr peer) port

        sock <- MaybeT . fix $ \loop -> do
            e <- atomically $ orElse
                    (fmap Right $ waitSTM connected)
                    (fmap Left  $ readTChan chan)
            case e of
                Left cmd   -> cacheCmd cmd cached >> loop
                Right sock -> return sock

        retry <- do
            (cache,snk) <- liftIO $ do
                h <- socketToHandle sock ReadWriteMode
                hSetBuffering h NoBuffering
                cache <- readIORef $ cached
                -- hint garbage collector: we're done with this...
                writeIORef cached newCache
                return (cache,packetSink h)
            MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk

        liftIO $ cacheCmd retry cached
        -- liftIO $ putStrLn $ "retrying " ++ show retry
        sendmsgs

connect' :: SockAddr -> Int -> IO (Maybe Socket)
connect' addr port = do
    proto <- getProtocolNumber "tcp"
    {-
    -- Given (host :: HostName) ...
    let hints = defaultHints { addrFlags = [AI_ADDRCONFIG]
                             , addrProtocol = proto
                             , addrSocketType = Stream }
    addrs <- getAddrInfo (Just hints) (Just host) (Just serv)
    firstSuccessful $ map tryToConnect addrs
    -}
    let getport (SockAddrInet port _) = port
        getport (SockAddrInet6 port _ _ _) = port
    let doException e = do
            -- L.putStrLn $  "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e
            return Nothing
    handleIO doException
         $ tryToConnect proto (addr `withPort` port)
  where
  tryToConnect proto addr =
    bracketOnError
        (socket (socketFamily addr) Stream proto)
        (sClose )  -- only done if there's an error
        (\sock -> do
          connect sock addr
          return (Just sock) -- socketToHandle sock ReadWriteMode
        )


{-
mmInsert val key mm = Map.alter f key mm
            where 
                f Nothing    = Just $ Set.singleton val
                f (Just set) = Just $ Set.insert val set
-}

-- newCache = todo
-- cacheCmd :: msg -> cache -> IO ()
-- cacheCmd _ cached = todo
-- toPeer = todo

type OutBoundXML sock cache msg =
         sock
         -> cache
         -> TChan msg
         -> (Maybe msg -> IO ())
         -> Source IO [XML.Event]

handleOutgoingToPeer
  :: SocketLike sock =>
     OutBoundXML sock cache msg
     -> sock
     -> cache 
     -> TChan msg
     -> ByteStringSink
     -> IO (Maybe msg)
handleOutgoingToPeer toPeer sock cache chan snk = do
    p <- getPeerName sock
    -- L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p)
    failed <- newIORef Nothing
    let failure cmd = do
            writeIORef failed cmd
            -- putStrLn $ "Failed: " ++ show cmd
    finally (
        handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk
        ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p)
    readIORef failed