diff options
author | joe <joe@jerkface.net> | 2013-06-20 18:50:18 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2013-06-20 18:50:18 -0400 |
commit | fa6a523704984bd98762a4e639b739e73320068f (patch) | |
tree | ca7d30d7b37e2c97368fa2ff14e2c5116b03c4ca | |
parent | 2e72fc27f26fc75cd236701b220f4e2bfaf686c1 (diff) |
Work toward sending outgoing messages to remote peers
-rw-r--r-- | Presence/ConfigFiles.hs | 10 | ||||
-rw-r--r-- | Presence/XMPPServer.hs | 78 | ||||
-rw-r--r-- | Presence/main.hs | 14 |
3 files changed, 95 insertions, 7 deletions
diff --git a/Presence/ConfigFiles.hs b/Presence/ConfigFiles.hs index ee0d5b85..f0e18f70 100644 --- a/Presence/ConfigFiles.hs +++ b/Presence/ConfigFiles.hs | |||
@@ -54,10 +54,12 @@ addSubscriber :: User -> ByteString -> IO () | |||
54 | addSubscriber user subscriber = | 54 | addSubscriber user subscriber = |
55 | subscriberPath user >>= addItem subscriber "<? subscribers ?>" | 55 | subscriberPath user >>= addItem subscriber "<? subscribers ?>" |
56 | 56 | ||
57 | getConfigList path = withFile path ReadMode $ | 57 | getConfigList path = |
58 | L.hGetContents | 58 | handle (\e -> if isDoesNotExistError e then (return []) else throw e) |
59 | >=> return . Prelude.tail . L.lines | 59 | $ withFile path ReadMode $ |
60 | >=> (\a -> seq (rnf a) (return a)) | 60 | L.hGetContents |
61 | >=> return . Prelude.tail . L.lines | ||
62 | >=> (\a -> seq (rnf a) (return a)) | ||
61 | 63 | ||
62 | getBuddies :: User -> IO [ByteString] | 64 | getBuddies :: User -> IO [ByteString] |
63 | getBuddies user = buddyPath user >>= getConfigList | 65 | getBuddies user = buddyPath user >>= getConfigList |
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs index 7e42c7ae..062fcacb 100644 --- a/Presence/XMPPServer.hs +++ b/Presence/XMPPServer.hs | |||
@@ -6,6 +6,7 @@ | |||
6 | -- {-# LANGUAGE GADTs #-} | 6 | -- {-# LANGUAGE GADTs #-} |
7 | module XMPPServer where -- ( listenForXmppClients ) where | 7 | module XMPPServer where -- ( listenForXmppClients ) where |
8 | 8 | ||
9 | import Todo | ||
9 | import Data.HList.TypeEqGeneric1() | 10 | import Data.HList.TypeEqGeneric1() |
10 | import Data.HList.TypeCastGeneric1() | 11 | import Data.HList.TypeCastGeneric1() |
11 | import ByteStringOperators | 12 | import ByteStringOperators |
@@ -14,6 +15,7 @@ import Server | |||
14 | import Data.ByteString.Lazy.Char8 as L | 15 | import Data.ByteString.Lazy.Char8 as L |
15 | ( hPutStrLn | 16 | ( hPutStrLn |
16 | , unlines | 17 | , unlines |
18 | , splitWith | ||
17 | , ByteString | 19 | , ByteString |
18 | , pack | 20 | , pack |
19 | , unpack ) | 21 | , unpack ) |
@@ -47,6 +49,8 @@ import Control.Exception | |||
47 | import Text.Show.ByteString as L | 49 | import Text.Show.ByteString as L |
48 | import Data.Binary.Builder as B | 50 | import Data.Binary.Builder as B |
49 | import Data.Binary.Put | 51 | import Data.Binary.Put |
52 | import qualified Data.Map as Map | ||
53 | import GHC.Conc | ||
50 | 54 | ||
51 | -- | Jabber ID (JID) datatype | 55 | -- | Jabber ID (JID) datatype |
52 | data JID = JID { name :: Maybe ByteString | 56 | data JID = JID { name :: Maybe ByteString |
@@ -98,6 +102,9 @@ class XMPPSession session where | |||
98 | closeSession :: session -> IO () | 102 | closeSession :: session -> IO () |
99 | subscribe :: session -> Maybe JID -> IO (TChan Presence) | 103 | subscribe :: session -> Maybe JID -> IO (TChan Presence) |
100 | 104 | ||
105 | class XMPPConfig config where | ||
106 | getBuddies :: config -> ByteString -> IO [ByteString] | ||
107 | getSubscribers :: config -> ByteString -> IO [ByteString] | ||
101 | 108 | ||
102 | greet host = L.unlines | 109 | greet host = L.unlines |
103 | [ "<?xml version='1.0'?>" | 110 | [ "<?xml version='1.0'?>" |
@@ -353,5 +360,72 @@ listenForRemotePeers session_factory port st = do | |||
353 | dopkt | 360 | dopkt |
354 | start | 361 | start |
355 | 362 | ||
356 | seekRemotePeers session_factory st = do | 363 | newServerConnections = atomically $ newTVar Map.empty |
357 | return () | 364 | {- |
365 | sendMessage cons msg peer = do | ||
366 | (is_new,entry) <- atomically $ do | ||
367 | consmap <- readTVar cons | ||
368 | let found = Map.lookup peer consmap | ||
369 | newEntry = () | ||
370 | entry = maybe newEntry id found | ||
371 | is_new = isNothing found | ||
372 | when is_new | ||
373 | $ writeTVar cons (Map.insert peer entry consmap) | ||
374 | return (is_new,entry) | ||
375 | L.putStrLn $ "sendMessage ->"<++>peer<++>": "<++>bshow msg | ||
376 | when is_new $ connect_to_server entry peer | ||
377 | |||
378 | -} | ||
379 | |||
380 | sendMessage cons msg peer = do | ||
381 | found <- atomically $ do | ||
382 | consmap <- readTVar cons | ||
383 | return (Map.lookup peer consmap) | ||
384 | let newEntry = do | ||
385 | chan <- atomically newTChan | ||
386 | t <- forkIO $ connect_to_server chan peer | ||
387 | return (chan,t) | ||
388 | entry <- maybe newEntry | ||
389 | ( \(chan,t) -> do | ||
390 | st <- threadStatus t | ||
391 | case st of | ||
392 | ThreadRunning -> return (chan,t) | ||
393 | _ -> newEntry | ||
394 | ) | ||
395 | found | ||
396 | L.putStrLn $ "sendMessage ->"<++>peer<++>": "<++>bshow msg | ||
397 | |||
398 | connect_to_server chan peer = return () | ||
399 | |||
400 | parseJID :: ByteString -> JID | ||
401 | parseJID bjid = | ||
402 | let xs = L.splitWith (=='@') bjid | ||
403 | ys = L.splitWith (=='/') (last xs) | ||
404 | (name,server) | ||
405 | = case xs of | ||
406 | (n:s:_) -> (Just n,s) | ||
407 | (s:_) -> (Nothing,s) | ||
408 | rsrc = case ys of | ||
409 | (s:_:_) -> Just $ last ys | ||
410 | _ -> Nothing | ||
411 | in JID name server rsrc | ||
412 | |||
413 | seekRemotePeers :: XMPPConfig config => | ||
414 | (ByteString -> Bool) -> config -> TChan Presence -> IO b0 | ||
415 | seekRemotePeers is_peer config chan = do | ||
416 | server_connections <- newServerConnections | ||
417 | fix $ \loop -> do | ||
418 | event <- atomically $ readTChan chan | ||
419 | case event of | ||
420 | p@(Presence jid stat) -> do | ||
421 | L.putStrLn $ "seekRemotePeers: " <++> L.show jid <++> " " <++> bshow stat | ||
422 | runMaybeT $ do | ||
423 | u <- MaybeT . return $ name jid | ||
424 | subscribers <- liftIO $ getSubscribers config u | ||
425 | liftIO . L.putStrLn $ "subscribers: " <++> bshow subscribers | ||
426 | forM_ subscribers $ \bjid -> do | ||
427 | let jid = parseJID bjid | ||
428 | peer = server jid | ||
429 | when (is_peer peer) $ | ||
430 | liftIO $ sendMessage server_connections p peer | ||
431 | loop | ||
diff --git a/Presence/main.hs b/Presence/main.hs index e416d7cc..b0721292 100644 --- a/Presence/main.hs +++ b/Presence/main.hs | |||
@@ -3,6 +3,7 @@ | |||
3 | {-# LANGUAGE TypeFamilies #-} | 3 | {-# LANGUAGE TypeFamilies #-} |
4 | module Main where | 4 | module Main where |
5 | 5 | ||
6 | import Debug.Trace | ||
6 | import System.Directory | 7 | import System.Directory |
7 | import Control.Monad | 8 | import Control.Monad |
8 | import System.Posix.Signals | 9 | import System.Posix.Signals |
@@ -10,6 +11,7 @@ import System.Posix.Types | |||
10 | import System.Posix.Process | 11 | import System.Posix.Process |
11 | import Data.Maybe | 12 | import Data.Maybe |
12 | import Data.Char | 13 | import Data.Char |
14 | import ConfigFiles | ||
13 | 15 | ||
14 | import System.INotify | 16 | import System.INotify |
15 | #ifndef NOUTMP | 17 | #ifndef NOUTMP |
@@ -36,7 +38,7 @@ import qualified Data.Map as Map | |||
36 | import Data.Map as Map (Map) | 38 | import Data.Map as Map (Map) |
37 | 39 | ||
38 | import Control.Concurrent.STM | 40 | import Control.Concurrent.STM |
39 | import Control.Concurrent (threadDelay) | 41 | import Control.Concurrent |
40 | import Control.Monad.Trans.Maybe | 42 | import Control.Monad.Trans.Maybe |
41 | import Control.Monad.IO.Class | 43 | import Control.Monad.IO.Class |
42 | 44 | ||
@@ -187,12 +189,21 @@ on_chvt state vtnum = do | |||
187 | return (us,fmap snd subs,fmap snd greedy) | 189 | return (us,fmap snd subs,fmap snd greedy) |
188 | update_presence greedy subs users $ matchResource tty | 190 | update_presence greedy subs users $ matchResource tty |
189 | 191 | ||
192 | data UnixConfig = UnixConfig | ||
193 | |||
194 | instance XMPPConfig UnixConfig where | ||
195 | getBuddies _ user = ConfigFiles.getBuddies user | ||
196 | getSubscribers _ user = ConfigFiles.getSubscribers user | ||
190 | 197 | ||
191 | start :: ByteString -> IO () | 198 | start :: ByteString -> IO () |
192 | start host = do | 199 | start host = do |
193 | tracked <- newPresenceState host | 200 | tracked <- newPresenceState host |
194 | let dologin e = track_login host tracked e | 201 | let dologin e = track_login host tracked e |
195 | dologin :: t -> IO () | 202 | dologin :: t -> IO () |
203 | |||
204 | chan <- atomically $ subscribeToChan (greedySubscriber tracked) | ||
205 | remotes <- forkIO $ seekRemotePeers (/=host) UnixConfig chan | ||
206 | |||
196 | installHandler sigUSR1 (Catch (dologin (userError "signaled"))) Nothing | 207 | installHandler sigUSR1 (Catch (dologin (userError "signaled"))) Nothing |
197 | -- installHandler sigTERM (CatchOnce (dologin (userError "term signaled"))) Nothing | 208 | -- installHandler sigTERM (CatchOnce (dologin (userError "term signaled"))) Nothing |
198 | mtty <- monitorTTY (on_chvt tracked) | 209 | mtty <- monitorTTY (on_chvt tracked) |
@@ -211,6 +222,7 @@ start host = do | |||
211 | dologin () | 222 | dologin () |
212 | putStrLn "\nHit enter to terminate...\n" | 223 | putStrLn "\nHit enter to terminate...\n" |
213 | getLine | 224 | getLine |
225 | killThread remotes | ||
214 | sClose sockLocals | 226 | sClose sockLocals |
215 | sClose sockRemotes | 227 | sClose sockRemotes |
216 | -- threadDelay 1000 | 228 | -- threadDelay 1000 |