1-- |
3-- This module defines three tasks intended to be run in separate threads:
5-- * 'acceptContact'
7-- * 'pursueContact'
9-- * 'freshenContact'
11{-# LANGUAGE CPP #-}
12{-# LANGUAGE LambdaCase #-}
13module Connection.Tox.Threads
14 ( PursueContactMethods(..)
15 , FreshenContactMethods(..)
16 , pursueContact
17 ) where
19import Connection
20-- import Connection.Tox
21import Crypto.Tox
22import Data.IP (IP)
23import Network.Tox.Crypto.Transport
24import Network.Tox.Crypto.Handlers
25import Network.Tox.NodeId
26import Network.Tox.ContactInfo
27import Network.Tox.Handshake
28import Network.Tox.DHT.Handlers {- (nodeSearch) -} as DHT
29import Network.Tox.DHT.Transport as DHT (dhtpk)
30import Network.Socket
31import Network.Kademlia.Search
32import Network.Kademlia.Routing (BucketList)
34import Control.Concurrent.Lifted.Instrument
36import Control.Concurrent.Lifted
37import GHC.Conc (labelThread)
40import Control.Arrow
41import Control.Concurrent.STM
42import Control.Monad
43import Data.Function
44import Data.Functor.Identity
45import Data.Time.Clock.POSIX
46import System.IO
47import System.Timeout
48import DPut
52type NodeSearch = Search NodeId (IP,PortNumber) () NodeInfo NodeInfo
54data AcceptContactMethods = AcceptContactMethods
55 { getHandshake :: STM (Handshake Identity)
56 , handshakeIsSuitable :: Handshake Identity -> STM Bool
57 , transitionToState :: Status ToxProgress -> STM ()
58 }
60-- | Invokes an STM action on each incoming handshake.
62-- Does not return until getPolicy yields RefusingToConnect.
63acceptContact :: STM Policy -> AcceptContactMethods -> (Status ToxProgress -> STM ()) -> IO ()
64acceptContact getPolicy AcceptContactMethods{..} writeState = fix $ \loop -> do
65 join $ atomically $ do
66 orElse
67 (getPolicy >>= \case
68 RefusingToConnect -> do writeState Dormant
69 return $ return () -- QUIT Dormant/Established
70 _ -> retry)
71 (do hs <- getHandshake
72 handshakeIsSuitable hs >>= \case
73 True -> do
74 -- Here we allocate a NetCrypto session for handling CryptoPacket.
75 writeState (InProgress AwaitingSessionPacket)
76 transitionToState (InProgress AwaitingSessionPacket)
77 return loop
78 False -> return loop)
80whileTryingAndNotEstablished :: STM Policy
81 -> STM (Status t)
82 -> TVar (Status ToxProgress)
83 -> ((Int -> IO ()) -> STM (IO ()))
84 -> IO ()
85whileTryingAndNotEstablished getPolicy getStatus statusVar body = fix $ \loop -> do
86 let retryWhileTrying k = getPolicy >>= \case
87 TryingToConnect -> retry
88 _ -> do writeTVar statusVar Dormant
89 return k
90 ifEstablished t e = getStatus >>= \case
91 Established -> t
92 _ -> e
93 retryAfterTimeout interval = do
94 timeout interval $ atomically
95 $ orElse
96 (retryWhileTrying ())
97 (ifEstablished (return ()) retry)
98 loop
99 join $ atomically $ orElse
100 (retryWhileTrying (return ())) -- QUIT Dormant/Established
101 (ifEstablished retry
102 (body retryAfterTimeout))
104data PursueContactMethods = PursueContactMethods
105 { allsessions :: NetCryptoSessions
106 , myseckey :: SecretKey
107 , theirpubkey :: PublicKey
108 , client :: DHT.Client
109 , shortRetryInterval :: Int -- successful cookie, try again soon.
110 , longRetryInterval :: Int -- no cookie, he's offline, give it some time.
111 , contact :: Contact
112 }
114retryUntilJust :: TVar (Maybe a) -> STM a
115retryUntilJust tvar = maybe retry return =<< readTVar tvar
117-- | Continuously attempt to send handshake packets until a connection is
118-- established.
120-- As long as getPolicy is TryingToConnect and there is no established
121-- connection, this function will continue.
122pursueContact :: STM Policy
123 -> STM (Status t)
124 -> PursueContactMethods
125 -> TVar (Status ToxProgress)
126 -> IO ()
127pursueContact getPolicy getStatus PursueContactMethods{..} statusVar = do
128 -- AwaitingDHTKey
129 atomically $ writeTVar statusVar (InProgress AwaitingDHTKey)
130 whileTryingAndNotEstablished getPolicy getStatus statusVar
131 $ \retryAfterTimeout ->
132 orElse (do
133 readTVar statusVar >>= check . (/= InProgress AcquiringIPAddress)
134 (stamp_theirDhtKey,theirDhtKey) <- second DHT.dhtpk <$> retryUntilJust (contactKeyPacket contact)
135 -- We don't have an IP address yet.
136 maybe (return ()) (const retry) =<< readTVar (contactLastSeenAddr contact)
137 return $ do -- AcquiringIPAddress
138 atomically $ writeTVar statusVar (InProgress AcquiringIPAddress)
139 retryAfterTimeout 0)
140 (do
141 (stamp_theirDhtKey,theirDhtKey) <- second DHT.dhtpk <$> retryUntilJust (contactKeyPacket contact)
142 (stamp_saddr,saddr) <- retryUntilJust (contactLastSeenAddr contact)
143 ni <- either (const retry) return $ nodeInfo (key2id theirDhtKey) (_fixme saddr)
144 return $ do
145 -- AcquiringCookie
146 atomically $ writeTVar statusVar (InProgress AcquiringCookie)
147 let mykeyAsId = key2id (toPublic myseckey)
148 theirkeyAsId = key2id theirpubkey
149 crypto = transportCrypto allsessions
150 mbCookie <- -- TODO: Check for recent cached cookie.
151 DHT.cookieRequest crypto client (toPublic myseckey) ni
152 interval <- case mbCookie of
153 Nothing -> do
154 dput XMan ("pursueContact: (" ++ show mykeyAsId ++") <--> (" ++ show theirkeyAsId ++ ").")
155 dput XMan ("pursueContact: CookieRequest failed. TODO: dhtpkNodes thingy")
156 return longRetryInterval
157 Just cookie -> do
158 dput XMan "Have cookie, creating handshake packet..."
159 let hp = HParam { hpOtherCookie = cookie
160 , hpMySecretKey = myseckey
161 , hpCookieRemotePubkey = theirpubkey
162 , hpCookieRemoteDhtkey = theirDhtKey
163 , hpTheirBaseNonce = Nothing
164 , hpTheirSessionKeyPublic = Nothing
165 }
166 newsession <- generateSecretKey
167 timestamp <- getPOSIXTime
168 (myhandshake,ioAction)
169 <- atomically $ freshCryptoSession allsessions (_fixme saddr) newsession timestamp hp
170 ioAction
171 -- send handshake
172 forM myhandshake $ \response_handshake -> do
173 sendHandshake allsessions (_fixme saddr) response_handshake
174 atomically $ writeTVar statusVar $ InProgress AwaitingHandshake
175 return shortRetryInterval
176 -- AwaitingHandshake
177 -- AwaitingSessionPacket
178 retryAfterTimeout interval)
180data FreshenContactMethods = FreshenContactMethods
181 { dhtkeyInterval :: Int
182 , sockAddrInterval :: Int
183 , nodeSch :: NodeSearch
184 , getDHTKey :: STM (Maybe NodeId)
185 , getSockAddr :: STM (Maybe SockAddr)
186 , nearestNodes :: NodeId -> STM [NodeInfo]
187 }
189-- send my dht key
190-- search for their sockaddr
191-- monitor new dht key
192-- monitor new sockaddr
194-- Keep going while TryingToConnect
195-- pause while Established
197-- Useful:
198-- toxidSearch onionTimeout
199-- newSearch
200-- searchLoop
201-- searchCancel
202-- -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching.
204-- | Continuously search the DHT to obtain ip addresses and to send your dht
205-- key to contacts.
207-- As long as getPolicy is TryingToConnect and there is no established
208-- connection, this function will continue.
209freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods
210 -> TVar (Status ToxProgress)
211 -> IO ()
212freshenContact getPolicy getStatus FreshenContactMethods{..} statusVar
213 = whileTryingAndNotEstablished getPolicy getStatus statusVar
214 -- retryAfterTimeout :: Int -> IO ()
215 $ \retryAfterTimeout ->
216 getDHTKey >>= \case
217 Nothing -> -- AwaitingDHTKey
218 retry
219 Just dk -> getSockAddr >>= \case
220 Nothing -> do -- AcquiringIPAddress
221 writeTVar statusVar (InProgress AcquiringIPAddress)
222 return $
223 do st <- atomically $ do
224 ns <- nearestNodes dk
225 newSearch nodeSch dk ns
226 -- forked simply to avoid relabeling this thread.
227 forkIO $ searchLoop nodeSch dk (const $ return True) st
228 -- TODO: searchCancel on stop condition
229 atomically $ searchIsFinished st >>= check
230 retryAfterTimeout sockAddrInterval
231 Just a -> do
232 writeTVar statusVar (InProgress AcquiringCookie)
233 return $
234 -- AcquiringCookie
235 -- AwaitingHandshake
236 -- AwaitingSessionPacket
237 do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0
238 retryAfterTimeout dhtkeyInterval