diff options
Diffstat (limited to 'Connection/Tox')
-rw-r--r-- | Connection/Tox/Threads.hs | 239 |
1 files changed, 0 insertions, 239 deletions
diff --git a/Connection/Tox/Threads.hs b/Connection/Tox/Threads.hs deleted file mode 100644 index de719655..00000000 --- a/Connection/Tox/Threads.hs +++ /dev/null | |||
@@ -1,239 +0,0 @@ | |||
1 | -- | | ||
2 | -- | ||
3 | -- This module defines three tasks intended to be run in separate threads: | ||
4 | -- | ||
5 | -- * 'acceptContact' | ||
6 | -- | ||
7 | -- * 'pursueContact' | ||
8 | -- | ||
9 | -- * 'freshenContact' | ||
10 | -- | ||
11 | {-# LANGUAGE CPP #-} | ||
12 | {-# LANGUAGE LambdaCase #-} | ||
13 | module Connection.Tox.Threads | ||
14 | ( PursueContactMethods(..) | ||
15 | , FreshenContactMethods(..) | ||
16 | , pursueContact | ||
17 | ) where | ||
18 | |||
19 | import Connection | ||
20 | -- import Connection.Tox | ||
21 | import Crypto.Tox | ||
22 | import Data.IP (IP) | ||
23 | import Network.Tox.Crypto.Transport | ||
24 | import Network.Tox.Crypto.Handlers | ||
25 | import Network.Tox.NodeId | ||
26 | import Network.Tox.ContactInfo | ||
27 | import Network.Tox.Handshake | ||
28 | import Network.Tox.DHT.Handlers {- (nodeSearch) -} as DHT | ||
29 | import Network.Tox.DHT.Transport as DHT (dhtpk) | ||
30 | import Network.Socket | ||
31 | import Network.Kademlia.Search | ||
32 | import Network.Kademlia.Routing (BucketList) | ||
33 | #ifdef THREAD_DEBUG | ||
34 | import Control.Concurrent.Lifted.Instrument | ||
35 | #else | ||
36 | import Control.Concurrent.Lifted | ||
37 | import GHC.Conc (labelThread) | ||
38 | #endif | ||
39 | |||
40 | import Control.Arrow | ||
41 | import Control.Concurrent.STM | ||
42 | import Control.Monad | ||
43 | import Data.Function | ||
44 | import Data.Functor.Identity | ||
45 | import Data.Time.Clock.POSIX | ||
46 | import System.IO | ||
47 | import System.Timeout | ||
48 | import DPut | ||
49 | |||
50 | |||
51 | |||
52 | type NodeSearch = Search NodeId (IP,PortNumber) () NodeInfo NodeInfo | ||
53 | |||
54 | data AcceptContactMethods = AcceptContactMethods | ||
55 | { getHandshake :: STM (Handshake Identity) | ||
56 | , handshakeIsSuitable :: Handshake Identity -> STM Bool | ||
57 | , transitionToState :: Status ToxProgress -> STM () | ||
58 | } | ||
59 | |||
60 | -- | Invokes an STM action on each incoming handshake. | ||
61 | -- | ||
62 | -- Does not return until getPolicy yields RefusingToConnect. | ||
63 | acceptContact :: STM Policy -> AcceptContactMethods -> (Status ToxProgress -> STM ()) -> IO () | ||
64 | acceptContact getPolicy AcceptContactMethods{..} writeState = fix $ \loop -> do | ||
65 | join $ atomically $ do | ||
66 | orElse | ||
67 | (getPolicy >>= \case | ||
68 | RefusingToConnect -> do writeState Dormant | ||
69 | return $ return () -- QUIT Dormant/Established | ||
70 | _ -> retry) | ||
71 | (do hs <- getHandshake | ||
72 | handshakeIsSuitable hs >>= \case | ||
73 | True -> do | ||
74 | -- Here we allocate a NetCrypto session for handling CryptoPacket. | ||
75 | writeState (InProgress AwaitingSessionPacket) | ||
76 | transitionToState (InProgress AwaitingSessionPacket) | ||
77 | return loop | ||
78 | False -> return loop) | ||
79 | |||
80 | whileTryingAndNotEstablished :: STM Policy | ||
81 | -> STM (Status t) | ||
82 | -> TVar (Status ToxProgress) | ||
83 | -> ((Int -> IO ()) -> STM (IO ())) | ||
84 | -> IO () | ||
85 | whileTryingAndNotEstablished getPolicy getStatus statusVar body = fix $ \loop -> do | ||
86 | let retryWhileTrying k = getPolicy >>= \case | ||
87 | TryingToConnect -> retry | ||
88 | _ -> do writeTVar statusVar Dormant | ||
89 | return k | ||
90 | ifEstablished t e = getStatus >>= \case | ||
91 | Established -> t | ||
92 | _ -> e | ||
93 | retryAfterTimeout interval = do | ||
94 | timeout interval $ atomically | ||
95 | $ orElse | ||
96 | (retryWhileTrying ()) | ||
97 | (ifEstablished (return ()) retry) | ||
98 | loop | ||
99 | join $ atomically $ orElse | ||
100 | (retryWhileTrying (return ())) -- QUIT Dormant/Established | ||
101 | (ifEstablished retry | ||
102 | (body retryAfterTimeout)) | ||
103 | |||
104 | data PursueContactMethods = PursueContactMethods | ||
105 | { allsessions :: NetCryptoSessions | ||
106 | , myseckey :: SecretKey | ||
107 | , theirpubkey :: PublicKey | ||
108 | , client :: DHT.Client | ||
109 | , shortRetryInterval :: Int -- successful cookie, try again soon. | ||
110 | , longRetryInterval :: Int -- no cookie, he's offline, give it some time. | ||
111 | , contact :: Contact | ||
112 | } | ||
113 | |||
114 | retryUntilJust :: TVar (Maybe a) -> STM a | ||
115 | retryUntilJust tvar = maybe retry return =<< readTVar tvar | ||
116 | |||
117 | -- | Continuously attempt to send handshake packets until a connection is | ||
118 | -- established. | ||
119 | -- | ||
120 | -- As long as getPolicy is TryingToConnect and there is no established | ||
121 | -- connection, this function will continue. | ||
122 | pursueContact :: STM Policy | ||
123 | -> STM (Status t) | ||
124 | -> PursueContactMethods | ||
125 | -> TVar (Status ToxProgress) | ||
126 | -> IO () | ||
127 | pursueContact getPolicy getStatus PursueContactMethods{..} statusVar = do | ||
128 | -- AwaitingDHTKey | ||
129 | atomically $ writeTVar statusVar (InProgress AwaitingDHTKey) | ||
130 | whileTryingAndNotEstablished getPolicy getStatus statusVar | ||
131 | $ \retryAfterTimeout -> | ||
132 | orElse (do | ||
133 | readTVar statusVar >>= check . (/= InProgress AcquiringIPAddress) | ||
134 | (stamp_theirDhtKey,theirDhtKey) <- second DHT.dhtpk <$> retryUntilJust (contactKeyPacket contact) | ||
135 | -- We don't have an IP address yet. | ||
136 | maybe (return ()) (const retry) =<< readTVar (contactLastSeenAddr contact) | ||
137 | return $ do -- AcquiringIPAddress | ||
138 | atomically $ writeTVar statusVar (InProgress AcquiringIPAddress) | ||
139 | retryAfterTimeout 0) | ||
140 | (do | ||
141 | (stamp_theirDhtKey,theirDhtKey) <- second DHT.dhtpk <$> retryUntilJust (contactKeyPacket contact) | ||
142 | (stamp_saddr,saddr) <- retryUntilJust (contactLastSeenAddr contact) | ||
143 | ni <- either (const retry) return $ nodeInfo (key2id theirDhtKey) (_fixme saddr) | ||
144 | return $ do | ||
145 | -- AcquiringCookie | ||
146 | atomically $ writeTVar statusVar (InProgress AcquiringCookie) | ||
147 | let mykeyAsId = key2id (toPublic myseckey) | ||
148 | theirkeyAsId = key2id theirpubkey | ||
149 | crypto = transportCrypto allsessions | ||
150 | mbCookie <- -- TODO: Check for recent cached cookie. | ||
151 | DHT.cookieRequest crypto client (toPublic myseckey) ni | ||
152 | interval <- case mbCookie of | ||
153 | Nothing -> do | ||
154 | dput XMan ("pursueContact: (" ++ show mykeyAsId ++") <--> (" ++ show theirkeyAsId ++ ").") | ||
155 | dput XMan ("pursueContact: CookieRequest failed. TODO: dhtpkNodes thingy") | ||
156 | return longRetryInterval | ||
157 | Just cookie -> do | ||
158 | dput XMan "Have cookie, creating handshake packet..." | ||
159 | let hp = HParam { hpOtherCookie = cookie | ||
160 | , hpMySecretKey = myseckey | ||
161 | , hpCookieRemotePubkey = theirpubkey | ||
162 | , hpCookieRemoteDhtkey = theirDhtKey | ||
163 | , hpTheirBaseNonce = Nothing | ||
164 | , hpTheirSessionKeyPublic = Nothing | ||
165 | } | ||
166 | newsession <- generateSecretKey | ||
167 | timestamp <- getPOSIXTime | ||
168 | (myhandshake,ioAction) | ||
169 | <- atomically $ freshCryptoSession allsessions (_fixme saddr) newsession timestamp hp | ||
170 | ioAction | ||
171 | -- send handshake | ||
172 | forM myhandshake $ \response_handshake -> do | ||
173 | sendHandshake allsessions (_fixme saddr) response_handshake | ||
174 | atomically $ writeTVar statusVar $ InProgress AwaitingHandshake | ||
175 | return shortRetryInterval | ||
176 | -- AwaitingHandshake | ||
177 | -- AwaitingSessionPacket | ||
178 | retryAfterTimeout interval) | ||
179 | |||
180 | data FreshenContactMethods = FreshenContactMethods | ||
181 | { dhtkeyInterval :: Int | ||
182 | , sockAddrInterval :: Int | ||
183 | , nodeSch :: NodeSearch | ||
184 | , getDHTKey :: STM (Maybe NodeId) | ||
185 | , getSockAddr :: STM (Maybe SockAddr) | ||
186 | , nearestNodes :: NodeId -> STM [NodeInfo] | ||
187 | } | ||
188 | |||
189 | -- send my dht key | ||
190 | -- search for their sockaddr | ||
191 | -- monitor new dht key | ||
192 | -- monitor new sockaddr | ||
193 | -- | ||
194 | -- Keep going while TryingToConnect | ||
195 | -- pause while Established | ||
196 | |||
197 | -- Useful: | ||
198 | -- toxidSearch onionTimeout | ||
199 | -- newSearch | ||
200 | -- searchLoop | ||
201 | -- searchCancel | ||
202 | -- -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. | ||
203 | |||
204 | -- | Continuously search the DHT to obtain ip addresses and to send your dht | ||
205 | -- key to contacts. | ||
206 | -- | ||
207 | -- As long as getPolicy is TryingToConnect and there is no established | ||
208 | -- connection, this function will continue. | ||
209 | freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods | ||
210 | -> TVar (Status ToxProgress) | ||
211 | -> IO () | ||
212 | freshenContact getPolicy getStatus FreshenContactMethods{..} statusVar | ||
213 | = whileTryingAndNotEstablished getPolicy getStatus statusVar | ||
214 | -- retryAfterTimeout :: Int -> IO () | ||
215 | $ \retryAfterTimeout -> | ||
216 | getDHTKey >>= \case | ||
217 | Nothing -> -- AwaitingDHTKey | ||
218 | retry | ||
219 | Just dk -> getSockAddr >>= \case | ||
220 | Nothing -> do -- AcquiringIPAddress | ||
221 | writeTVar statusVar (InProgress AcquiringIPAddress) | ||
222 | return $ | ||
223 | do st <- atomically $ do | ||
224 | ns <- nearestNodes dk | ||
225 | newSearch nodeSch dk ns | ||
226 | -- forked simply to avoid relabeling this thread. | ||
227 | forkIO $ searchLoop nodeSch dk (const $ return True) st | ||
228 | -- TODO: searchCancel on stop condition | ||
229 | atomically $ searchIsFinished st >>= check | ||
230 | retryAfterTimeout sockAddrInterval | ||
231 | Just a -> do | ||
232 | writeTVar statusVar (InProgress AcquiringCookie) | ||
233 | return $ | ||
234 | -- AcquiringCookie | ||
235 | -- AwaitingHandshake | ||
236 | -- AwaitingSessionPacket | ||
237 | do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0 | ||
238 | retryAfterTimeout dhtkeyInterval | ||
239 | |||