summaryrefslogtreecommitdiff
path: root/dht/Announcer
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /dht/Announcer
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'dht/Announcer')
-rw-r--r--dht/Announcer/Tox.hs206
1 files changed, 206 insertions, 0 deletions
diff --git a/dht/Announcer/Tox.hs b/dht/Announcer/Tox.hs
new file mode 100644
index 00000000..f8343f8d
--- /dev/null
+++ b/dht/Announcer/Tox.hs
@@ -0,0 +1,206 @@
1{-# LANGUAGE DeriveDataTypeable #-}
2{-# LANGUAGE DeriveGeneric #-}
3{-# LANGUAGE ExistentialQuantification #-}
4{-# LANGUAGE FlexibleContexts #-}
5{-# LANGUAGE GeneralizedNewtypeDeriving #-}
6{-# LANGUAGE LambdaCase #-}
7{-# LANGUAGE NamedFieldPuns #-}
8{-# LANGUAGE NondecreasingIndentation #-}
9module Announcer.Tox where
10 -- , AnnounceMethod(..)
11 -- , schedule
12
13import Announcer
14import qualified Data.MinMaxPSQ as MM
15import Data.Wrapper.PSQ as PSQ
16import Network.Kademlia.Search
17
18import Control.Concurrent.Lifted.Instrument
19import Control.Concurrent.STM
20import Control.Monad
21import Data.Hashable
22import Data.Maybe
23import Data.Ord
24import Data.Time.Clock.POSIX
25
26
27announceK :: Int
28announceK = 8
29
30data AnnounceState = forall nid addr tok ni r. AnnounceState
31 { aState :: SearchState nid addr tok ni r
32 , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime))
33 }
34
35-- | This type specifies an item that can be announced on appropriate nodes in
36-- a Kademlia network.
37data AnnounceMethod r = forall nid ni sr addr tok a.
38 ( Show nid
39 , Hashable nid
40 , Hashable ni
41 , Ord addr
42 , Ord nid
43 , Ord ni
44 ) => AnnounceMethod
45 { aSearch :: Search nid addr tok ni sr
46 -- ^ This is the Kademlia search to run repeatedly to find the
47 -- nearby nodes. A new search is started whenever one is not
48 -- already in progress at announce time. Repeated searches are
49 -- likely to finish faster than the first since nearby nodes
50 -- are not discarded.
51 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a)
52 -- ^ The action to perform when we find nearby nodes. The
53 -- destination node is given as a Maybe so that methods that
54 -- treat 'Nothing' as loop-back address can be passed here,
55 -- however 'Nothing' will not be passed by the announcer
56 -- thread.
57 --
58 -- The action requires a "token" from the destination
59 -- node. This is the more typical "announce" semantics for
60 -- Kademlia.
61 , aNearestNodes :: nid -> STM [ni]
62 -- ^ Method to obtain starting nodes from an iterative Kademlia search.
63 , aTarget :: nid
64 -- ^ This is the Kademlia node-id of the item being announced.
65 , aInterval :: POSIXTime
66 -- ^ Assuming we have nearby nodes from the search, the item
67 -- will be announced at this interval.
68 --
69 -- Current implementation is to make the scheduled
70 -- announcements even if the search hasn't finished. It will
71 -- use the closest nodes found so far.
72 }
73
74-- | This type specifies a Kademlia search and an action to perform upon the result.
75data SearchMethod r = forall nid ni sr addr tok a.
76 ( Show nid
77 , Hashable nid
78 , Hashable ni
79 , Ord addr
80 , Ord nid
81 , Ord ni
82 ) => SearchMethod
83 { sSearch :: Search nid addr tok ni sr
84 -- ^ This is the Kademlia search to run repeatedly to find the
85 -- nearby nodes. A new search is started whenever one is not
86 -- already in progress at announce time. Repeated searches are
87 -- likely to finish faster than the first since nearby nodes
88 -- are not discarded.
89 --
90 -- XXX: Currently, "repeatedly" is wrong.
91 , sWithResult :: r -> sr -> IO ()
92 -- ^
93 -- The action to perform upon a search result. This was
94 -- implemented to support Tox's DHTKey and Friend-Request
95 -- messages.
96 , sNearestNodes :: nid -> STM [ni]
97 -- ^ Method to obtain starting nodes from an iterative Kademlia search.
98 , sTarget :: nid
99 -- ^ This is the Kademlia node-id of the item being announced.
100 , sInterval :: POSIXTime
101 -- ^ The time between searches.
102 --
103 -- XXX: Currently, search results will stop any repetition.
104 }
105
106
107-- announcement started:
108newAnnouncement :: STM (IO a)
109 -> IO ()
110 -> IO ()
111 -> POSIXTime
112 -> Announcer
113 -> AnnounceKey
114 -> POSIXTime
115 -> STM (IO ())
116newAnnouncement checkFin search announce interval = \announcer k now -> do
117 scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)
118 return $ void $ fork search
119
120-- time for periodic announce:
121-- (re-)announce to the current known set of storing-nodes.
122-- TODO: If the search is finished, restart the search.
123reAnnounce :: STM (IO a)
124 -> IO ()
125 -> POSIXTime
126 -> Announcer
127 -> AnnounceKey
128 -> POSIXTime
129 -> STM (IO ())
130reAnnounce checkFin announce interval = \announcer k now -> do
131 isfin <- checkFin
132 scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)
133 return $ do
134 isfin
135 announce
136
137-- | Schedule a recurring Search + Announce sequence.
138scheduleAnnounce :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
139scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do
140 st <- atomically $ newSearch aSearch aTarget []
141 ns <- atomically $ newTVar MM.empty
142 let astate = AnnounceState st ns
143 publishToNodes is = do
144 forM_ is $ \(Binding ni mtok _) -> do
145 forM_ mtok $ \tok -> do
146 got <- aPublish r tok (Just ni)
147 now <- getPOSIXTime
148 forM_ got $ \_ -> do
149 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
150 announce = do -- publish to current search results
151 is <- atomically $ do
152 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
153 return $ MM.toList bs
154 publishToNodes is
155 onResult sr = return True
156 searchAgain = do
157 -- Canceling a pending search here seems to make announcements more reliable.
158 searchCancel st
159 isfin <- searchIsFinished st -- Always True, since we canceled.
160 return $ when isfin $ void $ fork search
161 search = do -- thread to fork
162 atomically $ reset aNearestNodes aSearch aTarget st
163 searchLoop aSearch aTarget onResult st
164 fork $ do -- Announce to any nodes we haven't already announced to.
165 is <- atomically $ do
166 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
167 nq <- readTVar ns
168 return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq)
169 $ MM.toList bs
170 publishToNodes is
171 return ()
172 {-
173 atomically $ scheduleImmediately announcer k
174 $ SearchFinished {- st -} search announce aInterval
175 interruptDelay (interrutible announcer)
176 -}
177 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval)
178
179-- | Schedule a recurring Search + Publish sequence.
180scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO ()
181scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarget,sInterval} r = do
182 st <- atomically $ newSearch sSearch sTarget []
183 ns <- atomically $ newTVar MM.empty
184 let astate = AnnounceState st ns
185 onResult sr = do
186 runAction announcer $ do
187 got <- sWithResult r sr
188 -- If we had a way to get the source of a search result, we might want to
189 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes'
190 -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent
191 -- a message be forgotten.
192 --
193 -- forM_ got $ \_ -> do
194 -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
195 return ()
196 return True -- True to keep searching.
197 searchAgain = do
198 -- Canceling a pending search here seems to make announcements more reliable.
199 searchCancel st
200 isfin <- searchIsFinished st -- Always True, since we canceled.
201 return $ when isfin $ void $ fork search
202 search = do -- thread to fork
203 atomically $ reset sNearestNodes sSearch sTarget st
204 searchLoop sSearch sTarget onResult st
205 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval)
206