diff options
Diffstat (limited to 'Announcer.hs')
-rw-r--r-- | Announcer.hs | 50 |
1 files changed, 34 insertions, 16 deletions
diff --git a/Announcer.hs b/Announcer.hs index 1f539d5d..f19f8d46 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -15,9 +15,11 @@ module Announcer | |||
15 | , cancel | 15 | , cancel |
16 | ) where | 16 | ) where |
17 | 17 | ||
18 | import Data.Wrapper.PSQ as PSQ | 18 | import qualified Data.MinMaxPSQ as MM |
19 | import Network.Kademlia.Search | 19 | import Data.Wrapper.PSQ as PSQ |
20 | import InterruptibleDelay | 20 | import InterruptibleDelay |
21 | import Network.Kademlia.Routing as R | ||
22 | import Network.Kademlia.Search | ||
21 | 23 | ||
22 | import Control.Concurrent.Lifted.Instrument | 24 | import Control.Concurrent.Lifted.Instrument |
23 | import Control.Concurrent.STM | 25 | import Control.Concurrent.STM |
@@ -42,9 +44,9 @@ data ScheduledItem | |||
42 | = forall r. ScheduledItem (AnnounceMethod r) | 44 | = forall r. ScheduledItem (AnnounceMethod r) |
43 | | StopAnnouncer | 45 | | StopAnnouncer |
44 | | NewAnnouncement (IO ()) (IO ()) POSIXTime | 46 | | NewAnnouncement (IO ()) (IO ()) POSIXTime |
47 | | SearchFinished (IO ()) (IO ()) POSIXTime | ||
45 | | Announce (IO ()) POSIXTime | 48 | | Announce (IO ()) POSIXTime |
46 | | DeleteAnnouncement | 49 | | DeleteAnnouncement |
47 | | SearchFinished | ||
48 | 50 | ||
49 | data Announcer = Announcer | 51 | data Announcer = Announcer |
50 | { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) | 52 | { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) |
@@ -62,22 +64,39 @@ stopAnnouncer announcer = do | |||
62 | interruptDelay (interrutible announcer) | 64 | interruptDelay (interrutible announcer) |
63 | atomically $ readTVar (announcerActive announcer) >>= check . not | 65 | atomically $ readTVar (announcerActive announcer) >>= check . not |
64 | 66 | ||
65 | data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod | 67 | data AnnounceMethod r = forall nid ni addr tok a. |
68 | ( Show nid | ||
69 | , Hashable nid | ||
70 | , Hashable ni | ||
71 | , Ord addr | ||
72 | , Ord nid | ||
73 | , Ord ni | ||
74 | ) => AnnounceMethod | ||
66 | { aSearch :: Search nid addr tok ni r | 75 | { aSearch :: Search nid addr tok ni r |
67 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) | 76 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) |
77 | , aBuckets :: TVar (R.BucketList ni) | ||
78 | , aTarget :: nid | ||
79 | , aInterval :: POSIXTime | ||
68 | } | 80 | } |
69 | 81 | ||
70 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 82 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
71 | schedule announcer k AnnounceMethod{aSearch,aPublish} r = do | 83 | schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do |
72 | let announce = _todo :: IO () -- publish to current search results | 84 | st <- atomically $ newSearch aSearch aTarget [] |
73 | onResult _ = return True | 85 | let announce = do -- publish to current search results |
74 | search = _todo :: IO () -- thread to fork | 86 | is <- atomically $ do |
75 | -- ns <- R.kclosest (searchSpace qsearch) searchK nid <$> readTVar dhtBuckets | 87 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) |
76 | -- st <- newSearch qsearch nid ns | 88 | return $ MM.toList bs |
77 | -- searchLoop :: Search nid addr tok ni r -> nid -> (r -> STM Bool) -> SearchState nid addr tok ni r -> IO () | 89 | forM_ is $ \(Binding ni tok _) -> do |
78 | -- searchLoop aSearch nid onResult st | 90 | aPublish r tok (Just ni) |
79 | interval = _todo :: POSIXTime -- publish interval | 91 | return () |
80 | atomically $ scheduleImmediately announcer k $ NewAnnouncement search announce interval | 92 | onResult _ = return True -- action for each search-hit (True = keep searching) |
93 | search = do -- thread to fork | ||
94 | atomically $ reset aBuckets aSearch aTarget st | ||
95 | searchLoop aSearch aTarget onResult st | ||
96 | atomically $ scheduleImmediately announcer k | ||
97 | $ SearchFinished {- st -} search announce aInterval | ||
98 | interruptDelay (interrutible announcer) | ||
99 | atomically $ scheduleImmediately announcer k $ NewAnnouncement search announce aInterval | ||
81 | interruptDelay (interrutible announcer) | 100 | interruptDelay (interrutible announcer) |
82 | 101 | ||
83 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 102 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
@@ -129,7 +148,6 @@ performScheduledItem announcer now = \case | |||
129 | modifyTVar (scheduled announcer) | 148 | modifyTVar (scheduled announcer) |
130 | (PSQ.insert' k (Announce announce interval) (now + interval)) | 149 | (PSQ.insert' k (Announce announce interval) (now + interval)) |
131 | return $ Just $ do | 150 | return $ Just $ do |
132 | interruptDelay (interrutible announcer) | ||
133 | fork search | 151 | fork search |
134 | return () | 152 | return () |
135 | 153 | ||
@@ -147,7 +165,7 @@ performScheduledItem announcer now = \case | |||
147 | -- search finished: | 165 | -- search finished: |
148 | -- if any of the current storing-nodes set have not been | 166 | -- if any of the current storing-nodes set have not been |
149 | -- announced to, announce to them. | 167 | -- announced to, announce to them. |
150 | (Binding _ SearchFinished _) -> return $ Just $ return () | 168 | (Binding _ (SearchFinished {- st -} search announce interval) _) -> return $ Just $ return () |
151 | 169 | ||
152 | 170 | ||
153 | 171 | ||