diff options
-rw-r--r-- | dht/Connection/Tcp.hs | 16 | ||||
-rw-r--r-- | dht/Presence/XMPPServer.hs | 22 | ||||
-rw-r--r-- | dht/src/Network/StreamServer.hs | 23 |
3 files changed, 32 insertions, 29 deletions
diff --git a/dht/Connection/Tcp.hs b/dht/Connection/Tcp.hs index 2572eba6..06290a1c 100644 --- a/dht/Connection/Tcp.hs +++ b/dht/Connection/Tcp.hs | |||
@@ -37,6 +37,7 @@ import Data.Map (Map) | |||
37 | import Data.Monoid ( (<>) ) | 37 | import Data.Monoid ( (<>) ) |
38 | import Control.Concurrent.ThreadUtil | 38 | import Control.Concurrent.ThreadUtil |
39 | 39 | ||
40 | import Control.Arrow | ||
40 | import Control.Concurrent.STM | 41 | import Control.Concurrent.STM |
41 | -- import Control.Concurrent.STM.TMVar | 42 | -- import Control.Concurrent.STM.TMVar |
42 | -- import Control.Concurrent.STM.TChan | 43 | -- import Control.Concurrent.STM.TChan |
@@ -76,6 +77,7 @@ import Network.StreamServer | |||
76 | import Network.SocketLike hiding (sClose) | 77 | import Network.SocketLike hiding (sClose) |
77 | import qualified Connection as G | 78 | import qualified Connection as G |
78 | ;import Connection (Manager (..), PeerAddress (..), Policy (..)) | 79 | ;import Connection (Manager (..), PeerAddress (..), Policy (..)) |
80 | import Network.Address (localhost4) | ||
79 | import DPut | 81 | import DPut |
80 | import DebugTag | 82 | import DebugTag |
81 | 83 | ||
@@ -96,7 +98,7 @@ data ConnectionParameters conkey u = | |||
96 | -- ^ The miliseconds of idle after 'RequiresPing' is signaled | 98 | -- ^ The miliseconds of idle after 'RequiresPing' is signaled |
97 | -- that are necessary for the connection to be considered | 99 | -- that are necessary for the connection to be considered |
98 | -- lost and signalling 'EOF'. | 100 | -- lost and signalling 'EOF'. |
99 | , makeConnKey :: RestrictedSocket -> IO (conkey,u) | 101 | , makeConnKey :: (RestrictedSocket,(Local SockAddr, Remote SockAddr)) -> IO (conkey,u) |
100 | -- ^ This action creates a lookup key for a new connection. If 'duplex' | 102 | -- ^ This action creates a lookup key for a new connection. If 'duplex' |
101 | -- is 'True' and the result is already assocatied with an established | 103 | -- is 'True' and the result is already assocatied with an established |
102 | -- connection, then an 'EOF' will be forced before the the new | 104 | -- connection, then an 'EOF' will be forced before the the new |
@@ -122,7 +124,7 @@ data ConnectionParameters conkey u = | |||
122 | -- * 'duplex' = True | 124 | -- * 'duplex' = True |
123 | -- | 125 | -- |
124 | connectionDefaults | 126 | connectionDefaults |
125 | :: (RestrictedSocket -> IO (conkey,u)) -> ConnectionParameters conkey u | 127 | :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> IO (conkey,u)) -> ConnectionParameters conkey u |
126 | connectionDefaults f = ConnectionParameters | 128 | connectionDefaults f = ConnectionParameters |
127 | { pingInterval = 28000 | 129 | { pingInterval = 28000 |
128 | , timeout = 2000 | 130 | , timeout = 2000 |
@@ -337,14 +339,15 @@ server allocate sessionConduits = do | |||
337 | handle (\e -> do -- let t = ioeGetErrorType e | 339 | handle (\e -> do -- let t = ioeGetErrorType e |
338 | when (isDoesNotExistError e) $ return () -- warn "GOTCHA" | 340 | when (isDoesNotExistError e) $ return () -- warn "GOTCHA" |
339 | -- warn $ "connect-error: " <> bshow e | 341 | -- warn $ "connect-error: " <> bshow e |
340 | (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? | 342 | (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) -- XXX: ? |
341 | Socket.close sock | 343 | Socket.close sock |
342 | atomically | 344 | atomically |
343 | $ writeTChan (serverEvent server) | 345 | $ writeTChan (serverEvent server) |
344 | $ ((conkey,u),ConnectFailure addr)) | 346 | $ ((conkey,u),ConnectFailure addr)) |
345 | $ do | 347 | $ do |
346 | connect sock addr | 348 | connect sock addr |
347 | (conkey,u) <- makeConnKey params (restrictSocket sock) | 349 | laddr <- Socket.getSocketName sock |
350 | (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr)) | ||
348 | h <- socketToHandle sock ReadWriteMode | 351 | h <- socketToHandle sock ReadWriteMode |
349 | newConnection server sessionConduits params conkey u h Out | 352 | newConnection server sessionConduits params conkey u h Out |
350 | return () | 353 | return () |
@@ -375,13 +378,14 @@ server allocate sessionConduits = do | |||
375 | sock <- socket (socketFamily addr) Stream proto | 378 | sock <- socket (socketFamily addr) Stream proto |
376 | handle (\(SomeException e) -> do | 379 | handle (\(SomeException e) -> do |
377 | -- Any thing else goes wrong and we broadcast ConnectFailure. | 380 | -- Any thing else goes wrong and we broadcast ConnectFailure. |
378 | do (conkey,u) <- makeConnKey params (restrictSocket sock) | 381 | do (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) |
379 | Socket.close sock | 382 | Socket.close sock |
380 | atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr) | 383 | atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr) |
381 | `onException` return () | 384 | `onException` return () |
382 | atomically $ readTVar retryVar) $ do | 385 | atomically $ readTVar retryVar) $ do |
383 | connect sock addr | 386 | connect sock addr |
384 | (conkey,u) <- makeConnKey params (restrictSocket sock) | 387 | laddr <- Socket.getSocketName sock |
388 | (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr)) | ||
385 | h <- socketToHandle sock ReadWriteMode | 389 | h <- socketToHandle sock ReadWriteMode |
386 | threads <- newConnection server sessionConduits params conkey u h Out | 390 | threads <- newConnection server sessionConduits params conkey u h Out |
387 | atomically $ do threadsWait threads | 391 | atomically $ do threadsWait threads |
diff --git a/dht/Presence/XMPPServer.hs b/dht/Presence/XMPPServer.hs index 272f6efe..3bafd33c 100644 --- a/dht/Presence/XMPPServer.hs +++ b/dht/Presence/XMPPServer.hs | |||
@@ -65,7 +65,7 @@ import Control.Monad.Trans (lift) | |||
65 | import Control.Monad.IO.Class (MonadIO, liftIO) | 65 | import Control.Monad.IO.Class (MonadIO, liftIO) |
66 | import Control.Monad.Fix (fix) | 66 | import Control.Monad.Fix (fix) |
67 | import Control.Monad | 67 | import Control.Monad |
68 | import Control.Concurrent.ThreadUtil (forkIO,myThreadId,forkLabeled,labelThread,ThreadId,MVar,putMVar,takeMVar,newMVar) | 68 | import Control.Concurrent.ThreadUtil (forkIO,myThreadId,forkLabeled,labelThread,ThreadId,MVar,putMVar,takeMVar,newMVar,threadDelay) |
69 | import Control.Concurrent.STM | 69 | import Control.Concurrent.STM |
70 | import Data.List hiding ((\\)) | 70 | import Data.List hiding ((\\)) |
71 | -- import Control.Concurrent.STM.TChan | 71 | -- import Control.Concurrent.STM.TChan |
@@ -107,6 +107,7 @@ import Stanza.Parse | |||
107 | import Stanza.Types | 107 | import Stanza.Types |
108 | import MUC | 108 | import MUC |
109 | import Chat | 109 | import Chat |
110 | import Network.StreamServer (Local(..), Remote(..)) | ||
110 | 111 | ||
111 | -- peerport :: PortNumber | 112 | -- peerport :: PortNumber |
112 | -- peerport = 5269 | 113 | -- peerport = 5269 |
@@ -117,9 +118,6 @@ my_uuid :: Text | |||
117 | my_uuid = "154ae29f-98f2-4af4-826d-a40c8a188574" | 118 | my_uuid = "154ae29f-98f2-4af4-826d-a40c8a188574" |
118 | 119 | ||
119 | 120 | ||
120 | newtype Local a = Local a deriving (Eq,Ord,Show) | ||
121 | newtype Remote a = Remote a deriving (Eq,Ord,Show) | ||
122 | |||
123 | data XMPPServerParameters = | 121 | data XMPPServerParameters = |
124 | XMPPServerParameters | 122 | XMPPServerParameters |
125 | { -- | Called when a client requests a resource id. The first Maybe indicates | 123 | { -- | Called when a client requests a resource id. The first Maybe indicates |
@@ -1090,13 +1088,15 @@ data PeerState | |||
1090 | | PeerConnected (TChan Stanza) | 1088 | | PeerConnected (TChan Stanza) |
1091 | -} | 1089 | -} |
1092 | 1090 | ||
1093 | peerKey :: SocketLike sock => Maybe SockAddr -> sock -> IO (PeerAddress,ConnectionData) | 1091 | peerKey :: SocketLike sock => Maybe SockAddr -> (sock, (Local SockAddr, Remote SockAddr)) -> IO (PeerAddress,ConnectionData) |
1094 | peerKey bind_addr sock = do | 1092 | peerKey bind_addr (sock,(laddr,Remote raddr)) = do |
1093 | {- | ||
1095 | laddr <- getSocketName sock | 1094 | laddr <- getSocketName sock |
1096 | raddr <- | 1095 | raddr <- |
1097 | isValidSocket sock >>= \(sock,c) -> | 1096 | isValidSocket sock >>= \(sock,c) -> |
1098 | if c then getPeerName sock -- addr is normally socketName | 1097 | if c then getPeerName sock -- addr is normally socketName |
1099 | else return laddr -- Weird hack: addr is would-be peer name | 1098 | else return laddr -- Weird hack: addr is would-be peer name |
1099 | -} | ||
1100 | -- Assume remote peers are listening on the same port that we do. | 1100 | -- Assume remote peers are listening on the same port that we do. |
1101 | let peerport = fromIntegral $ fromMaybe 5269 $ do | 1101 | let peerport = fromIntegral $ fromMaybe 5269 $ do |
1102 | p <- bind_addr >>= sockAddrPort | 1102 | p <- bind_addr >>= sockAddrPort |
@@ -1106,15 +1106,15 @@ peerKey bind_addr sock = do | |||
1106 | rname <- atomically $ newTVar Nothing | 1106 | rname <- atomically $ newTVar Nothing |
1107 | -- dput XMan $ "peerKey " ++ show (PeerAddress $ raddr `withPort` peerport,laddr) | 1107 | -- dput XMan $ "peerKey " ++ show (PeerAddress $ raddr `withPort` peerport,laddr) |
1108 | return $ ( PeerAddress $ raddr `withPort` peerport | 1108 | return $ ( PeerAddress $ raddr `withPort` peerport |
1109 | , ConnectionData { cdAddr = Left (Local laddr) | 1109 | , ConnectionData { cdAddr = Left laddr |
1110 | , cdType = XMPP | 1110 | , cdType = XMPP |
1111 | , cdProfile = "." | 1111 | , cdProfile = "." |
1112 | , cdRemoteName = rname } ) | 1112 | , cdRemoteName = rname } ) |
1113 | 1113 | ||
1114 | clientKey :: SocketLike sock => sock -> IO (PeerAddress,ConnectionData) | 1114 | clientKey :: SocketLike sock => (sock, (Local SockAddr,Remote SockAddr)) -> IO (PeerAddress,ConnectionData) |
1115 | clientKey sock = do | 1115 | clientKey (sock,(laddr,Remote raddr)) = do |
1116 | laddr <- getSocketName sock -- [::1]:5222 bind address, same for all clients | 1116 | -- laddr <- getSocketName sock -- [::1]:5222 bind address, same for all clients |
1117 | raddr <- getPeerName sock -- [::1]:????? unique key | 1117 | -- raddr <- getPeerName sock -- [::1]:????? unique key |
1118 | when (Just 0 == sockAddrPort raddr) $ do | 1118 | when (Just 0 == sockAddrPort raddr) $ do |
1119 | dput XMan $ unwords [ "BUG: XMPP Client" | 1119 | dput XMan $ unwords [ "BUG: XMPP Client" |
1120 | , show (laddr,raddr) | 1120 | , show (laddr,raddr) |
diff --git a/dht/src/Network/StreamServer.hs b/dht/src/Network/StreamServer.hs index 37f8812a..1da612ce 100644 --- a/dht/src/Network/StreamServer.hs +++ b/dht/src/Network/StreamServer.hs | |||
@@ -13,6 +13,8 @@ module Network.StreamServer | |||
13 | , quitListening | 13 | , quitListening |
14 | --, dummyServerHandle | 14 | --, dummyServerHandle |
15 | , listenSocket | 15 | , listenSocket |
16 | , Local(..) | ||
17 | , Remote(..) | ||
16 | ) where | 18 | ) where |
17 | 19 | ||
18 | import Data.Monoid | 20 | import Data.Monoid |
@@ -89,15 +91,18 @@ bshow e = show e | |||
89 | warnStderr :: String -> IO () | 91 | warnStderr :: String -> IO () |
90 | warnStderr str = dput XMisc str >> hFlush stderr | 92 | warnStderr str = dput XMisc str >> hFlush stderr |
91 | 93 | ||
94 | newtype Local a = Local a deriving (Eq,Ord,Show) | ||
95 | newtype Remote a = Remote a deriving (Eq,Ord,Show) | ||
96 | |||
92 | data ServerConfig = ServerConfig | 97 | data ServerConfig = ServerConfig |
93 | { serverWarn :: String -> IO () | 98 | { serverWarn :: String -> IO () |
94 | -- ^ Action to report warnings and errors. | 99 | -- ^ Action to report warnings and errors. |
95 | , serverSession :: RestrictedSocket -> Int -> Handle -> IO () | 100 | , serverSession :: ( RestrictedSocket, (Local SockAddr, Remote SockAddr)) -> Int -> Handle -> IO () |
96 | -- ^ Action to handle interaction with a client | 101 | -- ^ Action to handle interaction with a client |
97 | } | 102 | } |
98 | 103 | ||
99 | -- | Initialize a 'ServerConfig' using the provided session handler. | 104 | -- | Initialize a 'ServerConfig' using the provided session handler. |
100 | withSession :: (RestrictedSocket -> Int -> Handle -> IO ()) -> ServerConfig | 105 | withSession :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> Int -> Handle -> IO ()) -> ServerConfig |
101 | withSession session = ServerConfig warnStderr session | 106 | withSession session = ServerConfig warnStderr session |
102 | 107 | ||
103 | -- | Launch a thread to listen at the given bind address and dispatch | 108 | -- | Launch a thread to listen at the given bind address and dispatch |
@@ -138,19 +143,13 @@ streamServer cfg addrs = do | |||
138 | -- socket must be closed by 'quitListening'. | 143 | -- socket must be closed by 'quitListening'. |
139 | acceptLoop :: ServerConfig -> Socket -> Int -> IO () | 144 | acceptLoop :: ServerConfig -> Socket -> Int -> IO () |
140 | acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do | 145 | acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do |
141 | con <- accept sock -- TODO: We need to remember the (snd con) peer address here!!! | 146 | (con,raddr) <- accept sock |
142 | -- See also: "-- Weird hack: addr is would-be peer name" in XMPPServer.hs | ||
143 | -- If we remember the peer address here, we won't need that weird | ||
144 | -- hack or to call sIsConnected. | ||
145 | -- Probably we should move the | ||
146 | -- newtype Local a = Local a deriving (Eq,Ord,Show) | ||
147 | -- newtype Remote a = Remote a deriving (Eq,Ord,Show) | ||
148 | -- defines so that they are accessible to this module. | ||
149 | let conkey = n + 1 | 147 | let conkey = n + 1 |
150 | h <- socketToHandle (fst con) ReadWriteMode | 148 | laddr <- Socket.getSocketName con |
149 | h <- socketToHandle con ReadWriteMode | ||
151 | forkIO $ do | 150 | forkIO $ do |
152 | myThreadId >>= flip labelThread "StreamServer.session" | 151 | myThreadId >>= flip labelThread "StreamServer.session" |
153 | serverSession cfg (restrictHandleSocket h (fst con)) conkey h | 152 | serverSession cfg (restrictHandleSocket h con, (Local laddr, Remote raddr)) conkey h |
154 | acceptLoop cfg sock (n + 1) | 153 | acceptLoop cfg sock (n + 1) |
155 | 154 | ||
156 | acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () | 155 | acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () |