diff options
-rw-r--r-- | Announcer.hs | 66 | ||||
-rw-r--r-- | examples/dhtd.hs | 38 |
2 files changed, 83 insertions, 21 deletions
diff --git a/Announcer.hs b/Announcer.hs index 668e00c2..12119ec1 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -46,6 +46,7 @@ data ScheduledItem | |||
46 | | NewAnnouncement (STM (IO ())) (IO ()) (IO ()) POSIXTime | 46 | | NewAnnouncement (STM (IO ())) (IO ()) (IO ()) POSIXTime |
47 | | SearchFinished (IO ()) (IO ()) POSIXTime | 47 | | SearchFinished (IO ()) (IO ()) POSIXTime |
48 | | Announce (STM (IO ())) (IO ()) POSIXTime | 48 | | Announce (STM (IO ())) (IO ()) POSIXTime |
49 | | SearchResult (STM (IO ())) | ||
49 | | DeleteAnnouncement | 50 | | DeleteAnnouncement |
50 | 51 | ||
51 | data Announcer = Announcer | 52 | data Announcer = Announcer |
@@ -66,12 +67,15 @@ scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () | |||
66 | scheduleImmediately announcer k item | 67 | scheduleImmediately announcer k item |
67 | = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) | 68 | = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) |
68 | 69 | ||
70 | -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. | ||
69 | stopAnnouncer :: Announcer -> IO () | 71 | stopAnnouncer :: Announcer -> IO () |
70 | stopAnnouncer announcer = do | 72 | stopAnnouncer announcer = do |
71 | atomically $ scheduleImmediately announcer (AnnounceKey "*stop*") StopAnnouncer | 73 | atomically $ scheduleImmediately announcer (AnnounceKey "*stop*") StopAnnouncer |
72 | interruptDelay (interrutible announcer) | 74 | interruptDelay (interrutible announcer) |
73 | atomically $ readTVar (announcerActive announcer) >>= check . not | 75 | atomically $ readTVar (announcerActive announcer) >>= check . not |
74 | 76 | ||
77 | -- | This type specifies an item that can be announced on appropriate nodes in | ||
78 | -- a Kademlia network. | ||
75 | data AnnounceMethod r = forall nid ni sr addr tok a. | 79 | data AnnounceMethod r = forall nid ni sr addr tok a. |
76 | ( Show nid | 80 | ( Show nid |
77 | , Hashable nid | 81 | , Hashable nid |
@@ -80,22 +84,54 @@ data AnnounceMethod r = forall nid ni sr addr tok a. | |||
80 | , Ord nid | 84 | , Ord nid |
81 | , Ord ni | 85 | , Ord ni |
82 | ) => AnnounceMethod | 86 | ) => AnnounceMethod |
83 | { aSearch :: Search nid addr tok ni sr | 87 | { aSearch :: Search nid addr tok ni sr |
84 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) | 88 | -- ^ This is the Kademlia search to run repeatedly to find the |
85 | , aBuckets :: TVar (R.BucketList ni) | 89 | -- nearby nodes. A new search is started whenever one is not |
86 | , aTarget :: nid | 90 | -- already in progress at announce time. Repeated searches are |
91 | -- likely to finish faster than the first since nearby nodes | ||
92 | -- are not discarded. | ||
93 | , aPublish :: Either (r -> sr -> IO (Maybe a)) | ||
94 | (r -> tok -> Maybe ni -> IO (Maybe a)) | ||
95 | -- ^ The action to perform when we find nearby nodes. The | ||
96 | -- destination node is given as a Maybe so that methods that | ||
97 | -- treat 'Nothing' as loop-back address can be passed here, | ||
98 | -- however 'Nothing' will not be passed by the announcer | ||
99 | -- thread. | ||
100 | -- | ||
101 | -- There are two cases: | ||
102 | -- | ||
103 | -- [Left] The action to perform requires a search result. | ||
104 | -- This was implemented to support Tox's DHTKey and | ||
105 | -- Friend-Request messages. | ||
106 | -- | ||
107 | -- [Right] The action requires a "token" from the destination | ||
108 | -- node. This is the more typical "announce" semantics for | ||
109 | -- Kademlia. | ||
110 | , aBuckets :: TVar (R.BucketList ni) | ||
111 | -- ^ Set this to the current Kademlia routing table buckets. | ||
112 | , aTarget :: nid | ||
113 | -- ^ This is the Kademlia node-id of the item being announced. | ||
87 | , aInterval :: POSIXTime | 114 | , aInterval :: POSIXTime |
115 | -- ^ Assuming we have nearby nodes from the search, the item | ||
116 | -- will be announced at this interval. | ||
117 | -- | ||
118 | -- Current implementation is to make the scheduled | ||
119 | -- announcements even if the search hasn't finished. It will | ||
120 | -- use the closest nodes found so far. | ||
88 | } | 121 | } |
89 | 122 | ||
123 | -- | Schedule a recurring Search + Announce sequence. | ||
90 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 124 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
91 | schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do | 125 | schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do |
92 | st <- atomically $ newSearch aSearch aTarget [] | 126 | st <- atomically $ newSearch aSearch aTarget [] |
93 | ns <- atomically $ newTVar MM.empty | 127 | ns <- atomically $ newTVar MM.empty |
94 | let astate = AnnounceState st ns | 128 | let astate = AnnounceState st ns |
95 | publishToNodes is = do | 129 | publishToNodes is |
130 | | Left _ <- aPublish = return () | ||
131 | | Right publish <- aPublish = do | ||
96 | forM_ is $ \(Binding ni mtok _) -> do | 132 | forM_ is $ \(Binding ni mtok _) -> do |
97 | forM_ mtok $ \tok -> do | 133 | forM_ mtok $ \tok -> do |
98 | got <- aPublish r tok (Just ni) | 134 | got <- publish r tok (Just ni) |
99 | now <- getPOSIXTime | 135 | now <- getPOSIXTime |
100 | forM_ got $ \_ -> do | 136 | forM_ got $ \_ -> do |
101 | atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | 137 | atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) |
@@ -104,7 +140,20 @@ schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} | |||
104 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | 140 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) |
105 | return $ MM.toList bs | 141 | return $ MM.toList bs |
106 | publishToNodes is | 142 | publishToNodes is |
107 | onResult _ = return True -- action for each search-hit (True = keep searching) | 143 | onResult sr |
144 | | Right _ <- aPublish = return True | ||
145 | | Left sendit <- aPublish = do | ||
146 | scheduleImmediately announcer k $ SearchResult $ return $ do | ||
147 | got <- sendit r sr | ||
148 | -- If we had a way to get the source of a search result, we might want to | ||
149 | -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' | ||
150 | -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent | ||
151 | -- a message be forgotten. | ||
152 | -- | ||
153 | -- forM_ got $ \_ -> do | ||
154 | -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
155 | return () | ||
156 | return True -- True to keep searching. | ||
108 | searchAgain = searchIsFinished st >>= \isfin -> return $ when isfin $ void $ fork search | 157 | searchAgain = searchIsFinished st >>= \isfin -> return $ when isfin $ void $ fork search |
109 | search = do -- thread to fork | 158 | search = do -- thread to fork |
110 | atomically $ reset aBuckets aSearch aTarget st | 159 | atomically $ reset aBuckets aSearch aTarget st |
@@ -130,6 +179,8 @@ cancel announcer k _ _ = do | |||
130 | atomically $ scheduleImmediately announcer k $ DeleteAnnouncement | 179 | atomically $ scheduleImmediately announcer k $ DeleteAnnouncement |
131 | interruptDelay (interrutible announcer) | 180 | interruptDelay (interrutible announcer) |
132 | 181 | ||
182 | -- | Construct an 'Announcer' object and fork a thread in which to perform the | ||
183 | -- Kademlia searches and announces. | ||
133 | forkAnnouncer :: IO Announcer | 184 | forkAnnouncer :: IO Announcer |
134 | forkAnnouncer = do | 185 | forkAnnouncer = do |
135 | delay <- interruptibleDelay | 186 | delay <- interruptibleDelay |
@@ -194,5 +245,6 @@ performScheduledItem announcer now = \case | |||
194 | -- announced to, announce to them. | 245 | -- announced to, announce to them. |
195 | (Binding _ (SearchFinished {- st -} search announce interval) _) -> return $ Just $ return () | 246 | (Binding _ (SearchFinished {- st -} search announce interval) _) -> return $ Just $ return () |
196 | 247 | ||
248 | (Binding _ (SearchResult action) _) -> Just <$> action | ||
197 | 249 | ||
198 | 250 | ||
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 1aa36b77..8ceafd00 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -158,7 +158,8 @@ data DHTAnnouncable nid = forall dta tok ni r. | |||
158 | { announceParseData :: String -> Either String dta | 158 | { announceParseData :: String -> Either String dta |
159 | , announceParseToken :: dta -> String -> Either String tok | 159 | , announceParseToken :: dta -> String -> Either String tok |
160 | , announceParseAddress :: String -> Either String ni | 160 | , announceParseAddress :: String -> Either String ni |
161 | , announceSendData :: dta -> tok -> Maybe ni -> IO (Maybe r) | 161 | , announceSendData :: Either (dta -> r -> IO (Maybe r)) |
162 | (dta -> tok -> Maybe ni -> IO (Maybe r)) | ||
162 | , announceInterval :: POSIXTime | 163 | , announceInterval :: POSIXTime |
163 | , qresultAddr :: dta -> nid | 164 | , qresultAddr :: dta -> nid |
164 | } | 165 | } |
@@ -703,12 +704,16 @@ clientSession s@Session{..} sock cnum h = do | |||
703 | Left e -> hPutClient h e | 704 | Left e -> hPutClient h e |
704 | Right nid -> go nid >>= either (hPutClient h) (hPutClient h . show) | 705 | Right nid -> go nid >>= either (hPutClient h) (hPutClient h . show) |
705 | where | 706 | where |
706 | go | null destination = fmap (maybe (Left "Timeout.") Right) | 707 | go | Right asend <- announceSendData |
707 | . flip (uncurry announceSendData) Nothing | 708 | , null destination = fmap (maybe (Left "Timeout.") Right) |
708 | | otherwise = case announceParseAddress destination of | 709 | . flip (uncurry asend) Nothing |
710 | | Right asend <- announceSendData | ||
711 | = case announceParseAddress destination of | ||
709 | Right ni -> fmap (maybe (Left "Timeout.") Right) | 712 | Right ni -> fmap (maybe (Left "Timeout.") Right) |
710 | . flip (uncurry announceSendData) (Just ni) | 713 | . flip (uncurry asend) (Just ni) |
711 | Left e -> const $ return $ Left ("Bad destination: "++e) | 714 | Left e -> const $ return $ Left ("Bad destination: "++e) |
715 | | Left asend <- announceSendData | ||
716 | = const $ return $ Left "TODO" | ||
712 | maybe (hPutClient h ("Unsupported method: "++method)) | 717 | maybe (hPutClient h ("Unsupported method: "++method)) |
713 | goTarget | 718 | goTarget |
714 | $ Map.lookup method dhtAnnouncables | 719 | $ Map.lookup method dhtAnnouncables |
@@ -780,7 +785,8 @@ clientSession s@Session{..} sock cnum h = do | |||
780 | , announceInterval | 785 | , announceInterval |
781 | , qresultAddr } <- a | 786 | , qresultAddr } <- a |
782 | DHTQuery { qsearch } <- q | 787 | DHTQuery { qsearch } <- q |
783 | Refl <- matchingTok qsearch announceSendData | 788 | asend <- either (const Nothing) Just announceSendData |
789 | Refl <- matchingTok qsearch asend | ||
784 | return () | 790 | return () |
785 | chkni :: Maybe () | 791 | chkni :: Maybe () |
786 | chkni = do | 792 | chkni = do |
@@ -789,7 +795,8 @@ clientSession s@Session{..} sock cnum h = do | |||
789 | , announceInterval | 795 | , announceInterval |
790 | , qresultAddr } <- a | 796 | , qresultAddr } <- a |
791 | DHTQuery { qsearch } <- q | 797 | DHTQuery { qsearch } <- q |
792 | Refl <- matchingNI qsearch announceSendData | 798 | asend <- either (const Nothing) Just announceSendData |
799 | Refl <- matchingNI qsearch asend | ||
793 | return () | 800 | return () |
794 | mameth = do | 801 | mameth = do |
795 | DHTAnnouncable { announceSendData | 802 | DHTAnnouncable { announceSendData |
@@ -797,14 +804,15 @@ clientSession s@Session{..} sock cnum h = do | |||
797 | , announceInterval | 804 | , announceInterval |
798 | , qresultAddr } <- a | 805 | , qresultAddr } <- a |
799 | DHTQuery { qsearch } <- q | 806 | DHTQuery { qsearch } <- q |
800 | (Refl, Refl) <- matchingResult qsearch announceSendData | 807 | asend <- either (const Nothing) Just announceSendData |
808 | (Refl, Refl) <- matchingResult qsearch asend | ||
801 | -- return $ hPutClient h "Type matches." | 809 | -- return $ hPutClient h "Type matches." |
802 | dta <- either (const Nothing) Just $ announceParseData dtastr | 810 | dta <- either (const Nothing) Just $ announceParseData dtastr |
803 | return $ do | 811 | return $ do |
804 | akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr) | 812 | akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr) |
805 | doit op announcer | 813 | doit op announcer |
806 | akey | 814 | akey |
807 | (AnnounceMethod qsearch announceSendData dhtBuckets | 815 | (AnnounceMethod qsearch (Right asend) dhtBuckets |
808 | (qresultAddr dta) | 816 | (qresultAddr dta) |
809 | announceInterval) | 817 | announceInterval) |
810 | dta | 818 | dta |
@@ -1118,7 +1126,7 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1118 | -- pr = Announced | 1126 | -- pr = Announced |
1119 | -- ptok = Token | 1127 | -- ptok = Token |
1120 | -- pni = NodeInfo | 1128 | -- pni = NodeInfo |
1121 | [ ("peer", DHTAnnouncable { announceSendData = \ih tok -> \case | 1129 | [ ("peer", DHTAnnouncable { announceSendData = Right $ \ih tok -> \case |
1122 | Just ni -> do | 1130 | Just ni -> do |
1123 | port <- atomically $ readTVar peerPort | 1131 | port <- atomically $ readTVar peerPort |
1124 | let dta = Mainline.mkAnnounce port ih tok | 1132 | let dta = Mainline.mkAnnounce port ih tok |
@@ -1133,7 +1141,7 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1133 | , ("port", DHTAnnouncable { announceParseData = readEither | 1141 | , ("port", DHTAnnouncable { announceParseData = readEither |
1134 | , announceParseToken = \_ _ -> return () | 1142 | , announceParseToken = \_ _ -> return () |
1135 | , announceParseAddress = const $ Right () | 1143 | , announceParseAddress = const $ Right () |
1136 | , announceSendData = \dta () -> \case | 1144 | , announceSendData = Right $ \dta () -> \case |
1137 | Nothing -> do atomically $ writeTVar peerPort (dta :: PortNumber) | 1145 | Nothing -> do atomically $ writeTVar peerPort (dta :: PortNumber) |
1138 | return $ Just dta | 1146 | return $ Just dta |
1139 | Just _ -> return Nothing | 1147 | Just _ -> return Nothing |
@@ -1222,7 +1230,7 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1222 | , dhtSearches = toxSearches | 1230 | , dhtSearches = toxSearches |
1223 | , dhtFallbackNodes = return [] | 1231 | , dhtFallbackNodes = return [] |
1224 | , dhtAnnouncables = Map.fromList | 1232 | , dhtAnnouncables = Map.fromList |
1225 | [ ("toxid", DHTAnnouncable { announceSendData = \pubkey token -> \case | 1233 | [ ("toxid", DHTAnnouncable { announceSendData = Right $ \pubkey token -> \case |
1226 | Just ni -> | 1234 | Just ni -> |
1227 | Tox.putRendezvous | 1235 | Tox.putRendezvous |
1228 | (Tox.onionTimeout tox) | 1236 | (Tox.onionTimeout tox) |
@@ -1252,7 +1260,8 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1252 | , announceInterval = 15 | 1260 | , announceInterval = 15 |
1253 | 1261 | ||
1254 | }) | 1262 | }) |
1255 | , ("dhtkey", DHTAnnouncable { announceSendData = \pubkey () -> \case | 1263 | -- FIXME: Should use announceSendData = Left ... |
1264 | , ("dhtkey", DHTAnnouncable { announceSendData = Right $ \pubkey () -> \case | ||
1256 | Just addr -> do | 1265 | Just addr -> do |
1257 | dkey <- Tox.getContactInfo tox | 1266 | dkey <- Tox.getContactInfo tox |
1258 | sendMessage | 1267 | sendMessage |
@@ -1283,7 +1292,8 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1283 | , announceInterval = 30 | 1292 | , announceInterval = 30 |
1284 | 1293 | ||
1285 | }) | 1294 | }) |
1286 | , ("friend", DHTAnnouncable { announceSendData = \pubkey nospam -> \case | 1295 | -- FIXME: Should use announceSendData = Left ... |
1296 | , ("friend", DHTAnnouncable { announceSendData = Right $ \pubkey nospam -> \case | ||
1287 | Just addr -> do | 1297 | Just addr -> do |
1288 | let fr = Tox.FriendRequest nospam txt | 1298 | let fr = Tox.FriendRequest nospam txt |
1289 | -- nospam = 0xD64A8B00 | 1299 | -- nospam = 0xD64A8B00 |