diff options
author | joe <joe@jerkface.net> | 2017-06-29 10:37:07 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-06-29 13:00:16 -0400 |
commit | 3195c0877b443e5ccd4d489f03944fc059d4d7aa (patch) | |
tree | 2a05c35a9b43d8f0725c52fc860b30ae191f3871 | |
parent | 05e70386c2248d87a61a8e8267e0211597f2fa88 (diff) |
WIP: Generalizing DHT monad.
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 42 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 391 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Search.hs | 52 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 114 | ||||
-rw-r--r-- | src/Network/DHT/Mainline.hs | 89 | ||||
-rw-r--r-- | src/Network/DHT/Types.hs | 51 | ||||
-rw-r--r-- | src/Network/DatagramServer.hs | 36 |
7 files changed, 537 insertions, 238 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index d9328cea..8bc423a3 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs | |||
@@ -15,9 +15,11 @@ | |||
15 | -- <http://www.bittorrent.org/beps/bep_0005.html> | 15 | -- <http://www.bittorrent.org/beps/bep_0005.html> |
16 | -- | 16 | -- |
17 | {-# LANGUAGE FlexibleInstances #-} | 17 | {-# LANGUAGE FlexibleInstances #-} |
18 | {-# LANGUAGE FlexibleContexts #-} | ||
18 | {-# LANGUAGE TemplateHaskell #-} | 19 | {-# LANGUAGE TemplateHaskell #-} |
19 | {-# LANGUAGE TypeOperators #-} | 20 | {-# LANGUAGE TypeOperators #-} |
20 | {-# LANGUAGE ScopedTypeVariables #-} | 21 | {-# LANGUAGE ScopedTypeVariables #-} |
22 | {-# LANGUAGE CPP #-} | ||
21 | module Network.BitTorrent.DHT | 23 | module Network.BitTorrent.DHT |
22 | ( -- * Distributed Hash Table | 24 | ( -- * Distributed Hash Table |
23 | DHT | 25 | DHT |
@@ -37,7 +39,7 @@ module Network.BitTorrent.DHT | |||
37 | , snapshot | 39 | , snapshot |
38 | 40 | ||
39 | -- * Operations | 41 | -- * Operations |
40 | , Network.BitTorrent.DHT.lookup | 42 | -- , Network.BitTorrent.DHT.lookup |
41 | , Network.BitTorrent.DHT.insert | 43 | , Network.BitTorrent.DHT.insert |
42 | , Network.BitTorrent.DHT.delete | 44 | , Network.BitTorrent.DHT.delete |
43 | 45 | ||
@@ -50,7 +52,7 @@ module Network.BitTorrent.DHT | |||
50 | , closeNode | 52 | , closeNode |
51 | 53 | ||
52 | -- ** Monad | 54 | -- ** Monad |
53 | , MonadDHT (..) | 55 | -- , MonadDHT (..) |
54 | , runDHT | 56 | , runDHT |
55 | ) where | 57 | ) where |
56 | 58 | ||
@@ -81,11 +83,13 @@ import qualified Network.DatagramServer as KRPC (listen, Protocol(..)) | |||
81 | -- DHT types | 83 | -- DHT types |
82 | -----------------------------------------------------------------------} | 84 | -----------------------------------------------------------------------} |
83 | 85 | ||
86 | #if 0 | ||
84 | class MonadDHT m where | 87 | class MonadDHT m where |
85 | liftDHT :: DHT IPv4 a -> m a | 88 | liftDHT :: DHT raw dht u IPv4 a -> m a |
86 | 89 | ||
87 | instance MonadDHT (DHT IPv4) where | 90 | instance MonadDHT (DHT raw dht u IPv4) where |
88 | liftDHT = id | 91 | liftDHT = id |
92 | #endif | ||
89 | 93 | ||
90 | -- | Convenience method. Pass this to 'dht' to enable full logging. | 94 | -- | Convenience method. Pass this to 'dht' to enable full logging. |
91 | fullLogging :: LogSource -> LogLevel -> Bool | 95 | fullLogging :: LogSource -> LogLevel -> Bool |
@@ -96,7 +100,7 @@ dht :: (Ord ip, Address ip) | |||
96 | => Options -- ^ normally you need to use 'Data.Default.def'; | 100 | => Options -- ^ normally you need to use 'Data.Default.def'; |
97 | -> NodeAddr ip -- ^ address to bind this node; | 101 | -> NodeAddr ip -- ^ address to bind this node; |
98 | -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default | 102 | -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default |
99 | -> DHT ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; | 103 | -> DHT raw dht u ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; |
100 | -> IO a -- ^ result. | 104 | -> IO a -- ^ result. |
101 | dht opts addr logfilter action = do | 105 | dht opts addr logfilter action = do |
102 | runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do | 106 | runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do |
@@ -175,7 +179,7 @@ resolveHostName NodeAddr {..} = do | |||
175 | -- | 179 | -- |
176 | -- This operation do block, use | 180 | -- This operation do block, use |
177 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | 181 | -- 'Control.Concurrent.Async.Lifted.async' if needed. |
178 | bootstrap :: forall ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT ip () | 182 | bootstrap :: forall raw dht u ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT raw dht u ip () |
179 | bootstrap mbs startNodes = do | 183 | bootstrap mbs startNodes = do |
180 | restored <- | 184 | restored <- |
181 | case decode <$> mbs of | 185 | case decode <$> mbs of |
@@ -187,8 +191,8 @@ bootstrap mbs startNodes = do | |||
187 | $(logInfoS) "bootstrap" "Start node bootstrapping" | 191 | $(logInfoS) "bootstrap" "Start node bootstrapping" |
188 | let searchAll aliveNodes = do | 192 | let searchAll aliveNodes = do |
189 | nid <- myNodeIdAccordingTo (error "FIXME") | 193 | nid <- myNodeIdAccordingTo (error "FIXME") |
190 | nss <- C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume | 194 | ns <- bgsearch ioFindNodes nid |
191 | return ( nss :: [[NodeInfo KMessageOf ip ()]] ) | 195 | return ( ns :: [NodeInfo KMessageOf ip ()] ) |
192 | input_nodes <- (restored ++) . T.toList <$> getTable | 196 | input_nodes <- (restored ++) . T.toList <$> getTable |
193 | -- Step 1: Use iterative searches to flesh out the table.. | 197 | -- Step 1: Use iterative searches to flesh out the table.. |
194 | do let knowns = map (map $ nodeAddr . fst) input_nodes | 198 | do let knowns = map (map $ nodeAddr . fst) input_nodes |
@@ -200,10 +204,10 @@ bootstrap mbs startNodes = do | |||
200 | -- If our cached nodes are alive and our IP address did not change, it's possible | 204 | -- If our cached nodes are alive and our IP address did not change, it's possible |
201 | -- we are already bootsrapped, so no need to do any searches. | 205 | -- we are already bootsrapped, so no need to do any searches. |
202 | when (not b) $ do | 206 | when (not b) $ do |
203 | nss <- searchAll $ take 2 alive_knowns | 207 | ns <- searchAll $ take 2 alive_knowns |
204 | -- We only use the supplied bootstrap nodes when we don't know of any | 208 | -- We only use the supplied bootstrap nodes when we don't know of any |
205 | -- others to try. | 209 | -- others to try. |
206 | when (null nss) $ do | 210 | when (null ns) $ do |
207 | -- TODO filter duplicated in startNodes list | 211 | -- TODO filter duplicated in startNodes list |
208 | -- TODO retransmissions for startNodes | 212 | -- TODO retransmissions for startNodes |
209 | (aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes) | 213 | (aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes) |
@@ -243,7 +247,7 @@ bootstrap mbs startNodes = do | |||
243 | -- | 247 | -- |
244 | -- This operation do not block. | 248 | -- This operation do not block. |
245 | -- | 249 | -- |
246 | isBootstrapped :: Eq ip => DHT ip Bool | 250 | isBootstrapped :: Eq ip => DHT raw dht u ip Bool |
247 | isBootstrapped = T.full <$> getTable | 251 | isBootstrapped = T.full <$> getTable |
248 | 252 | ||
249 | {----------------------------------------------------------------------- | 253 | {----------------------------------------------------------------------- |
@@ -254,7 +258,11 @@ isBootstrapped = T.full <$> getTable | |||
254 | -- | 258 | -- |
255 | -- This is blocking operation, use | 259 | -- This is blocking operation, use |
256 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | 260 | -- 'Control.Concurrent.Async.Lifted.async' if needed. |
257 | snapshot :: Address ip => DHT ip BS.ByteString | 261 | snapshot :: ( Address ip |
262 | , Ord (NodeId dht) | ||
263 | , Serialize u | ||
264 | , Serialize (NodeId dht) | ||
265 | ) => DHT raw dht u ip BS.ByteString | ||
258 | snapshot = do | 266 | snapshot = do |
259 | tbl <- getTable | 267 | tbl <- getTable |
260 | return $ encode tbl | 268 | return $ encode tbl |
@@ -263,15 +271,19 @@ snapshot = do | |||
263 | -- Operations | 271 | -- Operations |
264 | -----------------------------------------------------------------------} | 272 | -----------------------------------------------------------------------} |
265 | 273 | ||
274 | #if 0 | ||
275 | |||
266 | -- | Get list of peers which downloading this torrent. | 276 | -- | Get list of peers which downloading this torrent. |
267 | -- | 277 | -- |
268 | -- This operation is incremental and do block. | 278 | -- This operation is incremental and do block. |
269 | -- | 279 | -- |
270 | lookup :: Address ip => InfoHash -> DHT ip `C.Source` [PeerAddr ip] | 280 | lookup :: Address ip => InfoHash -> DHT raw dht u ip `C.Source` [PeerAddr ip] |
271 | lookup topic = do -- TODO retry getClosest if bucket is empty | 281 | lookup topic = do -- TODO retry getClosest if bucket is empty |
272 | closest <- lift $ getClosest topic | 282 | closest <- lift $ getClosest topic |
273 | C.sourceList [closest] $= search topic (getPeersQ topic) | 283 | C.sourceList [closest] $= search topic (getPeersQ topic) |
274 | 284 | ||
285 | #endif | ||
286 | |||
275 | -- TODO do not republish if the topic is already in announceSet | 287 | -- TODO do not republish if the topic is already in announceSet |
276 | 288 | ||
277 | -- | Announce that /this/ peer may have some pieces of the specified | 289 | -- | Announce that /this/ peer may have some pieces of the specified |
@@ -281,7 +293,7 @@ lookup topic = do -- TODO retry getClosest if bucket is empty | |||
281 | -- This operation is synchronous and do block, use | 293 | -- This operation is synchronous and do block, use |
282 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | 294 | -- 'Control.Concurrent.Async.Lifted.async' if needed. |
283 | -- | 295 | -- |
284 | insert :: Address ip => InfoHash -> PortNumber -> DHT ip () | 296 | insert :: Address ip => InfoHash -> PortNumber -> DHT raw dht u ip () |
285 | insert ih p = do | 297 | insert ih p = do |
286 | publish ih p | 298 | publish ih p |
287 | insertTopic ih p | 299 | insertTopic ih p |
@@ -290,6 +302,6 @@ insert ih p = do | |||
290 | -- | 302 | -- |
291 | -- This operation is atomic and may block for a while. | 303 | -- This operation is atomic and may block for a while. |
292 | -- | 304 | -- |
293 | delete :: InfoHash -> PortNumber -> DHT ip () | 305 | delete :: InfoHash -> PortNumber -> DHT raw dht u ip () |
294 | delete = deleteTopic | 306 | delete = deleteTopic |
295 | {-# INLINE delete #-} | 307 | {-# INLINE delete #-} |
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 254b347c..e5d9bd5f 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -41,11 +41,13 @@ module Network.BitTorrent.DHT.Query | |||
41 | -- concatenate its responses, optionally yielding result and | 41 | -- concatenate its responses, optionally yielding result and |
42 | -- continue to the next iteration. | 42 | -- continue to the next iteration. |
43 | , Search | 43 | , Search |
44 | , search | 44 | -- , search |
45 | , publish | 45 | , publish |
46 | , ioFindNode | 46 | , ioFindNode |
47 | , ioFindNodes | ||
47 | , ioGetPeers | 48 | , ioGetPeers |
48 | , isearch | 49 | , isearch |
50 | , bgsearch | ||
49 | 51 | ||
50 | -- ** Routing table | 52 | -- ** Routing table |
51 | , insertNode | 53 | , insertNode |
@@ -57,6 +59,8 @@ module Network.BitTorrent.DHT.Query | |||
57 | , (<@>) | 59 | , (<@>) |
58 | ) where | 60 | ) where |
59 | 61 | ||
62 | import Data.Bits | ||
63 | import Data.Default | ||
60 | #ifdef THREAD_DEBUG | 64 | #ifdef THREAD_DEBUG |
61 | import Control.Concurrent.Lifted.Instrument hiding (yield) | 65 | import Control.Concurrent.Lifted.Instrument hiding (yield) |
62 | #else | 66 | #else |
@@ -102,30 +106,43 @@ import Network.DatagramServer.Tox | |||
102 | #endif | 106 | #endif |
103 | import Network.Address hiding (NodeId) | 107 | import Network.Address hiding (NodeId) |
104 | import Network.DatagramServer.Types as RPC hiding (Query,Response) | 108 | import Network.DatagramServer.Types as RPC hiding (Query,Response) |
109 | import Network.DHT.Types | ||
105 | import Control.Monad.Trans.Control | 110 | import Control.Monad.Trans.Control |
111 | import Data.Typeable | ||
112 | import Data.Serialize | ||
113 | import System.IO.Unsafe (unsafeInterleaveIO) | ||
114 | import Data.String | ||
106 | 115 | ||
107 | {----------------------------------------------------------------------- | 116 | {----------------------------------------------------------------------- |
108 | -- Handlers | 117 | -- Handlers |
109 | -----------------------------------------------------------------------} | 118 | -----------------------------------------------------------------------} |
110 | 119 | ||
120 | {- | ||
111 | nodeHandler :: ( Address ip | 121 | nodeHandler :: ( Address ip |
112 | , KRPC (Query a) (Response b) | 122 | , KRPC (Query KMessageOf a) (Response KMessageOf b) |
113 | ) | 123 | ) |
114 | => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler | 124 | => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler |
125 | -} | ||
126 | nodeHandler :: | ||
127 | (Address addr, WireFormat raw msg, Pretty (NodeInfo dht addr u), | ||
128 | Default u, | ||
129 | IsString t, Functor msg, | ||
130 | SerializableTo raw (Response dht r), | ||
131 | SerializableTo raw (Query dht q)) => | ||
132 | (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ()) | ||
133 | -> (NodeAddr addr -> IO (NodeId dht)) | ||
134 | -> (Char -> t -> Text -> IO ()) | ||
135 | -> QueryMethod msg | ||
136 | -> (NodeAddr addr -> q -> IO r) | ||
137 | -> Handler IO msg raw | ||
115 | nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do | 138 | nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do |
116 | #ifdef VERSION_bencoding | ||
117 | let remoteId = queringNodeId qry | 139 | let remoteId = queringNodeId qry |
118 | read_only = queryIsReadOnly qry | 140 | read_only = queryIsReadOnly qry |
119 | q = queryParams qry | 141 | q = queryParams qry |
120 | #else | ||
121 | let remoteId = msgClient qry | ||
122 | read_only = False | ||
123 | q = msgPayload qry | ||
124 | #endif | ||
125 | case fromSockAddr sockAddr of | 142 | case fromSockAddr sockAddr of |
126 | Nothing -> throwIO BadAddress | 143 | Nothing -> throwIO BadAddress |
127 | Just naddr -> do | 144 | Just naddr -> do |
128 | let ni = NodeInfo remoteId naddr () | 145 | let ni = NodeInfo remoteId naddr def |
129 | -- Do not route read-only nodes. (bep 43) | 146 | -- Do not route read-only nodes. (bep 43) |
130 | if read_only | 147 | if read_only |
131 | then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) | 148 | then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) |
@@ -135,13 +152,13 @@ nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ | |||
135 | <*> action naddr q | 152 | <*> action naddr q |
136 | 153 | ||
137 | -- | Default 'Ping' handler. | 154 | -- | Default 'Ping' handler. |
138 | pingH :: NodeAddr ip -> Ping -> IO Ping | 155 | pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht) |
139 | pingH _ Ping = return Ping | 156 | pingH dht _ _ = return (DHT.pongMessage dht) |
140 | -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } | 157 | -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } |
141 | 158 | ||
142 | -- | Default 'FindNode' handler. | 159 | -- | Default 'FindNode' handler. |
143 | findNodeH :: (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) -> NodeAddr ip -> FindNode ip -> IO (NodeFound ip) | 160 | findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip) |
144 | findNodeH getclosest _ (FindNode nid) = NodeFound <$> getclosest nid | 161 | findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg) |
145 | 162 | ||
146 | -- | Default 'GetPeers' handler. | 163 | -- | Default 'GetPeers' handler. |
147 | getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) | 164 | getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) |
@@ -162,51 +179,100 @@ announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do | |||
162 | insertPeer peers topic announcedName peerAddr | 179 | insertPeer peers topic announcedName peerAddr |
163 | return Announced | 180 | return Announced |
164 | 181 | ||
182 | -- | Includes all Kademlia-related handlers. | ||
183 | kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip | ||
184 | , Ord (TransactionID dht) | ||
185 | , Ord (NodeId dht) | ||
186 | , Show u | ||
187 | , SerializableTo raw (Response dht (Ping dht)) | ||
188 | , SerializableTo raw (Query dht (Ping dht)) | ||
189 | , Show (QueryMethod dht) | ||
190 | , Show (NodeId dht) | ||
191 | , FiniteBits (NodeId dht) | ||
192 | , Default u | ||
193 | , Serialize (TransactionID dht) | ||
194 | , WireFormat raw dht | ||
195 | , Kademlia dht | ||
196 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
197 | , Functor dht | ||
198 | , Pretty (NodeInfo dht ip u) | ||
199 | , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
200 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
201 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
202 | ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] | ||
203 | -- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] | ||
204 | kademliaHandlers logger = do | ||
205 | groknode <- insertNode1 | ||
206 | mynid <- myNodeIdAccordingTo1 | ||
207 | let handler :: ( KRPC (Query dht a) (Response dht b) | ||
208 | , SerializableTo raw (Response dht b) | ||
209 | , SerializableTo raw (Query dht a) | ||
210 | ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw | ||
211 | handler = nodeHandler groknode mynid (logt logger) | ||
212 | dht = Proxy :: Proxy dht | ||
213 | getclosest <- getClosest1 | ||
214 | return [ handler (namePing dht) $ pingH dht | ||
215 | , handler (nameFindNodes dht) $ findNodeH getclosest | ||
216 | ] | ||
217 | |||
218 | |||
165 | -- | Includes all default query handlers. | 219 | -- | Includes all default query handlers. |
166 | defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT ip [NodeHandler] | 220 | defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] |
167 | defaultHandlers logger = do | 221 | defaultHandlers logger = do |
168 | groknode <- insertNode1 | 222 | groknode <- insertNode1 |
169 | toks <- asks sessionTokens | ||
170 | getclosest <- getClosest1 | ||
171 | mynid <- myNodeIdAccordingTo1 | 223 | mynid <- myNodeIdAccordingTo1 |
224 | let handler :: KRPC (Query KMessageOf a) (Response KMessageOf b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler | ||
225 | handler = nodeHandler groknode mynid (logt logger) | ||
226 | toks <- asks sessionTokens | ||
172 | peers <- asks contactInfo | 227 | peers <- asks contactInfo |
173 | getpeers <- getPeerList1 | 228 | getpeers <- getPeerList1 |
174 | let handler :: KRPC (Query a) (Response b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler | 229 | hs <- kademliaHandlers logger |
175 | handler = nodeHandler groknode mynid (logt logger) | 230 | return $ hs ++ [ handler "get_peers" $ getPeersH getpeers toks |
176 | return [ handler "ping" $ pingH | 231 | , handler "announce_peer" $ announceH peers toks ] |
177 | , handler "find-nodes" $ findNodeH getclosest | ||
178 | , handler "get_peers" $ getPeersH getpeers toks | ||
179 | , handler "announce_peer" $ announceH peers toks ] | ||
180 | 232 | ||
181 | {----------------------------------------------------------------------- | 233 | {----------------------------------------------------------------------- |
182 | -- Basic queries | 234 | -- Basic queries |
183 | -----------------------------------------------------------------------} | 235 | -----------------------------------------------------------------------} |
184 | 236 | ||
185 | type Iteration ip o = NodeInfo KMessageOf ip () -> DHT ip (Either [NodeInfo KMessageOf ip ()] [o ip]) | 237 | type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip]) |
186 | 238 | ||
187 | -- | The most basic query. May be used to check if the given node is | 239 | -- | The most basic query. May be used to check if the given node is |
188 | -- alive or get its 'NodeId'. | 240 | -- alive or get its 'NodeId'. |
189 | pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo KMessageOf ip (), Maybe ReflectedIP) | 241 | pingQ :: forall raw dht u ip. |
242 | ( DHT.Kademlia dht | ||
243 | , Address ip | ||
244 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
245 | , Default u | ||
246 | , Show u | ||
247 | , Ord (TransactionID dht) | ||
248 | , Serialize (TransactionID dht) | ||
249 | , WireFormat raw dht | ||
250 | , SerializableTo raw (Response dht (Ping dht)) | ||
251 | , SerializableTo raw (Query dht (Ping dht)) | ||
252 | , Ord (NodeId dht) | ||
253 | , FiniteBits (NodeId dht) | ||
254 | , Show (NodeId dht) | ||
255 | , Show (QueryMethod dht) | ||
256 | ) => NodeAddr ip -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) | ||
190 | pingQ addr = do | 257 | pingQ addr = do |
191 | #ifdef VERSION_bencoding | 258 | let ping = DHT.pingMessage (Proxy :: Proxy dht) |
192 | (nid, Ping, mip) <- queryNode' addr Ping | 259 | (nid, pong, mip) <- queryNode' addr ping |
193 | #else | 260 | let _ = pong `asTypeOf` ping |
194 | (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} | 261 | -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} |
195 | #endif | 262 | return (NodeInfo nid addr def, mip) |
196 | return (NodeInfo nid addr (), mip) | ||
197 | 263 | ||
198 | -- TODO [robustness] match range of returned node ids with the | 264 | -- TODO [robustness] match range of returned node ids with the |
199 | -- expected range and either filter bad nodes or discard response at | 265 | -- expected range and either filter bad nodes or discard response at |
200 | -- all throwing an exception | 266 | -- all throwing an exception |
201 | -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo | 267 | -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo |
202 | findNodeQ key NodeInfo {..} = do | 268 | findNodeQ proxy key NodeInfo {..} = do |
203 | NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr | 269 | closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> nodeAddr |
204 | $(logInfoS) "findNodeQ" $ "NodeFound\n" | 270 | $(logInfoS) "findNodeQ" $ "NodeFound\n" |
205 | <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) | 271 | <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) |
206 | return $ Right closest | 272 | return $ Right closest |
207 | 273 | ||
208 | #ifdef VERSION_bencoding | 274 | #ifdef VERSION_bencoding |
209 | getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr | 275 | getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr |
210 | getPeersQ topic NodeInfo {..} = do | 276 | getPeersQ topic NodeInfo {..} = do |
211 | GotPeers {..} <- GetPeers topic <@> nodeAddr | 277 | GotPeers {..} <- GetPeers topic <@> nodeAddr |
212 | let dist = distance (toNodeId topic) nodeId | 278 | let dist = distance (toNodeId topic) nodeId |
@@ -215,7 +281,7 @@ getPeersQ topic NodeInfo {..} = do | |||
215 | <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } | 281 | <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } |
216 | return peers | 282 | return peers |
217 | 283 | ||
218 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr | 284 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr |
219 | announceQ ih p NodeInfo {..} = do | 285 | announceQ ih p NodeInfo {..} = do |
220 | GotPeers {..} <- GetPeers ih <@> nodeAddr | 286 | GotPeers {..} <- GetPeers ih <@> nodeAddr |
221 | case peers of | 287 | case peers of |
@@ -232,7 +298,7 @@ announceQ ih p NodeInfo {..} = do | |||
232 | -----------------------------------------------------------------------} | 298 | -----------------------------------------------------------------------} |
233 | 299 | ||
234 | 300 | ||
235 | ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) | 301 | ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) |
236 | ioGetPeers ih = do | 302 | ioGetPeers ih = do |
237 | session <- ask | 303 | session <- ask |
238 | return $ \ni -> runDHT session $ do | 304 | return $ \ni -> runDHT session $ do |
@@ -241,17 +307,71 @@ ioGetPeers ih = do | |||
241 | Right e -> return $ either (,[]) ([],) e | 307 | Right e -> return $ either (,[]) ([],) e |
242 | Left e -> let _ = e :: QueryFailure in return ([],[]) | 308 | Left e -> let _ = e :: QueryFailure in return ([],[]) |
243 | 309 | ||
244 | ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [NodeInfo KMessageOf ip ()])) | 310 | ioFindNode :: ( DHT.Kademlia dht |
311 | , WireFormat raw dht | ||
312 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
313 | , Address ip | ||
314 | , Default u | ||
315 | , Show u | ||
316 | , Show (QueryMethod dht) | ||
317 | , TableKey dht infohash | ||
318 | , Eq (NodeId dht) | ||
319 | , Ord (NodeId dht) | ||
320 | , FiniteBits (NodeId dht) | ||
321 | , Show (NodeId dht) | ||
322 | , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
323 | , Ord (TransactionID dht) | ||
324 | , Serialize (TransactionID dht) | ||
325 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
326 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
327 | , SerializableTo raw (Response dht (Ping dht)) | ||
328 | , SerializableTo raw (Query dht (Ping dht)) | ||
329 | ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) | ||
245 | ioFindNode ih = do | 330 | ioFindNode ih = do |
246 | session <- ask | 331 | session <- ask |
247 | return $ \ni -> runDHT session $ do | 332 | return $ \ni -> runDHT session $ do |
248 | NodeFound ns <- FindNode (toNodeId ih) <@> nodeAddr ni | 333 | ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni |
249 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns | 334 | let ns' = L.map (fmap (const def)) ns |
250 | 335 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns' | |
251 | isearch :: (Ord r, Ord ip) => | 336 | |
252 | (InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r]))) | 337 | |
253 | -> InfoHash | 338 | -- | Like ioFindNode, but considers all found nodes to be 'Right' results. |
254 | -> DHT ip (ThreadId, Search.IterativeSearch ip r) | 339 | ioFindNodes :: ( DHT.Kademlia dht |
340 | , WireFormat raw dht | ||
341 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
342 | , Address ip | ||
343 | , Default u | ||
344 | , Show u | ||
345 | , Show (QueryMethod dht) | ||
346 | , TableKey dht infohash | ||
347 | , Eq (NodeId dht) | ||
348 | , Ord (NodeId dht) | ||
349 | , FiniteBits (NodeId dht) | ||
350 | , Show (NodeId dht) | ||
351 | , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
352 | , Ord (TransactionID dht) | ||
353 | , Serialize (TransactionID dht) | ||
354 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
355 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
356 | , SerializableTo raw (Response dht (Ping dht)) | ||
357 | , SerializableTo raw (Query dht (Ping dht)) | ||
358 | ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) | ||
359 | ioFindNodes ih = do | ||
360 | session <- ask | ||
361 | return $ \ni -> runDHT session $ do | ||
362 | ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni | ||
363 | let ns' = L.map (fmap (const def)) ns | ||
364 | return ([], ns') | ||
365 | |||
366 | isearch :: ( Ord r | ||
367 | , Ord ip | ||
368 | , Ord (NodeId dht) | ||
369 | , FiniteBits (NodeId dht) | ||
370 | , TableKey dht ih | ||
371 | , Show ih) => | ||
372 | (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]))) | ||
373 | -> ih | ||
374 | -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r) | ||
255 | isearch f ih = do | 375 | isearch f ih = do |
256 | qry <- f ih | 376 | qry <- f ih |
257 | ns <- kclosest 8 ih <$> getTable | 377 | ns <- kclosest 8 ih <$> getTable |
@@ -263,8 +383,25 @@ isearch f ih = do | |||
263 | -- atomically \$ readTVar (Search.searchResults s) | 383 | -- atomically \$ readTVar (Search.searchResults s) |
264 | return (a, s) | 384 | return (a, s) |
265 | 385 | ||
266 | 386 | -- | Background search: fill a lazy list using a background thread. | |
267 | type Search ip o = Conduit [NodeInfo KMessageOf ip ()] (DHT ip) [o KMessageOf ip ()] | 387 | bgsearch f ih = do |
388 | (tid, s) <- isearch f ih | ||
389 | let again shown = do | ||
390 | (chk,fin) <- atomically $ do | ||
391 | r <- (Set.\\ shown) <$> readTVar (Search.searchResults s) | ||
392 | if not $ Set.null r | ||
393 | then (,) r <$> Search.searchIsFinished s | ||
394 | else Search.searchIsFinished s >>= check >> return (Set.empty,True) | ||
395 | let ps = Set.toList chk | ||
396 | if fin then return ps | ||
397 | else do | ||
398 | xs <- unsafeInterleaveIO $ again (shown `Set.union` chk) | ||
399 | return $ ps ++ xs | ||
400 | liftIO $ again Set.empty | ||
401 | |||
402 | type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u] | ||
403 | |||
404 | #if 0 | ||
268 | 405 | ||
269 | -- TODO: use reorder and filter (Traversal option) leftovers | 406 | -- TODO: use reorder and filter (Traversal option) leftovers |
270 | -- search :: k -> IterationI ip o -> Search ip o | 407 | -- search :: k -> IterationI ip o -> Search ip o |
@@ -275,17 +412,36 @@ search _ action = do | |||
275 | let (nodes, results) = partitionEithers responses | 412 | let (nodes, results) = partitionEithers responses |
276 | $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) | 413 | $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) |
277 | leftover $ L.concat nodes | 414 | leftover $ L.concat nodes |
278 | mapM_ yield results | 415 | let r = mapM_ yield results |
279 | 416 | _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ()) | |
280 | publish :: Address ip => InfoHash -> PortNumber -> DHT ip () | 417 | r |
281 | publish ih p = do | ||
282 | nodes <- getClosest ih | ||
283 | r <- asks (optReplication . options) | ||
284 | _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
285 | return () | ||
286 | 418 | ||
419 | #endif | ||
287 | 420 | ||
288 | probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) | 421 | publish = error "todo" |
422 | -- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip () | ||
423 | -- publish ih p = do | ||
424 | -- nodes <- getClosest ih | ||
425 | -- r <- asks (optReplication . options) | ||
426 | -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
427 | -- return () | ||
428 | |||
429 | |||
430 | probeNode :: ( Default u | ||
431 | , Show u | ||
432 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
433 | , DHT.Kademlia dht | ||
434 | , Address ip | ||
435 | , Ord (TransactionID dht) | ||
436 | , Serialize (TransactionID dht) | ||
437 | , WireFormat raw dht | ||
438 | , SerializableTo raw (Response dht (Ping dht)) | ||
439 | , SerializableTo raw (Query dht (Ping dht)) | ||
440 | , Ord (NodeId dht) | ||
441 | , FiniteBits (NodeId dht) | ||
442 | , Show (NodeId dht) | ||
443 | , Show (QueryMethod dht) | ||
444 | ) => NodeAddr ip -> DHT raw dht u ip (Bool , Maybe ReflectedIP) | ||
289 | probeNode addr = do | 445 | probeNode addr = do |
290 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) | 446 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) |
291 | result <- try $ pingQ addr | 447 | result <- try $ pingQ addr |
@@ -293,8 +449,16 @@ probeNode addr = do | |||
293 | return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result | 449 | return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result |
294 | 450 | ||
295 | 451 | ||
452 | refreshNodes :: forall raw dht u ip. | ||
453 | ( Address ip | ||
454 | , Ord (NodeId dht) | ||
455 | , Default u | ||
456 | , FiniteBits (NodeId dht) | ||
457 | , Pretty (NodeId dht) | ||
458 | , DHT.Kademlia dht ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()] | ||
459 | refreshNodes _ = return () -- TODO | ||
460 | #if 0 | ||
296 | -- FIXME do not use getClosest sinse we should /refresh/ them | 461 | -- FIXME do not use getClosest sinse we should /refresh/ them |
297 | refreshNodes :: Address ip => NodeId KMessageOf -> DHT ip () -- [NodeInfo KMessageOf ip ()] | ||
298 | refreshNodes nid = do | 462 | refreshNodes nid = do |
299 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) | 463 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) |
300 | nodes <- getClosest nid | 464 | nodes <- getClosest nid |
@@ -304,7 +468,7 @@ refreshNodes nid = do | |||
304 | -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () | 468 | -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () |
305 | -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () | 469 | -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () |
306 | -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume | 470 | -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume |
307 | nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume | 471 | nss <- sourceList [nodes] $= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume |
308 | $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." | 472 | $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." |
309 | _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do | 473 | _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do |
310 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) | 474 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) |
@@ -312,8 +476,9 @@ refreshNodes nid = do | |||
312 | -- pingQ takes care of inserting the node. | 476 | -- pingQ takes care of inserting the node. |
313 | return () | 477 | return () |
314 | return () -- \$ L.concat nss | 478 | return () -- \$ L.concat nss |
479 | #endif | ||
315 | 480 | ||
316 | logc :: Char -> String -> DHT ip () | 481 | logc :: Char -> String -> DHT raw dht u ip () |
317 | logc 'D' = $(logDebugS) "insertNode" . T.pack | 482 | logc 'D' = $(logDebugS) "insertNode" . T.pack |
318 | logc 'W' = $(logWarnS) "insertNode" . T.pack | 483 | logc 'W' = $(logWarnS) "insertNode" . T.pack |
319 | logc 'I' = $(logInfoS) "insertNode" . T.pack | 484 | logc 'I' = $(logInfoS) "insertNode" . T.pack |
@@ -321,12 +486,46 @@ logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) | |||
321 | 486 | ||
322 | -- | This operation do not block but acquire exclusive access to | 487 | -- | This operation do not block but acquire exclusive access to |
323 | -- routing table. | 488 | -- routing table. |
324 | insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () | 489 | insertNode :: forall raw dht u ip. |
490 | ( Address ip | ||
491 | , Ord (NodeId dht) | ||
492 | , FiniteBits (NodeId dht) | ||
493 | , Show (NodeId dht) | ||
494 | , Default u | ||
495 | , Show u | ||
496 | , DHT.Kademlia dht | ||
497 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
498 | , Ord (TransactionID dht) | ||
499 | , WireFormat raw dht | ||
500 | , Serialize (TransactionID dht) | ||
501 | , SerializableTo raw (Response dht (Ping dht)) | ||
502 | , SerializableTo raw (Query dht (Ping dht)) | ||
503 | , Ord (NodeId dht) | ||
504 | , Show (NodeId dht) | ||
505 | , Show (QueryMethod dht) | ||
506 | ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip () | ||
325 | insertNode info witnessed_ip0 = do | 507 | insertNode info witnessed_ip0 = do |
326 | f <- insertNode1 | 508 | f <- insertNode1 |
327 | liftIO $ f info witnessed_ip0 | 509 | liftIO $ f info witnessed_ip0 |
328 | 510 | ||
329 | insertNode1 :: forall ip. Address ip => DHT ip (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) | 511 | insertNode1 :: forall raw dht u ip. |
512 | ( Address ip | ||
513 | , Default u | ||
514 | , Show u | ||
515 | , Ord (NodeId dht) | ||
516 | , FiniteBits (NodeId dht) | ||
517 | , Show (NodeId dht) | ||
518 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
519 | , DHT.Kademlia dht | ||
520 | , Ord (TransactionID dht) | ||
521 | , WireFormat raw dht | ||
522 | , Serialize (TransactionID dht) | ||
523 | , SerializableTo raw (Response dht (Ping dht)) | ||
524 | , SerializableTo raw (Query dht (Ping dht)) | ||
525 | , Ord (NodeId dht) | ||
526 | , Show (NodeId dht) | ||
527 | , Show (QueryMethod dht) | ||
528 | ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) | ||
330 | insertNode1 = do | 529 | insertNode1 = do |
331 | bc <- optBucketCount <$> asks options | 530 | bc <- optBucketCount <$> asks options |
332 | nid <- asks tentativeNodeId | 531 | nid <- asks tentativeNodeId |
@@ -335,15 +534,17 @@ insertNode1 = do | |||
335 | dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. | 534 | dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. |
336 | probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) | 535 | probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) |
337 | let probe n = probe0 n >>= runDHT dht_node_state . restoreM | 536 | let probe n = probe0 n >>= runDHT dht_node_state . restoreM |
537 | {- | ||
338 | changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive | 538 | changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive |
339 | ip <- fromSockAddr ip0 :: Maybe ip | 539 | ip <- fromSockAddr ip0 :: Maybe ip |
340 | listToMaybe | 540 | listToMaybe |
341 | $ rank id (nodeId $ foreignNode arrival) | 541 | $ rank id (nodeId $ foreignNode arrival) |
342 | $ bep42s ip (DHT.fallbackID params) -- warning: recursive | 542 | $ bep42s ip (DHT.fallbackID params) -- warning: recursive |
543 | -} | ||
343 | params = DHT.TableParameters | 544 | params = DHT.TableParameters |
344 | { maxBuckets = bc :: Int | 545 | { maxBuckets = bc :: Int |
345 | , fallbackID = nid :: NodeId KMessageOf | 546 | , fallbackID = nid :: NodeId dht |
346 | , adjustID = changeip :: SockAddr -> Event KMessageOf ip () -> NodeId KMessageOf | 547 | , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht |
347 | , logMessage = logm :: Char -> String -> IO () | 548 | , logMessage = logm :: Char -> String -> IO () |
348 | , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP) | 549 | , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP) |
349 | } | 550 | } |
@@ -356,25 +557,75 @@ insertNode1 = do | |||
356 | return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 | 557 | return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 |
357 | 558 | ||
358 | -- | Throws exception if node is not responding. | 559 | -- | Throws exception if node is not responding. |
359 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 560 | queryNode :: forall raw dht u a b ip. |
360 | => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b) | 561 | ( Address ip |
562 | , KRPC (Query dht a) (Response dht b) | ||
563 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
564 | , Default u | ||
565 | , Show u | ||
566 | , DHT.Kademlia dht | ||
567 | , Ord (TransactionID dht) | ||
568 | , Serialize (TransactionID dht) | ||
569 | , WireFormat raw dht | ||
570 | , SerializableTo raw (Response dht b) | ||
571 | , SerializableTo raw (Query dht a) | ||
572 | , Ord (NodeId dht) | ||
573 | , FiniteBits (NodeId dht) | ||
574 | , Show (NodeId dht) | ||
575 | , Show (QueryMethod dht) | ||
576 | , SerializableTo raw (Response dht (Ping dht)) | ||
577 | , SerializableTo raw (Query dht (Ping dht)) | ||
578 | ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b) | ||
361 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q | 579 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q |
362 | 580 | ||
363 | queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 581 | queryNode' :: forall raw dht u a b ip. |
364 | => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b, Maybe ReflectedIP) | 582 | ( Address ip |
583 | , Default u | ||
584 | , Show u | ||
585 | , DHT.Kademlia dht | ||
586 | , KRPC (Query dht a) (Response dht b) | ||
587 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
588 | , Ord (TransactionID dht) | ||
589 | , Serialize (TransactionID dht) | ||
590 | , WireFormat raw dht | ||
591 | , SerializableTo raw (Response dht b) | ||
592 | , SerializableTo raw (Query dht a) | ||
593 | , Ord (NodeId dht) | ||
594 | , FiniteBits (NodeId dht) | ||
595 | , Show (NodeId dht) | ||
596 | , Show (QueryMethod dht) | ||
597 | , SerializableTo raw (Response dht (Ping dht)) | ||
598 | , SerializableTo raw (Query dht (Ping dht)) | ||
599 | ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) | ||
365 | queryNode' addr q = do | 600 | queryNode' addr q = do |
366 | nid <- myNodeIdAccordingTo addr | 601 | nid <- myNodeIdAccordingTo addr |
367 | let read_only = False -- TODO: check for NAT issues. (BEP 43) | 602 | let read_only = False -- TODO: check for NAT issues. (BEP 43) |
368 | let KRPC.Method name = KRPC.method :: KRPC.Method (Query a) (Response b) | 603 | let KRPC.Method name = KRPC.method :: KRPC.Method (Query dht a) (Response dht b) |
369 | mgr <- asks manager | 604 | mgr <- asks manager |
370 | (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr name (toSockAddr addr) (Query nid read_only q) | 605 | (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr (error "TODO: name") (toSockAddr addr) (Query nid read_only q) |
371 | -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) | 606 | -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) |
372 | -- <> " by " <> T.pack (show (toSockAddr addr)) | 607 | -- <> " by " <> T.pack (show (toSockAddr addr)) |
373 | _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip | 608 | _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip |
374 | return (remoteId, r, witnessed_ip) | 609 | return (remoteId, r, witnessed_ip) |
375 | 610 | ||
376 | -- | Infix version of 'queryNode' function. | 611 | -- | Infix version of 'queryNode' function. |
377 | (<@>) :: Address ip => KRPC (Query a) (Response b) | 612 | (<@>) :: ( Address ip |
378 | => a -> NodeAddr ip -> DHT ip b | 613 | , KRPC (Query dht a) (Response dht b) |
614 | , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
615 | , Default u | ||
616 | , Show u | ||
617 | , Show (QueryMethod dht) | ||
618 | , Ord (NodeId dht) | ||
619 | , FiniteBits (NodeId dht) | ||
620 | , Show (NodeId dht) | ||
621 | , Ord (TransactionID dht) | ||
622 | , Serialize (TransactionID dht) | ||
623 | , SerializableTo raw (Response dht b) | ||
624 | , SerializableTo raw (Query dht a) | ||
625 | , SerializableTo raw (Response dht (Ping dht)) | ||
626 | , SerializableTo raw (Query dht (Ping dht)) | ||
627 | , WireFormat raw dht | ||
628 | , Kademlia dht | ||
629 | ) => a -> NodeAddr ip -> DHT raw dht u ip b | ||
379 | q <@> addr = snd <$> queryNode addr q | 630 | q <@> addr = snd <$> queryNode addr q |
380 | {-# INLINE (<@>) #-} | 631 | {-# INLINE (<@>) #-} |
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index f5cd7834..356f6fd9 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs | |||
@@ -2,6 +2,7 @@ | |||
2 | {-# LANGUAGE PatternSynonyms #-} | 2 | {-# LANGUAGE PatternSynonyms #-} |
3 | {-# LANGUAGE RecordWildCards #-} | 3 | {-# LANGUAGE RecordWildCards #-} |
4 | {-# LANGUAGE ScopedTypeVariables #-} | 4 | {-# LANGUAGE ScopedTypeVariables #-} |
5 | {-# LANGUAGE FlexibleContexts #-} | ||
5 | module Network.BitTorrent.DHT.Search where | 6 | module Network.BitTorrent.DHT.Search where |
6 | 7 | ||
7 | import Control.Concurrent | 8 | import Control.Concurrent |
@@ -25,27 +26,23 @@ import qualified Data.Wrapper.PSQ as PSQ | |||
25 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) | 26 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) |
26 | import Network.Address hiding (NodeId) | 27 | import Network.Address hiding (NodeId) |
27 | import Network.DatagramServer.Types | 28 | import Network.DatagramServer.Types |
28 | #ifdef VERSION_bencoding | 29 | import Data.Bits |
29 | import Network.DatagramServer.Mainline (KMessageOf) | ||
30 | type Ann = () | ||
31 | #else | ||
32 | import Network.DatagramServer.Tox as Tox | ||
33 | type KMessageOf = Tox.Message | ||
34 | type Ann = Bool | ||
35 | #endif | ||
36 | 30 | ||
37 | data IterativeSearch ip r = IterativeSearch | 31 | data IterativeSearch dht u ip r = IterativeSearch |
38 | { searchTarget :: NodeId KMessageOf | 32 | { searchTarget :: NodeId dht |
39 | , searchQuery :: NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r]) | 33 | , searchQuery :: NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]) |
40 | , searchPendingCount :: TVar Int | 34 | , searchPendingCount :: TVar Int |
41 | , searchQueued :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) | 35 | , searchQueued :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))) |
42 | , searchInformant :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) | 36 | , searchInformant :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))) |
43 | , searchVisited :: TVar (Set (NodeAddr ip)) | 37 | , searchVisited :: TVar (Set (NodeAddr ip)) |
44 | , searchResults :: TVar (Set r) | 38 | , searchResults :: TVar (Set r) |
45 | } | 39 | } |
46 | 40 | ||
47 | newSearch :: Eq ip => (NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r])) | 41 | newSearch :: ( Eq ip |
48 | -> NodeId KMessageOf -> [NodeInfo KMessageOf ip Ann] -> IO (IterativeSearch ip r) | 42 | , Ord (NodeId dht) |
43 | , FiniteBits (NodeId dht) | ||
44 | ) => (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])) | ||
45 | -> NodeId dht -> [NodeInfo dht ip u] -> IO (IterativeSearch dht u ip r) | ||
49 | newSearch qry target ns = atomically $ do | 46 | newSearch qry target ns = atomically $ do |
50 | c <- newTVar 0 | 47 | c <- newTVar 0 |
51 | q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns | 48 | q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns |
@@ -60,9 +57,14 @@ searchAlpha = 3 | |||
60 | searchK :: Int | 57 | searchK :: Int |
61 | searchK = 8 | 58 | searchK = 8 |
62 | 59 | ||
63 | sendQuery :: forall a ip. (Ord a, Ord ip) => | 60 | sendQuery :: forall a ip dht u. |
64 | IterativeSearch ip a | 61 | ( Ord a |
65 | -> Binding (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf)) | 62 | , Ord ip |
63 | , Ord (NodeId dht) | ||
64 | , FiniteBits (NodeId dht) | ||
65 | ) => | ||
66 | IterativeSearch dht u ip a | ||
67 | -> Binding (NodeInfo dht ip u) (NodeDistance (NodeId dht)) | ||
66 | -> IO () | 68 | -> IO () |
67 | sendQuery IterativeSearch{..} (ni :-> d) = do | 69 | sendQuery IterativeSearch{..} (ni :-> d) = do |
68 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) | 70 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) |
@@ -71,9 +73,9 @@ sendQuery IterativeSearch{..} (ni :-> d) = do | |||
71 | modifyTVar searchPendingCount pred | 73 | modifyTVar searchPendingCount pred |
72 | vs <- readTVar searchVisited | 74 | vs <- readTVar searchVisited |
73 | -- We only queue a node if it is not yet visited | 75 | -- We only queue a node if it is not yet visited |
74 | let insertFoundNode :: NodeInfo KMessageOf ip u | 76 | let insertFoundNode :: NodeInfo dht ip u |
75 | -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) | 77 | -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)) |
76 | -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) | 78 | -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)) |
77 | insertFoundNode n q | 79 | insertFoundNode n q |
78 | | nodeAddr n `Set.member` vs = q | 80 | | nodeAddr n `Set.member` vs = q |
79 | | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q | 81 | | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q |
@@ -82,7 +84,9 @@ sendQuery IterativeSearch{..} (ni :-> d) = do | |||
82 | modifyTVar searchResults $ \s -> foldr Set.insert s rs | 84 | modifyTVar searchResults $ \s -> foldr Set.insert s rs |
83 | 85 | ||
84 | 86 | ||
85 | searchIsFinished :: Ord ip => IterativeSearch ip r -> STM Bool | 87 | searchIsFinished :: ( Ord ip |
88 | , Ord (NodeId dht) | ||
89 | ) => IterativeSearch dht u ip r -> STM Bool | ||
86 | searchIsFinished IterativeSearch{..} = do | 90 | searchIsFinished IterativeSearch{..} = do |
87 | q <- readTVar searchQueued | 91 | q <- readTVar searchQueued |
88 | cnt <- readTVar searchPendingCount | 92 | cnt <- readTVar searchPendingCount |
@@ -94,8 +98,8 @@ searchIsFinished IterativeSearch{..} = do | |||
94 | <= PSQ.prio (fromJust $ MM.findMin q)))) | 98 | <= PSQ.prio (fromJust $ MM.findMin q)))) |
95 | 99 | ||
96 | search :: | 100 | search :: |
97 | (Ord r, Ord ip) => | 101 | (Ord r, Ord ip, Ord (NodeId dht), FiniteBits (NodeId dht)) => |
98 | IterativeSearch ip r -> IO () | 102 | IterativeSearch dht u ip r -> IO () |
99 | search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do | 103 | search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do |
100 | fix $ \again -> do | 104 | fix $ \again -> do |
101 | join $ atomically $ do | 105 | join $ atomically $ do |
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index d4794038..f96ba707 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -96,6 +96,7 @@ import Control.Monad.Trans.Control | |||
96 | import Control.Monad.Trans.Resource | 96 | import Control.Monad.Trans.Resource |
97 | import Data.Typeable | 97 | import Data.Typeable |
98 | import Data.String | 98 | import Data.String |
99 | import Data.Bits | ||
99 | import Data.ByteString | 100 | import Data.ByteString |
100 | import Data.Conduit.Lazy | 101 | import Data.Conduit.Lazy |
101 | import Data.Default | 102 | import Data.Default |
@@ -265,26 +266,17 @@ type AnnounceSet = Set (InfoHash, PortNumber) | |||
265 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | 266 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () |
266 | 267 | ||
267 | -- | DHT session keep track state of /this/ node. | 268 | -- | DHT session keep track state of /this/ node. |
268 | data Node ip = Node | 269 | data Node raw dht u ip = Node |
269 | { -- | Session configuration; | 270 | { -- | Session configuration; |
270 | options :: !Options | 271 | options :: !Options |
271 | 272 | ||
272 | -- | Pseudo-unique self-assigned session identifier. This value is | 273 | -- | Pseudo-unique self-assigned session identifier. This value is |
273 | -- constant during DHT session and (optionally) between sessions. | 274 | -- constant during DHT session and (optionally) between sessions. |
274 | #ifdef VERSION_bencoding | 275 | , tentativeNodeId :: !(NodeId dht) |
275 | , tentativeNodeId :: !(NodeId KMessageOf) | ||
276 | #else | ||
277 | , tentativeNodeId :: !(NodeId Tox.Message) | ||
278 | #endif | ||
279 | 276 | ||
280 | , resources :: !InternalState | 277 | , resources :: !InternalState |
281 | #ifdef VERSION_bencoding | 278 | , manager :: !(Manager raw dht) -- ^ RPC manager; |
282 | , manager :: !(Manager IO BValue KMessageOf) -- ^ RPC manager; | 279 | , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; |
283 | , routingInfo :: !(TVar (Maybe (R.Info KMessageOf ip ()))) -- ^ search table; | ||
284 | #else | ||
285 | , manager :: !(Manager (DHT ip) ByteString Tox.Message) -- ^ RPC manager; | ||
286 | , routingInfo :: !(TVar (Maybe (R.Info Tox.Message ip Bool))) -- ^ search table; | ||
287 | #endif | ||
288 | , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; | 280 | , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; |
289 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; | 281 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; |
290 | , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. | 282 | , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. |
@@ -293,23 +285,23 @@ data Node ip = Node | |||
293 | 285 | ||
294 | -- | DHT keep track current session and proper resource allocation for | 286 | -- | DHT keep track current session and proper resource allocation for |
295 | -- safe multithreading. | 287 | -- safe multithreading. |
296 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } | 288 | newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a } |
297 | deriving ( Functor, Applicative, Monad, MonadIO | 289 | deriving ( Functor, Applicative, Monad, MonadIO |
298 | , MonadBase IO, MonadReader (Node ip), MonadThrow | 290 | , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow |
299 | ) | 291 | ) |
300 | 292 | ||
301 | #if MIN_VERSION_monad_control(1,0,0) | 293 | #if MIN_VERSION_monad_control(1,0,0) |
302 | newtype DHTStM ip a = StM { | 294 | newtype DHTStM raw dht u ip a = StM { |
303 | unSt :: StM (ReaderT (Node ip) IO) a | 295 | unSt :: StM (ReaderT (Node raw dht u ip) IO) a |
304 | } | 296 | } |
305 | #endif | 297 | #endif |
306 | 298 | ||
307 | instance MonadBaseControl IO (DHT ip) where | 299 | instance MonadBaseControl IO (DHT raw dht u ip) where |
308 | #if MIN_VERSION_monad_control(1,0,0) | 300 | #if MIN_VERSION_monad_control(1,0,0) |
309 | type StM (DHT ip) a = DHTStM ip a | 301 | type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a |
310 | #else | 302 | #else |
311 | newtype StM (DHT ip) a = StM { | 303 | newtype StM (DHT raw dht u ip) a = StM { |
312 | unSt :: StM (ReaderT (Node ip) IO) a | 304 | unSt :: StM (ReaderT (Node raw dht u ip) IO) a |
313 | } | 305 | } |
314 | #endif | 306 | #endif |
315 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> | 307 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> |
@@ -321,22 +313,22 @@ instance MonadBaseControl IO (DHT ip) where | |||
321 | 313 | ||
322 | -- | Check is it is possible to run 'queryNode' or handle pending | 314 | -- | Check is it is possible to run 'queryNode' or handle pending |
323 | -- query from remote node. | 315 | -- query from remote node. |
324 | instance MonadActive (DHT ip) where | 316 | instance MonadActive (DHT raw dht u ip) where |
325 | monadActive = getManager >>= liftIO . isActive | 317 | monadActive = getManager >>= liftIO . isActive |
326 | {-# INLINE monadActive #-} | 318 | {-# INLINE monadActive #-} |
327 | 319 | ||
328 | -- | All allocated resources will be closed at 'closeNode'. | 320 | -- | All allocated resources will be closed at 'closeNode'. |
329 | instance MonadResource (DHT ip) where | 321 | instance MonadResource (DHT raw dht u ip) where |
330 | liftResourceT m = do | 322 | liftResourceT m = do |
331 | s <- asks resources | 323 | s <- asks resources |
332 | liftIO $ runInternalState m s | 324 | liftIO $ runInternalState m s |
333 | 325 | ||
334 | -- instance MonadKRPC (DHT ip) (DHT ip) BValue KMessageOf where | 326 | -- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where |
335 | 327 | ||
336 | getManager :: DHT ip (Manager IO BValue KMessageOf) | 328 | getManager :: DHT raw dht u ip (Manager raw dht) |
337 | getManager = asks manager | 329 | getManager = asks manager |
338 | 330 | ||
339 | instance MonadLogger (DHT ip) where | 331 | instance MonadLogger (DHT raw dht u ip) where |
340 | monadLoggerLog loc src lvl msg = do | 332 | monadLoggerLog loc src lvl msg = do |
341 | logger <- asks loggerFun | 333 | logger <- asks loggerFun |
342 | liftIO $ logger loc src lvl (toLogStr msg) | 334 | liftIO $ logger loc src lvl (toLogStr msg) |
@@ -344,7 +336,7 @@ instance MonadLogger (DHT ip) where | |||
344 | #ifdef VERSION_bencoding | 336 | #ifdef VERSION_bencoding |
345 | type NodeHandler = Handler IO KMessageOf BValue | 337 | type NodeHandler = Handler IO KMessageOf BValue |
346 | #else | 338 | #else |
347 | type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString | 339 | type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString |
348 | #endif | 340 | #endif |
349 | 341 | ||
350 | logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () | 342 | logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () |
@@ -376,17 +368,16 @@ locFromCS cs = case getCallStack cs of | |||
376 | -- | Run DHT session. You /must/ properly close session using | 368 | -- | Run DHT session. You /must/ properly close session using |
377 | -- 'closeNode' function, otherwise socket or other scarce resources may | 369 | -- 'closeNode' function, otherwise socket or other scarce resources may |
378 | -- leak. | 370 | -- leak. |
379 | newNode :: Address ip | 371 | newNode :: ( Address ip |
372 | , FiniteBits (NodeId dht) | ||
373 | , Serialize (NodeId dht) | ||
374 | ) | ||
380 | => -- [NodeHandler] -- ^ handlers to run on accepted queries; | 375 | => -- [NodeHandler] -- ^ handlers to run on accepted queries; |
381 | Options -- ^ various dht options; | 376 | Options -- ^ various dht options; |
382 | -> NodeAddr ip -- ^ node address to bind; | 377 | -> NodeAddr ip -- ^ node address to bind; |
383 | -> LogFun -- ^ invoked on log messages; | 378 | -> LogFun -- ^ invoked on log messages; |
384 | #ifdef VERSION_bencoding | 379 | -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated. |
385 | -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. | 380 | -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address. |
386 | #else | ||
387 | -> Maybe (NodeId Tox.Message) -- ^ use this NodeId, if not given a new one is generated. | ||
388 | #endif | ||
389 | -> IO (Node ip) -- ^ a new DHT node running at given address. | ||
390 | newNode opts naddr logger mbid = do | 381 | newNode opts naddr logger mbid = do |
391 | s <- createInternalState | 382 | s <- createInternalState |
392 | runInternalState initNode s | 383 | runInternalState initNode s |
@@ -409,11 +400,11 @@ newNode opts naddr logger mbid = do | |||
409 | 400 | ||
410 | -- | Some resources like listener thread may live for | 401 | -- | Some resources like listener thread may live for |
411 | -- some short period of time right after this DHT session closed. | 402 | -- some short period of time right after this DHT session closed. |
412 | closeNode :: Node ip -> IO () | 403 | closeNode :: Node raw dht u ip -> IO () |
413 | closeNode Node {..} = closeInternalState resources | 404 | closeNode Node {..} = closeInternalState resources |
414 | 405 | ||
415 | -- | Run DHT operation on the given session. | 406 | -- | Run DHT operation on the given session. |
416 | runDHT :: Node ip -> DHT ip a -> IO a | 407 | runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a |
417 | runDHT node action = runReaderT (unDHT action) node | 408 | runDHT node action = runReaderT (unDHT action) node |
418 | {-# INLINE runDHT #-} | 409 | {-# INLINE runDHT #-} |
419 | 410 | ||
@@ -453,24 +444,20 @@ checkToken sessionTokens addr questionableToken = do | |||
453 | -----------------------------------------------------------------------} | 444 | -----------------------------------------------------------------------} |
454 | 445 | ||
455 | -- | This nodes externally routable address reported by remote peers. | 446 | -- | This nodes externally routable address reported by remote peers. |
456 | routableAddress :: DHT ip (Maybe SockAddr) | 447 | routableAddress :: DHT raw dht u ip (Maybe SockAddr) |
457 | routableAddress = do | 448 | routableAddress = do |
458 | info <- asks routingInfo >>= liftIO . atomically . readTVar | 449 | info <- asks routingInfo >>= liftIO . atomically . readTVar |
459 | return $ myAddress <$> info | 450 | return $ myAddress <$> info |
460 | 451 | ||
461 | -- | The current NodeId that the given remote node should know us by. | 452 | -- | The current NodeId that the given remote node should know us by. |
462 | #ifdef VERSION_bencoding | 453 | myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht) |
463 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId KMessageOf) | ||
464 | #else | ||
465 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId Tox.Message) | ||
466 | #endif | ||
467 | myNodeIdAccordingTo _ = do | 454 | myNodeIdAccordingTo _ = do |
468 | info <- asks routingInfo >>= liftIO . atomically . readTVar | 455 | info <- asks routingInfo >>= liftIO . atomically . readTVar |
469 | maybe (asks tentativeNodeId) | 456 | maybe (asks tentativeNodeId) |
470 | (return . myNodeId) | 457 | (return . myNodeId) |
471 | info | 458 | info |
472 | 459 | ||
473 | myNodeIdAccordingTo1 :: DHT ip ( NodeAddr ip -> IO (NodeId KMessageOf) ) | 460 | myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) ) |
474 | myNodeIdAccordingTo1 = do | 461 | myNodeIdAccordingTo1 = do |
475 | var <- asks routingInfo | 462 | var <- asks routingInfo |
476 | tid <- asks tentativeNodeId | 463 | tid <- asks tentativeNodeId |
@@ -480,11 +467,7 @@ myNodeIdAccordingTo1 = do | |||
480 | 467 | ||
481 | -- | Get current routing table. Normally you don't need to use this | 468 | -- | Get current routing table. Normally you don't need to use this |
482 | -- function, but it can be usefull for debugging and profiling purposes. | 469 | -- function, but it can be usefull for debugging and profiling purposes. |
483 | #ifdef VERSION_bencoding | 470 | getTable :: Eq ip => DHT raw dht u ip (Table dht ip u) |
484 | getTable :: Eq ip => DHT ip (Table KMessageOf ip ()) | ||
485 | #else | ||
486 | getTable :: Eq ip => DHT ip (Table Tox.Message ip Bool) | ||
487 | #endif | ||
488 | getTable = do | 471 | getTable = do |
489 | Node { tentativeNodeId = myId | 472 | Node { tentativeNodeId = myId |
490 | , routingInfo = var | 473 | , routingInfo = var |
@@ -492,18 +475,18 @@ getTable = do | |||
492 | let nil = nullTable myId (optBucketCount opts) | 475 | let nil = nullTable myId (optBucketCount opts) |
493 | liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) | 476 | liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) |
494 | 477 | ||
495 | getSwarms :: Ord ip => DHT ip [ (InfoHash, Int, Maybe ByteString) ] | 478 | getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ] |
496 | getSwarms = do | 479 | getSwarms = do |
497 | store <- asks contactInfo >>= liftIO . atomically . readTVar | 480 | store <- asks contactInfo >>= liftIO . atomically . readTVar |
498 | return $ P.knownSwarms store | 481 | return $ P.knownSwarms store |
499 | 482 | ||
500 | savePeerStore :: (Ord ip, Address ip) => DHT ip ByteString | 483 | savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString |
501 | savePeerStore = do | 484 | savePeerStore = do |
502 | var <- asks contactInfo | 485 | var <- asks contactInfo |
503 | peers <- liftIO $ atomically $ readTVar var | 486 | peers <- liftIO $ atomically $ readTVar var |
504 | return $ S.encode peers | 487 | return $ S.encode peers |
505 | 488 | ||
506 | mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT ip () | 489 | mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip () |
507 | mergeSavedPeers bs = do | 490 | mergeSavedPeers bs = do |
508 | var <- asks contactInfo | 491 | var <- asks contactInfo |
509 | case S.decode bs of | 492 | case S.decode bs of |
@@ -511,7 +494,7 @@ mergeSavedPeers bs = do | |||
511 | Left _ -> return () | 494 | Left _ -> return () |
512 | 495 | ||
513 | 496 | ||
514 | allPeers :: Ord ip => InfoHash -> DHT ip [ PeerAddr ip ] | 497 | allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ] |
515 | allPeers ih = do | 498 | allPeers ih = do |
516 | store <- asks contactInfo >>= liftIO . atomically . readTVar | 499 | store <- asks contactInfo >>= liftIO . atomically . readTVar |
517 | return $ P.lookup ih store | 500 | return $ P.lookup ih store |
@@ -521,18 +504,20 @@ allPeers ih = do | |||
521 | -- | 504 | -- |
522 | -- This operation used for 'find_nodes' query. | 505 | -- This operation used for 'find_nodes' query. |
523 | -- | 506 | -- |
524 | #ifdef VERSION_bencoding | 507 | getClosest :: ( Eq ip |
525 | getClosest :: Eq ip => TableKey KMessageOf k => k -> DHT ip [NodeInfo KMessageOf ip ()] | 508 | , Ord (NodeId dht) |
526 | #else | 509 | , FiniteBits (NodeId dht) |
527 | getClosest :: Eq ip => TableKey Tox.Message k => k -> DHT ip [NodeInfo Tox.Message ip Bool] | 510 | , TableKey dht k ) => |
528 | #endif | 511 | k -> DHT raw dht u ip [NodeInfo dht ip u] |
529 | getClosest node = do | 512 | getClosest node = do |
530 | k <- asks (optK . options) | 513 | k <- asks (optK . options) |
531 | kclosest k node <$> getTable | 514 | kclosest k node <$> getTable |
532 | 515 | ||
533 | getClosest1 :: ( Eq ip | 516 | getClosest1 :: ( Eq ip |
534 | , TableKey KMessageOf k | 517 | , Ord (NodeId dht) |
535 | ) => DHT ip (k -> IO [NodeInfo KMessageOf ip ()]) | 518 | , FiniteBits (NodeId dht) |
519 | , TableKey dht k | ||
520 | ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u]) | ||
536 | getClosest1 = do | 521 | getClosest1 = do |
537 | k <- asks (optK . options) | 522 | k <- asks (optK . options) |
538 | nobkts <- asks (optBucketCount . options) | 523 | nobkts <- asks (optBucketCount . options) |
@@ -574,13 +559,12 @@ getTimestamp = do | |||
574 | -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) | 559 | -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) |
575 | return $ utcTimeToPOSIXSeconds utcTime | 560 | return $ utcTimeToPOSIXSeconds utcTime |
576 | 561 | ||
577 | |||
578 | #ifdef VERSION_bencoding | 562 | #ifdef VERSION_bencoding |
579 | -- | Prepare result for 'get_peers' query. | 563 | -- | Prepare result for 'get_peers' query. |
580 | -- | 564 | -- |
581 | -- This operation use 'getClosest' as failback so it may block. | 565 | -- This operation use 'getClosest' as failback so it may block. |
582 | -- | 566 | -- |
583 | getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip) | 567 | getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) |
584 | getPeerList ih = do | 568 | getPeerList ih = do |
585 | var <- asks contactInfo | 569 | var <- asks contactInfo |
586 | ps <- liftIO $ lookupPeers var ih | 570 | ps <- liftIO $ lookupPeers var ih |
@@ -588,7 +572,7 @@ getPeerList ih = do | |||
588 | then Left <$> getClosest ih | 572 | then Left <$> getClosest ih |
589 | else return (Right ps) | 573 | else return (Right ps) |
590 | 574 | ||
591 | getPeerList1 :: Ord ip => DHT ip (InfoHash -> IO (PeerList ip)) | 575 | getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) |
592 | getPeerList1 = do | 576 | getPeerList1 = do |
593 | var <- asks contactInfo | 577 | var <- asks contactInfo |
594 | getclosest <- getClosest1 | 578 | getclosest <- getClosest1 |
@@ -599,12 +583,12 @@ getPeerList1 = do | |||
599 | else return (Right ps) | 583 | else return (Right ps) |
600 | 584 | ||
601 | 585 | ||
602 | insertTopic :: InfoHash -> PortNumber -> DHT ip () | 586 | insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () |
603 | insertTopic ih p = do | 587 | insertTopic ih p = do |
604 | var <- asks announceInfo | 588 | var <- asks announceInfo |
605 | liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) | 589 | liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) |
606 | 590 | ||
607 | deleteTopic :: InfoHash -> PortNumber -> DHT ip () | 591 | deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () |
608 | deleteTopic ih p = do | 592 | deleteTopic ih p = do |
609 | var <- asks announceInfo | 593 | var <- asks announceInfo |
610 | liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) | 594 | liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) |
@@ -616,7 +600,7 @@ deleteTopic ih p = do | |||
616 | -----------------------------------------------------------------------} | 600 | -----------------------------------------------------------------------} |
617 | 601 | ||
618 | -- | Failed queries are ignored. | 602 | -- | Failed queries are ignored. |
619 | queryParallel :: [DHT ip a] -> DHT ip [a] | 603 | queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a] |
620 | queryParallel queries = do | 604 | queryParallel queries = do |
621 | -- TODO: use alpha | 605 | -- TODO: use alpha |
622 | -- alpha <- asks (optAlpha . options) | 606 | -- alpha <- asks (optAlpha . options) |
diff --git a/src/Network/DHT/Mainline.hs b/src/Network/DHT/Mainline.hs index d118ceb0..29d4231d 100644 --- a/src/Network/DHT/Mainline.hs +++ b/src/Network/DHT/Mainline.hs | |||
@@ -123,6 +123,8 @@ import Network.BitTorrent.DHT.Token | |||
123 | import Network.DatagramServer () | 123 | import Network.DatagramServer () |
124 | #endif | 124 | #endif |
125 | import Network.DatagramServer.Types hiding (Query,Response) | 125 | import Network.DatagramServer.Types hiding (Query,Response) |
126 | import Network.DHT.Types | ||
127 | import Network.DHT.Routing | ||
126 | 128 | ||
127 | {----------------------------------------------------------------------- | 129 | {----------------------------------------------------------------------- |
128 | -- envelopes | 130 | -- envelopes |
@@ -140,15 +142,7 @@ read_only_key = "ro" | |||
140 | 142 | ||
141 | 143 | ||
142 | #ifdef VERSION_bencoding | 144 | #ifdef VERSION_bencoding |
143 | -- | All queries have an \"id\" key and value containing the node ID | 145 | instance BEncode a => BEncode (Query KMessageOf a) where |
144 | -- of the querying node. | ||
145 | data Query a = Query | ||
146 | { queringNodeId :: NodeId KMessageOf -- ^ node id of /quering/ node; | ||
147 | , queryIsReadOnly :: Bool -- ^ node is read-only as per BEP 43 | ||
148 | , queryParams :: a -- ^ query parameters. | ||
149 | } deriving (Show, Eq, Typeable) | ||
150 | |||
151 | instance BEncode a => BEncode (Query a) where | ||
152 | toBEncode Query {..} = toDict $ | 146 | toBEncode Query {..} = toDict $ |
153 | BDict.union ( node_id_key .=! queringNodeId | 147 | BDict.union ( node_id_key .=! queringNodeId |
154 | .: read_only_key .=? bool Nothing (Just (1 :: Integer)) queryIsReadOnly | 148 | .: read_only_key .=? bool Nothing (Just (1 :: Integer)) queryIsReadOnly |
@@ -167,14 +161,7 @@ data Query a = Query a | |||
167 | #endif | 161 | #endif |
168 | 162 | ||
169 | #ifdef VERSION_bencoding | 163 | #ifdef VERSION_bencoding |
170 | -- | All responses have an \"id\" key and value containing the node ID | 164 | instance BEncode a => BEncode (Response KMessageOf a) where |
171 | -- of the responding node. | ||
172 | data Response a = Response | ||
173 | { queredNodeId :: NodeId KMessageOf -- ^ node id of /quered/ node; | ||
174 | , responseVals :: a -- ^ query result. | ||
175 | } deriving (Show, Eq, Typeable) | ||
176 | |||
177 | instance BEncode a => BEncode (Response a) where | ||
178 | toBEncode = toBEncode . toQuery | 165 | toBEncode = toBEncode . toQuery |
179 | where | 166 | where |
180 | toQuery (Response nid a) = Query nid False a | 167 | toQuery (Response nid a) = Query nid False a |
@@ -183,28 +170,23 @@ instance BEncode a => BEncode (Response a) where | |||
183 | where | 170 | where |
184 | fromQuery (Query nid _ a) = Response nid a | 171 | fromQuery (Query nid _ a) = Response nid a |
185 | #else | 172 | #else |
186 | data Response a = Response a | 173 | data Response KMessageOf a = Response KMessageOf a |
187 | #endif | 174 | #endif |
188 | 175 | ||
189 | {----------------------------------------------------------------------- | 176 | {----------------------------------------------------------------------- |
190 | -- ping method | 177 | -- ping method |
191 | -----------------------------------------------------------------------} | 178 | -----------------------------------------------------------------------} |
192 | 179 | ||
193 | -- | The most basic query is a ping. Ping query is used to check if a | 180 | -- / The most basic query is a ping. Ping query is used to check if a |
194 | -- quered node is still alive. | 181 | -- quered node is still alive. |
195 | #ifdef VERSION_bencoding | 182 | -- data Ping = Ping Tox.Nonce8 deriving (Show, Eq, Typeable) |
196 | data Ping = Ping | ||
197 | #else | ||
198 | data Ping = Ping Tox.Nonce8 | ||
199 | #endif | ||
200 | deriving (Show, Eq, Typeable) | ||
201 | 183 | ||
202 | #ifdef VERSION_bencoding | 184 | #ifdef VERSION_bencoding |
203 | instance BEncode Ping where | 185 | instance BEncode (Ping KMessageOf) where |
204 | toBEncode Ping = toDict endDict | 186 | toBEncode Ping = toDict endDict |
205 | fromBEncode _ = pure Ping | 187 | fromBEncode _ = pure Ping |
206 | #else | 188 | #else |
207 | instance Serialize (Query Ping) where | 189 | instance Serialize (Query (Ping KMessageOf)) where |
208 | get = do | 190 | get = do |
209 | b <- get | 191 | b <- get |
210 | when ( (b::Word8) /= 0) $ fail "Bad ping request" | 192 | when ( (b::Word8) /= 0) $ fail "Bad ping request" |
@@ -225,7 +207,7 @@ instance Serialize (Response Ping) where | |||
225 | #endif | 207 | #endif |
226 | 208 | ||
227 | -- | \"q\" = \"ping\" | 209 | -- | \"q\" = \"ping\" |
228 | instance KRPC (Query Ping) (Response Ping) where | 210 | instance KRPC (Query KMessageOf (Ping KMessageOf)) (Response KMessageOf (Ping KMessageOf)) where |
229 | #ifdef VERSION_bencoding | 211 | #ifdef VERSION_bencoding |
230 | method = "ping" | 212 | method = "ping" |
231 | #else | 213 | #else |
@@ -236,24 +218,20 @@ instance KRPC (Query Ping) (Response Ping) where | |||
236 | -- find_node method | 218 | -- find_node method |
237 | -----------------------------------------------------------------------} | 219 | -----------------------------------------------------------------------} |
238 | 220 | ||
239 | -- | Find node is used to find the contact information for a node | 221 | -- / Find node is used to find the contact information for a node |
240 | -- given its ID. | 222 | -- given its ID. |
241 | #ifdef VERSION_bencoding | 223 | -- data FindNode KMessageOf ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes |
242 | newtype FindNode ip = FindNode (NodeId KMessageOf) | 224 | -- deriving (Show, Eq, Typeable) |
243 | #else | ||
244 | data FindNode ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes | ||
245 | #endif | ||
246 | deriving (Show, Eq, Typeable) | ||
247 | 225 | ||
248 | target_key :: BKey | 226 | target_key :: BKey |
249 | target_key = "target" | 227 | target_key = "target" |
250 | 228 | ||
251 | #ifdef VERSION_bencoding | 229 | #ifdef VERSION_bencoding |
252 | instance Typeable ip => BEncode (FindNode ip) where | 230 | instance Typeable ip => BEncode (FindNode KMessageOf ip) where |
253 | toBEncode (FindNode nid) = toDict $ target_key .=! nid .: endDict | 231 | toBEncode (FindNode nid) = toDict $ target_key .=! nid .: endDict |
254 | fromBEncode = fromDict $ FindNode <$>! target_key | 232 | fromBEncode = fromDict $ FindNode <$>! target_key |
255 | #else | 233 | #else |
256 | instance Serialize (Query (FindNode ip)) where | 234 | instance Serialize (Query KMessageOf (FindNode KMessageOf ip)) where |
257 | get = do | 235 | get = do |
258 | nid <- get | 236 | nid <- get |
259 | nonce <- get | 237 | nonce <- get |
@@ -268,12 +246,11 @@ instance Serialize (Query (FindNode ip)) where | |||
268 | -- nodes in its own routing table. | 246 | -- nodes in its own routing table. |
269 | -- | 247 | -- |
270 | #ifdef VERSION_bencoding | 248 | #ifdef VERSION_bencoding |
271 | newtype NodeFound ip = NodeFound [NodeInfo KMessageOf ip ()] | 249 | -- newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()] deriving (Show, Eq, Typeable) |
272 | #else | 250 | #else |
273 | data NodeFound ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 | 251 | data NodeFound KMessageOf ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 deriving (Show, Eq, Typeable) |
274 | #endif | 252 | #endif |
275 | -- Tox: send_nodes | 253 | -- Tox: send_nodes |
276 | deriving (Show, Eq, Typeable) | ||
277 | 254 | ||
278 | nodes_key :: BKey | 255 | nodes_key :: BKey |
279 | nodes_key = "nodes" | 256 | nodes_key = "nodes" |
@@ -290,7 +267,7 @@ binary k = field (req k) >>= either (fail . format) return . | |||
290 | where | 267 | where |
291 | format str = "fail to deserialize " ++ show k ++ " field: " ++ str | 268 | format str = "fail to deserialize " ++ show k ++ " field: " ++ str |
292 | 269 | ||
293 | instance Address ip => BEncode (NodeFound ip) where | 270 | instance Address ip => BEncode (NodeFound KMessageOf ip) where |
294 | toBEncode (NodeFound ns) = toDict $ | 271 | toBEncode (NodeFound ns) = toDict $ |
295 | nodes_key .=! runPut (mapM_ put ns) | 272 | nodes_key .=! runPut (mapM_ put ns) |
296 | .: endDict | 273 | .: endDict |
@@ -298,7 +275,7 @@ instance Address ip => BEncode (NodeFound ip) where | |||
298 | -- TODO: handle IPv6 by reading the "nodes6" key (see bep 32) | 275 | -- TODO: handle IPv6 by reading the "nodes6" key (see bep 32) |
299 | fromBEncode bval = NodeFound <$> (traverse from4 =<< fromDict (binary nodes_key) bval) | 276 | fromBEncode bval = NodeFound <$> (traverse from4 =<< fromDict (binary nodes_key) bval) |
300 | #else | 277 | #else |
301 | instance Serialize (Response (NodeFound ip)) where | 278 | instance Serialize (Response KMessageOf (NodeFound KMessageOf ip)) where |
302 | get = do | 279 | get = do |
303 | count <- get :: Get Word8 | 280 | count <- get :: Get Word8 |
304 | nodes <- sequence $ replicate (fromIntegral count) get | 281 | nodes <- sequence $ replicate (fromIntegral count) get |
@@ -314,7 +291,7 @@ instance Serialize (Response (NodeFound ip)) where | |||
314 | 291 | ||
315 | -- | \"q\" == \"find_node\" | 292 | -- | \"q\" == \"find_node\" |
316 | instance (Address ip, Typeable ip) | 293 | instance (Address ip, Typeable ip) |
317 | => KRPC (Query (FindNode ip)) (Response (NodeFound ip)) where | 294 | => KRPC (Query KMessageOf (FindNode KMessageOf ip)) (Response KMessageOf (NodeFound KMessageOf ip)) where |
318 | #ifdef VERSION_bencoding | 295 | #ifdef VERSION_bencoding |
319 | method = "find_node" | 296 | method = "find_node" |
320 | #else | 297 | #else |
@@ -383,7 +360,7 @@ instance (Typeable ip, Serialize ip) => BEncode (GotPeers ip) where | |||
383 | 360 | ||
384 | -- | \"q" = \"get_peers\" | 361 | -- | \"q" = \"get_peers\" |
385 | instance (Typeable ip, Serialize ip) => | 362 | instance (Typeable ip, Serialize ip) => |
386 | KRPC (Query (GetPeers ip)) (Response (GotPeers ip)) where | 363 | KRPC (Query KMessageOf (GetPeers ip)) (Response KMessageOf (GotPeers ip)) where |
387 | method = "get_peers" | 364 | method = "get_peers" |
388 | 365 | ||
389 | {----------------------------------------------------------------------- | 366 | {----------------------------------------------------------------------- |
@@ -455,7 +432,7 @@ instance BEncode Announced where | |||
455 | fromBEncode _ = pure Announced | 432 | fromBEncode _ = pure Announced |
456 | 433 | ||
457 | -- | \"q" = \"announce\" | 434 | -- | \"q" = \"announce\" |
458 | instance KRPC (Query Announce) (Response Announced) where | 435 | instance KRPC (Query KMessageOf Announce) (Response KMessageOf Announced) where |
459 | method = "announce_peer" | 436 | method = "announce_peer" |
460 | 437 | ||
461 | -- endif VERSION_bencoding | 438 | -- endif VERSION_bencoding |
@@ -495,3 +472,25 @@ bep42 addr (NodeId r) | |||
495 | where msk | BS.length ip == 4 = ip4mask | 472 | where msk | BS.length ip == 4 = ip4mask |
496 | | otherwise = ip6mask | 473 | | otherwise = ip6mask |
497 | 474 | ||
475 | instance Kademlia KMessageOf where | ||
476 | data Ping KMessageOf = Ping | ||
477 | deriving (Show, Eq, Typeable) | ||
478 | newtype FindNode KMessageOf ip = FindNode (NodeId KMessageOf) | ||
479 | deriving (Show, Eq, Typeable) | ||
480 | newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()] | ||
481 | deriving (Show, Eq, Typeable) | ||
482 | pingMessage _ = Ping | ||
483 | pongMessage _ = Ping | ||
484 | findNodeMessage _ k = FindNode (toNodeId k) | ||
485 | foundNodes (NodeFound ns) = ns | ||
486 | |||
487 | dhtAdjustID _ fallback ip0 arrival | ||
488 | = fromMaybe fallback $ do | ||
489 | ip <- fromSockAddr ip0 -- :: Maybe ip | ||
490 | let _ = ip `asTypeOf` nodeAddr (foreignNode arrival) | ||
491 | listToMaybe | ||
492 | $ rank id (nodeId $ foreignNode arrival) | ||
493 | $ bep42s ip fallback | ||
494 | |||
495 | namePing _ = "ping" | ||
496 | nameFindNodes _ = "find-nodes" | ||
diff --git a/src/Network/DHT/Types.hs b/src/Network/DHT/Types.hs index ed2dc175..79f9e1d3 100644 --- a/src/Network/DHT/Types.hs +++ b/src/Network/DHT/Types.hs | |||
@@ -1,8 +1,17 @@ | |||
1 | module Network.DHT.Types where | 1 | {-# LANGUAGE TypeFamilies #-} |
2 | {-# LANGUAGE ScopedTypeVariables #-} | ||
3 | {-# LANGUAGE StandaloneDeriving #-} | ||
4 | {-# LANGUAGE FlexibleContexts #-} | ||
5 | module Network.DHT.Types | ||
6 | ( module Network.DHT.Types | ||
7 | , TableKey | ||
8 | , toNodeId | ||
9 | ) where | ||
2 | 10 | ||
3 | import Network.Socket (SockAddr) | 11 | import Network.Socket (SockAddr) |
4 | import Network.DatagramServer.Types | 12 | import Network.DatagramServer.Types |
5 | import Network.DHT.Routing | 13 | import Network.DHT.Routing |
14 | import Data.Typeable | ||
6 | 15 | ||
7 | data TableParameters msg ip u = TableParameters | 16 | data TableParameters msg ip u = TableParameters |
8 | { maxBuckets :: Int | 17 | { maxBuckets :: Int |
@@ -11,3 +20,43 @@ data TableParameters msg ip u = TableParameters | |||
11 | , logMessage :: Char -> String -> IO () | 20 | , logMessage :: Char -> String -> IO () |
12 | , adjustID :: SockAddr -> Event msg ip u -> NodeId msg | 21 | , adjustID :: SockAddr -> Event msg ip u -> NodeId msg |
13 | } | 22 | } |
23 | |||
24 | -- | All queries have an \"id\" key and value containing the node ID | ||
25 | -- of the querying node. | ||
26 | data Query dht a = Query | ||
27 | { queringNodeId :: NodeId dht -- ^ node id of /quering/ node; | ||
28 | , queryIsReadOnly :: Bool -- ^ node is read-only as per BEP 43 | ||
29 | , queryParams :: a -- ^ query parameters. | ||
30 | } deriving (Typeable) | ||
31 | |||
32 | deriving instance (Eq (NodeId dht), Eq a ) => Eq (Query dht a) | ||
33 | deriving instance (Show (NodeId dht), Show a ) => Show (Query dht a) | ||
34 | |||
35 | -- | All responses have an \"id\" key and value containing the node ID | ||
36 | -- of the responding node. | ||
37 | data Response dht a = Response | ||
38 | { queredNodeId :: NodeId dht -- ^ node id of /quered/ node; | ||
39 | , responseVals :: a -- ^ query result. | ||
40 | } deriving (Typeable) | ||
41 | |||
42 | deriving instance (Eq (NodeId dht), Eq a ) => Eq (Response dht a) | ||
43 | deriving instance (Show (NodeId dht), Show a ) => Show (Response dht a) | ||
44 | |||
45 | |||
46 | class Kademlia dht where | ||
47 | -- | The most basic query is a ping. Ping query is used to check if a | ||
48 | -- quered node is still alive. | ||
49 | data Ping dht | ||
50 | -- | Find node is used to find the contact information for a node | ||
51 | -- given its ID. | ||
52 | data FindNode dht ip | ||
53 | data NodeFound dht ip | ||
54 | pingMessage :: Proxy dht -> Ping dht | ||
55 | pongMessage :: Proxy dht -> Ping dht | ||
56 | findNodeMessage :: TableKey dht k => Proxy dht -> k -> FindNode dht ip | ||
57 | foundNodesMessage :: [NodeInfo dht ip ()] -> NodeFound dht ip | ||
58 | foundNodes :: NodeFound dht ip -> [NodeInfo dht ip ()] | ||
59 | findWho :: FindNode dht ip -> NodeId dht | ||
60 | dhtAdjustID :: Address ip => Proxy dht -> NodeId dht -> SockAddr -> Event dht ip u -> NodeId dht | ||
61 | namePing :: Proxy dht -> QueryMethod dht | ||
62 | nameFindNodes :: Proxy dht -> QueryMethod dht | ||
diff --git a/src/Network/DatagramServer.hs b/src/Network/DatagramServer.hs index 2140e2cd..91efa443 100644 --- a/src/Network/DatagramServer.hs +++ b/src/Network/DatagramServer.hs | |||
@@ -190,13 +190,13 @@ type Handler h msg v = (QueryMethod msg, HandlerBody h msg v) | |||
190 | 190 | ||
191 | -- | Keep track pending queries made by /this/ node and handle queries | 191 | -- | Keep track pending queries made by /this/ node and handle queries |
192 | -- made by /remote/ nodes. | 192 | -- made by /remote/ nodes. |
193 | data Manager h raw msg = Manager | 193 | data Manager raw msg = Manager |
194 | { sock :: !Socket | 194 | { sock :: !Socket |
195 | , options :: !Options | 195 | , options :: !Options |
196 | , listenerThread :: !(MVar ThreadId) | 196 | , listenerThread :: !(MVar ThreadId) |
197 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter | 197 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter |
198 | , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw) | 198 | , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw) |
199 | , handlers :: [Handler h msg raw] -- TODO delete this, it's not used | 199 | -- , handlers :: [Handler h msg raw] -- TODO delete this, it's not used |
200 | , logMsg :: Char -> String -> T.Text -> IO () | 200 | , logMsg :: Char -> String -> T.Text -> IO () |
201 | } | 201 | } |
202 | 202 | ||
@@ -212,14 +212,14 @@ newManager :: Options -- ^ various protocol options; | |||
212 | -> (Char -> String -> T.Text -> IO ()) -- ^ loging function | 212 | -> (Char -> String -> T.Text -> IO ()) -- ^ loging function |
213 | -> SockAddr -- ^ address to listen on; | 213 | -> SockAddr -- ^ address to listen on; |
214 | -> [Handler h msg raw] -- ^ handlers to run on incoming queries. | 214 | -> [Handler h msg raw] -- ^ handlers to run on incoming queries. |
215 | -> IO (Manager h raw msg) -- ^ new rpc manager. | 215 | -> IO (Manager raw msg) -- ^ new rpc manager. |
216 | newManager opts @ Options {..} logmsg servAddr handlers = do | 216 | newManager opts @ Options {..} logmsg servAddr handlers = do |
217 | validateOptions opts | 217 | validateOptions opts |
218 | sock <- bindServ | 218 | sock <- bindServ |
219 | tref <- newEmptyMVar | 219 | tref <- newEmptyMVar |
220 | tran <- newIORef optSeedTransaction | 220 | tran <- newIORef optSeedTransaction |
221 | calls <- newIORef M.empty | 221 | calls <- newIORef M.empty |
222 | return $ Manager sock opts tref tran calls handlers logmsg | 222 | return $ Manager sock opts tref tran calls logmsg |
223 | where | 223 | where |
224 | bindServ = do | 224 | bindServ = do |
225 | let family = sockAddrFamily servAddr | 225 | let family = sockAddrFamily servAddr |
@@ -230,7 +230,7 @@ newManager opts @ Options {..} logmsg servAddr handlers = do | |||
230 | return sock | 230 | return sock |
231 | 231 | ||
232 | -- | Unblock all pending calls and close socket. | 232 | -- | Unblock all pending calls and close socket. |
233 | closeManager :: Manager m raw msg -> IO () | 233 | closeManager :: Manager raw msg -> IO () |
234 | closeManager Manager {..} = do | 234 | closeManager Manager {..} = do |
235 | maybe (return ()) killThread =<< tryTakeMVar listenerThread | 235 | maybe (return ()) killThread =<< tryTakeMVar listenerThread |
236 | -- TODO unblock calls | 236 | -- TODO unblock calls |
@@ -238,7 +238,7 @@ closeManager Manager {..} = do | |||
238 | 238 | ||
239 | -- | Check if the manager is still active. Manager becomes active | 239 | -- | Check if the manager is still active. Manager becomes active |
240 | -- until 'closeManager' called. | 240 | -- until 'closeManager' called. |
241 | isActive :: Manager m raw msg -> IO Bool | 241 | isActive :: Manager raw msg -> IO Bool |
242 | isActive Manager {..} = liftIO $ isBound sock | 242 | isActive Manager {..} = liftIO $ isBound sock |
243 | {-# INLINE isActive #-} | 243 | {-# INLINE isActive #-} |
244 | 244 | ||
@@ -246,7 +246,7 @@ isActive Manager {..} = liftIO $ isBound sock | |||
246 | -- | Normally you should use Control.Monad.Trans.Resource.allocate | 246 | -- | Normally you should use Control.Monad.Trans.Resource.allocate |
247 | -- function. | 247 | -- function. |
248 | withManager :: Options -> SockAddr -> [Handler h msg raw] | 248 | withManager :: Options -> SockAddr -> [Handler h msg raw] |
249 | -> (Manager h raw msg -> IO a) -> IO a | 249 | -> (Manager raw msg -> IO a) -> IO a |
250 | withManager opts addr hs = bracket (newManager opts addr hs) closeManager | 250 | withManager opts addr hs = bracket (newManager opts addr hs) closeManager |
251 | #endif | 251 | #endif |
252 | 252 | ||
@@ -289,7 +289,7 @@ genTransactionId ref = do | |||
289 | uniqueTransactionId cur | 289 | uniqueTransactionId cur |
290 | 290 | ||
291 | -- | How many times 'query' call have been performed. | 291 | -- | How many times 'query' call have been performed. |
292 | getQueryCount :: Manager h raw msg -> IO Int | 292 | getQueryCount :: Manager raw msg -> IO Int |
293 | getQueryCount mgr@Manager{..} = do | 293 | getQueryCount mgr@Manager{..} = do |
294 | curTrans <- readIORef transactionCounter | 294 | curTrans <- readIORef transactionCounter |
295 | return $ curTrans - optSeedTransaction options | 295 | return $ curTrans - optSeedTransaction options |
@@ -320,21 +320,21 @@ sendQuery sock addr q = handle sockError $ sendMessage sock addr q | |||
320 | -- This function should throw 'QueryFailure' exception if quered node | 320 | -- This function should throw 'QueryFailure' exception if quered node |
321 | -- respond with @error@ message or the query timeout expires. | 321 | -- respond with @error@ message or the query timeout expires. |
322 | -- | 322 | -- |
323 | query :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO b | 323 | query :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager raw msg -> QueryMethod msg -> SockAddr -> a -> IO b |
324 | query mgr meth addr params = queryK mgr meth addr params (\_ x _ -> x) | 324 | query mgr meth addr params = queryK mgr meth addr params (\_ x _ -> x) |
325 | 325 | ||
326 | -- | Like 'query' but possibly returns your externally routable IP address. | 326 | -- | Like 'query' but possibly returns your externally routable IP address. |
327 | query' :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, Maybe ReflectedIP) | 327 | query' :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, Maybe ReflectedIP) |
328 | query' mgr meth addr params = queryK mgr meth addr params (const (,)) | 328 | query' mgr meth addr params = queryK mgr meth addr params (const (,)) |
329 | 329 | ||
330 | -- | Enqueue a query, but give us the complete BEncoded content sent by the | 330 | -- | Enqueue a query, but give us the complete BEncoded content sent by the |
331 | -- remote Node. This is useful for handling extensions that this library does | 331 | -- remote Node. This is useful for handling extensions that this library does |
332 | -- not otherwise support. | 332 | -- not otherwise support. |
333 | queryRaw :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, raw) | 333 | queryRaw :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, raw) |
334 | queryRaw mgr meth addr params = queryK mgr meth addr params (\raw x _ -> (x,raw)) | 334 | queryRaw mgr meth addr params = queryK mgr meth addr params (\raw x _ -> (x,raw)) |
335 | 335 | ||
336 | queryK :: forall h a b x raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => | 336 | queryK :: forall h a b x raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => |
337 | Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> IO x | 337 | Manager raw msg -> QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> IO x |
338 | queryK mgr@Manager{..} meth addr params kont = do | 338 | queryK mgr@Manager{..} meth addr params kont = do |
339 | tid <- liftIO $ genTransactionId transactionCounter | 339 | tid <- liftIO $ genTransactionId transactionCounter |
340 | -- let queryMethod = method :: Method a b | 340 | -- let queryMethod = method :: Method a b |
@@ -424,7 +424,7 @@ handler name body = (name, wrapper) | |||
424 | runHandler :: ( Envelope msg | 424 | runHandler :: ( Envelope msg |
425 | , Show (QueryMethod msg) | 425 | , Show (QueryMethod msg) |
426 | , Serialize (TransactionID msg)) | 426 | , Serialize (TransactionID msg)) |
427 | => Manager IO raw msg -> QueryMethod msg -> HandlerBody IO msg raw -> SockAddr -> msg raw -> IO (KResult msg raw) | 427 | => Manager raw msg -> QueryMethod msg -> HandlerBody IO msg raw -> SockAddr -> msg raw -> IO (KResult msg raw) |
428 | runHandler mgr@Manager{..} meth h addr m = Lifted.catches wrapper failbacks | 428 | runHandler mgr@Manager{..} meth h addr m = Lifted.catches wrapper failbacks |
429 | where | 429 | where |
430 | signature = querySignature meth (envelopeTransaction m) addr | 430 | signature = querySignature meth (envelopeTransaction m) addr |
@@ -462,7 +462,7 @@ dispatchHandler :: ( Eq (QueryMethod msg) | |||
462 | , Show (QueryMethod msg) | 462 | , Show (QueryMethod msg) |
463 | , Serialize (TransactionID msg) | 463 | , Serialize (TransactionID msg) |
464 | , Envelope msg | 464 | , Envelope msg |
465 | ) => Manager IO raw msg -> [Handler IO msg raw] -> QueryMethod msg -> msg raw -> SockAddr -> IO (KResult msg raw) | 465 | ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> msg raw -> SockAddr -> IO (KResult msg raw) |
466 | dispatchHandler mgr handlers meth q addr = do | 466 | dispatchHandler mgr handlers meth q addr = do |
467 | case L.lookup meth handlers of | 467 | case L.lookup meth handlers of |
468 | Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q) | 468 | Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q) |
@@ -483,7 +483,7 @@ handleQuery :: ( WireFormat raw msg | |||
483 | , Eq (QueryMethod msg) | 483 | , Eq (QueryMethod msg) |
484 | , Show (QueryMethod msg) | 484 | , Show (QueryMethod msg) |
485 | , Serialize (TransactionID msg) | 485 | , Serialize (TransactionID msg) |
486 | ) => Manager IO raw msg -> [Handler IO msg raw] -> QueryMethod msg -> raw -> msg raw -> SockAddr -> IO () | 486 | ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> raw -> msg raw -> SockAddr -> IO () |
487 | handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do | 487 | handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do |
488 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" | 488 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" |
489 | res <- dispatchHandler mgr hs meth q addr | 489 | res <- dispatchHandler mgr hs meth q addr |
@@ -501,7 +501,7 @@ handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do | |||
501 | 501 | ||
502 | handleResponse :: ( Ord (TransactionID msg) | 502 | handleResponse :: ( Ord (TransactionID msg) |
503 | , Envelope msg | 503 | , Envelope msg |
504 | ) => Manager IO raw msg -> raw -> KResult msg raw -> SockAddr -> IO () | 504 | ) => Manager raw msg -> raw -> KResult msg raw -> SockAddr -> IO () |
505 | handleResponse mgr@Manager{..} raw result addr = do | 505 | handleResponse mgr@Manager{..} raw result addr = do |
506 | liftIO $ do | 506 | liftIO $ do |
507 | let resultId = either errorId envelopeTransaction result | 507 | let resultId = either errorId envelopeTransaction result |
@@ -520,7 +520,7 @@ listener :: forall raw msg. | |||
520 | , Eq (QueryMethod msg) | 520 | , Eq (QueryMethod msg) |
521 | , Show (QueryMethod msg) | 521 | , Show (QueryMethod msg) |
522 | , Serialize (TransactionID msg) | 522 | , Serialize (TransactionID msg) |
523 | ) => Manager IO raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () | 523 | ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () |
524 | listener mgr@Manager{..} hs p = do | 524 | listener mgr@Manager{..} hs p = do |
525 | fix $ \again -> do | 525 | fix $ \again -> do |
526 | let ctx = error "TODO TOX ToxCipherContext or () for Mainline" | 526 | let ctx = error "TODO TOX ToxCipherContext or () for Mainline" |
@@ -551,7 +551,7 @@ listen :: ( WireFormat raw msg | |||
551 | , Eq (QueryMethod msg) | 551 | , Eq (QueryMethod msg) |
552 | , Show (QueryMethod msg) | 552 | , Show (QueryMethod msg) |
553 | , Serialize (TransactionID msg) | 553 | , Serialize (TransactionID msg) |
554 | ) => Manager IO raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () | 554 | ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () |
555 | listen mgr@Manager{..} hs p = do | 555 | listen mgr@Manager{..} hs p = do |
556 | tid <- fork $ do | 556 | tid <- fork $ do |
557 | myThreadId >>= liftIO . flip labelThread "KRPC.listen" | 557 | myThreadId >>= liftIO . flip labelThread "KRPC.listen" |