diff options
-rw-r--r-- | Announcer.hs | 50 | ||||
-rw-r--r-- | examples/dhtd.hs | 118 | ||||
-rw-r--r-- | src/Network/Kademlia/Search.hs | 3 |
3 files changed, 131 insertions, 40 deletions
diff --git a/Announcer.hs b/Announcer.hs index 1f539d5d..f19f8d46 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -15,9 +15,11 @@ module Announcer | |||
15 | , cancel | 15 | , cancel |
16 | ) where | 16 | ) where |
17 | 17 | ||
18 | import Data.Wrapper.PSQ as PSQ | 18 | import qualified Data.MinMaxPSQ as MM |
19 | import Network.Kademlia.Search | 19 | import Data.Wrapper.PSQ as PSQ |
20 | import InterruptibleDelay | 20 | import InterruptibleDelay |
21 | import Network.Kademlia.Routing as R | ||
22 | import Network.Kademlia.Search | ||
21 | 23 | ||
22 | import Control.Concurrent.Lifted.Instrument | 24 | import Control.Concurrent.Lifted.Instrument |
23 | import Control.Concurrent.STM | 25 | import Control.Concurrent.STM |
@@ -42,9 +44,9 @@ data ScheduledItem | |||
42 | = forall r. ScheduledItem (AnnounceMethod r) | 44 | = forall r. ScheduledItem (AnnounceMethod r) |
43 | | StopAnnouncer | 45 | | StopAnnouncer |
44 | | NewAnnouncement (IO ()) (IO ()) POSIXTime | 46 | | NewAnnouncement (IO ()) (IO ()) POSIXTime |
47 | | SearchFinished (IO ()) (IO ()) POSIXTime | ||
45 | | Announce (IO ()) POSIXTime | 48 | | Announce (IO ()) POSIXTime |
46 | | DeleteAnnouncement | 49 | | DeleteAnnouncement |
47 | | SearchFinished | ||
48 | 50 | ||
49 | data Announcer = Announcer | 51 | data Announcer = Announcer |
50 | { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) | 52 | { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) |
@@ -62,22 +64,39 @@ stopAnnouncer announcer = do | |||
62 | interruptDelay (interrutible announcer) | 64 | interruptDelay (interrutible announcer) |
63 | atomically $ readTVar (announcerActive announcer) >>= check . not | 65 | atomically $ readTVar (announcerActive announcer) >>= check . not |
64 | 66 | ||
65 | data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod | 67 | data AnnounceMethod r = forall nid ni addr tok a. |
68 | ( Show nid | ||
69 | , Hashable nid | ||
70 | , Hashable ni | ||
71 | , Ord addr | ||
72 | , Ord nid | ||
73 | , Ord ni | ||
74 | ) => AnnounceMethod | ||
66 | { aSearch :: Search nid addr tok ni r | 75 | { aSearch :: Search nid addr tok ni r |
67 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) | 76 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) |
77 | , aBuckets :: TVar (R.BucketList ni) | ||
78 | , aTarget :: nid | ||
79 | , aInterval :: POSIXTime | ||
68 | } | 80 | } |
69 | 81 | ||
70 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 82 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
71 | schedule announcer k AnnounceMethod{aSearch,aPublish} r = do | 83 | schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do |
72 | let announce = _todo :: IO () -- publish to current search results | 84 | st <- atomically $ newSearch aSearch aTarget [] |
73 | onResult _ = return True | 85 | let announce = do -- publish to current search results |
74 | search = _todo :: IO () -- thread to fork | 86 | is <- atomically $ do |
75 | -- ns <- R.kclosest (searchSpace qsearch) searchK nid <$> readTVar dhtBuckets | 87 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) |
76 | -- st <- newSearch qsearch nid ns | 88 | return $ MM.toList bs |
77 | -- searchLoop :: Search nid addr tok ni r -> nid -> (r -> STM Bool) -> SearchState nid addr tok ni r -> IO () | 89 | forM_ is $ \(Binding ni tok _) -> do |
78 | -- searchLoop aSearch nid onResult st | 90 | aPublish r tok (Just ni) |
79 | interval = _todo :: POSIXTime -- publish interval | 91 | return () |
80 | atomically $ scheduleImmediately announcer k $ NewAnnouncement search announce interval | 92 | onResult _ = return True -- action for each search-hit (True = keep searching) |
93 | search = do -- thread to fork | ||
94 | atomically $ reset aBuckets aSearch aTarget st | ||
95 | searchLoop aSearch aTarget onResult st | ||
96 | atomically $ scheduleImmediately announcer k | ||
97 | $ SearchFinished {- st -} search announce aInterval | ||
98 | interruptDelay (interrutible announcer) | ||
99 | atomically $ scheduleImmediately announcer k $ NewAnnouncement search announce aInterval | ||
81 | interruptDelay (interrutible announcer) | 100 | interruptDelay (interrutible announcer) |
82 | 101 | ||
83 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 102 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
@@ -129,7 +148,6 @@ performScheduledItem announcer now = \case | |||
129 | modifyTVar (scheduled announcer) | 148 | modifyTVar (scheduled announcer) |
130 | (PSQ.insert' k (Announce announce interval) (now + interval)) | 149 | (PSQ.insert' k (Announce announce interval) (now + interval)) |
131 | return $ Just $ do | 150 | return $ Just $ do |
132 | interruptDelay (interrutible announcer) | ||
133 | fork search | 151 | fork search |
134 | return () | 152 | return () |
135 | 153 | ||
@@ -147,7 +165,7 @@ performScheduledItem announcer now = \case | |||
147 | -- search finished: | 165 | -- search finished: |
148 | -- if any of the current storing-nodes set have not been | 166 | -- if any of the current storing-nodes set have not been |
149 | -- announced to, announce to them. | 167 | -- announced to, announce to them. |
150 | (Binding _ SearchFinished _) -> return $ Just $ return () | 168 | (Binding _ (SearchFinished {- st -} search announce interval) _) -> return $ Just $ return () |
151 | 169 | ||
152 | 170 | ||
153 | 171 | ||
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index bd9b9e09..f9dce3bc 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -112,10 +112,11 @@ data DHTQuery nid ni = forall addr r tok. | |||
112 | , Typeable tok | 112 | , Typeable tok |
113 | , Typeable ni | 113 | , Typeable ni |
114 | ) => DHTQuery | 114 | ) => DHTQuery |
115 | { qsearch :: Search nid addr tok ni r | 115 | { qsearch :: Search nid addr tok ni r |
116 | , qhandler :: ni -> nid -> IO ([ni], [r], tok) -- ^ Invoked on local node, when there is no query destination. | 116 | , qhandler :: ni -> nid -> IO ([ni], [r], tok) -- ^ Invoked on local node, when there is no query destination. |
117 | , qshowR :: r -> String | 117 | , qshowR :: r -> String |
118 | , qshowTok :: tok -> Maybe String | 118 | , qshowTok :: tok -> Maybe String |
119 | , qresultAddr :: r -> nid | ||
119 | } | 120 | } |
120 | 121 | ||
121 | data DHTAnnouncable = forall dta tok ni r. | 122 | data DHTAnnouncable = forall dta tok ni r. |
@@ -129,6 +130,7 @@ data DHTAnnouncable = forall dta tok ni r. | |||
129 | , announceParseToken :: dta -> String -> Either String tok | 130 | , announceParseToken :: dta -> String -> Either String tok |
130 | , announceParseAddress :: String -> Either String ni | 131 | , announceParseAddress :: String -> Either String ni |
131 | , announceSendData :: dta -> tok -> Maybe ni -> IO (Maybe r) | 132 | , announceSendData :: dta -> tok -> Maybe ni -> IO (Maybe r) |
133 | , announceInterval :: POSIXTime | ||
132 | } | 134 | } |
133 | 135 | ||
134 | data DHTLink = forall status linkid params. | 136 | data DHTLink = forall status linkid params. |
@@ -616,15 +618,19 @@ clientSession s@Session{..} sock cnum h = do | |||
616 | matchingResult _ _ = liftA3 (\a b c -> (a,b,c)) eqT eqT eqT | 618 | matchingResult _ _ = liftA3 (\a b c -> (a,b,c)) eqT eqT eqT |
617 | mameth = do | 619 | mameth = do |
618 | DHTAnnouncable { announceSendData | 620 | DHTAnnouncable { announceSendData |
619 | , announceParseData } <- a | 621 | , announceParseData |
620 | DHTQuery { qsearch } <- q | 622 | , announceInterval } <- a |
623 | DHTQuery { qsearch | ||
624 | , qresultAddr } <- q | ||
621 | (Refl,Refl,nr@Refl) <- matchingResult qsearch announceSendData | 625 | (Refl,Refl,nr@Refl) <- matchingResult qsearch announceSendData |
622 | dta <- either (const Nothing) Just $ announceParseData dtastr | 626 | dta <- either (const Nothing) Just $ announceParseData dtastr |
623 | return $ do | 627 | return $ do |
624 | akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr) | 628 | akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr) |
625 | doit op nr announcer | 629 | doit op nr announcer |
626 | akey | 630 | akey |
627 | (AnnounceMethod qsearch announceSendData) | 631 | (AnnounceMethod qsearch announceSendData dhtBuckets |
632 | (qresultAddr dta) | ||
633 | announceInterval) | ||
628 | dta | 634 | dta |
629 | fromMaybe (hPutClient h "error.") mameth | 635 | fromMaybe (hPutClient h "error.") mameth |
630 | 636 | ||
@@ -824,19 +830,25 @@ main = do | |||
824 | , pingShowResult = show | 830 | , pingShowResult = show |
825 | } | 831 | } |
826 | , dhtQuery = Map.fromList | 832 | , dhtQuery = Map.fromList |
827 | [ ("node", DHTQuery (Mainline.nodeSearch bt) | 833 | [ ("node", DHTQuery |
828 | (\ni -> fmap Mainline.unwrapNodes | 834 | { qsearch = (Mainline.nodeSearch bt) |
835 | , qhandler = (\ni -> fmap Mainline.unwrapNodes | ||
829 | . Mainline.findNodeH btR ni | 836 | . Mainline.findNodeH btR ni |
830 | . flip Mainline.FindNode (Just Want_Both)) | 837 | . flip Mainline.FindNode (Just Want_Both)) |
831 | show | 838 | , qshowR = show |
832 | (const Nothing)) | 839 | , qshowTok = (const Nothing) |
833 | , ("peer", DHTQuery (Mainline.peerSearch bt) | 840 | , qresultAddr = Mainline.nodeId |
834 | (\ni -> fmap Mainline.unwrapPeers | 841 | }) |
842 | , ("peer", DHTQuery | ||
843 | { qsearch = (Mainline.peerSearch bt) | ||
844 | , qhandler = (\ni -> fmap Mainline.unwrapPeers | ||
835 | . Mainline.getPeersH btR swarms ni | 845 | . Mainline.getPeersH btR swarms ni |
836 | . flip Mainline.GetPeers (Just Want_Both) | 846 | . flip Mainline.GetPeers (Just Want_Both) |
837 | . (read . show)) -- TODO: InfoHash -> NodeId | 847 | . (read . show)) -- TODO: InfoHash -> NodeId |
838 | (show . pPrint) | 848 | , qshowR = (show . pPrint) |
839 | (Just . show)) | 849 | , qshowTok = (Just . show) |
850 | , qresultAddr = (read . show) -- TODO: InfoHash -> NodeId | ||
851 | }) | ||
840 | ] | 852 | ] |
841 | , dhtParseId = readEither :: String -> Either String Mainline.NodeId | 853 | , dhtParseId = readEither :: String -> Either String Mainline.NodeId |
842 | , dhtSearches = mainlineSearches | 854 | , dhtSearches = mainlineSearches |
@@ -851,6 +863,7 @@ main = do | |||
851 | , announceParseAddress = readEither | 863 | , announceParseAddress = readEither |
852 | , announceParseData = readEither | 864 | , announceParseData = readEither |
853 | , announceParseToken = const $ readEither | 865 | , announceParseToken = const $ readEither |
866 | , announceInterval = 60 -- TODO: Is one minute good? | ||
854 | }) | 867 | }) |
855 | , ("port", DHTAnnouncable { announceParseData = readEither | 868 | , ("port", DHTAnnouncable { announceParseData = readEither |
856 | , announceParseToken = \_ _ -> return () | 869 | , announceParseToken = \_ _ -> return () |
@@ -859,6 +872,8 @@ main = do | |||
859 | Nothing -> do atomically $ writeTVar peerPort (dta :: PortNumber) | 872 | Nothing -> do atomically $ writeTVar peerPort (dta :: PortNumber) |
860 | return $ Just dta | 873 | return $ Just dta |
861 | Just _ -> return Nothing | 874 | Just _ -> return Nothing |
875 | , announceInterval = 0 -- TODO: The "port" setting should probably | ||
876 | -- be a command rather than an announcement. | ||
862 | })] | 877 | })] |
863 | 878 | ||
864 | , dhtLinks = Map.fromList | 879 | , dhtLinks = Map.fromList |
@@ -908,16 +923,20 @@ main = do | |||
908 | , pingShowResult = show | 923 | , pingShowResult = show |
909 | })] | 924 | })] |
910 | , dhtQuery = Map.fromList | 925 | , dhtQuery = Map.fromList |
911 | [ ("node", DHTQuery (Tox.nodeSearch $ Tox.toxDHT tox) | 926 | [ ("node", DHTQuery |
912 | (\ni -> fmap Tox.unwrapNodes | 927 | { qsearch = (Tox.nodeSearch $ Tox.toxDHT tox) |
928 | , qhandler = (\ni -> fmap Tox.unwrapNodes | ||
913 | . Tox.getNodesH (Tox.toxRouting tox) ni | 929 | . Tox.getNodesH (Tox.toxRouting tox) ni |
914 | . Tox.GetNodes) | 930 | . Tox.GetNodes) |
915 | show -- NodeInfo | 931 | , qshowR = show -- NodeInfo |
916 | (const Nothing)) | 932 | , qshowTok = (const Nothing) |
917 | , ("toxid", DHTQuery (Tox.toxidSearch (Tox.onionTimeout tox) | 933 | , qresultAddr = Tox.nodeId |
934 | }) | ||
935 | , ("toxid", DHTQuery | ||
936 | { qsearch = (Tox.toxidSearch (Tox.onionTimeout tox) | ||
918 | (Tox.toxCryptoKeys tox) | 937 | (Tox.toxCryptoKeys tox) |
919 | (Tox.toxOnion tox)) | 938 | (Tox.toxOnion tox)) |
920 | -- qhandler :: ni -> nid -> IO ([ni], [r], tok) | 939 | , qhandler = -- qhandler :: ni -> nid -> IO ([ni], [r], tok) |
921 | (\ni nid -> | 940 | (\ni nid -> |
922 | Tox.unwrapAnnounceResponse Nothing | 941 | Tox.unwrapAnnounceResponse Nothing |
923 | <$> clientAddress (Tox.toxDHT tox) Nothing | 942 | <$> clientAddress (Tox.toxDHT tox) Nothing |
@@ -926,8 +945,10 @@ main = do | |||
926 | (Tox.toxAnnouncedKeys tox) | 945 | (Tox.toxAnnouncedKeys tox) |
927 | (Tox.OnionDestination Tox.SearchingAlias ni Nothing) | 946 | (Tox.OnionDestination Tox.SearchingAlias ni Nothing) |
928 | (Tox.AnnounceRequest zeros32 nid Tox.zeroID)) | 947 | (Tox.AnnounceRequest zeros32 nid Tox.zeroID)) |
929 | show -- PublicKey | 948 | , qshowR = show -- Rendezvous |
930 | (fmap show)) | 949 | , qshowTok = (fmap show) -- Nonce32 |
950 | , qresultAddr = Tox.key2id . Tox.rendezvousKey | ||
951 | }) | ||
931 | ] | 952 | ] |
932 | , dhtParseId = readEither :: String -> Either String Tox.NodeId | 953 | , dhtParseId = readEither :: String -> Either String Tox.NodeId |
933 | , dhtSearches = toxSearches | 954 | , dhtSearches = toxSearches |
@@ -946,6 +967,21 @@ main = do | |||
946 | , announceParseAddress = readEither | 967 | , announceParseAddress = readEither |
947 | , announceParseToken = const $ readEither | 968 | , announceParseToken = const $ readEither |
948 | , announceParseData = fmap Tox.id2key . readEither | 969 | , announceParseData = fmap Tox.id2key . readEither |
970 | |||
971 | -- For peers we are announcing ourselves to, if we are not | ||
972 | -- announced to them toxcore tries every 3 seconds to | ||
973 | -- announce ourselves to them until they return that we | ||
974 | -- have announced ourselves to, then toxcore sends an | ||
975 | -- announce request packet every 15 seconds to see if we | ||
976 | -- are still announced and re announce ourselves at the | ||
977 | -- same time. The timeout of 15 seconds means a `ping_id` | ||
978 | -- received in the last packet will not have had time to | ||
979 | -- expire (20 second minimum timeout) before it is resent | ||
980 | -- 15 seconds later. Toxcore sends every announce packet | ||
981 | -- with the `ping_id` previously received from that peer | ||
982 | -- with the same path (if possible). | ||
983 | , announceInterval = 15 | ||
984 | |||
949 | }) | 985 | }) |
950 | , ("dhtkey", DHTAnnouncable { announceSendData = \pubkey () -> \case | 986 | , ("dhtkey", DHTAnnouncable { announceSendData = \pubkey () -> \case |
951 | Just addr -> do | 987 | Just addr -> do |
@@ -959,6 +995,23 @@ main = do | |||
959 | , announceParseAddress = readEither | 995 | , announceParseAddress = readEither |
960 | , announceParseToken = \_ _ -> return () | 996 | , announceParseToken = \_ _ -> return () |
961 | , announceParseData = fmap Tox.id2key . readEither | 997 | , announceParseData = fmap Tox.id2key . readEither |
998 | |||
999 | -- We send this packet every 30 seconds if there is more | ||
1000 | -- than one peer (in the 8) that says they our friend is | ||
1001 | -- announced on them. This packet can also be sent through | ||
1002 | -- the DHT module as a DHT request packet (see DHT) if we | ||
1003 | -- know the DHT public key of the friend and are looking | ||
1004 | -- for them in the DHT but have not connected to them yet. | ||
1005 | -- 30 second is a reasonable timeout to not flood the | ||
1006 | -- network with too many packets while making sure the | ||
1007 | -- other will eventually receive the packet. Since packets | ||
1008 | -- are sent through every peer that knows the friend, | ||
1009 | -- resending it right away without waiting has a high | ||
1010 | -- likelihood of failure as the chances of packet loss | ||
1011 | -- happening to all (up to to 8) packets sent is low. | ||
1012 | -- | ||
1013 | , announceInterval = 30 | ||
1014 | |||
962 | }) | 1015 | }) |
963 | , ("friend", DHTAnnouncable { announceSendData = \pubkey nospam -> \case | 1016 | , ("friend", DHTAnnouncable { announceSendData = \pubkey nospam -> \case |
964 | Just addr -> do | 1017 | Just addr -> do |
@@ -979,6 +1032,25 @@ main = do | |||
979 | (Tox.verifyChecksum pubkey) | 1032 | (Tox.verifyChecksum pubkey) |
980 | chksum | 1033 | chksum |
981 | return nospam | 1034 | return nospam |
1035 | |||
1036 | -- Friend requests are sent with exponentially increasing | ||
1037 | -- interval of 2 seconds, 4 seconds, 8 seconds, etc... in | ||
1038 | -- toxcore. This is so friend requests get resent but | ||
1039 | -- eventually get resent in intervals that are so big that | ||
1040 | -- they essentially expire. The sender has no way of | ||
1041 | -- knowing if a peer refuses a friend requests which is why | ||
1042 | -- friend requests need to expire in some way. Note that | ||
1043 | -- the interval is the minimum timeout, if toxcore cannot | ||
1044 | -- send that friend request it will try again until it | ||
1045 | -- manages to send it. One reason for not being able to | ||
1046 | -- send the friend request would be that the onion has not | ||
1047 | -- found the friend in the onion and so cannot send an | ||
1048 | -- onion data packet to them. | ||
1049 | -- | ||
1050 | -- TODO: Support exponential backoff behavior. For now, setting | ||
1051 | -- interval to 8 seconds. | ||
1052 | |||
1053 | , announceInterval = 8 | ||
982 | })] | 1054 | })] |
983 | , dhtLinks = Map.fromList | 1055 | , dhtLinks = Map.fromList |
984 | [ {- TODO -} | 1056 | [ {- TODO -} |
diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs index 5f024cd0..9d51e815 100644 --- a/src/Network/Kademlia/Search.hs +++ b/src/Network/Kademlia/Search.hs | |||
@@ -105,7 +105,7 @@ reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => | |||
105 | -> Search nid addr1 tok1 ni r1 | 105 | -> Search nid addr1 tok1 ni r1 |
106 | -> nid | 106 | -> nid |
107 | -> SearchState nid addr tok ni r | 107 | -> SearchState nid addr tok ni r |
108 | -> STM () | 108 | -> STM (SearchState nid addr tok ni r) |
109 | reset bkts qsearch target st = do | 109 | reset bkts qsearch target st = do |
110 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. | 110 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. |
111 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) | 111 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) |
@@ -116,6 +116,7 @@ reset bkts qsearch target st = do | |||
116 | writeTVar (searchInformant st) MM.empty | 116 | writeTVar (searchInformant st) MM.empty |
117 | writeTVar (searchVisited st) Set.empty | 117 | writeTVar (searchVisited st) Set.empty |
118 | writeTVar (searchPendingCount st) 0 | 118 | writeTVar (searchPendingCount st) 0 |
119 | return st | ||
119 | 120 | ||
120 | searchAlpha :: Int | 121 | searchAlpha :: Int |
121 | searchAlpha = 8 | 122 | searchAlpha = 8 |