summaryrefslogtreecommitdiff
path: root/Announcer/Tox.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Announcer/Tox.hs')
-rw-r--r--Announcer/Tox.hs176
1 files changed, 176 insertions, 0 deletions
diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs
new file mode 100644
index 00000000..eab974bc
--- /dev/null
+++ b/Announcer/Tox.hs
@@ -0,0 +1,176 @@
1{-# LANGUAGE DeriveDataTypeable #-}
2{-# LANGUAGE DeriveGeneric #-}
3{-# LANGUAGE ExistentialQuantification #-}
4{-# LANGUAGE FlexibleContexts #-}
5{-# LANGUAGE GeneralizedNewtypeDeriving #-}
6{-# LANGUAGE LambdaCase #-}
7{-# LANGUAGE NamedFieldPuns #-}
8{-# LANGUAGE NondecreasingIndentation #-}
9module Announcer.Tox where
10 -- , AnnounceMethod(..)
11 -- , schedule
12
13import Announcer
14import qualified Data.MinMaxPSQ as MM
15import Data.Wrapper.PSQ as PSQ
16import InterruptibleDelay
17import Network.Kademlia.Routing as R
18import Network.Kademlia.Search
19
20import Control.Concurrent.Lifted.Instrument
21import Control.Concurrent.STM
22import Control.Monad
23import Data.Hashable
24import Data.Maybe
25import Data.Ord
26import Data.Time.Clock.POSIX
27import System.IO
28
29
30announceK :: Int
31announceK = 8
32
33data AnnounceState = forall nid addr tok ni r. AnnounceState
34 { aState :: SearchState nid addr tok ni r
35 , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime))
36 }
37
38-- | This type specifies an item that can be announced on appropriate nodes in
39-- a Kademlia network.
40data AnnounceMethod r = forall nid ni sr addr tok a.
41 ( Show nid
42 , Hashable nid
43 , Hashable ni
44 , Ord addr
45 , Ord nid
46 , Ord ni
47 ) => AnnounceMethod
48 { aSearch :: Search nid addr tok ni sr
49 -- ^ This is the Kademlia search to run repeatedly to find the
50 -- nearby nodes. A new search is started whenever one is not
51 -- already in progress at announce time. Repeated searches are
52 -- likely to finish faster than the first since nearby nodes
53 -- are not discarded.
54 , aPublish :: Either (r -> sr -> IO ())
55 (r -> tok -> Maybe ni -> IO (Maybe a))
56 -- ^ The action to perform when we find nearby nodes. The
57 -- destination node is given as a Maybe so that methods that
58 -- treat 'Nothing' as loop-back address can be passed here,
59 -- however 'Nothing' will not be passed by the announcer
60 -- thread.
61 --
62 -- There are two cases:
63 --
64 -- [Left] The action to perform requires a search result.
65 -- This was implemented to support Tox's DHTKey and
66 -- Friend-Request messages.
67 --
68 -- [Right] The action requires a "token" from the destination
69 -- node. This is the more typical "announce" semantics for
70 -- Kademlia.
71 , aBuckets :: TVar (R.BucketList ni)
72 -- ^ Set this to the current Kademlia routing table buckets.
73 -- TODO: List of TVars to have separate routing tables for IPv6 and IPv4?
74 , aTarget :: nid
75 -- ^ This is the Kademlia node-id of the item being announced.
76 , aInterval :: POSIXTime
77 -- ^ Assuming we have nearby nodes from the search, the item
78 -- will be announced at this interval.
79 --
80 -- Current implementation is to make the scheduled
81 -- announcements even if the search hasn't finished. It will
82 -- use the closest nodes found so far.
83 }
84
85
86-- announcement started:
87newAnnouncement :: STM (IO a)
88 -> IO ()
89 -> IO ()
90 -> POSIXTime
91 -> Announcer
92 -> AnnounceKey
93 -> POSIXTime
94 -> STM (IO ())
95newAnnouncement checkFin search announce interval = \announcer k now -> do
96 modifyTVar (scheduled announcer)
97 (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval))
98 return $ void $ fork search
99
100-- time for periodic announce:
101-- (re-)announce to the current known set of storing-nodes.
102-- TODO: If the search is finished, restart the search.
103reAnnounce :: STM (IO a)
104 -> IO ()
105 -> POSIXTime
106 -> Announcer
107 -> AnnounceKey
108 -> POSIXTime
109 -> STM (IO ())
110reAnnounce checkFin announce interval = \announcer k now -> do
111 isfin <- checkFin
112 modifyTVar (scheduled announcer)
113 (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval))
114 return $ do
115 isfin
116 hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now
117 announce
118
119-- | Schedule a recurring Search + Announce sequence.
120schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
121schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do
122 st <- atomically $ newSearch aSearch aTarget []
123 ns <- atomically $ newTVar MM.empty
124 let astate = AnnounceState st ns
125 publishToNodes is
126 | Left _ <- aPublish = return ()
127 | Right publish <- aPublish = do
128 forM_ is $ \(Binding ni mtok _) -> do
129 forM_ mtok $ \tok -> do
130 got <- publish r tok (Just ni)
131 now <- getPOSIXTime
132 forM_ got $ \_ -> do
133 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
134 announce = do -- publish to current search results
135 is <- atomically $ do
136 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
137 return $ MM.toList bs
138 publishToNodes is
139 onResult sr
140 | Right _ <- aPublish = return True
141 | Left sendit <- aPublish = do
142 scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do
143 got <- sendit r sr
144 -- If we had a way to get the source of a search result, we might want to
145 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes'
146 -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent
147 -- a message be forgotten.
148 --
149 -- forM_ got $ \_ -> do
150 -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
151 return ()
152 return True -- True to keep searching.
153 searchAgain = do
154 -- Canceling a pending search here seems to make announcements more reliable.
155 searchCancel st
156 isfin <- searchIsFinished st -- Always True, since we canceled.
157 return $ when isfin $ void $ fork search
158 search = do -- thread to fork
159 atomically $ reset aBuckets aSearch aTarget st
160 searchLoop aSearch aTarget onResult st
161 fork $ do -- Announce to any nodes we haven't already announced to.
162 is <- atomically $ do
163 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
164 nq <- readTVar ns
165 return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq)
166 $ MM.toList bs
167 publishToNodes is
168 return ()
169 {-
170 atomically $ scheduleImmediately announcer k
171 $ SearchFinished {- st -} search announce aInterval
172 interruptDelay (interrutible announcer)
173 -}
174 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval)
175 interruptDelay (interrutible announcer)
176