diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/Tox.hs | 165 | ||||
-rw-r--r-- | src/Network/Tox/AggregateSession.hs | 127 |
2 files changed, 110 insertions, 182 deletions
diff --git a/src/Network/Tox.hs b/src/Network/Tox.hs index 861d71d3..88228c50 100644 --- a/src/Network/Tox.hs +++ b/src/Network/Tox.hs | |||
@@ -32,31 +32,25 @@ import qualified Data.ByteString as B | |||
32 | import qualified Data.ByteString.Char8 as C8 | 32 | import qualified Data.ByteString.Char8 as C8 |
33 | import Data.Data | 33 | import Data.Data |
34 | import Data.Functor.Contravariant | 34 | import Data.Functor.Contravariant |
35 | import Data.IP | ||
36 | import Data.Maybe | 35 | import Data.Maybe |
37 | import qualified Data.MinMaxPSQ as MinMaxPSQ | 36 | import qualified Data.MinMaxPSQ as MinMaxPSQ |
38 | import qualified Data.Serialize as S | 37 | import qualified Data.Serialize as S |
39 | import Data.Time.Clock.POSIX (getPOSIXTime) | 38 | import Data.Time.Clock.POSIX (getPOSIXTime) |
40 | import Data.Word | 39 | import Data.Word |
40 | import Network.Socket | ||
41 | import System.Endian | ||
42 | |||
43 | import Network.BitTorrent.DHT.Token as Token | ||
41 | import qualified Data.Wrapper.PSQ as PSQ | 44 | import qualified Data.Wrapper.PSQ as PSQ |
42 | import System.Global6 | 45 | import System.Global6 |
43 | import Network.Address (WantIP (..)) | 46 | import Network.Address (WantIP (..),IP) |
44 | import qualified Network.Kademlia.Routing as R | 47 | import qualified Network.Kademlia.Routing as R |
45 | import Network.QueryResponse | 48 | import Network.QueryResponse |
46 | import Network.Socket | ||
47 | import System.Endian | ||
48 | import Network.BitTorrent.DHT.Token as Token | ||
49 | |||
50 | import Connection | ||
51 | import Crypto.Tox | 49 | import Crypto.Tox |
52 | import Data.Word64Map (fitsInInt) | 50 | import Data.Word64Map (fitsInInt) |
53 | import qualified Data.Word64Map (empty) | 51 | import qualified Data.Word64Map (empty) |
54 | import HandshakeCache | ||
55 | import Network.Kademlia.Bootstrap (forkPollForRefresh, bootstrap) | 52 | import Network.Kademlia.Bootstrap (forkPollForRefresh, bootstrap) |
56 | import Network.Kademlia.Search | ||
57 | import Network.Tox.Crypto.Transport (Handshake(..),CryptoPacket) | 53 | import Network.Tox.Crypto.Transport (Handshake(..),CryptoPacket) |
58 | import Network.Tox.Handshake | ||
59 | import Network.Tox.Crypto.Handlers | ||
60 | import qualified Network.Tox.DHT.Handlers as DHT | 54 | import qualified Network.Tox.DHT.Handlers as DHT |
61 | import qualified Network.Tox.DHT.Transport as DHT | 55 | import qualified Network.Tox.DHT.Transport as DHT |
62 | import Network.Tox.NodeId | 56 | import Network.Tox.NodeId |
@@ -66,12 +60,12 @@ import Network.Tox.Transport | |||
66 | import OnionRouter | 60 | import OnionRouter |
67 | import Network.Tox.ContactInfo | 61 | import Network.Tox.ContactInfo |
68 | import Text.XXD | 62 | import Text.XXD |
69 | import qualified Data.HashMap.Strict as HashMap | ||
70 | import qualified Data.Map.Strict as Map | ||
71 | import DPut | 63 | import DPut |
72 | import Network.Tox.Avahi | 64 | import Network.Tox.Avahi |
73 | import Text.Printf | 65 | import Network.Tox.Session |
74 | import Data.List | 66 | import Network.SessionTransports |
67 | import Network.Kademlia.Search | ||
68 | import HandshakeCache | ||
75 | 69 | ||
76 | newCrypto :: IO TransportCrypto | 70 | newCrypto :: IO TransportCrypto |
77 | newCrypto = do | 71 | newCrypto = do |
@@ -207,7 +201,6 @@ data Tox extra = Tox | |||
207 | , toxCrypto :: Transport String SockAddr (CryptoPacket Encrypted) | 201 | , toxCrypto :: Transport String SockAddr (CryptoPacket Encrypted) |
208 | , toxHandshakes :: Transport String SockAddr (Handshake Encrypted) | 202 | , toxHandshakes :: Transport String SockAddr (Handshake Encrypted) |
209 | , toxHandshakeCache :: HandshakeCache | 203 | , toxHandshakeCache :: HandshakeCache |
210 | , toxCryptoSessions :: NetCryptoSessions | ||
211 | , toxCryptoKeys :: TransportCrypto | 204 | , toxCryptoKeys :: TransportCrypto |
212 | , toxRouting :: DHT.Routing | 205 | , toxRouting :: DHT.Routing |
213 | , toxTokens :: TVar SessionTokens | 206 | , toxTokens :: TVar SessionTokens |
@@ -217,97 +210,7 @@ data Tox extra = Tox | |||
217 | , toxAnnounceToLan :: IO () | 210 | , toxAnnounceToLan :: IO () |
218 | } | 211 | } |
219 | 212 | ||
220 | -- | initiate a netcrypto session, blocking | 213 | |
221 | netCrypto :: Tox extra -> SecretKey -> PublicKey{-UserKey -} -> IO [NetCryptoSession] | ||
222 | netCrypto tox myseckey theirpubkey = netCryptoWithBackoff 1000000 tox myseckey theirpubkey | ||
223 | |||
224 | -- | helper for 'netCrypto', initiate a netcrypto session, retry after specified millisecs | ||
225 | netCryptoWithBackoff :: Int -> Tox extra -> SecretKey -> PublicKey -> IO [NetCryptoSession] | ||
226 | netCryptoWithBackoff millisecs tox myseckey theirpubkey = do | ||
227 | let mykeyAsId = key2id (toPublic myseckey) | ||
228 | -- TODO: check status of connection here: | ||
229 | mbContactsVar <- fmap contacts . HashMap.lookup mykeyAsId <$> atomically (readTVar (accounts (toxContactInfo tox))) | ||
230 | case mbContactsVar of | ||
231 | Nothing -> do | ||
232 | dput XNetCrypto ("netCrypto: (" ++ show mykeyAsId ++") accounts lookup failed.") | ||
233 | return [] | ||
234 | |||
235 | Just contactsVar -> do | ||
236 | let theirkeyAsId = key2id theirpubkey | ||
237 | mbContact <- HashMap.lookup theirkeyAsId <$> atomically (readTVar contactsVar) | ||
238 | tup <- atomically $ do | ||
239 | mc <- HashMap.lookup theirkeyAsId <$> readTVar contactsVar | ||
240 | kp <- fmap join $ forM mc $ \c -> readTVar (contactKeyPacket c) | ||
241 | sa <- fmap join $ forM mc $ \c -> readTVar (contactLastSeenAddr c) | ||
242 | fr <- fmap join $ forM mc $ \c -> readTVar (contactFriendRequest c) | ||
243 | cp <- fmap join $ forM mc $ \c -> readTVar (contactPolicy c) | ||
244 | return (kp,sa,fr,cp) | ||
245 | case tup of | ||
246 | (Nothing,Nothing,Nothing,Nothing) -> do | ||
247 | dput XNetCrypto ("netCrypto: (" ++ show mykeyAsId ++") friend not found (" ++ show theirkeyAsId ++ ").") | ||
248 | return [] | ||
249 | (mbKeyPkt,Nothing,mbFR,mbPolicy) -> do | ||
250 | dput XNetCrypto ("netCrypto: (" ++ show mykeyAsId ++") no SockAddr for friend (" ++ show theirkeyAsId ++ "). TODO: search their node?") | ||
251 | return [] | ||
252 | (Nothing,_,_,_) -> do | ||
253 | dput XNetCrypto ("netCrypto: (" ++ show mykeyAsId ++") no DHT-key for friend (" ++ show theirkeyAsId ++ "). TODO: what?") | ||
254 | return [] | ||
255 | (Just (stamp_theirDhtKey,keyPkt),Just (stamp_saddr,saddr),mbFR,mbPolicy) | ||
256 | | theirDhtKey <- DHT.dhtpk keyPkt -> do | ||
257 | -- Do we already have an active session with this user? | ||
258 | sessionsMap <- atomically $ readTVar (netCryptoSessionsByKey (toxCryptoSessions tox) ) | ||
259 | let sessionUsesIdentity key session = key == ncMyPublicKey session | ||
260 | case Map.lookup theirpubkey sessionsMap of | ||
261 | -- if sessions found, is it using this private key? | ||
262 | Just sessions | matchedSessions <- filter (sessionUsesIdentity (toPublic myseckey)) sessions | ||
263 | , not (null matchedSessions) | ||
264 | -> do | ||
265 | dput XNetCrypto ("netCrypto: Already have a session for " ++ show mykeyAsId ++ "<-->" ++ show theirkeyAsId) | ||
266 | return matchedSessions | ||
267 | -- if not, send handshake, this is separate session | ||
268 | _ -> do | ||
269 | -- if no session: | ||
270 | -- Convert to NodeInfo, so we can send cookieRequest | ||
271 | let crypto = toxCryptoKeys tox | ||
272 | client = toxDHT tox | ||
273 | case nodeInfo (key2id theirDhtKey) (nodeAddr saddr) of | ||
274 | Left e -> dput XNetCrypto ("netCrypto: nodeInfo fail... " ++ e) >> return [] | ||
275 | Right ni -> do | ||
276 | mbCookie <- DHT.cookieRequest crypto client (toPublic myseckey) ni | ||
277 | case mbCookie of | ||
278 | Nothing -> do | ||
279 | dput XNetCrypto ("netCrypto: (" ++ show mykeyAsId ++") <--> (" ++ show theirkeyAsId ++ ").") | ||
280 | dput XNetCrypto ("netCrypto: CookieRequest failed. TODO: dhtpkNodes thingy") | ||
281 | return [] | ||
282 | Just cookie -> do | ||
283 | dput XNetCrypto "Have cookie, creating handshake packet..." | ||
284 | let hp = HParam { hpOtherCookie = cookie | ||
285 | , hpMySecretKey = myseckey | ||
286 | , hpCookieRemotePubkey = theirpubkey | ||
287 | , hpCookieRemoteDhtkey = theirDhtKey | ||
288 | , hpTheirBaseNonce = Nothing | ||
289 | , hpTheirSessionKeyPublic = Nothing | ||
290 | } | ||
291 | newsession <- generateSecretKey | ||
292 | timestamp <- getPOSIXTime | ||
293 | (myhandshake,ioAction) | ||
294 | <- atomically $ freshCryptoSession (toxCryptoSessions tox) (nodeAddr saddr) newsession timestamp hp | ||
295 | ioAction | ||
296 | -- send handshake | ||
297 | forM myhandshake $ \response_handshake -> do | ||
298 | sendHandshake (toxCryptoSessions tox) (nodeAddr saddr) response_handshake | ||
299 | let secnum :: Double | ||
300 | secnum = fromIntegral millisecs / 1000000 | ||
301 | delay = (millisecs * 5 `div` 4) | ||
302 | if secnum < 20000000 | ||
303 | then do | ||
304 | dput XNetCrypto $ "sent handshake, now delaying " ++ show (secnum * 1.25) ++ " second(s).." | ||
305 | -- threadDelay delay | ||
306 | -- Commenting loop for simpler debugging | ||
307 | return [] -- netCryptoWithBackoff delay tox myseckey theirpubkey -- hopefully it will find an active session this time. | ||
308 | else do | ||
309 | dput XNetCrypto "Unable to establish session..." | ||
310 | return [] | ||
311 | 214 | ||
312 | -- | Create a DHTPublicKey packet to send to a remote contact. | 215 | -- | Create a DHTPublicKey packet to send to a remote contact. |
313 | getContactInfo :: Tox extra -> IO DHT.DHTPublicKey | 216 | getContactInfo :: Tox extra -> IO DHT.DHTPublicKey |
@@ -365,30 +268,24 @@ getOnionAlias crypto dhtself remoteNode = atomically $ do | |||
365 | 268 | ||
366 | newTox :: TVar Onion.AnnouncedKeys -- ^ Store of announced keys we are a rendezvous for. | 269 | newTox :: TVar Onion.AnnouncedKeys -- ^ Store of announced keys we are a rendezvous for. |
367 | -> SockAddr -- ^ Bind-address to listen on. | 270 | -> SockAddr -- ^ Bind-address to listen on. |
368 | -> Maybe NetCryptoSessions -- ^ State of all one-on-one Tox links. | 271 | -> ( ContactInfo extra -> SockAddr -> Session -> IO () ) |
369 | -> Maybe SecretKey -- ^ Optional DHT secret key to use. | 272 | -> Maybe SecretKey -- ^ Optional DHT secret key to use. |
370 | -> IO (Tox extra) | 273 | -> IO (Tox extra) |
371 | newTox keydb addr mbSessionsState suppliedDHTKey = do | 274 | newTox keydb addr onsess suppliedDHTKey = do |
372 | (udp,sock) <- {- addVerbosity <$> -} udpTransport' addr | 275 | (udp,sock) <- {- addVerbosity <$> -} udpTransport' addr |
373 | tox <- newToxOverTransport keydb addr mbSessionsState suppliedDHTKey udp | 276 | tox <- newToxOverTransport keydb addr onsess suppliedDHTKey udp |
374 | return tox { toxAnnounceToLan = announceToLan sock (key2id $ transportPublic $ toxCryptoKeys tox) } | 277 | return tox { toxAnnounceToLan = announceToLan sock (key2id $ transportPublic $ toxCryptoKeys tox) } |
375 | 278 | ||
376 | -- | This version of 'newTox' is useful for automated tests using 'testPairTransport'. | 279 | -- | This version of 'newTox' is useful for automated tests using 'testPairTransport'. |
377 | newToxOverTransport :: TVar Onion.AnnouncedKeys | 280 | newToxOverTransport :: TVar Onion.AnnouncedKeys |
378 | -> SockAddr | 281 | -> SockAddr |
379 | -> Maybe NetCryptoSessions | 282 | -> ( ContactInfo extra -> SockAddr -> Session -> IO () ) |
380 | -> Maybe SecretKey | 283 | -> Maybe SecretKey |
381 | -> Onion.UDPTransport | 284 | -> Onion.UDPTransport |
382 | -> IO (Tox extra) | 285 | -> IO (Tox extra) |
383 | newToxOverTransport keydb addr mbSessionsState suppliedDHTKey udp = do | 286 | newToxOverTransport keydb addr onNewSession suppliedDHTKey udp = do |
384 | roster <- newContactInfo | 287 | roster <- newContactInfo |
385 | (crypto0,sessionsState0) <- case mbSessionsState of | 288 | crypto0 <- newCrypto |
386 | Nothing -> do | ||
387 | crypto <- newCrypto | ||
388 | sessionsState <- newSessionsState crypto (const $ dput XUnexpected "Missing destroy hook!") defaultUnRecHook defaultCryptoDataHooks | ||
389 | return (crypto,sessionsState) | ||
390 | Just s -> return (transportCrypto s, s) | ||
391 | |||
392 | let -- patch in supplied DHT key | 289 | let -- patch in supplied DHT key |
393 | crypto1 = fromMaybe crypto0 $do | 290 | crypto1 = fromMaybe crypto0 $do |
394 | k <- suppliedDHTKey | 291 | k <- suppliedDHTKey |
@@ -409,6 +306,7 @@ newToxOverTransport keydb addr mbSessionsState suppliedDHTKey udp = do | |||
409 | mkrouting <- DHT.newRouting addr crypto updateIP updateIP | 306 | mkrouting <- DHT.newRouting addr crypto updateIP updateIP |
410 | orouter <- newOnionRouter $ dput XRoutes | 307 | orouter <- newOnionRouter $ dput XRoutes |
411 | (cryptonet,dhtcrypt,onioncrypt,dtacrypt,handshakes) <- toxTransport crypto orouter lookupClose udp | 308 | (cryptonet,dhtcrypt,onioncrypt,dtacrypt,handshakes) <- toxTransport crypto orouter lookupClose udp |
309 | sessions <- initSessions (sendMessage cryptonet) | ||
412 | 310 | ||
413 | let dhtnet0 = layerTransportM (DHT.decrypt crypto) (DHT.encrypt crypto) dhtcrypt | 311 | let dhtnet0 = layerTransportM (DHT.decrypt crypto) (DHT.encrypt crypto) dhtcrypt |
414 | tbl4 = DHT.routing4 $ mkrouting (error "missing client") | 312 | tbl4 = DHT.routing4 $ mkrouting (error "missing client") |
@@ -417,22 +315,12 @@ newToxOverTransport keydb addr mbSessionsState suppliedDHTKey udp = do | |||
417 | $ \client net -> onInbound (DHT.updateRouting client (mkrouting client) orouter) net | 315 | $ \client net -> onInbound (DHT.updateRouting client (mkrouting client) orouter) net |
418 | 316 | ||
419 | hscache <- newHandshakeCache crypto (sendMessage handshakes) | 317 | hscache <- newHandshakeCache crypto (sendMessage handshakes) |
420 | 318 | let sparams = SessionParams | |
421 | let sessionsState = sessionsState0 { sendHandshake = sendMessage handshakes | 319 | { spCrypto = crypto |
422 | , sendSessionPacket = sendMessage cryptonet | 320 | , spSessions = sessions |
423 | , transportCrypto = crypto | 321 | , spGetSentHandshake = getSentHandshake hscache |
424 | -- ToxContact -> STM Policy | 322 | , spOnNewSession = onNewSession roster addr |
425 | , netCryptoPolicyByKey = policylookup | 323 | } |
426 | } | ||
427 | policylookup (ToxContact me them) = do | ||
428 | macnt <- HashMap.lookup me <$> readTVar (accounts roster) | ||
429 | case macnt of | ||
430 | Nothing -> return RefusingToConnect | ||
431 | Just acnt -> do | ||
432 | mc <- HashMap.lookup them <$> readTVar (contacts acnt) | ||
433 | case mc of | ||
434 | Nothing -> return RefusingToConnect | ||
435 | Just c -> fromMaybe RefusingToConnect <$> readTVar (contactPolicy c) | ||
436 | 324 | ||
437 | orouter' <- forkRouteBuilder orouter | 325 | orouter' <- forkRouteBuilder orouter |
438 | $ \nid ni -> fmap (\(_,ns,_)->ns) | 326 | $ \nid ni -> fmap (\(_,ns,_)->ns) |
@@ -453,10 +341,9 @@ newToxOverTransport keydb addr mbSessionsState suppliedDHTKey udp = do | |||
453 | { toxDHT = dhtclient | 341 | { toxDHT = dhtclient |
454 | , toxOnion = onionclient | 342 | , toxOnion = onionclient |
455 | , toxToRoute = onInbound (updateContactInfo roster) dtacrypt | 343 | , toxToRoute = onInbound (updateContactInfo roster) dtacrypt |
456 | , toxCrypto = addHandler (dput XMisc) (sessionPacketH sessionsState) cryptonet | 344 | , toxCrypto = addHandler (dput XMisc) (sessionHandler sessions) cryptonet |
457 | , toxHandshakes = addHandler (dput XMisc) (handshakeH sessionsState) handshakes | 345 | , toxHandshakes = addHandler (dput XMisc) (handshakeH sparams) handshakes |
458 | , toxHandshakeCache = hscache | 346 | , toxHandshakeCache = hscache |
459 | , toxCryptoSessions = sessionsState | ||
460 | , toxCryptoKeys = crypto | 347 | , toxCryptoKeys = crypto |
461 | , toxRouting = mkrouting dhtclient | 348 | , toxRouting = mkrouting dhtclient |
462 | , toxTokens = toks | 349 | , toxTokens = toks |
@@ -526,8 +413,10 @@ announceToLan sock nid = do | |||
526 | (Just "33445") | 413 | (Just "33445") |
527 | let broadcast = addrAddress broadcast_info | 414 | let broadcast = addrAddress broadcast_info |
528 | bs = S.runPut $ DHT.putMessage (DHT.DHTLanDiscovery nid) | 415 | bs = S.runPut $ DHT.putMessage (DHT.DHTLanDiscovery nid) |
416 | dput XLan $ show broadcast ++ " <-- LanAnnounce " ++ show nid | ||
529 | saferSendTo sock bs broadcast | 417 | saferSendTo sock bs broadcast |
530 | 418 | ||
419 | |||
531 | toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous | 420 | toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous |
532 | toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox) | 421 | toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox) |
533 | 422 | ||
diff --git a/src/Network/Tox/AggregateSession.hs b/src/Network/Tox/AggregateSession.hs index edb897e0..1dd10eef 100644 --- a/src/Network/Tox/AggregateSession.hs +++ b/src/Network/Tox/AggregateSession.hs | |||
@@ -22,7 +22,6 @@ module Network.Tox.AggregateSession | |||
22 | 22 | ||
23 | import Control.Concurrent.STM | 23 | import Control.Concurrent.STM |
24 | import Control.Concurrent.STM.TMChan | 24 | import Control.Concurrent.STM.TMChan |
25 | import Control.Concurrent.Supply | ||
26 | import Control.Monad | 25 | import Control.Monad |
27 | import Data.Function | 26 | import Data.Function |
28 | import qualified Data.IntMap.Strict as IntMap | 27 | import qualified Data.IntMap.Strict as IntMap |
@@ -47,9 +46,7 @@ import Network.Tox.Crypto.Transport (CryptoMessage (..), pattern KillPacket, | |||
47 | pattern PacketRequest) | 46 | pattern PacketRequest) |
48 | import Network.Tox.DHT.Transport (key2id) | 47 | import Network.Tox.DHT.Transport (key2id) |
49 | import Network.Tox.NodeId (ToxProgress (..)) | 48 | import Network.Tox.NodeId (ToxProgress (..)) |
50 | import Network.Tox.Crypto.Handlers | 49 | import Network.Tox.Session |
51 | |||
52 | type Session = NetCryptoSession | ||
53 | 50 | ||
54 | -- | For each component session, we track the current status. | 51 | -- | For each component session, we track the current status. |
55 | data SingleCon = SingleCon | 52 | data SingleCon = SingleCon |
@@ -113,47 +110,94 @@ data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. | |||
113 | | DoRequestMissing -- ^ Detect and request lost packets. | 110 | | DoRequestMissing -- ^ Detect and request lost packets. |
114 | deriving Enum | 111 | deriving Enum |
115 | 112 | ||
116 | -- | This function forks a thread to read all packets from the provided | 113 | -- | This call loops until the provided sesison is closed or times out. It |
117 | -- 'Session' and forward them to 'contactChannel' for a containing | 114 | -- monitors the provided (non-empty) priority queue for scheduled tasks (see |
118 | -- 'AggregateSession' | 115 | -- 'KeepAliveEvents') to perform for the connection. |
116 | keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO () | ||
117 | keepAlive s q = do | ||
118 | myThreadId >>= flip labelThread | ||
119 | (intercalate "." ["beacon" | ||
120 | , take 8 $ show $ key2id $ sTheirUserKey s | ||
121 | , show $ sSessionID s]) | ||
122 | |||
123 | let outPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " <-- " ++ e | ||
124 | |||
125 | doAlive = do | ||
126 | -- outPrint $ "Beacon" | ||
127 | sendMessage (sTransport s) () (OneByte PING) | ||
128 | |||
129 | doRequestMissing = do | ||
130 | (ns,nmin) <- sMissingInbound s | ||
131 | -- outPrint $ "PacketRequest " ++ show (nmin,ns) | ||
132 | sendMessage (sTransport s) () (RequestResend PacketRequest ns) | ||
133 | |||
134 | re tm again e io = do | ||
135 | io | ||
136 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm | ||
137 | again | ||
138 | |||
139 | doEvent again now e = case e of | ||
140 | DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s) | ||
141 | sClose s | ||
142 | DoAlive -> re (now + 10) again e doAlive | ||
143 | DoRequestMissing -> re (now + 5) again e doRequestMissing -- tox-core does this at 1 second intervals | ||
144 | |||
145 | fix $ \again -> do | ||
146 | |||
147 | now <- getPOSIXTime | ||
148 | join $ atomically $ do | ||
149 | Just ( k :-> tm ) <- PSQ.findMin <$> readTVar q | ||
150 | return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again | ||
151 | else doEvent again now (toEnum k) | ||
152 | |||
153 | -- | This function forks two threads: the 'keepAlive' beacon-sending thread and | ||
154 | -- a thread to read all packets from the provided 'Session' and forward them to | ||
155 | -- 'contactChannel' for a containing 'AggregateSession' | ||
119 | forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId | 156 | forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId |
120 | forkSession c s setStatus = forkIO $ do | 157 | forkSession c s setStatus = forkIO $ do |
121 | myThreadId >>= flip labelThread | 158 | myThreadId >>= flip labelThread |
122 | (intercalate "." ["s" | 159 | (intercalate "." ["s" |
123 | , take 8 $ show $ key2id $ ncTheirPublicKey s | 160 | , take 8 $ show $ key2id $ sTheirUserKey s |
124 | , show $ sSessionID s]) | 161 | , show $ sSessionID s]) |
125 | tmchan <- atomically $ do | 162 | |
126 | tmchan <- newTMChan | 163 | q <- atomically $ newTVar $ fromList |
127 | supply <- readTVar (listenerIDSupply $ ncAllSessions s) | 164 | [ fromEnum DoAlive :-> 0 |
128 | let (listenerId,supply') = freshId supply | 165 | , fromEnum DoRequestMissing :-> 0 |
129 | writeTVar (listenerIDSupply $ ncAllSessions s) supply' | 166 | ] |
130 | modifyTVar' (ncListeners s) (IntMap.insert listenerId (0,tmchan)) | ||
131 | return tmchan | ||
132 | 167 | ||
133 | let sendPacket :: CryptoMessage -> STM () | 168 | let sendPacket :: CryptoMessage -> STM () |
134 | sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) | 169 | sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) |
135 | 170 | ||
136 | inPrint e = dput XNetCrypto $ shows (sSessionID s,ncSockAddr s) $ " --> " ++ e | 171 | inPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " --> " ++ e |
172 | |||
173 | bump = do | ||
174 | -- inPrint $ "BUMP: " ++ show (sSessionID s) | ||
175 | now <- getPOSIXTime | ||
176 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15) | ||
137 | 177 | ||
138 | onPacket body loop Nothing = return () | 178 | onPacket body loop Nothing = return () |
139 | onPacket body loop (Just (Left e)) = inPrint e >> loop | 179 | onPacket body loop (Just (Left e)) = inPrint e >> loop |
140 | onPacket body loop (Just (Right x)) = body loop x | 180 | onPacket body loop (Just (Right x)) = body loop x |
141 | 181 | ||
142 | awaitPacket body = fix $ (.) (fmap Right <$> atomically (readTMChan tmchan) >>=) | 182 | awaitPacket body = fix $ awaitMessage (sTransport s) . onPacket body |
143 | $ onPacket body | ||
144 | 183 | ||
145 | atomically $ setStatus $ InProgress AwaitingSessionPacket | 184 | atomically $ setStatus $ InProgress AwaitingSessionPacket |
146 | atomically $ setStatus Established | 185 | awaitPacket $ \_ (online,()) -> do |
147 | awaitPacket $ \loop x -> do | 186 | when (msgID online /= ONLINE) $ do |
148 | case msgID x of | 187 | inPrint $ "Unexpected initial packet: " ++ show (msgID online) |
149 | KillPacket -> return () | 188 | atomically $ do setStatus Established |
150 | _ -> atomically (sendPacket x) >> loop | 189 | sendPacket online |
151 | 190 | bump | |
152 | atomically $ setStatus Dormant | 191 | beacon <- forkIO $ keepAlive s q |
153 | 192 | awaitPacket $ \awaitNext (x,()) -> do | |
154 | 193 | bump | |
155 | sSessionID :: Session -> Int | 194 | case msgID x of |
156 | sSessionID s = fromIntegral $ ncSessionId s | 195 | PING -> return () |
196 | KillPacket -> sClose s | ||
197 | _ -> atomically $ sendPacket x | ||
198 | awaitNext | ||
199 | atomically $ setStatus Dormant | ||
200 | killThread beacon | ||
157 | 201 | ||
158 | -- | Add a new session (in 'AwaitingSessionPacket' state) to the | 202 | -- | Add a new session (in 'AwaitingSessionPacket' state) to the |
159 | -- 'AggregateSession'. If the supplied session is not compatible because it is | 203 | -- 'AggregateSession'. If the supplied session is not compatible because it is |
@@ -166,8 +210,8 @@ sSessionID s = fromIntegral $ ncSessionId s | |||
166 | addSession :: AggregateSession -> Session -> IO AddResult | 210 | addSession :: AggregateSession -> Session -> IO AddResult |
167 | addSession c s = do | 211 | addSession c s = do |
168 | (result,mcon,replaced) <- atomically $ do | 212 | (result,mcon,replaced) <- atomically $ do |
169 | let them = ncTheirPublicKey s | 213 | let them = sTheirUserKey s |
170 | me = ncMyPublicKey s | 214 | me = toPublic $ sOurKey s |
171 | compat <- checkCompatible me them c | 215 | compat <- checkCompatible me them c |
172 | let result = case compat of | 216 | let result = case compat of |
173 | Nothing -> FirstSession | 217 | Nothing -> FirstSession |
@@ -184,7 +228,7 @@ addSession c s = do | |||
184 | writeTVar (contactSession c) imap' | 228 | writeTVar (contactSession c) imap' |
185 | return (result,Just con,s0) | 229 | return (result,Just con,s0) |
186 | 230 | ||
187 | mapM_ (destroySession . singleSession) replaced | 231 | mapM_ (sClose . singleSession) replaced |
188 | forM_ mcon $ \con -> | 232 | forM_ mcon $ \con -> |
189 | forkSession c s $ \progress -> do | 233 | forkSession c s $ \progress -> do |
190 | writeTVar (singleStatus con) progress | 234 | writeTVar (singleStatus con) progress |
@@ -203,6 +247,7 @@ addSession c s = do | |||
203 | return emap' | 247 | return emap' |
204 | writeTVar (contactEstablished c) emap' | 248 | writeTVar (contactEstablished c) emap' |
205 | return result | 249 | return result |
250 | |||
206 | -- | Information returned from 'delSession'. | 251 | -- | Information returned from 'delSession'. |
207 | data DelResult = NoSession -- ^ Contact is completely disconnected. | 252 | data DelResult = NoSession -- ^ Contact is completely disconnected. |
208 | | DeletedSession -- ^ Connection removed but session remains active. | 253 | | DeletedSession -- ^ Connection removed but session remains active. |
@@ -230,11 +275,10 @@ delSession c sid = do | |||
230 | writeTVar (contactSession c) imap' | 275 | writeTVar (contactSession c) imap' |
231 | writeTVar (contactEstablished c) emap' | 276 | writeTVar (contactEstablished c) emap' |
232 | return ( IntMap.lookup sid imap, IntMap.null imap') | 277 | return ( IntMap.lookup sid imap, IntMap.null imap') |
233 | mapM_ (destroySession . singleSession) con | 278 | mapM_ (sClose . singleSession) con |
234 | return $ if r then NoSession | 279 | return $ if r then NoSession |
235 | else DeletedSession | 280 | else DeletedSession |
236 | 281 | ||
237 | |||
238 | -- | Send a packet to one or all of the component sessions in the aggregate. | 282 | -- | Send a packet to one or all of the component sessions in the aggregate. |
239 | dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. | 283 | dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. |
240 | -> CryptoMessage -> IO () | 284 | -> CryptoMessage -> IO () |
@@ -242,11 +286,7 @@ dispatchMessage c msid msg = join $ atomically $ do | |||
242 | imap <- readTVar (contactSession c) | 286 | imap <- readTVar (contactSession c) |
243 | let go = case msid of Nothing -> forM_ imap | 287 | let go = case msid of Nothing -> forM_ imap |
244 | Just sid -> forM_ (IntMap.lookup sid imap) | 288 | Just sid -> forM_ (IntMap.lookup sid imap) |
245 | return $ go $ \con -> do | 289 | return $ go $ \con -> sendMessage (sTransport $ singleSession con) () msg |
246 | eResult <- sendLossless (transportCrypto (ncAllSessions (singleSession con))) (singleSession con) msg | ||
247 | case eResult of | ||
248 | Left msg -> dput XJabber msg | ||
249 | Right pkt -> dput XJabber ("sendLossLess SUCCESS: " ++ show pkt) | ||
250 | 290 | ||
251 | -- | Retry until: | 291 | -- | Retry until: |
252 | -- | 292 | -- |
@@ -287,7 +327,6 @@ aggregateStatus c = do | |||
287 | | not (IntMap.null imap) -> InProgress AwaitingSessionPacket | 327 | | not (IntMap.null imap) -> InProgress AwaitingSessionPacket |
288 | | otherwise -> Dormant | 328 | | otherwise -> Dormant |
289 | 329 | ||
290 | |||
291 | -- | Query whether the supplied ToxID keys are compatible with this aggregate. | 330 | -- | Query whether the supplied ToxID keys are compatible with this aggregate. |
292 | -- | 331 | -- |
293 | -- [ Nothing ] Any keys would be compatible because there is not yet any | 332 | -- [ Nothing ] Any keys would be compatible because there is not yet any |
@@ -304,8 +343,8 @@ checkCompatible me them c = do | |||
304 | imap <- readTVar (contactSession c) | 343 | imap <- readTVar (contactSession c) |
305 | return $ case IntMap.elems imap of | 344 | return $ case IntMap.elems imap of |
306 | _ | isclosed -> Just False -- All keys are incompatible (closed). | 345 | _ | isclosed -> Just False -- All keys are incompatible (closed). |
307 | con:_ -> Just $ ncTheirPublicKey (singleSession con) == them | 346 | con:_ -> Just $ sTheirUserKey (singleSession con) == them |
308 | && (ncMyPublicKey $ singleSession con) == me | 347 | && toPublic (sOurKey $ singleSession con) == me |
309 | [] -> Nothing | 348 | [] -> Nothing |
310 | 349 | ||
311 | -- | Returns the local and remote keys that are compatible with this aggregate. | 350 | -- | Returns the local and remote keys that are compatible with this aggregate. |
@@ -317,6 +356,6 @@ compatibleKeys c = do | |||
317 | imap <- readTVar (contactSession c) | 356 | imap <- readTVar (contactSession c) |
318 | return $ case IntMap.elems imap of | 357 | return $ case IntMap.elems imap of |
319 | _ | isclosed -> Nothing -- none. | 358 | _ | isclosed -> Nothing -- none. |
320 | con:_ -> Just ( ncMyPublicKey $ singleSession con | 359 | con:_ -> Just ( toPublic (sOurKey $ singleSession con) |
321 | , ncTheirPublicKey (singleSession con)) | 360 | , sTheirUserKey (singleSession con)) |
322 | [] -> Nothing -- any. | 361 | [] -> Nothing -- any. |