summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Kademlia.hs182
1 files changed, 182 insertions, 0 deletions
diff --git a/Kademlia.hs b/Kademlia.hs
new file mode 100644
index 00000000..d29a3240
--- /dev/null
+++ b/Kademlia.hs
@@ -0,0 +1,182 @@
1{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
2{-# LANGUAGE KindSignatures #-}
3{-# LANGUAGE DeriveFunctor, DeriveTraversable #-}
4-- {-# LANGUAGE TypeFamilies #-}
5{-# LANGUAGE GADTs #-}
6module Kademlia where
7
8import Data.Maybe
9import Network.DHT.Routing as R
10import Data.Time.Clock (getCurrentTime)
11import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
12#ifdef THREAD_DEBUG
13import Control.Concurrent.Lifted.Instrument
14#else
15import GHC.Conc (labelThread)
16import Control.Concurrent.Lifted
17#endif
18
19import Text.PrettyPrint as PP hiding ((<>), ($$))
20import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
21import Data.IP
22import Control.Concurrent.STM
23import Control.Monad
24import Data.Monoid
25import Data.Time.Clock.POSIX (POSIXTime)
26
27{-
28insertNode1 :: forall raw dht u ip.
29 ( Address ip
30 , Default u
31 , Show u
32 , Ord (NodeId dht)
33 , FiniteBits (NodeId dht)
34 , Show (NodeId dht)
35 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
36 , DHT.Kademlia dht
37 , Ord (TransactionID dht)
38 , WireFormat raw dht
39 , Serialize (TransactionID dht)
40 , SerializableTo raw (Response dht (Ping dht))
41 , SerializableTo raw (Query dht (Ping dht))
42 , Ord (NodeId dht)
43 , Show (NodeId dht)
44 , Show (QueryMethod dht)
45 ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ())
46-}
47{-
48insertNode1 = do
49 params = DHT.TableParameters
50 { maxBuckets = R.defaultBucketCount :: Int
51 , fallbackID = myid
52 , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht
53 , logMessage = (\ _ _ -> return ()) {- TODO -} :: Char -> String -> IO ()
54 , pingProbe = error "probe" :: ni -> NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP)
55 }
56 let state = DHT.TableKeeper
57 { routingInfo = tbl
58 , grokNode = DHT.insertNode params state
59 , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO ()
60 }
61 return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0
62
63-}
64
65{-
66insertNode :: forall msg ip u.
67 ( Address ip
68 , Show u
69 , Show (NodeId msg)
70 , Ord (NodeId msg)
71 , FiniteBits (NodeId msg)
72 ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO ()
73-}
74 {-
75 let showTable = do
76 t <- atomically $ fmap myBuckets <$> readTVar routingInfo
77 let logMsg = "Routing table: " <> pPrint t
78 logMessage 'D' (render logMsg)
79 reportPingResult tm n b = showTable
80 reportArrival tm info ps = showTable
81 -}
82
83-- | A change occured in the kademlia routing table.
84data RoutingTableChanged ni = RoutingTableChanged
85 { nodeReplaced :: !(Maybe ni) -- Deleted entry.
86 , nodeInserted :: ni -- New routing table entry.
87 , nodeTimestamp :: !POSIXTime -- Last-seen time for the new node.
88 }
89 deriving (Eq,Ord,Show,Functor,Foldable,Traversable)
90
91data InsertionReporter ni = InsertionReporter
92 { -- | Called on every inbound packet.
93 reportArrival :: POSIXTime
94 -> ni -- ^ Origin of packet.
95 -> [ni] -- ^ These will be pinged as a result.
96 -> IO ()
97 -- | Called on every ping probe.
98 , reportPingResult :: POSIXTime
99 -> ni -- ^ Who was pinged.
100 -> Bool -- ^ True if they ponged.
101 -> IO ()
102 }
103
104contramapIR f ir = InsertionReporter
105 { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis)
106 , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b
107 }
108
109data KademliaSpace nid ni = KademliaSpace
110 { kademliaLocation :: ni -> nid
111 , kademliaTestBit :: nid -> Word -> Bool
112 }
113
114contramapKS f ks = KademliaSpace
115 { kademliaLocation = kademliaLocation ks . f
116 }
117
118-- insertNode param@TableParameters{..} state info witnessed_ip0 = do
119insertNode ::
120 forall ni nid.
121 (Ord ni) =>
122
123 -- reporter
124 InsertionReporter ni
125
126 -- nil
127 -> R.Info ni nid
128
129 -- k
130 -> KademliaSpace nid ni
131
132 -- changed
133 -> (RoutingTableChanged ni -> STM (IO ()))
134
135 -- pingProbe
136 -> (ni -> IO Bool)
137
138 -- info
139 -> ni
140
141 -- var
142 -> TVar (Maybe (R.Info ni nid))
143
144
145 -> IO ()
146
147insertNode
148 reporter
149 nil
150 k
151 changed
152 pingProbe
153 info
154 var = do
155
156 tm <- utcTimeToPOSIXSeconds <$> getCurrentTime
157
158 (ps,reaction) <- atomically $ do
159 tbl <- fromMaybe nil <$> readTVar var
160 let (inserted, ps,t') = R.updateForInbound (kademliaTestBit k) (kademliaLocation k) tm info $ myBuckets tbl
161 reaction <- if inserted
162 then changed $ RoutingTableChanged Nothing info tm
163 else return $ return ()
164 writeTVar var (Just $ tbl { myBuckets = t' })
165 return (ps, reaction)
166
167 reportArrival reporter tm info ps
168 reaction
169
170 _ <- fork $ do
171 myThreadId >>= flip labelThread "pingResults"
172 forM_ ps $ \n -> do
173 b <- pingProbe n
174 reportPingResult reporter tm n b
175 join $ atomically $ do
176 tbl <- fromMaybe nil <$> readTVar var
177 let (replacements, t') = R.updateForPingResult (kademliaTestBit k) (kademliaLocation k) n b $ myBuckets tbl
178 writeTVar var (Just $ tbl { myBuckets = t' })
179 sequence <$> mapM (\(x,(t,y)) -> changed $ RoutingTableChanged (Just x) y t)
180 replacements
181
182 return ()