summaryrefslogtreecommitdiff
path: root/Connection
diff options
context:
space:
mode:
Diffstat (limited to 'Connection')
-rw-r--r--Connection/Tox.hs269
-rw-r--r--Connection/Tox/Threads.hs239
2 files changed, 0 insertions, 508 deletions
diff --git a/Connection/Tox.hs b/Connection/Tox.hs
deleted file mode 100644
index 9612f1e5..00000000
--- a/Connection/Tox.hs
+++ /dev/null
@@ -1,269 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE GADTs #-}
3{-# LANGUAGE NamedFieldPuns #-}
4module Connection.Tox
5 ( module Connection.Tox
6 , ToxProgress(..)
7 ) where
8
9import qualified Connection as G
10 ;import Connection (Manager (..), Policy (..))
11import Connection.Tox.Threads
12import Control.Concurrent.STM
13import Control.Monad
14import Crypto.Tox
15import qualified Data.HashMap.Strict as HashMap
16import qualified Data.Map as Map
17import Data.Maybe
18import Network.Kademlia.Routing as R
19import Network.Kademlia.Search
20import Network.Tox.ContactInfo
21import Network.Tox.Crypto.Handlers
22import Network.Tox.DHT.Handlers as DHT
23import Network.Tox.DHT.Transport as DHT
24import PingMachine
25import Text.Read
26#ifdef THREAD_DEBUG
27import Control.Concurrent.Lifted.Instrument
28#else
29import Control.Concurrent.Lifted
30import GHC.Conc (labelThread)
31#endif
32import GHC.Conc (ThreadStatus (..), threadStatus)
33import System.IO
34import DPut
35
36
37
38
39data Parameters extra = Parameters
40 { -- | Various Tox transports and clients.
41 dhtRouting :: Routing
42 , roster :: ContactInfo extra
43 , sessions :: NetCryptoSessions
44 , dhtClient :: DHT.Client
45 -- | Thread to be forked when a connection is established.
46 -- TODO: this function should accept relevant parameters.
47 , onToxSession :: IO ()
48 }
49
50
51{-
52-- | A conneciton status that is tagged with a state type that is specific to
53-- the status.
54data Transient a where
55 IsDormant :: Transient ()
56 IsAwaitingDHTKey :: Transient ()
57 IsAcquiringIPAddress :: Transient ()
58 IsAcquiringCookie :: Transient ()
59 IsAwaitingHandshake :: Transient ()
60 IsAwaitingSessionPacket :: Transient ()
61 IsEstablished :: Transient ()
62
63
64untag :: DSum Transient Identity -> G.Status ToxProgress
65untag (IsDormant :=> _) = G.Dormant
66untag (IsAwaitingDHTKey :=> _) = G.InProgress AwaitingDHTKey
67untag (IsAcquiringIPAddress :=> _) = G.InProgress AcquiringIPAddress
68untag (IsAcquiringCookie :=> _) = G.InProgress AcquiringCookie
69untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake
70untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket
71untag (IsEstablished :=> _) = G.Established
72-}
73
74data StatefulTask st = StatefulTask
75 { taskThread :: ThreadId
76 , taskState :: TVar st
77 }
78
79launch :: String -> st -> (TVar st -> IO ()) -> IO (StatefulTask st)
80launch lbl st f = do
81 stvar <- newTVarIO st
82 tid <- forkIO (f stvar)
83 labelThread tid lbl
84 stat <- threadStatus tid
85 dput XMan $ "launch "++lbl++" "++show stat
86 return $ StatefulTask tid stvar
87
88
89data SessionState = SessionState
90 { connPolicy :: TVar Policy
91 , connPingLogic :: PingMachine
92 , handshakeTask :: TVar (StatefulTask (G.Status ToxProgress))
93 }
94
95newSessionState :: IO SessionState
96newSessionState = do
97 pings <- forkPingMachine "SessionState"
98 25000 -- 25 ms send ping
99 30000 -- 30 ms timed out
100 a <- fork $ return ()
101 atomically $ do
102 av <- newTVar G.Dormant
103 let tasks = StatefulTask a av
104 SessionState <$> newTVar G.RefusingToConnect <*> pure pings <*> newTVar tasks
105
106
107sessionStatus :: SessionState -> G.Connection ToxProgress
108sessionStatus st = G.Connection
109 { G.connStatus = readTVar . taskState =<< readTVar (handshakeTask st)
110 , G.connPolicy = readTVar (connPolicy st)
111 , G.connPingLogic = connPingLogic st
112 }
113
114lookupForPolicyChange :: TVar (Map.Map Key SessionState)
115 -> Key -> Policy -> IO (Maybe SessionState)
116lookupForPolicyChange conmap k policy = do
117 cons <- atomically $ readTVar conmap
118 st <- case Map.lookup k cons of
119 Nothing -> newSessionState
120 Just st -> return st
121 atomically $ do
122 p <- readTVar (connPolicy st)
123 writeTVar (connPolicy st) policy
124 return $ do
125 guard $ p /= policy
126 return st
127
128callbackId :: Int
129callbackId = 1
130
131lookupContact :: Key -> ContactInfo extra -> STM (Maybe (SecretKey,Contact))
132lookupContact (Key me them) ContactInfo{accounts} = do
133 acnts <- readTVar accounts
134 fmap join $ forM (HashMap.lookup me acnts) $ \Account{userSecret,contacts} -> do
135 cs <- readTVar contacts
136 forM (HashMap.lookup them cs) $ \c -> do
137 return (userSecret,c)
138
139-- | This function will fork threads as necessary.
140setToxPolicy :: Parameters extra
141 -> TVar (Map.Map Key SessionState)
142 -> Key
143 -> Policy
144 -> IO ()
145setToxPolicy params conmap k@(Key me them) policy = do
146 dput XMan $ "C.setToxPolicy "++show (them,policy)
147 case policy of
148 TryingToConnect -> do
149 mst <- lookupForPolicyChange conmap k policy
150 r <- atomically $ lookupContact k (roster params)
151 dput XMan $ "C.r="++show (fmap (const ()) r)
152 forM_ r $ \(sec,c) -> do
153 let pursue_methods = PursueContactMethods
154 { allsessions = sessions params
155 , myseckey = sec
156 , theirpubkey = id2key them
157 , client = dhtClient params
158 , shortRetryInterval = _todo
159 , longRetryInterval = _todo
160 , contact = c
161 }
162 sch = nodeSearch (dhtClient params) (nodesOfInterest $ dhtRouting params)
163 freshen_methods = FreshenContactMethods
164 { dhtkeyInterval = _todo :: Int
165 , sockAddrInterval = _todo :: Int
166 , nodeSch = sch
167 , getDHTKey = retry :: STM (Maybe NodeId)
168 , getSockAddr = retry -- :: STM (Maybe SockAddr)
169 , nearestNodes = \nid -> do
170 bkts4 <- readTVar $ routing4 $ dhtRouting params
171 bkts6 <- readTVar $ routing6 $ dhtRouting params
172 let interweave [] ys = ys
173 interweave (x:xs) ys = x : interweave ys xs
174 return $ interweave (R.kclosest (searchSpace sch) searchK nid bkts4)
175 (R.kclosest (searchSpace sch) searchK nid bkts6)
176 }
177 get_status = do
178 sbk <- readTVar $ netCryptoSessionsByKey (sessions params)
179 fmap (fromMaybe G.Dormant) $ forM (Map.lookup (id2key them) sbk) $ \ss -> do
180 stats <- mapM (readTVar . ncState) ss
181 return $ maximum stats
182 dput XMan $ "C.mst="++show (fmap (const ()) mst)
183 forM_ mst $ \st -> do
184 let getPolicy = readTVar $ connPolicy st
185 tasks <- atomically $ readTVar (handshakeTask st)
186 persuing <- launch ("pursue:"++show k)
187 (G.InProgress $ toEnum 0)
188 $ pursueContact getPolicy get_status pursue_methods
189 atomically $ do
190 writeTVar (handshakeTask st) $ persuing
191 let routing = dhtRouting params
192 Key _ nid = k
193 registerNodeCallback routing $ NodeInfoCallback
194 { interestingNodeId = nid
195 , listenerId = callbackId
196 , observedAddress = \now ni -> writeTVar (contactLastSeenAddr c) (Just (now, ni))
197 , rumoredAddress = \now saddr ni -> do
198 m <- readTVar (contactLastSeenAddr c)
199 -- TODO remember information source and handle multiple rumors.
200 case m of Just _ -> return ()
201 Nothing -> writeTVar (contactLastSeenAddr c) (Just (now, ni))
202 }
203 return ()
204 RefusingToConnect -> do -- disconnect or cancel any pending connection
205 mst <- lookupForPolicyChange conmap k policy
206 -- Since the connection threads poll the current policy, they should
207 -- all terminate on their own.
208 --
209 -- Here we block until they finish.
210 forM_ mst $ \st -> do
211 atomically $ do
212 let routing = dhtRouting params
213 Key _ nid = k
214 unregisterNodeCallback callbackId routing nid
215 atomically $ do
216 tasks <- readTVar (handshakeTask st)
217 p <- readTVar $ taskState tasks
218 case p of
219 G.Dormant -> return ()
220 _ -> retry
221 OpenToConnect -> do -- passively accept connections if they initiate.
222 mst <- lookupForPolicyChange conmap k policy
223 r <- atomically $ lookupContact k (roster params)
224 forM_ r $ \(sec,c) -> do
225 forM_ mst $ \st -> do
226 {-
227 let getPolicy = readTVar $ connPolicy st
228 accept_thread <- launch ("accept:"++show k)
229 (G.InProgress $ toEnum 0)
230 $ acceptContact getPolicy _accept_methods
231 -}
232 atomically $ do
233 let routing = dhtRouting params
234 Key _ nid = k
235 registerNodeCallback routing $ NodeInfoCallback
236 { interestingNodeId = nid
237 , listenerId = callbackId
238 , observedAddress = \now ni -> writeTVar (contactLastSeenAddr c) (Just (now, ni))
239 , rumoredAddress = \now saddr ni -> do
240 m <- readTVar (contactLastSeenAddr c)
241 -- TODO remember information source and handle multiple rumors.
242 case m of Just _ -> return ()
243 Nothing -> writeTVar (contactLastSeenAddr c) (Just (now, ni))
244 }
245
246stringToKey_ :: String -> Maybe Key
247stringToKey_ s = let (xs,ys) = break (==':') s
248 in if null ys then Nothing
249 else do me <- readMaybe xs
250 them <- readMaybe (drop 1 ys)
251 return $ Key me them
252setNoToxPolicy :: Parameters extra
253 -> TVar (Map.Map Key SessionState)
254 -> Key
255 -> Policy
256 -> IO ()
257setNoToxPolicy _ _ _ _ = return ()
258
259toxManager :: Parameters extra -> IO (Manager ToxProgress Key)
260toxManager params = do
261 conmap <- newTVarIO Map.empty
262 return Manager
263 { setPolicy = setNoToxPolicy params conmap
264 , connections = fmap sessionStatus <$> readTVar conmap -- STM (Map k (Connection status))
265 , stringToKey = stringToKey_ -- String -> Maybe k
266 , showProgress = show -- status -> String
267 , showKey = showKey_ -- k -> String
268 }
269
diff --git a/Connection/Tox/Threads.hs b/Connection/Tox/Threads.hs
deleted file mode 100644
index de719655..00000000
--- a/Connection/Tox/Threads.hs
+++ /dev/null
@@ -1,239 +0,0 @@
1-- |
2--
3-- This module defines three tasks intended to be run in separate threads:
4--
5-- * 'acceptContact'
6--
7-- * 'pursueContact'
8--
9-- * 'freshenContact'
10--
11{-# LANGUAGE CPP #-}
12{-# LANGUAGE LambdaCase #-}
13module Connection.Tox.Threads
14 ( PursueContactMethods(..)
15 , FreshenContactMethods(..)
16 , pursueContact
17 ) where
18
19import Connection
20-- import Connection.Tox
21import Crypto.Tox
22import Data.IP (IP)
23import Network.Tox.Crypto.Transport
24import Network.Tox.Crypto.Handlers
25import Network.Tox.NodeId
26import Network.Tox.ContactInfo
27import Network.Tox.Handshake
28import Network.Tox.DHT.Handlers {- (nodeSearch) -} as DHT
29import Network.Tox.DHT.Transport as DHT (dhtpk)
30import Network.Socket
31import Network.Kademlia.Search
32import Network.Kademlia.Routing (BucketList)
33#ifdef THREAD_DEBUG
34import Control.Concurrent.Lifted.Instrument
35#else
36import Control.Concurrent.Lifted
37import GHC.Conc (labelThread)
38#endif
39
40import Control.Arrow
41import Control.Concurrent.STM
42import Control.Monad
43import Data.Function
44import Data.Functor.Identity
45import Data.Time.Clock.POSIX
46import System.IO
47import System.Timeout
48import DPut
49
50
51
52type NodeSearch = Search NodeId (IP,PortNumber) () NodeInfo NodeInfo
53
54data AcceptContactMethods = AcceptContactMethods
55 { getHandshake :: STM (Handshake Identity)
56 , handshakeIsSuitable :: Handshake Identity -> STM Bool
57 , transitionToState :: Status ToxProgress -> STM ()
58 }
59
60-- | Invokes an STM action on each incoming handshake.
61--
62-- Does not return until getPolicy yields RefusingToConnect.
63acceptContact :: STM Policy -> AcceptContactMethods -> (Status ToxProgress -> STM ()) -> IO ()
64acceptContact getPolicy AcceptContactMethods{..} writeState = fix $ \loop -> do
65 join $ atomically $ do
66 orElse
67 (getPolicy >>= \case
68 RefusingToConnect -> do writeState Dormant
69 return $ return () -- QUIT Dormant/Established
70 _ -> retry)
71 (do hs <- getHandshake
72 handshakeIsSuitable hs >>= \case
73 True -> do
74 -- Here we allocate a NetCrypto session for handling CryptoPacket.
75 writeState (InProgress AwaitingSessionPacket)
76 transitionToState (InProgress AwaitingSessionPacket)
77 return loop
78 False -> return loop)
79
80whileTryingAndNotEstablished :: STM Policy
81 -> STM (Status t)
82 -> TVar (Status ToxProgress)
83 -> ((Int -> IO ()) -> STM (IO ()))
84 -> IO ()
85whileTryingAndNotEstablished getPolicy getStatus statusVar body = fix $ \loop -> do
86 let retryWhileTrying k = getPolicy >>= \case
87 TryingToConnect -> retry
88 _ -> do writeTVar statusVar Dormant
89 return k
90 ifEstablished t e = getStatus >>= \case
91 Established -> t
92 _ -> e
93 retryAfterTimeout interval = do
94 timeout interval $ atomically
95 $ orElse
96 (retryWhileTrying ())
97 (ifEstablished (return ()) retry)
98 loop
99 join $ atomically $ orElse
100 (retryWhileTrying (return ())) -- QUIT Dormant/Established
101 (ifEstablished retry
102 (body retryAfterTimeout))
103
104data PursueContactMethods = PursueContactMethods
105 { allsessions :: NetCryptoSessions
106 , myseckey :: SecretKey
107 , theirpubkey :: PublicKey
108 , client :: DHT.Client
109 , shortRetryInterval :: Int -- successful cookie, try again soon.
110 , longRetryInterval :: Int -- no cookie, he's offline, give it some time.
111 , contact :: Contact
112 }
113
114retryUntilJust :: TVar (Maybe a) -> STM a
115retryUntilJust tvar = maybe retry return =<< readTVar tvar
116
117-- | Continuously attempt to send handshake packets until a connection is
118-- established.
119--
120-- As long as getPolicy is TryingToConnect and there is no established
121-- connection, this function will continue.
122pursueContact :: STM Policy
123 -> STM (Status t)
124 -> PursueContactMethods
125 -> TVar (Status ToxProgress)
126 -> IO ()
127pursueContact getPolicy getStatus PursueContactMethods{..} statusVar = do
128 -- AwaitingDHTKey
129 atomically $ writeTVar statusVar (InProgress AwaitingDHTKey)
130 whileTryingAndNotEstablished getPolicy getStatus statusVar
131 $ \retryAfterTimeout ->
132 orElse (do
133 readTVar statusVar >>= check . (/= InProgress AcquiringIPAddress)
134 (stamp_theirDhtKey,theirDhtKey) <- second DHT.dhtpk <$> retryUntilJust (contactKeyPacket contact)
135 -- We don't have an IP address yet.
136 maybe (return ()) (const retry) =<< readTVar (contactLastSeenAddr contact)
137 return $ do -- AcquiringIPAddress
138 atomically $ writeTVar statusVar (InProgress AcquiringIPAddress)
139 retryAfterTimeout 0)
140 (do
141 (stamp_theirDhtKey,theirDhtKey) <- second DHT.dhtpk <$> retryUntilJust (contactKeyPacket contact)
142 (stamp_saddr,saddr) <- retryUntilJust (contactLastSeenAddr contact)
143 ni <- either (const retry) return $ nodeInfo (key2id theirDhtKey) (_fixme saddr)
144 return $ do
145 -- AcquiringCookie
146 atomically $ writeTVar statusVar (InProgress AcquiringCookie)
147 let mykeyAsId = key2id (toPublic myseckey)
148 theirkeyAsId = key2id theirpubkey
149 crypto = transportCrypto allsessions
150 mbCookie <- -- TODO: Check for recent cached cookie.
151 DHT.cookieRequest crypto client (toPublic myseckey) ni
152 interval <- case mbCookie of
153 Nothing -> do
154 dput XMan ("pursueContact: (" ++ show mykeyAsId ++") <--> (" ++ show theirkeyAsId ++ ").")
155 dput XMan ("pursueContact: CookieRequest failed. TODO: dhtpkNodes thingy")
156 return longRetryInterval
157 Just cookie -> do
158 dput XMan "Have cookie, creating handshake packet..."
159 let hp = HParam { hpOtherCookie = cookie
160 , hpMySecretKey = myseckey
161 , hpCookieRemotePubkey = theirpubkey
162 , hpCookieRemoteDhtkey = theirDhtKey
163 , hpTheirBaseNonce = Nothing
164 , hpTheirSessionKeyPublic = Nothing
165 }
166 newsession <- generateSecretKey
167 timestamp <- getPOSIXTime
168 (myhandshake,ioAction)
169 <- atomically $ freshCryptoSession allsessions (_fixme saddr) newsession timestamp hp
170 ioAction
171 -- send handshake
172 forM myhandshake $ \response_handshake -> do
173 sendHandshake allsessions (_fixme saddr) response_handshake
174 atomically $ writeTVar statusVar $ InProgress AwaitingHandshake
175 return shortRetryInterval
176 -- AwaitingHandshake
177 -- AwaitingSessionPacket
178 retryAfterTimeout interval)
179
180data FreshenContactMethods = FreshenContactMethods
181 { dhtkeyInterval :: Int
182 , sockAddrInterval :: Int
183 , nodeSch :: NodeSearch
184 , getDHTKey :: STM (Maybe NodeId)
185 , getSockAddr :: STM (Maybe SockAddr)
186 , nearestNodes :: NodeId -> STM [NodeInfo]
187 }
188
189-- send my dht key
190-- search for their sockaddr
191-- monitor new dht key
192-- monitor new sockaddr
193--
194-- Keep going while TryingToConnect
195-- pause while Established
196
197-- Useful:
198-- toxidSearch onionTimeout
199-- newSearch
200-- searchLoop
201-- searchCancel
202-- -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching.
203
204-- | Continuously search the DHT to obtain ip addresses and to send your dht
205-- key to contacts.
206--
207-- As long as getPolicy is TryingToConnect and there is no established
208-- connection, this function will continue.
209freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods
210 -> TVar (Status ToxProgress)
211 -> IO ()
212freshenContact getPolicy getStatus FreshenContactMethods{..} statusVar
213 = whileTryingAndNotEstablished getPolicy getStatus statusVar
214 -- retryAfterTimeout :: Int -> IO ()
215 $ \retryAfterTimeout ->
216 getDHTKey >>= \case
217 Nothing -> -- AwaitingDHTKey
218 retry
219 Just dk -> getSockAddr >>= \case
220 Nothing -> do -- AcquiringIPAddress
221 writeTVar statusVar (InProgress AcquiringIPAddress)
222 return $
223 do st <- atomically $ do
224 ns <- nearestNodes dk
225 newSearch nodeSch dk ns
226 -- forked simply to avoid relabeling this thread.
227 forkIO $ searchLoop nodeSch dk (const $ return True) st
228 -- TODO: searchCancel on stop condition
229 atomically $ searchIsFinished st >>= check
230 retryAfterTimeout sockAddrInterval
231 Just a -> do
232 writeTVar statusVar (InProgress AcquiringCookie)
233 return $
234 -- AcquiringCookie
235 -- AwaitingHandshake
236 -- AwaitingSessionPacket
237 do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0
238 retryAfterTimeout dhtkeyInterval
239