summaryrefslogtreecommitdiff
path: root/Announcer.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Announcer.hs')
-rw-r--r--Announcer.hs50
1 files changed, 34 insertions, 16 deletions
diff --git a/Announcer.hs b/Announcer.hs
index 1f539d5d..f19f8d46 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -15,9 +15,11 @@ module Announcer
15 , cancel 15 , cancel
16 ) where 16 ) where
17 17
18import Data.Wrapper.PSQ as PSQ 18import qualified Data.MinMaxPSQ as MM
19import Network.Kademlia.Search 19import Data.Wrapper.PSQ as PSQ
20import InterruptibleDelay 20import InterruptibleDelay
21import Network.Kademlia.Routing as R
22import Network.Kademlia.Search
21 23
22import Control.Concurrent.Lifted.Instrument 24import Control.Concurrent.Lifted.Instrument
23import Control.Concurrent.STM 25import Control.Concurrent.STM
@@ -42,9 +44,9 @@ data ScheduledItem
42 = forall r. ScheduledItem (AnnounceMethod r) 44 = forall r. ScheduledItem (AnnounceMethod r)
43 | StopAnnouncer 45 | StopAnnouncer
44 | NewAnnouncement (IO ()) (IO ()) POSIXTime 46 | NewAnnouncement (IO ()) (IO ()) POSIXTime
47 | SearchFinished (IO ()) (IO ()) POSIXTime
45 | Announce (IO ()) POSIXTime 48 | Announce (IO ()) POSIXTime
46 | DeleteAnnouncement 49 | DeleteAnnouncement
47 | SearchFinished
48 50
49data Announcer = Announcer 51data Announcer = Announcer
50 { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) 52 { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem)
@@ -62,22 +64,39 @@ stopAnnouncer announcer = do
62 interruptDelay (interrutible announcer) 64 interruptDelay (interrutible announcer)
63 atomically $ readTVar (announcerActive announcer) >>= check . not 65 atomically $ readTVar (announcerActive announcer) >>= check . not
64 66
65data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod 67data AnnounceMethod r = forall nid ni addr tok a.
68 ( Show nid
69 , Hashable nid
70 , Hashable ni
71 , Ord addr
72 , Ord nid
73 , Ord ni
74 ) => AnnounceMethod
66 { aSearch :: Search nid addr tok ni r 75 { aSearch :: Search nid addr tok ni r
67 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) 76 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a)
77 , aBuckets :: TVar (R.BucketList ni)
78 , aTarget :: nid
79 , aInterval :: POSIXTime
68 } 80 }
69 81
70schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 82schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
71schedule announcer k AnnounceMethod{aSearch,aPublish} r = do 83schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do
72 let announce = _todo :: IO () -- publish to current search results 84 st <- atomically $ newSearch aSearch aTarget []
73 onResult _ = return True 85 let announce = do -- publish to current search results
74 search = _todo :: IO () -- thread to fork 86 is <- atomically $ do
75 -- ns <- R.kclosest (searchSpace qsearch) searchK nid <$> readTVar dhtBuckets 87 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
76 -- st <- newSearch qsearch nid ns 88 return $ MM.toList bs
77 -- searchLoop :: Search nid addr tok ni r -> nid -> (r -> STM Bool) -> SearchState nid addr tok ni r -> IO () 89 forM_ is $ \(Binding ni tok _) -> do
78 -- searchLoop aSearch nid onResult st 90 aPublish r tok (Just ni)
79 interval = _todo :: POSIXTime -- publish interval 91 return ()
80 atomically $ scheduleImmediately announcer k $ NewAnnouncement search announce interval 92 onResult _ = return True -- action for each search-hit (True = keep searching)
93 search = do -- thread to fork
94 atomically $ reset aBuckets aSearch aTarget st
95 searchLoop aSearch aTarget onResult st
96 atomically $ scheduleImmediately announcer k
97 $ SearchFinished {- st -} search announce aInterval
98 interruptDelay (interrutible announcer)
99 atomically $ scheduleImmediately announcer k $ NewAnnouncement search announce aInterval
81 interruptDelay (interrutible announcer) 100 interruptDelay (interrutible announcer)
82 101
83cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 102cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
@@ -129,7 +148,6 @@ performScheduledItem announcer now = \case
129 modifyTVar (scheduled announcer) 148 modifyTVar (scheduled announcer)
130 (PSQ.insert' k (Announce announce interval) (now + interval)) 149 (PSQ.insert' k (Announce announce interval) (now + interval))
131 return $ Just $ do 150 return $ Just $ do
132 interruptDelay (interrutible announcer)
133 fork search 151 fork search
134 return () 152 return ()
135 153
@@ -147,7 +165,7 @@ performScheduledItem announcer now = \case
147 -- search finished: 165 -- search finished:
148 -- if any of the current storing-nodes set have not been 166 -- if any of the current storing-nodes set have not been
149 -- announced to, announce to them. 167 -- announced to, announce to them.
150 (Binding _ SearchFinished _) -> return $ Just $ return () 168 (Binding _ (SearchFinished {- st -} search announce interval) _) -> return $ Just $ return ()
151 169
152 170
153 171