diff options
author | joe <joe@jerkface.net> | 2018-06-12 00:11:17 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2018-06-12 00:11:17 -0400 |
commit | 66ee00b2b74eea4258314a66b7599da7606a6539 (patch) | |
tree | 2bcb092494b3cd48537cd175caf099d552377d55 /Connection | |
parent | 4cb899c4af5e2933c39e295633164321b3420795 (diff) |
Started Tox connection management helper threads.
Diffstat (limited to 'Connection')
-rw-r--r-- | Connection/Tox/Threads.hs | 150 |
1 files changed, 150 insertions, 0 deletions
diff --git a/Connection/Tox/Threads.hs b/Connection/Tox/Threads.hs new file mode 100644 index 00000000..8b19c7cf --- /dev/null +++ b/Connection/Tox/Threads.hs | |||
@@ -0,0 +1,150 @@ | |||
1 | -- | | ||
2 | -- | ||
3 | -- This module defines three tasks intended to be run in separate threads: | ||
4 | -- | ||
5 | -- * 'acceptContact' | ||
6 | -- | ||
7 | -- * 'persueContact' | ||
8 | -- | ||
9 | -- * 'freshenContact' | ||
10 | -- | ||
11 | {-# LANGUAGE LambdaCase #-} | ||
12 | module Connection.Tox.Threads where | ||
13 | |||
14 | import Connection | ||
15 | import Connection.Tox | ||
16 | import Data.IP (IP) | ||
17 | import Network.Tox.Crypto.Transport | ||
18 | import Network.Tox.NodeId | ||
19 | -- import Network.Tox.DHT.Handlers (nodeSearch) | ||
20 | import Network.Socket | ||
21 | import Network.Kademlia.Search | ||
22 | import Network.Kademlia.Routing (BucketList) | ||
23 | |||
24 | import Control.Concurrent.STM | ||
25 | import Control.Monad | ||
26 | import Data.Function | ||
27 | import Data.Functor.Identity | ||
28 | import System.Timeout | ||
29 | |||
30 | type NodeSearch = Search NodeId (IP,PortNumber) () NodeInfo NodeInfo | ||
31 | |||
32 | data AcceptContactMethods = AcceptContactMethods | ||
33 | { getHandshake :: STM (Handshake Identity) | ||
34 | , handshakeIsSuitable :: Handshake Identity -> STM Bool | ||
35 | , transitionToState :: Status ToxProgress -> STM () | ||
36 | } | ||
37 | |||
38 | -- | Invokes an STM action on each incoming handshake. | ||
39 | -- | ||
40 | -- Does not return until getPolicy yields RefusingToConnect. | ||
41 | acceptContact :: STM Policy -> AcceptContactMethods -> IO () | ||
42 | acceptContact getPolicy AcceptContactMethods{..} = fix $ \loop -> do | ||
43 | join $ atomically $ do | ||
44 | orElse | ||
45 | (getPolicy >>= \case | ||
46 | RefusingToConnect -> return $ return () -- QUIT Dormant/Established | ||
47 | _ -> retry) | ||
48 | (do hs <- getHandshake | ||
49 | handshakeIsSuitable hs >>= \case | ||
50 | True -> do | ||
51 | -- Here we allocate a NetCrypto session for handling CryptoPacket. | ||
52 | transitionToState (InProgress AwaitingSessionPacket) | ||
53 | return loop | ||
54 | False -> return loop) | ||
55 | |||
56 | whileTryingAndNotEstablished :: STM Policy -> STM (Status t) -> ((Int -> IO ()) -> STM (IO ())) -> IO () | ||
57 | whileTryingAndNotEstablished getPolicy getStatus body = fix $ \loop -> do | ||
58 | let retryWhileTrying k = getPolicy >>= \case | ||
59 | TryingToConnect -> retry | ||
60 | _ -> return k | ||
61 | ifEstablished t e = getStatus >>= \case | ||
62 | Established -> t | ||
63 | _ -> e | ||
64 | retryAfterTimeout interval = do | ||
65 | timeout interval $ atomically | ||
66 | $ orElse | ||
67 | (retryWhileTrying ()) | ||
68 | (ifEstablished (return ()) retry) | ||
69 | loop | ||
70 | join $ atomically $ orElse | ||
71 | (retryWhileTrying (return ())) -- QUIT Dormant/Established | ||
72 | (ifEstablished retry | ||
73 | (body retryAfterTimeout)) | ||
74 | |||
75 | data PersueContactMethods params = PersueContactMethods | ||
76 | { getHandshakeParams :: STM params | ||
77 | , sendHandshake :: params -> IO () | ||
78 | , retryInterval :: Int | ||
79 | } | ||
80 | |||
81 | -- | Continuously attempt to send handshake packets until a connection is | ||
82 | -- established. | ||
83 | -- | ||
84 | -- As long as getPolicy is TryingToConnect and there is no established | ||
85 | -- connection, this function will continue. | ||
86 | persueContact :: STM Policy -> STM (Status t) -> PersueContactMethods a -> IO () | ||
87 | persueContact getPolicy getStatus PersueContactMethods{..} | ||
88 | = whileTryingAndNotEstablished getPolicy getStatus | ||
89 | $ \retryAfterTimeout -> do | ||
90 | -- AwaitingDHTKey | ||
91 | -- AcquiringIPAddress | ||
92 | params <- getHandshakeParams | ||
93 | return $ do -- AcquiringCookie | ||
94 | -- AwaitingHandshake | ||
95 | -- AwaitingSessionPacket | ||
96 | sendHandshake params | ||
97 | retryAfterTimeout retryInterval | ||
98 | |||
99 | data FreshenContactMethods = FreshenContactMethods | ||
100 | { dhtkeyInterval :: Int | ||
101 | , sockAddrInterval :: Int | ||
102 | , nodeSch :: NodeSearch | ||
103 | , getDHTKey :: STM (Maybe NodeId) | ||
104 | , getSockAddr :: STM (Maybe SockAddr) | ||
105 | , getBuckets :: STM (BucketList NodeInfo) | ||
106 | } | ||
107 | |||
108 | -- send my dht key | ||
109 | -- search for their sockaddr | ||
110 | -- monitor new dht key | ||
111 | -- monitor new sockaddr | ||
112 | -- | ||
113 | -- Keep going while TryingToConnect | ||
114 | -- pause while Established | ||
115 | |||
116 | -- Useful: | ||
117 | -- toxidSearch onionTimeout | ||
118 | -- newSearch | ||
119 | -- searchLoop | ||
120 | -- searchCancel | ||
121 | -- -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. | ||
122 | |||
123 | -- | Continuously search the DHT to obtain ip addresses and to send your dht | ||
124 | -- key to contacts. | ||
125 | -- | ||
126 | -- As long as getPolicy is TryingToConnect and there is no established | ||
127 | -- connection, this function will continue. | ||
128 | freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods -> IO () | ||
129 | freshenContact getPolicy getStatus FreshenContactMethods{..} | ||
130 | = whileTryingAndNotEstablished getPolicy getStatus | ||
131 | -- retryAfterTimeout :: Int -> IO () | ||
132 | $ \retryAfterTimeout -> | ||
133 | getDHTKey >>= \case | ||
134 | Nothing -> -- AwaitingDHTKey | ||
135 | retry | ||
136 | Just dk -> getSockAddr >>= return . \case | ||
137 | Nothing -> -- AcquiringIPAddress | ||
138 | do bkts <- atomically $ getBuckets | ||
139 | st <- search nodeSch bkts dk $ | ||
140 | \r -> do -- TODO: store saddr, check for finish | ||
141 | return True | ||
142 | atomically $ searchIsFinished st >>= check | ||
143 | -- TODO: searchCancel on stop condition | ||
144 | retryAfterTimeout sockAddrInterval | ||
145 | Just a -> -- AcquiringCookie | ||
146 | -- AwaitingHandshake | ||
147 | -- AwaitingSessionPacket | ||
148 | do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0 | ||
149 | retryAfterTimeout dhtkeyInterval | ||
150 | |||