diff options
author | joe <joe@jerkface.net> | 2018-06-17 17:13:53 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2018-06-17 17:15:20 -0400 |
commit | eda92679a6d7d27ebb5757ba7056fc452384faac (patch) | |
tree | 571f07cb584561750d1fa234f15959ab99872b60 /Announcer | |
parent | 75a18e4cb814044a714aa3f487d2e6475de6127a (diff) |
Factored out Tox-specific scheduling to Announcer.Tox.
Diffstat (limited to 'Announcer')
-rw-r--r-- | Announcer/Tox.hs | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs new file mode 100644 index 00000000..eab974bc --- /dev/null +++ b/Announcer/Tox.hs | |||
@@ -0,0 +1,176 @@ | |||
1 | {-# LANGUAGE DeriveDataTypeable #-} | ||
2 | {-# LANGUAGE DeriveGeneric #-} | ||
3 | {-# LANGUAGE ExistentialQuantification #-} | ||
4 | {-# LANGUAGE FlexibleContexts #-} | ||
5 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | {-# LANGUAGE NamedFieldPuns #-} | ||
8 | {-# LANGUAGE NondecreasingIndentation #-} | ||
9 | module Announcer.Tox where | ||
10 | -- , AnnounceMethod(..) | ||
11 | -- , schedule | ||
12 | |||
13 | import Announcer | ||
14 | import qualified Data.MinMaxPSQ as MM | ||
15 | import Data.Wrapper.PSQ as PSQ | ||
16 | import InterruptibleDelay | ||
17 | import Network.Kademlia.Routing as R | ||
18 | import Network.Kademlia.Search | ||
19 | |||
20 | import Control.Concurrent.Lifted.Instrument | ||
21 | import Control.Concurrent.STM | ||
22 | import Control.Monad | ||
23 | import Data.Hashable | ||
24 | import Data.Maybe | ||
25 | import Data.Ord | ||
26 | import Data.Time.Clock.POSIX | ||
27 | import System.IO | ||
28 | |||
29 | |||
30 | announceK :: Int | ||
31 | announceK = 8 | ||
32 | |||
33 | data AnnounceState = forall nid addr tok ni r. AnnounceState | ||
34 | { aState :: SearchState nid addr tok ni r | ||
35 | , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) | ||
36 | } | ||
37 | |||
38 | -- | This type specifies an item that can be announced on appropriate nodes in | ||
39 | -- a Kademlia network. | ||
40 | data AnnounceMethod r = forall nid ni sr addr tok a. | ||
41 | ( Show nid | ||
42 | , Hashable nid | ||
43 | , Hashable ni | ||
44 | , Ord addr | ||
45 | , Ord nid | ||
46 | , Ord ni | ||
47 | ) => AnnounceMethod | ||
48 | { aSearch :: Search nid addr tok ni sr | ||
49 | -- ^ This is the Kademlia search to run repeatedly to find the | ||
50 | -- nearby nodes. A new search is started whenever one is not | ||
51 | -- already in progress at announce time. Repeated searches are | ||
52 | -- likely to finish faster than the first since nearby nodes | ||
53 | -- are not discarded. | ||
54 | , aPublish :: Either (r -> sr -> IO ()) | ||
55 | (r -> tok -> Maybe ni -> IO (Maybe a)) | ||
56 | -- ^ The action to perform when we find nearby nodes. The | ||
57 | -- destination node is given as a Maybe so that methods that | ||
58 | -- treat 'Nothing' as loop-back address can be passed here, | ||
59 | -- however 'Nothing' will not be passed by the announcer | ||
60 | -- thread. | ||
61 | -- | ||
62 | -- There are two cases: | ||
63 | -- | ||
64 | -- [Left] The action to perform requires a search result. | ||
65 | -- This was implemented to support Tox's DHTKey and | ||
66 | -- Friend-Request messages. | ||
67 | -- | ||
68 | -- [Right] The action requires a "token" from the destination | ||
69 | -- node. This is the more typical "announce" semantics for | ||
70 | -- Kademlia. | ||
71 | , aBuckets :: TVar (R.BucketList ni) | ||
72 | -- ^ Set this to the current Kademlia routing table buckets. | ||
73 | -- TODO: List of TVars to have separate routing tables for IPv6 and IPv4? | ||
74 | , aTarget :: nid | ||
75 | -- ^ This is the Kademlia node-id of the item being announced. | ||
76 | , aInterval :: POSIXTime | ||
77 | -- ^ Assuming we have nearby nodes from the search, the item | ||
78 | -- will be announced at this interval. | ||
79 | -- | ||
80 | -- Current implementation is to make the scheduled | ||
81 | -- announcements even if the search hasn't finished. It will | ||
82 | -- use the closest nodes found so far. | ||
83 | } | ||
84 | |||
85 | |||
86 | -- announcement started: | ||
87 | newAnnouncement :: STM (IO a) | ||
88 | -> IO () | ||
89 | -> IO () | ||
90 | -> POSIXTime | ||
91 | -> Announcer | ||
92 | -> AnnounceKey | ||
93 | -> POSIXTime | ||
94 | -> STM (IO ()) | ||
95 | newAnnouncement checkFin search announce interval = \announcer k now -> do | ||
96 | modifyTVar (scheduled announcer) | ||
97 | (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) | ||
98 | return $ void $ fork search | ||
99 | |||
100 | -- time for periodic announce: | ||
101 | -- (re-)announce to the current known set of storing-nodes. | ||
102 | -- TODO: If the search is finished, restart the search. | ||
103 | reAnnounce :: STM (IO a) | ||
104 | -> IO () | ||
105 | -> POSIXTime | ||
106 | -> Announcer | ||
107 | -> AnnounceKey | ||
108 | -> POSIXTime | ||
109 | -> STM (IO ()) | ||
110 | reAnnounce checkFin announce interval = \announcer k now -> do | ||
111 | isfin <- checkFin | ||
112 | modifyTVar (scheduled announcer) | ||
113 | (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) | ||
114 | return $ do | ||
115 | isfin | ||
116 | hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now | ||
117 | announce | ||
118 | |||
119 | -- | Schedule a recurring Search + Announce sequence. | ||
120 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | ||
121 | schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do | ||
122 | st <- atomically $ newSearch aSearch aTarget [] | ||
123 | ns <- atomically $ newTVar MM.empty | ||
124 | let astate = AnnounceState st ns | ||
125 | publishToNodes is | ||
126 | | Left _ <- aPublish = return () | ||
127 | | Right publish <- aPublish = do | ||
128 | forM_ is $ \(Binding ni mtok _) -> do | ||
129 | forM_ mtok $ \tok -> do | ||
130 | got <- publish r tok (Just ni) | ||
131 | now <- getPOSIXTime | ||
132 | forM_ got $ \_ -> do | ||
133 | atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
134 | announce = do -- publish to current search results | ||
135 | is <- atomically $ do | ||
136 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | ||
137 | return $ MM.toList bs | ||
138 | publishToNodes is | ||
139 | onResult sr | ||
140 | | Right _ <- aPublish = return True | ||
141 | | Left sendit <- aPublish = do | ||
142 | scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do | ||
143 | got <- sendit r sr | ||
144 | -- If we had a way to get the source of a search result, we might want to | ||
145 | -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' | ||
146 | -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent | ||
147 | -- a message be forgotten. | ||
148 | -- | ||
149 | -- forM_ got $ \_ -> do | ||
150 | -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
151 | return () | ||
152 | return True -- True to keep searching. | ||
153 | searchAgain = do | ||
154 | -- Canceling a pending search here seems to make announcements more reliable. | ||
155 | searchCancel st | ||
156 | isfin <- searchIsFinished st -- Always True, since we canceled. | ||
157 | return $ when isfin $ void $ fork search | ||
158 | search = do -- thread to fork | ||
159 | atomically $ reset aBuckets aSearch aTarget st | ||
160 | searchLoop aSearch aTarget onResult st | ||
161 | fork $ do -- Announce to any nodes we haven't already announced to. | ||
162 | is <- atomically $ do | ||
163 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | ||
164 | nq <- readTVar ns | ||
165 | return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) | ||
166 | $ MM.toList bs | ||
167 | publishToNodes is | ||
168 | return () | ||
169 | {- | ||
170 | atomically $ scheduleImmediately announcer k | ||
171 | $ SearchFinished {- st -} search announce aInterval | ||
172 | interruptDelay (interrutible announcer) | ||
173 | -} | ||
174 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) | ||
175 | interruptDelay (interrutible announcer) | ||
176 | |||