From 36cd21f0b42c09cbcf3a215afbcd754cc37d1c4e Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Sat, 8 Sep 2018 06:37:10 -0400 Subject: Switched to new session tracker. --- ToxManager.hs | 8 +- dht-client.cabal | 4 + examples/dhtd.hs | 816 +++++++----------------------------- src/Network/Tox.hs | 165 ++------ src/Network/Tox/AggregateSession.hs | 127 ++++-- 5 files changed, 274 insertions(+), 846 deletions(-) diff --git a/ToxManager.hs b/ToxManager.hs index 44b7a5ef..4ea6736d 100644 --- a/ToxManager.hs +++ b/ToxManager.hs @@ -39,12 +39,10 @@ import qualified Network.Tox as Tox ;import Network.Tox import Network.Tox.AggregateSession import Network.Tox.ContactInfo as Tox -import qualified Network.Tox.Crypto.Handlers as Tox import Network.Tox.DHT.Handlers (nodeSearch, nodesOfInterest) import Network.Tox.DHT.Handlers import qualified Network.Tox.DHT.Transport as Tox ;import Network.Tox.DHT.Transport (FriendRequest (..), dhtpk) -import Network.Tox.Handshake (HandshakeParams (..)) import Network.Tox.NodeId import qualified Network.Tox.Onion.Handlers as Tox import qualified Network.Tox.Onion.Transport as Tox @@ -414,14 +412,10 @@ gotAddr' ni@(nodeAddr -> addr) tx theirKey theirDhtKey = atomically blee return $ when (not active) getCookieIO callRealShakeHands cookie = do - {- forM_ (nodeInfo (key2id $ dhtpk theirDhtKey) (nodeAddr ni)) $ \ni' -> do hs <- cacheHandshake (toxHandshakeCache tox) (userSecret (txAccount tx)) theirKey ni' cookie dput XNetCrypto $ show addr ++ "<-- handshake " ++ show (key2id theirKey) sendMessage (toxHandshakes tox) (nodeAddr ni) hs - -} - realShakeHands (userSecret (txAccount tx)) theirKey (dhtpk theirDhtKey) (toxCryptoSessions tox) (nodeAddr ni) cookie - reschedule n f = scheduleRel ann akey f n reschedule' n f = reschedule n (ScheduledItem $ \_ _ now -> f now) @@ -457,6 +451,7 @@ gotAddr' ni@(nodeAddr -> addr) tx theirKey theirDhtKey = atomically blee atomically $ reschedule' 5 shaker +{- realShakeHands :: SecretKey -> PublicKey -> PublicKey -> Tox.NetCryptoSessions -> SockAddr -> Tox.Cookie Encrypted -> IO Bool realShakeHands myseckey theirpubkey theirDhtKey allsessions saddr cookie = do dput XUnused "realShakeHands" @@ -477,6 +472,7 @@ realShakeHands myseckey theirpubkey theirDhtKey allsessions saddr cookie = do ioAction -- send handshake isJust <$> forM myhandshake (Tox.sendHandshake allsessions saddr) +-} diff --git a/dht-client.cabal b/dht-client.cabal index 55ca6a04..29cf7a39 100644 --- a/dht-client.cabal +++ b/dht-client.cabal @@ -150,9 +150,13 @@ library ToxManager XMPPToTox DebugUtil + Data.IntervalSet Data.Tox.Message HandshakeCache + Network.Lossless + Network.SessionTransports Network.Tox.AggregateSession + Network.Tox.Session build-depends: base , containers diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 5bc2b87a..7c66fd73 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -1,5 +1,4 @@ {-# LANGUAGE BangPatterns #-} -{-# LANGUAGE ViewPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FlexibleContexts #-} @@ -13,10 +12,12 @@ {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE RecursiveDo #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeOperators #-} +{-# LANGUAGE ViewPatterns #-} module Main where @@ -29,20 +30,18 @@ import Control.Monad import Control.Monad.IO.Class (liftIO) import Data.Array.MArray (getAssocs) import Data.Bool +import Data.Bits (xor) import Data.Char import Data.Conduit as C import qualified Data.Conduit.List as C import Data.Function +import Data.Functor.Identity import Data.Hashable import Data.List -import Data.Word -import Data.InOrOut import qualified Data.IntMap.Strict as IntMap import qualified Data.Map.Strict as Map import Data.Maybe import qualified Data.Set as Set -import Data.Tuple -import Data.Time.Clock import qualified Data.XML.Types as XML import GHC.Conc (threadStatus,ThreadStatus(..)) import GHC.Stats @@ -64,8 +63,6 @@ import qualified Data.HashMap.Strict as HashMap import qualified Data.Text as T import qualified Data.Text.Encoding as T import System.Posix.Signals -import qualified Data.Array.Unboxed as U -import qualified Data.Conduit as Conduit import Announcer import Announcer.Tox @@ -84,8 +81,6 @@ import qualified Network.BitTorrent.MainlineDHT as Mainline import qualified Network.Tox as Tox import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Char8 as B -import qualified Data.Text.Encoding as E -import qualified Data.Text.Encoding.Error as E import Control.Concurrent.Tasks import System.IO.Error import qualified Data.Serialize as S @@ -99,23 +94,17 @@ import qualified Network.Tox.DHT.Transport as Tox import qualified Network.Tox.DHT.Handlers as Tox import qualified Network.Tox.Onion.Transport as Tox import qualified Network.Tox.Onion.Handlers as Tox -import qualified Network.Tox.Crypto.Transport as Tox (CryptoMessage(..),putCryptoMessage,getCryptoMessage) -import qualified Network.Tox.Crypto.Handlers as Tox +import qualified Network.Tox.Crypto.Transport as Tox (CryptoMessage(..),handshakeCookie, pattern PacketRequest, pattern PING) import Data.Typeable import Network.Tox.ContactInfo as Tox import OnionRouter -import Data.PacketQueue import qualified Data.Word64Map as W64 import Network.Tox.AggregateSession -import System.FilePath -import System.Process -import System.Posix.IO -import Data.Word64RangeMap -import Network.Tox.Crypto.Transport -import Data.Conduit.Cereal -import qualified Data.Conduit.Binary as Conduit +import qualified Network.Tox.Session as Tox (Session) + ;import Network.Tox.Session hiding (Session) -- Presence imports. +import Connection.Tcp (TCPStatus) import ConsoleWriter import Presence import XMPPServer @@ -123,8 +112,6 @@ import Connection import ToxToXMPP import XMPPToTox import qualified Connection.Tcp as Tcp (ConnectionEvent(..),noCleanUp,TCPStatus) -import Control.Concurrent.Supply -import qualified Data.CyclicBuffer as CB import DPut @@ -309,7 +296,6 @@ data Session = Session , dhts :: Map.Map String DHT , externalAddresses :: IO [SockAddr] , swarms :: Mainline.SwarmsDatabase - , cryptosessions :: Tox.NetCryptoSessions , toxkeys :: TVar Tox.AnnouncedKeys , roster :: Tox.ContactInfo JabberClients , announceToLan :: IO () @@ -383,17 +369,6 @@ clientSession s@Session{..} sock cnum h = do hPutClient h $ showReport (zip (map (drop 1 . show) allDebugTags) (map f vs)) let readHex :: (Read n, Integral n) => String -> Maybe n readHex s = readMaybe ("0x" ++ s) - strToSession :: String -> IO (Either String Tox.NetCryptoSession) - strToSession idStr - = case readHex idStr of - Nothing -> return (Left "Unable to parse session id") - Just id -> do - sessions <- filter ((==id) . Tox.ncSessionId) - . concat - . Map.elems <$> (atomically $ readTVar (Tox.netCryptoSessionsByKey cryptosessions)) - case sessions of - [] -> return (Left "Session not found") - (x:xs) -> return (Right x) let mkrow :: (SecretKey, PublicKey) -> (String,String) mkrow (a,b) | Just x <- encodeSecret a= (B.unpack x, show (Tox.key2id b)) mkrow _ = error (concat ["Assertion fail in 'mkrow' function at ", __FILE__, ":", show __LINE__]) @@ -690,247 +665,6 @@ clientSession s@Session{..} sock cnum h = do setVerbose tag hPutClient h $ "Showing " ++ show tag ++ " messages." - -- list information about current netcrypto sesssions - ("sessions", s) | "" <- strp s - -> cmd0 $ do - sessions <- concat . Map.elems <$> (atomically $ readTVar (Tox.netCryptoSessionsByKey cryptosessions)) - let sessionsReport = mapM showPerSession sessions - headers = ["SessionID", "YourKey", "TheirKey", "Address", "NextMsg", "Dropped" {-,"Handled","Unhandled" -} - ,"Progress" ] - showPerSession (Tox.NCrypto - { ncState = progressVar - , ncSessionId = id - , ncMyPublicKey = yourkey - , ncTheirPublicKey = theirkey - , ncLastNMsgs = lastN - , ncSockAddr = sockAddr - }) = do - progress <- atomically $ readTVar progressVar - (num,dropped) <- atomically $ liftA2 (,) (CB.getTotal lastN) (CB.getDropped lastN) - as <- atomically (CB.cyclicBufferViewList lastN) - let (h,u) = partition (fst . snd) as - countHandled = length h - countUnhandled = length u - return [ printf "%x" id -- "SessionID" - , take 8 $ show (Tox.key2id yourkey) -- "YourKey" - , show (Tox.key2id theirkey)-- "TheirKey" - , drop 11 $ show sockAddr -- "Address" - , show num -- "NextMsg" - , show dropped -- "Dropped" - -- , show countHandled -- "Handled" - -- , show countUnhandled -- "Unhandled" - , show progress - ] - if null sessions - then hPutClient h "No sessions." - else do - rows <- sessionsReport - hPutClient h (showColumns (headers:rows)) - -- session set key val - ("session",s) | (idStr,"set",unstripped) <- twoWords s - , (key,val,unstripped2) <- twoWords unstripped - , let setmap = [("ncRequestInterval", \s x -> writeTVar (Tox.ncRequestInterval s) x) - ,("ncAliveInterval", \s x -> writeTVar (Tox.ncAliveInterval s) x) - ,("ncIdleEvent", \s x -> writeTVar (Tox.ncIdleEvent s) x) - ,("ncTimeOut", \s x -> writeTVar (Tox.ncTimeOut s) x) - ] - , Just stmFunc <- Data.List.lookup key setmap - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - case readMaybe val of - Just (x::Int) -> do - atomically (stmFunc session x) - hPutClient h $ "Session " ++ idStr ++ ": " ++ key ++ " = " ++ val - _ -> - hPutClient h $ "Invalid " ++ key ++ " value: " ++ val - - -- session interval factor - ("session",s) | (idStr,"interval",unstripped) <- twoWords s - , val <- strp unstripped - -> cmd0 $ do - lrSession <- strToSession idStr - let displayIntervals session = atomically $ do - intervals <- forM [Tox.ncRequestInterval,Tox.ncAliveInterval, Tox.ncIdleEvent, Tox.ncTimeOut] $ \i -> readTVar (i session) - let keys = ["ncRequestInterval","ncAliveInterval","ncIdleEvent","ncTimeOut"] - return (intercalate "\n" $ - map (\(key,val) -> "Session " ++ idStr ++ ": " ++ key ++ " = " ++ val) - (zip keys (map show intervals))) - case lrSession of - Left s -> hPutClient h s - Right session -> do - case readMaybe val of - Just (factor::Double) -> do - atomically $ do - modifyTVar (Tox.ncRequestInterval session) (round . (*factor) . fromIntegral) - modifyTVar (Tox.ncAliveInterval session) (round . (*factor) . fromIntegral) - modifyTVar (Tox.ncIdleEvent session) (round . (*factor) . fromIntegral) - modifyTVar (Tox.ncTimeOut session) (round . (*factor) . fromIntegral) - displayIntervals session >>= hPutClient h - _ -> displayIntervals session >>= hPutClient h . (("No parse (" ++ show val ++ ").\n") ++) - - -- report error when setting invalid keys - ("session",s) | (idStr,"set",unstripped) <- twoWords s - , (key,val,unstripped2) <- twoWords unstripped - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> hPutClient h $ "What is " ++ key ++ "?" - -- session tail - -- show context (latest lossless messages) - ("session", s) | (idStr,tailcmd,unstripped) <- twoWords s - , "" <- strp unstripped - , tailcmd `elem` ["tail","context"] - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - msgs <- atomically $ CB.cyclicBufferViewList (Tox.ncLastNMsgs session) - hPutClientB h (B.unlines (map showMsg msgs)) - -- session me - -- display information about how you look to that session - ("session", s) | (idStr,"me",unstripped) <- twoWords s - , "" <- strp unstripped - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - view <- atomically (readTVar (Tox.ncView session) >>= Tox.viewSnapshot) - hPutClientB h (vShowMe view 0) - -- session them - -- display information about the person on the other end of the session - ("session", s) | (idStr,them,unstripped) <- twoWords s - , "" <- strp unstripped - , them `elem` ["them","you"] - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - view <- atomically (readTVar (Tox.ncView session) >>= Tox.viewSnapshot) - hPutClientB h (vShowThem view 0) - -- session online - -- send ONLINE packet to session N - ("session", s) | (idStr,"online",unstripped) <- twoWords s - , stripped <- strp unstripped - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - case mbTox of - Nothing -> hPutClient h "Requires Tox enabled." - Just tox-> do - Tox.sendOnline (Tox.toxCryptoKeys tox) session - hPutClient h "sent ONLINE" - -- session online - -- send OFFLINE packet to session N - ("session", s) | (idStr,"offline",unstripped) <- twoWords s - , stripped <- strp unstripped - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - case mbTox of - Nothing -> hPutClient h "Requires Tox enabled." - Just tox-> do - Tox.sendOffline (Tox.toxCryptoKeys tox) session - hPutClient h "sent OFFLINE" - -- session kill - -- send KILL packet to session N - ("session", s) | (idStr,"kill",unstripped) <- twoWords s - , stripped <- strp unstripped - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - case mbTox of - Nothing -> hPutClient h "Requires Tox enabled." - Just tox-> do - Tox.sendKill (Tox.toxCryptoKeys tox) session - hPutClient h "sent KillPacket" - -- session nick - -- send NICK packet to session N, setting nick to NICKNAME - ("session", s) | (idStr,"nick",unstripped) <- twoWords s - , nick <- strp unstripped - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - case mbTox of - Nothing -> hPutClient h "Requires Tox enabled." - Just tox-> do - Tox.setNick (Tox.toxCryptoKeys tox) session (B.pack nick) - hPutClient h "sent NICKNAME" - -- session status - -- send USERSTATUS packet to session N, set status to STATUS - ("session", s) | (idStr,"status",unstripped) <- twoWords s - , statusStr <- strp unstripped - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - case mbTox of - Nothing -> hPutClient h "Requires Tox enabled." - Just tox-> do - case readMaybe statusStr of - Nothing -> hPutClient h "Unable to parse status" - Just status -> do - Tox.setStatus (Tox.toxCryptoKeys tox) session status - hPutClient h "sent USERSTATUS" - -- session typing - -- send TYPING packet to session N, set typing to TYPINGSTATUS - ("session", s) | (idStr,"typing",unstripped) <- twoWords s - , typingstatus <- strp unstripped - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - case mbTox of - Nothing -> hPutClient h "Requires Tox enabled." - Just tox-> do - case readMaybe typingstatus of - Nothing -> hPutClient h "Unable to parse status" - Just status -> do - Tox.setTyping (Tox.toxCryptoKeys tox) session status - hPutClient h "sent TYPINGSTATUS" - -- session statusmsg - -- send STATUSMESSAGE packet to session N, setting status message to MSG - ("session", s) | (idStr,"statusmsg",statusmsg) <- twoWords s - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - case mbTox of - Nothing -> hPutClient h "Requires Tox enabled." - Just tox-> do - Tox.setStatusMsg (Tox.toxCryptoKeys tox) session (B.pack statusmsg) - hPutClient h "sent STATUSMESSAGE" - -- session c - -- send MESSAGE packet to session N (send chat message MSG) - ("session", s) | (idStr,msgcmd,msg) <- twoWords s - , msgcmd `elem` ["c","msg","send"] - -> cmd0 $ do - lrSession <- strToSession idStr - case lrSession of - Left s -> hPutClient h s - Right session -> do - case mbTox of - Nothing -> hPutClient h "Requires Tox enabled." - Just tox-> do - Tox.sendChatMsg (Tox.toxCryptoKeys tox) session (B.pack msg) - hPutClient h "sent MESSAGE" ("onion", s) -> cmd0 $ do now <- getPOSIXTime @@ -955,11 +689,6 @@ clientSession s@Session{..} sock cnum h = do , "pending: " ++ show (W64.size pqs) ] hPutClient h $ showColumns $ ["","responses","timeouts", "age", "version"]:r - -- necrypto - -- establish a netcrypto session with specified person - ("netcrypto", s) -> cmd0 $ do - let exes = Map.fromList [("atox",("/usr/bin/tmux -c","atox"))] - netcrypto (Map.lookup netname dhts) selectedKey h roster mbTox exes (strp s) ("g", s) | Just DHT{..} <- Map.lookup netname dhts -> cmd0 $ do -- arguments: method @@ -1309,153 +1038,6 @@ clientSession s@Session{..} sock cnum h = do _ -> cmd0 $ hPutClient h "error." -netcrypto - :: Maybe DHT - -> Maybe PublicKey - -> ClientHandle - -> ContactInfo extra1 - -> Maybe (Tox.Tox extra2) - -> Map.Map String (String,String) -- profile name to (multiplexer,exe name) for supported child executables - -> String - -> IO () -netcrypto _ _ h _ Nothing _ _ = hPutClient h "Requires Tox enabled." -netcrypto _ Nothing h _ _ _ _ = hPutClient h "No key is selected, see k command." -netcrypto (Just (DHT {..})) (Just mypubkey) h roster (Just tox) exes paramStr = - either - (const $ - either - (hPutClient h . ("Bad netcrypto target: " ++)) - goNodeInfo - (readEither keystr) -- attempt read as NodeInfo type - ) - (goPubkey . Tox.id2key) $ - readEither keystr -- attempt read as NodeId type - where - params = words paramStr - keystr = bool (head params) "" (null params) - -- TODO: - -- execProfiles: - -- atox@24-25,48-51,64-45 gnome-tox-notifier@24-25,49,50 - -- ^-- word64 type values that should be forwarded to this process - execParams=drop 1 params - parseExecParam :: String -> (String,[(Maybe Word64,Maybe Word64)]) - parseExecParam param = let (name,drop 1 -> rangesCombined) = span (/='@') param - wordsBy x str = groupBy (const (/=x)) str - rangesUnparsed = wordsBy ',' rangesCombined - parseRange :: String -> (Maybe Word64,Maybe Word64) - parseRange "all" = (Nothing,Nothing) - parseRange x = let (low,drop 1 -> high) = break (==',') x - in (readMaybe low,readMaybe high) - in (name,map parseRange rangesUnparsed) - execs = map parseExecParam execParams - - - goNodeInfo userkey_nodeinfo = do - msec <- - atomically $ do - fmap userSecret . HashMap.lookup (Tox.key2id mypubkey) <$> - readTVar (accounts roster) - case msec of - Nothing -> hPutClient h "Error getting secret key" - Just sec -> do - let their_pub = Tox.id2key $ Tox.nodeId userkey_nodeinfo - their_addr = Tox.nodeAddr userkey_nodeinfo - let acsVar = accounts (Tox.toxContactInfo tox) - acsmap <- atomically $ readTVar acsVar - case HashMap.lookup (Tox.key2id mypubkey) acsmap of - Nothing -> hPutClient h "Unable to find account for selected key" - Just account -> do - now <- getPOSIXTime - atomically $ do - mcontact <- HashMap.lookup (Tox.nodeId userkey_nodeinfo) <$> readTVar (contacts account) - forM_ mcontact $ \contact -> do - mnid <- fmap (Tox.key2id . Tox.dhtpk . snd) <$> readTVar (contactKeyPacket contact) - forM_ mnid $ \nid -> do - forM_ (Tox.nodeInfo nid their_addr) $ \their_ni -> do - setContactAddr now their_pub their_ni account - sessions <- Tox.netCrypto tox sec their_pub - exeDir <- takeDirectory <$> getExecutablePath - forM_ sessions $ \session -> do - forM_ execs $ \(exekey,ranges) -> do - case Map.lookup exekey exes of - Nothing -> return () - Just (multiplexer,exename) -> do - let exepath = exeDir exename - (myReadFd,myWriteFd) <- System.Posix.IO.createPipe - myRead <- fdToHandle myReadFd - myWrite <- fdToHandle myWriteFd - whoAmI <- atomically $ newTVar mypubkey - whoAreThey <- atomically $ newTVar their_pub - let fdArgs = [show myWriteFd,show myReadFd] - if null multiplexer - then callProcess exepath fdArgs - else do - let (multiplexer_exe,multiplexer_args) = splitAt 1 (words multiplexer) - callProcess multiplexer (multiplexer_args ++ [intercalate " " (exepath:fdArgs)]) - -- tell subprocess who is talking to who - B.hPutStr myWrite ("\NUL\NUL" `B.append` S.encode (Tox.key2id mypubkey)) - B.hPutStr myWrite ("\NUL\SOH" `B.append` S.encode (Tox.key2id their_pub)) - -- add hooks so subprocess is updated on incoming - let makeHook session typ - = \session msg - -> do -- if (getMessageType msg == typ) - me <- atomically $ readTVar whoAmI - when (me /= mypubkey) $ do - atomically $ writeTVar whoAmI mypubkey - B.hPutStr myWrite ("\NUL\NUL" `B.append` S.encode (Tox.key2id mypubkey)) - them <- atomically $ readTVar whoAreThey - when (them /= their_pub) $ do - atomically $ writeTVar whoAreThey their_pub - B.hPutStr myWrite ("\NUL\SOH" `B.append` S.encode (Tox.key2id their_pub)) - B.hPutStr myWrite (S.runPut $ Tox.putCryptoMessage 0 msg) - return (Just id) - addHooks currentHooks typs = forM_ typs $ \typ -> modifyTVar (Tox.ncHooks session) (Map.insert typ (currentHooks typ ++ [makeHook session typ])) - case ranges of - [(Nothing,Nothing)] -> atomically $ do - typs <- map fromWord64 . filter (/=0) . U.elems <$> readTVar (Tox.ncIncomingTypeArray session) - addHooks (const []) typs - _ -> atomically . forM_ ranges $ \range -> do - case range of - (Just first,Just last) -> do - let typs = map fromWord64 [first .. last] - hooks <- readTVar (Tox.ncHooks session) - let currentHooks typ = fromMaybe [] (Map.lookup typ hooks) - addHooks currentHooks typs - -- forward messages from subprocess - forwardThread <- forkIO $ do - tid <- myThreadId - let sidStr = printf "(%x)" (Tox.ncSessionId session) - labelThread tid (exekey ++ ".forward" ++ sidStr) - let myconduit = Conduit.sourceHandle myRead .| conduitGet2 (Tox.getCryptoMessage 0) -- :: ConduitT i CryptoMessage IO () - Conduit.runConduit (myconduit .| awaitForever (\msg -> do - let typ = toWord64 (getMessageType msg) - mbSendIt <- liftIO $ atomically (lookupInRangeMap typ (Tox.ncOutHooks session)) - case mbSendIt of - Just sendit -> liftIO . void $ sendit (Tox.toxCryptoKeys tox) session msg - Nothing -> return () -- do - -- uncomment to let unhooked pass thru: - -- if lossyness (msgId msg) == Lossless - -- then sendLossless (Tox.toxCryptoKeys tox) session msg - -- else sendLossy (Tox.toxCryptoKeys tox) session msg - )) - -- add hook to killThread on kill packet - atomically $ do - hooks <- readTVar (Tox.ncHooks session) - let currentHooks = fromMaybe [] $ Map.lookup (Msg KillPacket) hooks - let myhook :: Tox.NetCryptoSession -> CryptoMessage -> IO (Maybe (CryptoMessage -> CryptoMessage)) - myhook _ _ = killThread forwardThread >> return (Just id) - modifyTVar' (Tox.ncHooks session) (Map.insert (Msg KillPacket) (currentHooks ++ [myhook])) - hPutClient h "Handshake sent" - goPubkey their_pub = do - msec <- - atomically $ do - ks <- map swap <$> myKeyPairs roster - return $ Data.List.lookup mypubkey ks - case msec of - Nothing -> hPutClient h "Error getting secret key" - Just sec -> do - Tox.netCrypto tox sec their_pub - hPutClient h "Handshake sent" readExternals :: (ni -> SockAddr) -> [TVar (BucketList ni)] -> IO [SockAddr] readExternals nodeAddr vars = do @@ -1533,12 +1115,15 @@ noArgPing f [] x = f x noArgPing _ _ _ = return Nothing -- | Create a Conduit Source by repeatedly calling an IO action. -ioToSource :: IO (Maybe x) -> IO () -> C.Source IO x +ioToSource :: IO (Maybe x) -> IO () -> ConduitT () x IO () ioToSource !action !onEOF = liftIO action >>= \case - Nothing -> liftIO onEOF + Nothing -> do + dput XNetCrypto "ioToSource terminated." + liftIO onEOF Just item -> do C.yield item ioToSource action onEOF +{- newXmmpSink :: Tox.NetCryptoSession -> C.Sink (Flush Tox.CryptoMessage) IO () newXmmpSink session@(Tox.NCrypto { ncOutgoingQueue = outGoingQVar }) = C.awaitForever $ \flush_cyptomessage -> do let sendit :: Tox.NetCryptoSession -> Flush Tox.CryptoMessage -> IO () @@ -1563,80 +1148,23 @@ newXmmpSink session@(Tox.NCrypto { ncOutgoingQueue = outGoingQVar }) = C.awaitFo _ -> return () sendit session Flush = return () liftIO $ sendit session flush_cyptomessage +-} --- | Called upon a new Tox friend-connection session with a remote peer in --- order to set up translating conduits that simulate a remote XMPP server. -announceToxJabberPeer :: PublicKey -- ^ This node's long-term user key. - -> PublicKey -- ^ Remote tox node's long-term user key. - -> TChan ((SockAddr,ConnectionData), Tcp.ConnectionEvent XML.Event) - -> SockAddr -- ^ Local bind address for incoming Tox packets. - -> SockAddr -- ^ Remote address for this connection. - -> STM Bool - -> C.Source IO Tox.CryptoMessage - -> C.Sink (Flush Tox.CryptoMessage) IO () - -> IO (Maybe (Tox.NetCryptoSession -> Tox.NetCryptoSession)) -announceToxJabberPeer me them echan laddr saddr pingflag tsrc tsnk - = do - atomically $ do - v <- newTVar Nothing - writeTChan echan - ( (saddr, ConnectionData (Left (Local laddr)) XMPPServer.Tox (xmppHostname me) v) - , Tcp.Connection pingflag xsrc xsnk ) - return Nothing - where - xsrc = tsrc =$= toxToXmpp laddr me (xmppHostname them) - xsnk = flushPassThrough xmppToTox =$= tsnk - -vShowMe :: Tox.ViewSnapshot -> Int -> B.ByteString -vShowMe (Tox.ViewSnapshot { vNick, vStatus, vStatusMsg, vTyping }) indent - = B.unlines - . map doRow $ [ ["Nick: ", vNick ] - , ["Status: ", "(" <> pshow vStatus <> ") " <> vStatusMsg ] - , ["Typing: ", pshow vTyping ] - ] - where (<>) = B.append - space = B.replicate indent ' ' - doRow = B.append space . B.concat - -vShowThem :: Tox.ViewSnapshot -> Int -> B.ByteString -vShowThem (Tox.ViewSnapshot { vTheirNick, vTheirStatus, vTheirStatusMsg, vTheirTyping }) indent - = B.unlines - . map doRow $ [ ["Nick: ", vTheirNick ] - , ["Status: ", "(" <> pshow vTheirStatus <> ") " <> vTheirStatusMsg ] - , ["Typing: ", pshow vTheirTyping ] - ] - where (<>) = B.append - space = B.replicate indent ' ' - doRow = B.append space . B.concat - -showMsg ::(Word32, (Bool,(Tox.ViewSnapshot, InOrOut Tox.CryptoMessage))) -> B.ByteString -showMsg (n,(flg,(snapshot,iocm))) = B.concat [bool " " "h " flg, showmsg' (snapshot,iocm)] - where - showmsg' (snapshot,In cm) = B.concat [Tox.vTheirNick snapshot,"> ", pshow cm] - showmsg' (snapshot,Out cm) = B.concat [{-utf8boldify-} (Tox.vNick snapshot),": ",pshow cm] - utf8boldify s = boldify (T.decodeUtf8With E.lenientDecode s) - where - boldify :: T.Text -> B.ByteString - boldify j = E.encodeUtf8 $ T.map addIt j - where addIt x = let o = ord x in case o of - _ | o <= 90 && o >= 65 -> chr (o + 119743) - _ | o <= 122 && o >= 97 -> chr (o + 119737) - _ -> x - onNewToxSession :: XMPPServer -> TVar (Map.Map Uniq24 AggregateSession) -> ContactInfo extra -> SockAddr - -> Tox.NetCryptoSession + -> Tox.Session -> IO () onNewToxSession sv ssvar ContactInfo{accounts} addrTox netcrypto = do - let them s = Tox.ncTheirPublicKey s + let them s = Tox.longTermKey $ runIdentity cookie -- remote tox key + where Tox.Cookie _ cookie = Tox.handshakeCookie (sReceivedHandshake s) - me s = Tox.ncMyPublicKey s + me s = toPublic $ sOurKey s - onStatusChange :: (Tox.NetCryptoSession -> Tcp.ConnectionEvent XML.Event -> STM ()) - -> AggregateSession -> Tox.NetCryptoSession -> Status Tox.ToxProgress -> STM () + onStatusChange :: (Tox.Session -> Tcp.ConnectionEvent XML.Event -> STM ()) + -> AggregateSession -> Tox.Session -> Status Tox.ToxProgress -> STM () onStatusChange announce c s Established = onConnect announce c s onStatusChange announce _ s _ = onEOF announce s @@ -1686,7 +1214,6 @@ onNewToxSession sv ssvar ContactInfo{accounts} addrTox netcrypto = do return c Just c -> return c - atomically $ Tox.addDestroySessionHook netcrypto (Just 0) $ void . delSession c . fromIntegral . Tox.ncSessionId addSession c netcrypto return () @@ -1755,126 +1282,22 @@ selectManager mtman tcp profile = case T.splitAt 43 profile of } -main :: IO () -main = do - args <- getArgs - let opts = parseArgs args sensibleDefaults - print opts - - swarms <- Mainline.newSwarmsDatabase - -- Restore peer database before forking the listener thread. - peerdb <- left show <$> tryIOError (L.readFile "bt-peers.dat") - either (dput XMisc . ("bt-peers.dat: "++)) - (atomically . writeTVar (Mainline.contactInfo swarms)) - (peerdb >>= S.decodeLazy) - - announcer <- forkAnnouncer - - -- Default: quiet all tags (except XMisc). - forM [minBound .. maxBound] setQuiet - forM (verboseTags opts) setVerbose - - (quitBt,btdhts,btips,baddrs) <- case portbt opts of - "" -> return (return (), Map.empty,return [],[]) - p -> do - addr <- getBindAddress p (ip6bt opts) - (bt,btR,btBootstrap4, btBootstrap6) <- Mainline.newClient swarms addr - quitBt <- forkListener "bt" (clientNet bt) - mainlineSearches <- atomically $ newTVar Map.empty - peerPort <- atomically $ newTVar 6881 -- BitTorrent client TCP port. - let mainlineDHT bkts wantip = DHT - { dhtBuckets = bkts btR - , dhtPing = Map.singleton "ping" $ DHTPing - { pingQuery = noArgPing $ fmap (bool Nothing (Just ())) . Mainline.ping bt - , pingShowResult = show - } - , dhtQuery = Map.fromList - [ ("node", DHTQuery - { qsearch = (Mainline.nodeSearch bt) - , qhandler = (\ni -> fmap Mainline.unwrapNodes - . Mainline.findNodeH btR ni - . flip Mainline.FindNode (Just Want_Both)) - , qshowR = show - , qshowTok = (const Nothing) - }) - -- Maybe (sr :~: pr, stok :~: ptok, sni :~: pni ) - -- sr = InfoHash - -- stok = Token - -- sni = NodeInfo - , ("peer", DHTQuery - { qsearch = (Mainline.peerSearch bt) - , qhandler = (\ni -> fmap Mainline.unwrapPeers - . Mainline.getPeersH btR swarms ni - . flip Mainline.GetPeers (Just Want_Both) - . (read . show)) -- TODO: InfoHash -> NodeId - , qshowR = (show . pPrint) - , qshowTok = (Just . show) - }) - ] - , dhtParseId = readEither :: String -> Either String Mainline.NodeId - , dhtSearches = mainlineSearches - , dhtFallbackNodes = Mainline.bootstrapNodes wantip - , dhtAnnouncables = Map.fromList - -- Maybe (sr :~: pr, stok :~: ptok, sni :~: pni ) - -- dta = Announce - -- pr = Announced - -- ptok = Token - -- pni = NodeInfo - [ ("peer", DHTAnnouncable { announceSendData = Right $ \ih tok -> \case - Just ni -> do - port <- atomically $ readTVar peerPort - let dta = Mainline.mkAnnounce port ih tok - Mainline.announce bt dta ni - Nothing -> return Nothing - , announceParseAddress = readEither - , announceParseData = readEither - , announceParseToken = const $ readEither - , announceInterval = 60 -- TODO: Is one minute good? - , announceTarget = (read . show) -- TODO: InfoHash -> NodeId -- peer - }) - , ("port", DHTAnnouncable { announceParseData = readEither - , announceParseToken = \_ _ -> return () - , announceParseAddress = const $ Right () - , announceSendData = Right $ \dta () -> \case - Nothing -> do atomically $ writeTVar peerPort (dta :: PortNumber) - return $ Just dta - Just _ -> return Nothing - , announceInterval = 0 -- TODO: The "port" setting should probably - -- be a command rather than an announcement. - , announceTarget = const $ Mainline.zeroID - })] - - , dhtSecretKey = return Nothing - , dhtBootstrap = case wantip of - Want_IP4 -> btBootstrap4 - Want_IP6 -> btBootstrap6 - } - dhts = Map.fromList $ - ("bt4", mainlineDHT Mainline.routing4 Want_IP4) - : if ip6bt opts - then [ ("bt6", mainlineDHT Mainline.routing6 Want_IP6) ] - else [] - ips :: IO [SockAddr] - ips = readExternals Mainline.nodeAddr - [ Mainline.routing4 btR - , Mainline.routing6 btR - ] - return (quitBt,dhts,ips, [addr]) - - keysdb <- Tox.newKeysDatabase - - _crypto <- Tox.newCrypto - let emptyDestroyHook :: Tox.NetCryptoSession -> IO () - emptyDestroyHook session = dput XNetCrypto $ "SESSION DESTROY HOOK NOT ADDED ! publkey= " ++ show (Tox.key2id (Tox.ncTheirPublicKey session)) - _netCryptoSessionsState <- Tox.newSessionsState _crypto emptyDestroyHook Tox.defaultUnRecHook Tox.defaultCryptoDataHooks - (mbtox,quitTox,toxdhts,toxips,(taddrs::[SockAddr])) <- case porttox opts of +initTox :: Options + -> TVar (Map.Map Uniq24 AggregateSession) + -> TVar Tox.AnnouncedKeys -> Maybe XMPPServer -> IO ( Maybe (Tox.Tox JabberClients) , IO () + , Map.Map String DHT + , IO [SockAddr] + , [SockAddr]) +initTox opts ssvar keysdb mbxmpp = case porttox opts of "" -> return (Nothing,return (), Map.empty, return [],[]) toxport -> do addrTox <- getBindAddress toxport (ip6tox opts) dput XMisc $ "Supplied key: " ++ show (fmap (Tox.key2id . toPublic) (dhtkey opts)) tox <- Tox.newTox keysdb addrTox - (Just _netCryptoSessionsState) + (case mbxmpp of + Nothing -> \_ _ _ -> return () + Just xmpp -> onNewToxSession xmpp ssvar) (dhtkey opts) (quitTox, toxStrap4, toxStrap6) <- Tox.forkTox tox True @@ -2060,12 +1483,16 @@ main = do , Tox.routing6 $ Tox.toxRouting tox ] return (Just tox, quitTox, dhts, ips, [addrTox]) - let netCryptoSessionsState = maybe _netCryptoSessionsState Tox.toxCryptoSessions mbtox - - _ <- UPNP.requestPorts "dht-client" $ map (Datagram,) $ baddrs ++ taddrs - - ssvar <- atomically $ newTVar Map.empty - (msv,mconns,mstate) <- case portxmpp opts of +initJabber :: Options + -> TVar (Map.Map Uniq24 AggregateSession) + -> Announcer + -> Maybe (Tox.Tox JabberClients) + -> Map.Map String DHT + -> IO ( Maybe XMPPServer + , Maybe (Manager TCPStatus T.Text) + , Maybe (PresenceState Pending) + ) +initJabber opts ssvar announcer mbtox toxdhts = case portxmpp opts of "" -> return (Nothing,Nothing,Nothing) p -> do cport <- getBindAddress p True{-IPv6 supported-} @@ -2092,50 +1519,124 @@ main = do conns <- xmppConnections sv return (Just sv, Just conns, Just state) - forM_ (take 1 taddrs) $ \addrTox -> do - atomically $ Tox.addNewSessionHook netCryptoSessionsState $ \mbNoSpam netcrypto -> do - {- - -- allsessionsMap <- atomically $ readTVar (netCryptoSessions netCryptoSessionsState) - let sockAddr = Tox.ncSockAddr netcrypto - pubKey = Tox.ncTheirPublicKey netcrypto - tmchan <- atomically newTMChan - let pingflag = return False -- XMPPServer should never send pings. - -- This is taken care of by the tox layer. - receiveCrypto = atomically $ readTMChan tmchan - onEOF = return () -- setTerminate is called elsewhere. - xmppSrc = ioToSource receiveCrypto onEOF - xmppSink = newXmmpSink netcrypto - -} - forM_ msv $ \sv -> do - let Tox.HaveDHTKey saddr = Tox.ncSockAddr netcrypto - {- - Tox.HaveDHTKey dkey = Tox.ncTheirDHTKey netcrypto - nid = Tox.key2id dkey - them = Tox.ncTheirPublicKey netcrypto - me = Tox.ncMyPublicKey netcrypto - - announceToxJabberPeer me them (xmppEventChannel sv) addrTox saddr pingflag xmppSrc xmppSink - -} - - forM_ mbtox $ \tox -> do - onNewToxSession sv ssvar (Tox.toxContactInfo tox) saddr netcrypto - {- - mbacc <- HashMap.lookup (Tox.key2id me) - <$> atomically (readTVar accounts) - -- TODO: Add account if it doesn't exist? - forM_ mbacc $ \acnt -> do - now <- getPOSIXTime - forM_ (either (const Nothing) Just $ Tox.nodeInfo nid saddr) - $ \ni -> do - atomically $ do setEstablished them acnt - setContactAddr now them ni acnt - atomically $ do - supply <- readTVar (Tox.listenerIDSupply netCryptoSessionsState) - let (listenerId,supply') = freshId supply - writeTVar (Tox.listenerIDSupply netCryptoSessionsState) supply' - modifyTVar' (Tox.ncListeners netcrypto) (IntMap.insert listenerId (0,tmchan)) - -} - return Nothing +main :: IO () +main = do + args <- getArgs + let opts = parseArgs args sensibleDefaults + print opts + + swarms <- Mainline.newSwarmsDatabase + -- Restore peer database before forking the listener thread. + peerdb <- left show <$> tryIOError (L.readFile "bt-peers.dat") + either (dput XMisc . ("bt-peers.dat: "++)) + (atomically . writeTVar (Mainline.contactInfo swarms)) + (peerdb >>= S.decodeLazy) + + announcer <- forkAnnouncer + + -- Default: quiet all tags (except XMisc). + forM [minBound .. maxBound] setQuiet + forM (verboseTags opts) setVerbose + + (quitBt,btdhts,btips,baddrs) <- case portbt opts of + "" -> return (return (), Map.empty,return [],[]) + p -> do + addr <- getBindAddress p (ip6bt opts) + (bt,btR,btBootstrap4, btBootstrap6) <- Mainline.newClient swarms addr + quitBt <- forkListener "bt" (clientNet bt) + mainlineSearches <- atomically $ newTVar Map.empty + peerPort <- atomically $ newTVar 6881 -- BitTorrent client TCP port. + let mainlineDHT bkts wantip = DHT + { dhtBuckets = bkts btR + , dhtPing = Map.singleton "ping" $ DHTPing + { pingQuery = noArgPing $ fmap (bool Nothing (Just ())) . Mainline.ping bt + , pingShowResult = show + } + , dhtQuery = Map.fromList + [ ("node", DHTQuery + { qsearch = (Mainline.nodeSearch bt) + , qhandler = (\ni -> fmap Mainline.unwrapNodes + . Mainline.findNodeH btR ni + . flip Mainline.FindNode (Just Want_Both)) + , qshowR = show + , qshowTok = (const Nothing) + }) + -- Maybe (sr :~: pr, stok :~: ptok, sni :~: pni ) + -- sr = InfoHash + -- stok = Token + -- sni = NodeInfo + , ("peer", DHTQuery + { qsearch = (Mainline.peerSearch bt) + , qhandler = (\ni -> fmap Mainline.unwrapPeers + . Mainline.getPeersH btR swarms ni + . flip Mainline.GetPeers (Just Want_Both) + . (read . show)) -- TODO: InfoHash -> NodeId + , qshowR = (show . pPrint) + , qshowTok = (Just . show) + }) + ] + , dhtParseId = readEither :: String -> Either String Mainline.NodeId + , dhtSearches = mainlineSearches + , dhtFallbackNodes = Mainline.bootstrapNodes wantip + , dhtAnnouncables = Map.fromList + -- Maybe (sr :~: pr, stok :~: ptok, sni :~: pni ) + -- dta = Announce + -- pr = Announced + -- ptok = Token + -- pni = NodeInfo + [ ("peer", DHTAnnouncable { announceSendData = Right $ \ih tok -> \case + Just ni -> do + port <- atomically $ readTVar peerPort + let dta = Mainline.mkAnnounce port ih tok + Mainline.announce bt dta ni + Nothing -> return Nothing + , announceParseAddress = readEither + , announceParseData = readEither + , announceParseToken = const $ readEither + , announceInterval = 60 -- TODO: Is one minute good? + , announceTarget = (read . show) -- TODO: InfoHash -> NodeId -- peer + }) + , ("port", DHTAnnouncable { announceParseData = readEither + , announceParseToken = \_ _ -> return () + , announceParseAddress = const $ Right () + , announceSendData = Right $ \dta () -> \case + Nothing -> do atomically $ writeTVar peerPort (dta :: PortNumber) + return $ Just dta + Just _ -> return Nothing + , announceInterval = 0 -- TODO: The "port" setting should probably + -- be a command rather than an announcement. + , announceTarget = const $ Mainline.zeroID + })] + + , dhtSecretKey = return Nothing + , dhtBootstrap = case wantip of + Want_IP4 -> btBootstrap4 + Want_IP6 -> btBootstrap6 + } + dhts = Map.fromList $ + ("bt4", mainlineDHT Mainline.routing4 Want_IP4) + : if ip6bt opts + then [ ("bt6", mainlineDHT Mainline.routing6 Want_IP6) ] + else [] + ips :: IO [SockAddr] + ips = readExternals Mainline.nodeAddr + [ Mainline.routing4 btR + , Mainline.routing6 btR + ] + return (quitBt,dhts,ips, [addr]) + + keysdb <- Tox.newKeysDatabase + + ssvar <- atomically $ newTVar Map.empty + rec (mbtox,quitTox,toxdhts,toxips,(taddrs::[SockAddr]),msv,mconns,mstate) <- do + + (mbtox,quitTox,toxdhts,toxips,(taddrs::[SockAddr])) <- initTox opts ssvar keysdb msv + + (msv,mconns,mstate) <- initJabber opts ssvar announcer mbtox toxdhts + + return (mbtox,quitTox,toxdhts,toxips,(taddrs::[SockAddr]),msv,mconns,mstate) + + _ <- UPNP.requestPorts "dht-client" $ map (Datagram,) $ baddrs ++ taddrs let dhts = Map.union btdhts toxdhts @@ -2157,7 +1658,6 @@ main = do , dhts = dhts -- all DHTs , signalQuit = quitCommand , swarms = swarms - , cryptosessions = netCryptoSessionsState , toxkeys = keysdb , roster = rstr , announceToLan = fromMaybe (return ()) $ Tox.toxAnnounceToLan <$> mbtox diff --git a/src/Network/Tox.hs b/src/Network/Tox.hs index 861d71d3..88228c50 100644 --- a/src/Network/Tox.hs +++ b/src/Network/Tox.hs @@ -32,31 +32,25 @@ import qualified Data.ByteString as B import qualified Data.ByteString.Char8 as C8 import Data.Data import Data.Functor.Contravariant -import Data.IP import Data.Maybe import qualified Data.MinMaxPSQ as MinMaxPSQ import qualified Data.Serialize as S import Data.Time.Clock.POSIX (getPOSIXTime) import Data.Word +import Network.Socket +import System.Endian + +import Network.BitTorrent.DHT.Token as Token import qualified Data.Wrapper.PSQ as PSQ import System.Global6 -import Network.Address (WantIP (..)) +import Network.Address (WantIP (..),IP) import qualified Network.Kademlia.Routing as R import Network.QueryResponse -import Network.Socket -import System.Endian -import Network.BitTorrent.DHT.Token as Token - -import Connection import Crypto.Tox import Data.Word64Map (fitsInInt) import qualified Data.Word64Map (empty) -import HandshakeCache import Network.Kademlia.Bootstrap (forkPollForRefresh, bootstrap) -import Network.Kademlia.Search import Network.Tox.Crypto.Transport (Handshake(..),CryptoPacket) -import Network.Tox.Handshake -import Network.Tox.Crypto.Handlers import qualified Network.Tox.DHT.Handlers as DHT import qualified Network.Tox.DHT.Transport as DHT import Network.Tox.NodeId @@ -66,12 +60,12 @@ import Network.Tox.Transport import OnionRouter import Network.Tox.ContactInfo import Text.XXD -import qualified Data.HashMap.Strict as HashMap -import qualified Data.Map.Strict as Map import DPut import Network.Tox.Avahi -import Text.Printf -import Data.List +import Network.Tox.Session +import Network.SessionTransports +import Network.Kademlia.Search +import HandshakeCache newCrypto :: IO TransportCrypto newCrypto = do @@ -207,7 +201,6 @@ data Tox extra = Tox , toxCrypto :: Transport String SockAddr (CryptoPacket Encrypted) , toxHandshakes :: Transport String SockAddr (Handshake Encrypted) , toxHandshakeCache :: HandshakeCache - , toxCryptoSessions :: NetCryptoSessions , toxCryptoKeys :: TransportCrypto , toxRouting :: DHT.Routing , toxTokens :: TVar SessionTokens @@ -217,97 +210,7 @@ data Tox extra = Tox , toxAnnounceToLan :: IO () } --- | initiate a netcrypto session, blocking -netCrypto :: Tox extra -> SecretKey -> PublicKey{-UserKey -} -> IO [NetCryptoSession] -netCrypto tox myseckey theirpubkey = netCryptoWithBackoff 1000000 tox myseckey theirpubkey - --- | helper for 'netCrypto', initiate a netcrypto session, retry after specified millisecs -netCryptoWithBackoff :: Int -> Tox extra -> SecretKey -> PublicKey -> IO [NetCryptoSession] -netCryptoWithBackoff millisecs tox myseckey theirpubkey = do - let mykeyAsId = key2id (toPublic myseckey) - -- TODO: check status of connection here: - mbContactsVar <- fmap contacts . HashMap.lookup mykeyAsId <$> atomically (readTVar (accounts (toxContactInfo tox))) - case mbContactsVar of - Nothing -> do - dput XNetCrypto ("netCrypto: (" ++ show mykeyAsId ++") accounts lookup failed.") - return [] - - Just contactsVar -> do - let theirkeyAsId = key2id theirpubkey - mbContact <- HashMap.lookup theirkeyAsId <$> atomically (readTVar contactsVar) - tup <- atomically $ do - mc <- HashMap.lookup theirkeyAsId <$> readTVar contactsVar - kp <- fmap join $ forM mc $ \c -> readTVar (contactKeyPacket c) - sa <- fmap join $ forM mc $ \c -> readTVar (contactLastSeenAddr c) - fr <- fmap join $ forM mc $ \c -> readTVar (contactFriendRequest c) - cp <- fmap join $ forM mc $ \c -> readTVar (contactPolicy c) - return (kp,sa,fr,cp) - case tup of - (Nothing,Nothing,Nothing,Nothing) -> do - dput XNetCrypto ("netCrypto: (" ++ show mykeyAsId ++") friend not found (" ++ show theirkeyAsId ++ ").") - return [] - (mbKeyPkt,Nothing,mbFR,mbPolicy) -> do - dput XNetCrypto ("netCrypto: (" ++ show mykeyAsId ++") no SockAddr for friend (" ++ show theirkeyAsId ++ "). TODO: search their node?") - return [] - (Nothing,_,_,_) -> do - dput XNetCrypto ("netCrypto: (" ++ show mykeyAsId ++") no DHT-key for friend (" ++ show theirkeyAsId ++ "). TODO: what?") - return [] - (Just (stamp_theirDhtKey,keyPkt),Just (stamp_saddr,saddr),mbFR,mbPolicy) - | theirDhtKey <- DHT.dhtpk keyPkt -> do - -- Do we already have an active session with this user? - sessionsMap <- atomically $ readTVar (netCryptoSessionsByKey (toxCryptoSessions tox) ) - let sessionUsesIdentity key session = key == ncMyPublicKey session - case Map.lookup theirpubkey sessionsMap of - -- if sessions found, is it using this private key? - Just sessions | matchedSessions <- filter (sessionUsesIdentity (toPublic myseckey)) sessions - , not (null matchedSessions) - -> do - dput XNetCrypto ("netCrypto: Already have a session for " ++ show mykeyAsId ++ "<-->" ++ show theirkeyAsId) - return matchedSessions - -- if not, send handshake, this is separate session - _ -> do - -- if no session: - -- Convert to NodeInfo, so we can send cookieRequest - let crypto = toxCryptoKeys tox - client = toxDHT tox - case nodeInfo (key2id theirDhtKey) (nodeAddr saddr) of - Left e -> dput XNetCrypto ("netCrypto: nodeInfo fail... " ++ e) >> return [] - Right ni -> do - mbCookie <- DHT.cookieRequest crypto client (toPublic myseckey) ni - case mbCookie of - Nothing -> do - dput XNetCrypto ("netCrypto: (" ++ show mykeyAsId ++") <--> (" ++ show theirkeyAsId ++ ").") - dput XNetCrypto ("netCrypto: CookieRequest failed. TODO: dhtpkNodes thingy") - return [] - Just cookie -> do - dput XNetCrypto "Have cookie, creating handshake packet..." - let hp = HParam { hpOtherCookie = cookie - , hpMySecretKey = myseckey - , hpCookieRemotePubkey = theirpubkey - , hpCookieRemoteDhtkey = theirDhtKey - , hpTheirBaseNonce = Nothing - , hpTheirSessionKeyPublic = Nothing - } - newsession <- generateSecretKey - timestamp <- getPOSIXTime - (myhandshake,ioAction) - <- atomically $ freshCryptoSession (toxCryptoSessions tox) (nodeAddr saddr) newsession timestamp hp - ioAction - -- send handshake - forM myhandshake $ \response_handshake -> do - sendHandshake (toxCryptoSessions tox) (nodeAddr saddr) response_handshake - let secnum :: Double - secnum = fromIntegral millisecs / 1000000 - delay = (millisecs * 5 `div` 4) - if secnum < 20000000 - then do - dput XNetCrypto $ "sent handshake, now delaying " ++ show (secnum * 1.25) ++ " second(s).." - -- threadDelay delay - -- Commenting loop for simpler debugging - return [] -- netCryptoWithBackoff delay tox myseckey theirpubkey -- hopefully it will find an active session this time. - else do - dput XNetCrypto "Unable to establish session..." - return [] + -- | Create a DHTPublicKey packet to send to a remote contact. getContactInfo :: Tox extra -> IO DHT.DHTPublicKey @@ -365,30 +268,24 @@ getOnionAlias crypto dhtself remoteNode = atomically $ do newTox :: TVar Onion.AnnouncedKeys -- ^ Store of announced keys we are a rendezvous for. -> SockAddr -- ^ Bind-address to listen on. - -> Maybe NetCryptoSessions -- ^ State of all one-on-one Tox links. + -> ( ContactInfo extra -> SockAddr -> Session -> IO () ) -> Maybe SecretKey -- ^ Optional DHT secret key to use. -> IO (Tox extra) -newTox keydb addr mbSessionsState suppliedDHTKey = do +newTox keydb addr onsess suppliedDHTKey = do (udp,sock) <- {- addVerbosity <$> -} udpTransport' addr - tox <- newToxOverTransport keydb addr mbSessionsState suppliedDHTKey udp + tox <- newToxOverTransport keydb addr onsess suppliedDHTKey udp return tox { toxAnnounceToLan = announceToLan sock (key2id $ transportPublic $ toxCryptoKeys tox) } -- | This version of 'newTox' is useful for automated tests using 'testPairTransport'. newToxOverTransport :: TVar Onion.AnnouncedKeys -> SockAddr - -> Maybe NetCryptoSessions + -> ( ContactInfo extra -> SockAddr -> Session -> IO () ) -> Maybe SecretKey -> Onion.UDPTransport -> IO (Tox extra) -newToxOverTransport keydb addr mbSessionsState suppliedDHTKey udp = do +newToxOverTransport keydb addr onNewSession suppliedDHTKey udp = do roster <- newContactInfo - (crypto0,sessionsState0) <- case mbSessionsState of - Nothing -> do - crypto <- newCrypto - sessionsState <- newSessionsState crypto (const $ dput XUnexpected "Missing destroy hook!") defaultUnRecHook defaultCryptoDataHooks - return (crypto,sessionsState) - Just s -> return (transportCrypto s, s) - + crypto0 <- newCrypto let -- patch in supplied DHT key crypto1 = fromMaybe crypto0 $do k <- suppliedDHTKey @@ -409,6 +306,7 @@ newToxOverTransport keydb addr mbSessionsState suppliedDHTKey udp = do mkrouting <- DHT.newRouting addr crypto updateIP updateIP orouter <- newOnionRouter $ dput XRoutes (cryptonet,dhtcrypt,onioncrypt,dtacrypt,handshakes) <- toxTransport crypto orouter lookupClose udp + sessions <- initSessions (sendMessage cryptonet) let dhtnet0 = layerTransportM (DHT.decrypt crypto) (DHT.encrypt crypto) dhtcrypt tbl4 = DHT.routing4 $ mkrouting (error "missing client") @@ -417,22 +315,12 @@ newToxOverTransport keydb addr mbSessionsState suppliedDHTKey udp = do $ \client net -> onInbound (DHT.updateRouting client (mkrouting client) orouter) net hscache <- newHandshakeCache crypto (sendMessage handshakes) - - let sessionsState = sessionsState0 { sendHandshake = sendMessage handshakes - , sendSessionPacket = sendMessage cryptonet - , transportCrypto = crypto - -- ToxContact -> STM Policy - , netCryptoPolicyByKey = policylookup - } - policylookup (ToxContact me them) = do - macnt <- HashMap.lookup me <$> readTVar (accounts roster) - case macnt of - Nothing -> return RefusingToConnect - Just acnt -> do - mc <- HashMap.lookup them <$> readTVar (contacts acnt) - case mc of - Nothing -> return RefusingToConnect - Just c -> fromMaybe RefusingToConnect <$> readTVar (contactPolicy c) + let sparams = SessionParams + { spCrypto = crypto + , spSessions = sessions + , spGetSentHandshake = getSentHandshake hscache + , spOnNewSession = onNewSession roster addr + } orouter' <- forkRouteBuilder orouter $ \nid ni -> fmap (\(_,ns,_)->ns) @@ -453,10 +341,9 @@ newToxOverTransport keydb addr mbSessionsState suppliedDHTKey udp = do { toxDHT = dhtclient , toxOnion = onionclient , toxToRoute = onInbound (updateContactInfo roster) dtacrypt - , toxCrypto = addHandler (dput XMisc) (sessionPacketH sessionsState) cryptonet - , toxHandshakes = addHandler (dput XMisc) (handshakeH sessionsState) handshakes + , toxCrypto = addHandler (dput XMisc) (sessionHandler sessions) cryptonet + , toxHandshakes = addHandler (dput XMisc) (handshakeH sparams) handshakes , toxHandshakeCache = hscache - , toxCryptoSessions = sessionsState , toxCryptoKeys = crypto , toxRouting = mkrouting dhtclient , toxTokens = toks @@ -526,8 +413,10 @@ announceToLan sock nid = do (Just "33445") let broadcast = addrAddress broadcast_info bs = S.runPut $ DHT.putMessage (DHT.DHTLanDiscovery nid) + dput XLan $ show broadcast ++ " <-- LanAnnounce " ++ show nid saferSendTo sock bs broadcast + toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox) diff --git a/src/Network/Tox/AggregateSession.hs b/src/Network/Tox/AggregateSession.hs index edb897e0..1dd10eef 100644 --- a/src/Network/Tox/AggregateSession.hs +++ b/src/Network/Tox/AggregateSession.hs @@ -22,7 +22,6 @@ module Network.Tox.AggregateSession import Control.Concurrent.STM import Control.Concurrent.STM.TMChan -import Control.Concurrent.Supply import Control.Monad import Data.Function import qualified Data.IntMap.Strict as IntMap @@ -47,9 +46,7 @@ import Network.Tox.Crypto.Transport (CryptoMessage (..), pattern KillPacket, pattern PacketRequest) import Network.Tox.DHT.Transport (key2id) import Network.Tox.NodeId (ToxProgress (..)) -import Network.Tox.Crypto.Handlers - -type Session = NetCryptoSession +import Network.Tox.Session -- | For each component session, we track the current status. data SingleCon = SingleCon @@ -113,47 +110,94 @@ data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. | DoRequestMissing -- ^ Detect and request lost packets. deriving Enum --- | This function forks a thread to read all packets from the provided --- 'Session' and forward them to 'contactChannel' for a containing --- 'AggregateSession' +-- | This call loops until the provided sesison is closed or times out. It +-- monitors the provided (non-empty) priority queue for scheduled tasks (see +-- 'KeepAliveEvents') to perform for the connection. +keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO () +keepAlive s q = do + myThreadId >>= flip labelThread + (intercalate "." ["beacon" + , take 8 $ show $ key2id $ sTheirUserKey s + , show $ sSessionID s]) + + let outPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " <-- " ++ e + + doAlive = do + -- outPrint $ "Beacon" + sendMessage (sTransport s) () (OneByte PING) + + doRequestMissing = do + (ns,nmin) <- sMissingInbound s + -- outPrint $ "PacketRequest " ++ show (nmin,ns) + sendMessage (sTransport s) () (RequestResend PacketRequest ns) + + re tm again e io = do + io + atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm + again + + doEvent again now e = case e of + DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s) + sClose s + DoAlive -> re (now + 10) again e doAlive + DoRequestMissing -> re (now + 5) again e doRequestMissing -- tox-core does this at 1 second intervals + + fix $ \again -> do + + now <- getPOSIXTime + join $ atomically $ do + Just ( k :-> tm ) <- PSQ.findMin <$> readTVar q + return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again + else doEvent again now (toEnum k) + +-- | This function forks two threads: the 'keepAlive' beacon-sending thread and +-- a thread to read all packets from the provided 'Session' and forward them to +-- 'contactChannel' for a containing 'AggregateSession' forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId forkSession c s setStatus = forkIO $ do myThreadId >>= flip labelThread (intercalate "." ["s" - , take 8 $ show $ key2id $ ncTheirPublicKey s + , take 8 $ show $ key2id $ sTheirUserKey s , show $ sSessionID s]) - tmchan <- atomically $ do - tmchan <- newTMChan - supply <- readTVar (listenerIDSupply $ ncAllSessions s) - let (listenerId,supply') = freshId supply - writeTVar (listenerIDSupply $ ncAllSessions s) supply' - modifyTVar' (ncListeners s) (IntMap.insert listenerId (0,tmchan)) - return tmchan + + q <- atomically $ newTVar $ fromList + [ fromEnum DoAlive :-> 0 + , fromEnum DoRequestMissing :-> 0 + ] let sendPacket :: CryptoMessage -> STM () sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) - inPrint e = dput XNetCrypto $ shows (sSessionID s,ncSockAddr s) $ " --> " ++ e + inPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " --> " ++ e + + bump = do + -- inPrint $ "BUMP: " ++ show (sSessionID s) + now <- getPOSIXTime + atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15) onPacket body loop Nothing = return () onPacket body loop (Just (Left e)) = inPrint e >> loop onPacket body loop (Just (Right x)) = body loop x - awaitPacket body = fix $ (.) (fmap Right <$> atomically (readTMChan tmchan) >>=) - $ onPacket body + awaitPacket body = fix $ awaitMessage (sTransport s) . onPacket body atomically $ setStatus $ InProgress AwaitingSessionPacket - atomically $ setStatus Established - awaitPacket $ \loop x -> do - case msgID x of - KillPacket -> return () - _ -> atomically (sendPacket x) >> loop - - atomically $ setStatus Dormant - - -sSessionID :: Session -> Int -sSessionID s = fromIntegral $ ncSessionId s + awaitPacket $ \_ (online,()) -> do + when (msgID online /= ONLINE) $ do + inPrint $ "Unexpected initial packet: " ++ show (msgID online) + atomically $ do setStatus Established + sendPacket online + bump + beacon <- forkIO $ keepAlive s q + awaitPacket $ \awaitNext (x,()) -> do + bump + case msgID x of + PING -> return () + KillPacket -> sClose s + _ -> atomically $ sendPacket x + awaitNext + atomically $ setStatus Dormant + killThread beacon -- | Add a new session (in 'AwaitingSessionPacket' state) to the -- 'AggregateSession'. If the supplied session is not compatible because it is @@ -166,8 +210,8 @@ sSessionID s = fromIntegral $ ncSessionId s addSession :: AggregateSession -> Session -> IO AddResult addSession c s = do (result,mcon,replaced) <- atomically $ do - let them = ncTheirPublicKey s - me = ncMyPublicKey s + let them = sTheirUserKey s + me = toPublic $ sOurKey s compat <- checkCompatible me them c let result = case compat of Nothing -> FirstSession @@ -184,7 +228,7 @@ addSession c s = do writeTVar (contactSession c) imap' return (result,Just con,s0) - mapM_ (destroySession . singleSession) replaced + mapM_ (sClose . singleSession) replaced forM_ mcon $ \con -> forkSession c s $ \progress -> do writeTVar (singleStatus con) progress @@ -203,6 +247,7 @@ addSession c s = do return emap' writeTVar (contactEstablished c) emap' return result + -- | Information returned from 'delSession'. data DelResult = NoSession -- ^ Contact is completely disconnected. | DeletedSession -- ^ Connection removed but session remains active. @@ -230,11 +275,10 @@ delSession c sid = do writeTVar (contactSession c) imap' writeTVar (contactEstablished c) emap' return ( IntMap.lookup sid imap, IntMap.null imap') - mapM_ (destroySession . singleSession) con + mapM_ (sClose . singleSession) con return $ if r then NoSession else DeletedSession - -- | Send a packet to one or all of the component sessions in the aggregate. dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. -> CryptoMessage -> IO () @@ -242,11 +286,7 @@ dispatchMessage c msid msg = join $ atomically $ do imap <- readTVar (contactSession c) let go = case msid of Nothing -> forM_ imap Just sid -> forM_ (IntMap.lookup sid imap) - return $ go $ \con -> do - eResult <- sendLossless (transportCrypto (ncAllSessions (singleSession con))) (singleSession con) msg - case eResult of - Left msg -> dput XJabber msg - Right pkt -> dput XJabber ("sendLossLess SUCCESS: " ++ show pkt) + return $ go $ \con -> sendMessage (sTransport $ singleSession con) () msg -- | Retry until: -- @@ -287,7 +327,6 @@ aggregateStatus c = do | not (IntMap.null imap) -> InProgress AwaitingSessionPacket | otherwise -> Dormant - -- | Query whether the supplied ToxID keys are compatible with this aggregate. -- -- [ Nothing ] Any keys would be compatible because there is not yet any @@ -304,8 +343,8 @@ checkCompatible me them c = do imap <- readTVar (contactSession c) return $ case IntMap.elems imap of _ | isclosed -> Just False -- All keys are incompatible (closed). - con:_ -> Just $ ncTheirPublicKey (singleSession con) == them - && (ncMyPublicKey $ singleSession con) == me + con:_ -> Just $ sTheirUserKey (singleSession con) == them + && toPublic (sOurKey $ singleSession con) == me [] -> Nothing -- | Returns the local and remote keys that are compatible with this aggregate. @@ -317,6 +356,6 @@ compatibleKeys c = do imap <- readTVar (contactSession c) return $ case IntMap.elems imap of _ | isclosed -> Nothing -- none. - con:_ -> Just ( ncMyPublicKey $ singleSession con - , ncTheirPublicKey (singleSession con)) + con:_ -> Just ( toPublic (sOurKey $ singleSession con) + , sTheirUserKey (singleSession con)) [] -> Nothing -- any. -- cgit v1.2.3