summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Announcer.hs66
-rw-r--r--examples/dhtd.hs38
2 files changed, 83 insertions, 21 deletions
diff --git a/Announcer.hs b/Announcer.hs
index 668e00c2..12119ec1 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -46,6 +46,7 @@ data ScheduledItem
46 | NewAnnouncement (STM (IO ())) (IO ()) (IO ()) POSIXTime 46 | NewAnnouncement (STM (IO ())) (IO ()) (IO ()) POSIXTime
47 | SearchFinished (IO ()) (IO ()) POSIXTime 47 | SearchFinished (IO ()) (IO ()) POSIXTime
48 | Announce (STM (IO ())) (IO ()) POSIXTime 48 | Announce (STM (IO ())) (IO ()) POSIXTime
49 | SearchResult (STM (IO ()))
49 | DeleteAnnouncement 50 | DeleteAnnouncement
50 51
51data Announcer = Announcer 52data Announcer = Announcer
@@ -66,12 +67,15 @@ scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM ()
66scheduleImmediately announcer k item 67scheduleImmediately announcer k item
67 = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) 68 = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0)
68 69
70-- | Terminate the 'Announcer' thread. Don't use the Announcer after this.
69stopAnnouncer :: Announcer -> IO () 71stopAnnouncer :: Announcer -> IO ()
70stopAnnouncer announcer = do 72stopAnnouncer announcer = do
71 atomically $ scheduleImmediately announcer (AnnounceKey "*stop*") StopAnnouncer 73 atomically $ scheduleImmediately announcer (AnnounceKey "*stop*") StopAnnouncer
72 interruptDelay (interrutible announcer) 74 interruptDelay (interrutible announcer)
73 atomically $ readTVar (announcerActive announcer) >>= check . not 75 atomically $ readTVar (announcerActive announcer) >>= check . not
74 76
77-- | This type specifies an item that can be announced on appropriate nodes in
78-- a Kademlia network.
75data AnnounceMethod r = forall nid ni sr addr tok a. 79data AnnounceMethod r = forall nid ni sr addr tok a.
76 ( Show nid 80 ( Show nid
77 , Hashable nid 81 , Hashable nid
@@ -80,22 +84,54 @@ data AnnounceMethod r = forall nid ni sr addr tok a.
80 , Ord nid 84 , Ord nid
81 , Ord ni 85 , Ord ni
82 ) => AnnounceMethod 86 ) => AnnounceMethod
83 { aSearch :: Search nid addr tok ni sr 87 { aSearch :: Search nid addr tok ni sr
84 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) 88 -- ^ This is the Kademlia search to run repeatedly to find the
85 , aBuckets :: TVar (R.BucketList ni) 89 -- nearby nodes. A new search is started whenever one is not
86 , aTarget :: nid 90 -- already in progress at announce time. Repeated searches are
91 -- likely to finish faster than the first since nearby nodes
92 -- are not discarded.
93 , aPublish :: Either (r -> sr -> IO (Maybe a))
94 (r -> tok -> Maybe ni -> IO (Maybe a))
95 -- ^ The action to perform when we find nearby nodes. The
96 -- destination node is given as a Maybe so that methods that
97 -- treat 'Nothing' as loop-back address can be passed here,
98 -- however 'Nothing' will not be passed by the announcer
99 -- thread.
100 --
101 -- There are two cases:
102 --
103 -- [Left] The action to perform requires a search result.
104 -- This was implemented to support Tox's DHTKey and
105 -- Friend-Request messages.
106 --
107 -- [Right] The action requires a "token" from the destination
108 -- node. This is the more typical "announce" semantics for
109 -- Kademlia.
110 , aBuckets :: TVar (R.BucketList ni)
111 -- ^ Set this to the current Kademlia routing table buckets.
112 , aTarget :: nid
113 -- ^ This is the Kademlia node-id of the item being announced.
87 , aInterval :: POSIXTime 114 , aInterval :: POSIXTime
115 -- ^ Assuming we have nearby nodes from the search, the item
116 -- will be announced at this interval.
117 --
118 -- Current implementation is to make the scheduled
119 -- announcements even if the search hasn't finished. It will
120 -- use the closest nodes found so far.
88 } 121 }
89 122
123-- | Schedule a recurring Search + Announce sequence.
90schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 124schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
91schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do 125schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do
92 st <- atomically $ newSearch aSearch aTarget [] 126 st <- atomically $ newSearch aSearch aTarget []
93 ns <- atomically $ newTVar MM.empty 127 ns <- atomically $ newTVar MM.empty
94 let astate = AnnounceState st ns 128 let astate = AnnounceState st ns
95 publishToNodes is = do 129 publishToNodes is
130 | Left _ <- aPublish = return ()
131 | Right publish <- aPublish = do
96 forM_ is $ \(Binding ni mtok _) -> do 132 forM_ is $ \(Binding ni mtok _) -> do
97 forM_ mtok $ \tok -> do 133 forM_ mtok $ \tok -> do
98 got <- aPublish r tok (Just ni) 134 got <- publish r tok (Just ni)
99 now <- getPOSIXTime 135 now <- getPOSIXTime
100 forM_ got $ \_ -> do 136 forM_ got $ \_ -> do
101 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) 137 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
@@ -104,7 +140,20 @@ schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval}
104 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) 140 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
105 return $ MM.toList bs 141 return $ MM.toList bs
106 publishToNodes is 142 publishToNodes is
107 onResult _ = return True -- action for each search-hit (True = keep searching) 143 onResult sr
144 | Right _ <- aPublish = return True
145 | Left sendit <- aPublish = do
146 scheduleImmediately announcer k $ SearchResult $ return $ do
147 got <- sendit r sr
148 -- If we had a way to get the source of a search result, we might want to
149 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes'
150 -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent
151 -- a message be forgotten.
152 --
153 -- forM_ got $ \_ -> do
154 -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
155 return ()
156 return True -- True to keep searching.
108 searchAgain = searchIsFinished st >>= \isfin -> return $ when isfin $ void $ fork search 157 searchAgain = searchIsFinished st >>= \isfin -> return $ when isfin $ void $ fork search
109 search = do -- thread to fork 158 search = do -- thread to fork
110 atomically $ reset aBuckets aSearch aTarget st 159 atomically $ reset aBuckets aSearch aTarget st
@@ -130,6 +179,8 @@ cancel announcer k _ _ = do
130 atomically $ scheduleImmediately announcer k $ DeleteAnnouncement 179 atomically $ scheduleImmediately announcer k $ DeleteAnnouncement
131 interruptDelay (interrutible announcer) 180 interruptDelay (interrutible announcer)
132 181
182-- | Construct an 'Announcer' object and fork a thread in which to perform the
183-- Kademlia searches and announces.
133forkAnnouncer :: IO Announcer 184forkAnnouncer :: IO Announcer
134forkAnnouncer = do 185forkAnnouncer = do
135 delay <- interruptibleDelay 186 delay <- interruptibleDelay
@@ -194,5 +245,6 @@ performScheduledItem announcer now = \case
194 -- announced to, announce to them. 245 -- announced to, announce to them.
195 (Binding _ (SearchFinished {- st -} search announce interval) _) -> return $ Just $ return () 246 (Binding _ (SearchFinished {- st -} search announce interval) _) -> return $ Just $ return ()
196 247
248 (Binding _ (SearchResult action) _) -> Just <$> action
197 249
198 250
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index 1aa36b77..8ceafd00 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -158,7 +158,8 @@ data DHTAnnouncable nid = forall dta tok ni r.
158 { announceParseData :: String -> Either String dta 158 { announceParseData :: String -> Either String dta
159 , announceParseToken :: dta -> String -> Either String tok 159 , announceParseToken :: dta -> String -> Either String tok
160 , announceParseAddress :: String -> Either String ni 160 , announceParseAddress :: String -> Either String ni
161 , announceSendData :: dta -> tok -> Maybe ni -> IO (Maybe r) 161 , announceSendData :: Either (dta -> r -> IO (Maybe r))
162 (dta -> tok -> Maybe ni -> IO (Maybe r))
162 , announceInterval :: POSIXTime 163 , announceInterval :: POSIXTime
163 , qresultAddr :: dta -> nid 164 , qresultAddr :: dta -> nid
164 } 165 }
@@ -703,12 +704,16 @@ clientSession s@Session{..} sock cnum h = do
703 Left e -> hPutClient h e 704 Left e -> hPutClient h e
704 Right nid -> go nid >>= either (hPutClient h) (hPutClient h . show) 705 Right nid -> go nid >>= either (hPutClient h) (hPutClient h . show)
705 where 706 where
706 go | null destination = fmap (maybe (Left "Timeout.") Right) 707 go | Right asend <- announceSendData
707 . flip (uncurry announceSendData) Nothing 708 , null destination = fmap (maybe (Left "Timeout.") Right)
708 | otherwise = case announceParseAddress destination of 709 . flip (uncurry asend) Nothing
710 | Right asend <- announceSendData
711 = case announceParseAddress destination of
709 Right ni -> fmap (maybe (Left "Timeout.") Right) 712 Right ni -> fmap (maybe (Left "Timeout.") Right)
710 . flip (uncurry announceSendData) (Just ni) 713 . flip (uncurry asend) (Just ni)
711 Left e -> const $ return $ Left ("Bad destination: "++e) 714 Left e -> const $ return $ Left ("Bad destination: "++e)
715 | Left asend <- announceSendData
716 = const $ return $ Left "TODO"
712 maybe (hPutClient h ("Unsupported method: "++method)) 717 maybe (hPutClient h ("Unsupported method: "++method))
713 goTarget 718 goTarget
714 $ Map.lookup method dhtAnnouncables 719 $ Map.lookup method dhtAnnouncables
@@ -780,7 +785,8 @@ clientSession s@Session{..} sock cnum h = do
780 , announceInterval 785 , announceInterval
781 , qresultAddr } <- a 786 , qresultAddr } <- a
782 DHTQuery { qsearch } <- q 787 DHTQuery { qsearch } <- q
783 Refl <- matchingTok qsearch announceSendData 788 asend <- either (const Nothing) Just announceSendData
789 Refl <- matchingTok qsearch asend
784 return () 790 return ()
785 chkni :: Maybe () 791 chkni :: Maybe ()
786 chkni = do 792 chkni = do
@@ -789,7 +795,8 @@ clientSession s@Session{..} sock cnum h = do
789 , announceInterval 795 , announceInterval
790 , qresultAddr } <- a 796 , qresultAddr } <- a
791 DHTQuery { qsearch } <- q 797 DHTQuery { qsearch } <- q
792 Refl <- matchingNI qsearch announceSendData 798 asend <- either (const Nothing) Just announceSendData
799 Refl <- matchingNI qsearch asend
793 return () 800 return ()
794 mameth = do 801 mameth = do
795 DHTAnnouncable { announceSendData 802 DHTAnnouncable { announceSendData
@@ -797,14 +804,15 @@ clientSession s@Session{..} sock cnum h = do
797 , announceInterval 804 , announceInterval
798 , qresultAddr } <- a 805 , qresultAddr } <- a
799 DHTQuery { qsearch } <- q 806 DHTQuery { qsearch } <- q
800 (Refl, Refl) <- matchingResult qsearch announceSendData 807 asend <- either (const Nothing) Just announceSendData
808 (Refl, Refl) <- matchingResult qsearch asend
801 -- return $ hPutClient h "Type matches." 809 -- return $ hPutClient h "Type matches."
802 dta <- either (const Nothing) Just $ announceParseData dtastr 810 dta <- either (const Nothing) Just $ announceParseData dtastr
803 return $ do 811 return $ do
804 akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr) 812 akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr)
805 doit op announcer 813 doit op announcer
806 akey 814 akey
807 (AnnounceMethod qsearch announceSendData dhtBuckets 815 (AnnounceMethod qsearch (Right asend) dhtBuckets
808 (qresultAddr dta) 816 (qresultAddr dta)
809 announceInterval) 817 announceInterval)
810 dta 818 dta
@@ -1118,7 +1126,7 @@ main = runResourceT $ liftBaseWith $ \resT -> do
1118 -- pr = Announced 1126 -- pr = Announced
1119 -- ptok = Token 1127 -- ptok = Token
1120 -- pni = NodeInfo 1128 -- pni = NodeInfo
1121 [ ("peer", DHTAnnouncable { announceSendData = \ih tok -> \case 1129 [ ("peer", DHTAnnouncable { announceSendData = Right $ \ih tok -> \case
1122 Just ni -> do 1130 Just ni -> do
1123 port <- atomically $ readTVar peerPort 1131 port <- atomically $ readTVar peerPort
1124 let dta = Mainline.mkAnnounce port ih tok 1132 let dta = Mainline.mkAnnounce port ih tok
@@ -1133,7 +1141,7 @@ main = runResourceT $ liftBaseWith $ \resT -> do
1133 , ("port", DHTAnnouncable { announceParseData = readEither 1141 , ("port", DHTAnnouncable { announceParseData = readEither
1134 , announceParseToken = \_ _ -> return () 1142 , announceParseToken = \_ _ -> return ()
1135 , announceParseAddress = const $ Right () 1143 , announceParseAddress = const $ Right ()
1136 , announceSendData = \dta () -> \case 1144 , announceSendData = Right $ \dta () -> \case
1137 Nothing -> do atomically $ writeTVar peerPort (dta :: PortNumber) 1145 Nothing -> do atomically $ writeTVar peerPort (dta :: PortNumber)
1138 return $ Just dta 1146 return $ Just dta
1139 Just _ -> return Nothing 1147 Just _ -> return Nothing
@@ -1222,7 +1230,7 @@ main = runResourceT $ liftBaseWith $ \resT -> do
1222 , dhtSearches = toxSearches 1230 , dhtSearches = toxSearches
1223 , dhtFallbackNodes = return [] 1231 , dhtFallbackNodes = return []
1224 , dhtAnnouncables = Map.fromList 1232 , dhtAnnouncables = Map.fromList
1225 [ ("toxid", DHTAnnouncable { announceSendData = \pubkey token -> \case 1233 [ ("toxid", DHTAnnouncable { announceSendData = Right $ \pubkey token -> \case
1226 Just ni -> 1234 Just ni ->
1227 Tox.putRendezvous 1235 Tox.putRendezvous
1228 (Tox.onionTimeout tox) 1236 (Tox.onionTimeout tox)
@@ -1252,7 +1260,8 @@ main = runResourceT $ liftBaseWith $ \resT -> do
1252 , announceInterval = 15 1260 , announceInterval = 15
1253 1261
1254 }) 1262 })
1255 , ("dhtkey", DHTAnnouncable { announceSendData = \pubkey () -> \case 1263 -- FIXME: Should use announceSendData = Left ...
1264 , ("dhtkey", DHTAnnouncable { announceSendData = Right $ \pubkey () -> \case
1256 Just addr -> do 1265 Just addr -> do
1257 dkey <- Tox.getContactInfo tox 1266 dkey <- Tox.getContactInfo tox
1258 sendMessage 1267 sendMessage
@@ -1283,7 +1292,8 @@ main = runResourceT $ liftBaseWith $ \resT -> do
1283 , announceInterval = 30 1292 , announceInterval = 30
1284 1293
1285 }) 1294 })
1286 , ("friend", DHTAnnouncable { announceSendData = \pubkey nospam -> \case 1295 -- FIXME: Should use announceSendData = Left ...
1296 , ("friend", DHTAnnouncable { announceSendData = Right $ \pubkey nospam -> \case
1287 Just addr -> do 1297 Just addr -> do
1288 let fr = Tox.FriendRequest nospam txt 1298 let fr = Tox.FriendRequest nospam txt
1289 -- nospam = 0xD64A8B00 1299 -- nospam = 0xD64A8B00