diff options
author | joe <joe@jerkface.net> | 2017-07-16 17:33:45 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-07-16 17:33:45 -0400 |
commit | b4349ffe0a8ceed841cfc3941b024afe7183c10d (patch) | |
tree | 6506b3bde102bd6c611ad12fb12d175e80a59eb6 /Kademlia.hs | |
parent | c554d48acb6861160480711d299f22f857daec97 (diff) |
Scratch work, Kademlia insert-node operation.
Diffstat (limited to 'Kademlia.hs')
-rw-r--r-- | Kademlia.hs | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/Kademlia.hs b/Kademlia.hs new file mode 100644 index 00000000..d29a3240 --- /dev/null +++ b/Kademlia.hs | |||
@@ -0,0 +1,182 @@ | |||
1 | {-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} | ||
2 | {-# LANGUAGE KindSignatures #-} | ||
3 | {-# LANGUAGE DeriveFunctor, DeriveTraversable #-} | ||
4 | -- {-# LANGUAGE TypeFamilies #-} | ||
5 | {-# LANGUAGE GADTs #-} | ||
6 | module Kademlia where | ||
7 | |||
8 | import Data.Maybe | ||
9 | import Network.DHT.Routing as R | ||
10 | import Data.Time.Clock (getCurrentTime) | ||
11 | import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) | ||
12 | #ifdef THREAD_DEBUG | ||
13 | import Control.Concurrent.Lifted.Instrument | ||
14 | #else | ||
15 | import GHC.Conc (labelThread) | ||
16 | import Control.Concurrent.Lifted | ||
17 | #endif | ||
18 | |||
19 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | ||
20 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
21 | import Data.IP | ||
22 | import Control.Concurrent.STM | ||
23 | import Control.Monad | ||
24 | import Data.Monoid | ||
25 | import Data.Time.Clock.POSIX (POSIXTime) | ||
26 | |||
27 | {- | ||
28 | insertNode1 :: forall raw dht u ip. | ||
29 | ( Address ip | ||
30 | , Default u | ||
31 | , Show u | ||
32 | , Ord (NodeId dht) | ||
33 | , FiniteBits (NodeId dht) | ||
34 | , Show (NodeId dht) | ||
35 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
36 | , DHT.Kademlia dht | ||
37 | , Ord (TransactionID dht) | ||
38 | , WireFormat raw dht | ||
39 | , Serialize (TransactionID dht) | ||
40 | , SerializableTo raw (Response dht (Ping dht)) | ||
41 | , SerializableTo raw (Query dht (Ping dht)) | ||
42 | , Ord (NodeId dht) | ||
43 | , Show (NodeId dht) | ||
44 | , Show (QueryMethod dht) | ||
45 | ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) | ||
46 | -} | ||
47 | {- | ||
48 | insertNode1 = do | ||
49 | params = DHT.TableParameters | ||
50 | { maxBuckets = R.defaultBucketCount :: Int | ||
51 | , fallbackID = myid | ||
52 | , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht | ||
53 | , logMessage = (\ _ _ -> return ()) {- TODO -} :: Char -> String -> IO () | ||
54 | , pingProbe = error "probe" :: ni -> NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP) | ||
55 | } | ||
56 | let state = DHT.TableKeeper | ||
57 | { routingInfo = tbl | ||
58 | , grokNode = DHT.insertNode params state | ||
59 | , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO () | ||
60 | } | ||
61 | return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 | ||
62 | |||
63 | -} | ||
64 | |||
65 | {- | ||
66 | insertNode :: forall msg ip u. | ||
67 | ( Address ip | ||
68 | , Show u | ||
69 | , Show (NodeId msg) | ||
70 | , Ord (NodeId msg) | ||
71 | , FiniteBits (NodeId msg) | ||
72 | ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO () | ||
73 | -} | ||
74 | {- | ||
75 | let showTable = do | ||
76 | t <- atomically $ fmap myBuckets <$> readTVar routingInfo | ||
77 | let logMsg = "Routing table: " <> pPrint t | ||
78 | logMessage 'D' (render logMsg) | ||
79 | reportPingResult tm n b = showTable | ||
80 | reportArrival tm info ps = showTable | ||
81 | -} | ||
82 | |||
83 | -- | A change occured in the kademlia routing table. | ||
84 | data RoutingTableChanged ni = RoutingTableChanged | ||
85 | { nodeReplaced :: !(Maybe ni) -- Deleted entry. | ||
86 | , nodeInserted :: ni -- New routing table entry. | ||
87 | , nodeTimestamp :: !POSIXTime -- Last-seen time for the new node. | ||
88 | } | ||
89 | deriving (Eq,Ord,Show,Functor,Foldable,Traversable) | ||
90 | |||
91 | data InsertionReporter ni = InsertionReporter | ||
92 | { -- | Called on every inbound packet. | ||
93 | reportArrival :: POSIXTime | ||
94 | -> ni -- ^ Origin of packet. | ||
95 | -> [ni] -- ^ These will be pinged as a result. | ||
96 | -> IO () | ||
97 | -- | Called on every ping probe. | ||
98 | , reportPingResult :: POSIXTime | ||
99 | -> ni -- ^ Who was pinged. | ||
100 | -> Bool -- ^ True if they ponged. | ||
101 | -> IO () | ||
102 | } | ||
103 | |||
104 | contramapIR f ir = InsertionReporter | ||
105 | { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis) | ||
106 | , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b | ||
107 | } | ||
108 | |||
109 | data KademliaSpace nid ni = KademliaSpace | ||
110 | { kademliaLocation :: ni -> nid | ||
111 | , kademliaTestBit :: nid -> Word -> Bool | ||
112 | } | ||
113 | |||
114 | contramapKS f ks = KademliaSpace | ||
115 | { kademliaLocation = kademliaLocation ks . f | ||
116 | } | ||
117 | |||
118 | -- insertNode param@TableParameters{..} state info witnessed_ip0 = do | ||
119 | insertNode :: | ||
120 | forall ni nid. | ||
121 | (Ord ni) => | ||
122 | |||
123 | -- reporter | ||
124 | InsertionReporter ni | ||
125 | |||
126 | -- nil | ||
127 | -> R.Info ni nid | ||
128 | |||
129 | -- k | ||
130 | -> KademliaSpace nid ni | ||
131 | |||
132 | -- changed | ||
133 | -> (RoutingTableChanged ni -> STM (IO ())) | ||
134 | |||
135 | -- pingProbe | ||
136 | -> (ni -> IO Bool) | ||
137 | |||
138 | -- info | ||
139 | -> ni | ||
140 | |||
141 | -- var | ||
142 | -> TVar (Maybe (R.Info ni nid)) | ||
143 | |||
144 | |||
145 | -> IO () | ||
146 | |||
147 | insertNode | ||
148 | reporter | ||
149 | nil | ||
150 | k | ||
151 | changed | ||
152 | pingProbe | ||
153 | info | ||
154 | var = do | ||
155 | |||
156 | tm <- utcTimeToPOSIXSeconds <$> getCurrentTime | ||
157 | |||
158 | (ps,reaction) <- atomically $ do | ||
159 | tbl <- fromMaybe nil <$> readTVar var | ||
160 | let (inserted, ps,t') = R.updateForInbound (kademliaTestBit k) (kademliaLocation k) tm info $ myBuckets tbl | ||
161 | reaction <- if inserted | ||
162 | then changed $ RoutingTableChanged Nothing info tm | ||
163 | else return $ return () | ||
164 | writeTVar var (Just $ tbl { myBuckets = t' }) | ||
165 | return (ps, reaction) | ||
166 | |||
167 | reportArrival reporter tm info ps | ||
168 | reaction | ||
169 | |||
170 | _ <- fork $ do | ||
171 | myThreadId >>= flip labelThread "pingResults" | ||
172 | forM_ ps $ \n -> do | ||
173 | b <- pingProbe n | ||
174 | reportPingResult reporter tm n b | ||
175 | join $ atomically $ do | ||
176 | tbl <- fromMaybe nil <$> readTVar var | ||
177 | let (replacements, t') = R.updateForPingResult (kademliaTestBit k) (kademliaLocation k) n b $ myBuckets tbl | ||
178 | writeTVar var (Just $ tbl { myBuckets = t' }) | ||
179 | sequence <$> mapM (\(x,(t,y)) -> changed $ RoutingTableChanged (Just x) y t) | ||
180 | replacements | ||
181 | |||
182 | return () | ||