1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
|
-- |
--
-- This module defines three tasks intended to be run in separate threads:
--
-- * 'acceptContact'
--
-- * 'pursueContact'
--
-- * 'freshenContact'
--
{-# LANGUAGE CPP #-}
{-# LANGUAGE LambdaCase #-}
module Connection.Tox.Threads
( PursueContactMethods(..)
, FreshenContactMethods(..)
, pursueContact
) where
import Connection
-- import Connection.Tox
import Crypto.Tox
import Data.IP (IP)
import Network.Tox.Crypto.Transport
import Network.Tox.Crypto.Handlers
import Network.Tox.NodeId
import Network.Tox.ContactInfo
import Network.Tox.Handshake
import Network.Tox.DHT.Handlers {- (nodeSearch) -} as DHT
import Network.Tox.DHT.Transport as DHT (dhtpk)
import Network.Socket
import Network.Kademlia.Search
import Network.Kademlia.Routing (BucketList)
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
import GHC.Conc (labelThread)
#endif
import Control.Arrow
import Control.Concurrent.STM
import Control.Monad
import Data.Function
import Data.Functor.Identity
import Data.Time.Clock.POSIX
import System.IO
import System.Timeout
import DPut
type NodeSearch = Search NodeId (IP,PortNumber) () NodeInfo NodeInfo
data AcceptContactMethods = AcceptContactMethods
{ getHandshake :: STM (Handshake Identity)
, handshakeIsSuitable :: Handshake Identity -> STM Bool
, transitionToState :: Status ToxProgress -> STM ()
}
-- | Invokes an STM action on each incoming handshake.
--
-- Does not return until getPolicy yields RefusingToConnect.
acceptContact :: STM Policy -> AcceptContactMethods -> (Status ToxProgress -> STM ()) -> IO ()
acceptContact getPolicy AcceptContactMethods{..} writeState = fix $ \loop -> do
join $ atomically $ do
orElse
(getPolicy >>= \case
RefusingToConnect -> do writeState Dormant
return $ return () -- QUIT Dormant/Established
_ -> retry)
(do hs <- getHandshake
handshakeIsSuitable hs >>= \case
True -> do
-- Here we allocate a NetCrypto session for handling CryptoPacket.
writeState (InProgress AwaitingSessionPacket)
transitionToState (InProgress AwaitingSessionPacket)
return loop
False -> return loop)
whileTryingAndNotEstablished :: STM Policy
-> STM (Status t)
-> TVar (Status ToxProgress)
-> ((Int -> IO ()) -> STM (IO ()))
-> IO ()
whileTryingAndNotEstablished getPolicy getStatus statusVar body = fix $ \loop -> do
let retryWhileTrying k = getPolicy >>= \case
TryingToConnect -> retry
_ -> do writeTVar statusVar Dormant
return k
ifEstablished t e = getStatus >>= \case
Established -> t
_ -> e
retryAfterTimeout interval = do
timeout interval $ atomically
$ orElse
(retryWhileTrying ())
(ifEstablished (return ()) retry)
loop
join $ atomically $ orElse
(retryWhileTrying (return ())) -- QUIT Dormant/Established
(ifEstablished retry
(body retryAfterTimeout))
data PursueContactMethods = PursueContactMethods
{ allsessions :: NetCryptoSessions
, myseckey :: SecretKey
, theirpubkey :: PublicKey
, client :: DHT.Client
, shortRetryInterval :: Int -- successful cookie, try again soon.
, longRetryInterval :: Int -- no cookie, he's offline, give it some time.
, contact :: Contact
}
retryUntilJust :: TVar (Maybe a) -> STM a
retryUntilJust tvar = maybe retry return =<< readTVar tvar
-- | Continuously attempt to send handshake packets until a connection is
-- established.
--
-- As long as getPolicy is TryingToConnect and there is no established
-- connection, this function will continue.
pursueContact :: STM Policy
-> STM (Status t)
-> PursueContactMethods
-> TVar (Status ToxProgress)
-> IO ()
pursueContact getPolicy getStatus PursueContactMethods{..} statusVar = do
-- AwaitingDHTKey
atomically $ writeTVar statusVar (InProgress AwaitingDHTKey)
whileTryingAndNotEstablished getPolicy getStatus statusVar
$ \retryAfterTimeout ->
orElse (do
readTVar statusVar >>= check . (/= InProgress AcquiringIPAddress)
(stamp_theirDhtKey,theirDhtKey) <- second DHT.dhtpk <$> retryUntilJust (contactKeyPacket contact)
-- We don't have an IP address yet.
maybe (return ()) (const retry) =<< readTVar (contactLastSeenAddr contact)
return $ do -- AcquiringIPAddress
atomically $ writeTVar statusVar (InProgress AcquiringIPAddress)
retryAfterTimeout 0)
(do
(stamp_theirDhtKey,theirDhtKey) <- second DHT.dhtpk <$> retryUntilJust (contactKeyPacket contact)
(stamp_saddr,saddr) <- retryUntilJust (contactLastSeenAddr contact)
ni <- either (const retry) return $ nodeInfo (key2id theirDhtKey) (_fixme saddr)
return $ do
-- AcquiringCookie
atomically $ writeTVar statusVar (InProgress AcquiringCookie)
let mykeyAsId = key2id (toPublic myseckey)
theirkeyAsId = key2id theirpubkey
crypto = transportCrypto allsessions
mbCookie <- -- TODO: Check for recent cached cookie.
DHT.cookieRequest crypto client (toPublic myseckey) ni
interval <- case mbCookie of
Nothing -> do
dput XMan ("pursueContact: (" ++ show mykeyAsId ++") <--> (" ++ show theirkeyAsId ++ ").")
dput XMan ("pursueContact: CookieRequest failed. TODO: dhtpkNodes thingy")
return longRetryInterval
Just cookie -> do
dput XMan "Have cookie, creating handshake packet..."
let hp = HParam { hpOtherCookie = cookie
, hpMySecretKey = myseckey
, hpCookieRemotePubkey = theirpubkey
, hpCookieRemoteDhtkey = theirDhtKey
, hpTheirBaseNonce = Nothing
, hpTheirSessionKeyPublic = Nothing
}
newsession <- generateSecretKey
timestamp <- getPOSIXTime
(myhandshake,ioAction)
<- atomically $ freshCryptoSession allsessions (_fixme saddr) newsession timestamp hp
ioAction
-- send handshake
forM myhandshake $ \response_handshake -> do
sendHandshake allsessions (_fixme saddr) response_handshake
atomically $ writeTVar statusVar $ InProgress AwaitingHandshake
return shortRetryInterval
-- AwaitingHandshake
-- AwaitingSessionPacket
retryAfterTimeout interval)
data FreshenContactMethods = FreshenContactMethods
{ dhtkeyInterval :: Int
, sockAddrInterval :: Int
, nodeSch :: NodeSearch
, getDHTKey :: STM (Maybe NodeId)
, getSockAddr :: STM (Maybe SockAddr)
, nearestNodes :: NodeId -> STM [NodeInfo]
}
-- send my dht key
-- search for their sockaddr
-- monitor new dht key
-- monitor new sockaddr
--
-- Keep going while TryingToConnect
-- pause while Established
-- Useful:
-- toxidSearch onionTimeout
-- newSearch
-- searchLoop
-- searchCancel
-- -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching.
-- | Continuously search the DHT to obtain ip addresses and to send your dht
-- key to contacts.
--
-- As long as getPolicy is TryingToConnect and there is no established
-- connection, this function will continue.
freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods
-> TVar (Status ToxProgress)
-> IO ()
freshenContact getPolicy getStatus FreshenContactMethods{..} statusVar
= whileTryingAndNotEstablished getPolicy getStatus statusVar
-- retryAfterTimeout :: Int -> IO ()
$ \retryAfterTimeout ->
getDHTKey >>= \case
Nothing -> -- AwaitingDHTKey
retry
Just dk -> getSockAddr >>= \case
Nothing -> do -- AcquiringIPAddress
writeTVar statusVar (InProgress AcquiringIPAddress)
return $
do st <- atomically $ do
ns <- nearestNodes dk
newSearch nodeSch dk ns
-- forked simply to avoid relabeling this thread.
forkIO $ searchLoop nodeSch dk (const $ return True) st
-- TODO: searchCancel on stop condition
atomically $ searchIsFinished st >>= check
retryAfterTimeout sockAddrInterval
Just a -> do
writeTVar statusVar (InProgress AcquiringCookie)
return $
-- AcquiringCookie
-- AwaitingHandshake
-- AwaitingSessionPacket
do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0
retryAfterTimeout dhtkeyInterval
|