diff options
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 362 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 750 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 571 | ||||
-rw-r--r-- | src/Network/DHT.hs | 125 | ||||
-rw-r--r-- | src/Network/DHT/Mainline.hs | 579 | ||||
-rw-r--r-- | src/Network/DHT/Tox.hs | 112 | ||||
-rw-r--r-- | src/Network/DHT/Types.hs | 176 | ||||
-rw-r--r-- | src/Network/DatagramServer.hs | 608 | ||||
-rw-r--r-- | src/Network/DatagramServer/Error.hs | 68 | ||||
-rw-r--r-- | src/Network/DatagramServer/Mainline.hs | 404 | ||||
-rw-r--r-- | src/Network/DatagramServer/Tox.hs | 554 | ||||
-rw-r--r-- | src/Network/KRPC/Method.hs | 40 |
12 files changed, 0 insertions, 4349 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs deleted file mode 100644 index 1a67c7c4..00000000 --- a/src/Network/BitTorrent/DHT.hs +++ /dev/null | |||
@@ -1,362 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- BitTorrent uses a \"distributed sloppy hash table\" (DHT) for | ||
9 | -- storing peer contact information for \"trackerless\" torrents. In | ||
10 | -- effect, each peer becomes a tracker. | ||
11 | -- | ||
12 | -- Normally you don't need to import other DHT modules. | ||
13 | -- | ||
14 | -- For more info see: | ||
15 | -- <http://www.bittorrent.org/beps/bep_0005.html> | ||
16 | -- | ||
17 | {-# LANGUAGE FlexibleInstances #-} | ||
18 | {-# LANGUAGE FlexibleContexts #-} | ||
19 | {-# LANGUAGE TemplateHaskell #-} | ||
20 | {-# LANGUAGE TypeOperators #-} | ||
21 | {-# LANGUAGE ScopedTypeVariables #-} | ||
22 | {-# LANGUAGE CPP #-} | ||
23 | module Network.BitTorrent.DHT | ||
24 | ( -- * Distributed Hash Table | ||
25 | DHT | ||
26 | , Options (..) | ||
27 | , fullLogging | ||
28 | , dht | ||
29 | |||
30 | -- * Bootstrapping | ||
31 | -- $bootstrapping-terms | ||
32 | , tNodes | ||
33 | , defaultBootstrapNodes | ||
34 | , resolveHostName | ||
35 | , bootstrap | ||
36 | , isBootstrapped | ||
37 | |||
38 | -- * Initialization | ||
39 | , snapshot | ||
40 | |||
41 | -- * Operations | ||
42 | -- , Network.BitTorrent.DHT.lookup | ||
43 | , Network.BitTorrent.DHT.insert | ||
44 | , Network.BitTorrent.DHT.delete | ||
45 | |||
46 | -- * Embedding | ||
47 | -- ** Session | ||
48 | , LogFun | ||
49 | , Node | ||
50 | , defaultHandlers | ||
51 | , newNode | ||
52 | , closeNode | ||
53 | |||
54 | -- ** Monad | ||
55 | -- , MonadDHT (..) | ||
56 | , runDHT | ||
57 | ) where | ||
58 | |||
59 | import Control.Monad.Logger | ||
60 | import Control.Monad.Reader | ||
61 | import Control.Exception | ||
62 | import qualified Data.ByteString as BS | ||
63 | import Data.Conduit as C | ||
64 | import qualified Data.Conduit.List as C | ||
65 | import Data.Serialize | ||
66 | import Network.Socket | ||
67 | import Text.PrettyPrint.HughesPJClass as PP (pPrint,render) | ||
68 | |||
69 | import Data.Torrent | ||
70 | import Network.Address | ||
71 | import Network.BitTorrent.DHT.Query | ||
72 | import Network.BitTorrent.DHT.Session | ||
73 | import Network.DHT.Routing as T hiding (null) | ||
74 | import qualified Data.Text as Text | ||
75 | import Data.Typeable | ||
76 | import Data.Monoid | ||
77 | import Network.DatagramServer.Mainline (KMessageOf) | ||
78 | import qualified Network.DatagramServer as KRPC (listen, Protocol(..)) | ||
79 | import Network.DatagramServer.Types | ||
80 | import Network.DHT.Types | ||
81 | import Data.Bits | ||
82 | import Data.Default | ||
83 | import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) | ||
84 | import Network.KRPC.Method | ||
85 | |||
86 | {----------------------------------------------------------------------- | ||
87 | -- DHT types | ||
88 | -----------------------------------------------------------------------} | ||
89 | |||
90 | #if 0 | ||
91 | class MonadDHT m where | ||
92 | liftDHT :: DHT raw dht u IPv4 a -> m a | ||
93 | |||
94 | instance MonadDHT (DHT raw dht u IPv4) where | ||
95 | liftDHT = id | ||
96 | #endif | ||
97 | |||
98 | -- | Convenience method. Pass this to 'dht' to enable full logging. | ||
99 | fullLogging :: LogSource -> LogLevel -> Bool | ||
100 | fullLogging _ _ = True | ||
101 | |||
102 | -- | Run DHT on specified port. <add note about resources> | ||
103 | dht :: | ||
104 | ( Ord ip | ||
105 | , Address ip | ||
106 | , Functor dht | ||
107 | , Ord (NodeId dht) | ||
108 | , FiniteBits (NodeId dht) | ||
109 | , Serialize (NodeId dht) | ||
110 | , Show (NodeId dht) | ||
111 | , SerializableTo raw (Response dht (Ping dht)) | ||
112 | , SerializableTo raw (Query dht (Ping dht)) | ||
113 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
114 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
115 | , Ord (TransactionID dht) | ||
116 | , Serialize (TransactionID dht) | ||
117 | , Eq (QueryMethod dht) | ||
118 | , Show (QueryMethod dht) | ||
119 | , Pretty (NodeInfo dht ip u) | ||
120 | , Kademlia dht | ||
121 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
122 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
123 | , DataHandlers raw dht | ||
124 | , WireFormat raw dht | ||
125 | , Show u | ||
126 | , Default u | ||
127 | , Typeable dht | ||
128 | ) | ||
129 | => Options -- ^ normally you need to use 'Data.Default.def'; | ||
130 | -> NodeAddr ip -- ^ address to bind this node; | ||
131 | -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default | ||
132 | -> DHT raw dht u ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; | ||
133 | -> IO a -- ^ result. | ||
134 | dht opts addr logfilter action = do | ||
135 | runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do | ||
136 | bracket (newNode opts addr logger Nothing) closeNode $ | ||
137 | \ node -> runDHT node $ do | ||
138 | hs <- defaultHandlers logger | ||
139 | m <- asks manager | ||
140 | liftIO $ KRPC.listen m hs (KRPC.Protocol Proxy Proxy) | ||
141 | action | ||
142 | {-# INLINE dht #-} | ||
143 | |||
144 | {----------------------------------------------------------------------- | ||
145 | -- Bootstrapping | ||
146 | -----------------------------------------------------------------------} | ||
147 | -- $bootstrapping-terms | ||
148 | -- | ||
149 | -- [@Bootstrapping@] DHT @bootstrapping@ is the process of filling | ||
150 | -- routing 'Table' by /good/ nodes. | ||
151 | -- | ||
152 | -- [@Bootstrapping time@] Bootstrapping process can take up to 5 | ||
153 | -- minutes. Bootstrapping should only happen at first application | ||
154 | -- startup, if possible you should use 'snapshot' & 'restore' | ||
155 | -- mechanism which must work faster. | ||
156 | -- | ||
157 | -- [@Bootstrap nodes@] DHT @bootstrap node@ is either: | ||
158 | -- | ||
159 | -- * a specialized high performance node maintained by bittorrent | ||
160 | -- software authors\/maintainers, like those listed in | ||
161 | -- 'defaultBootstrapNodes'. /Specialized/ means that those nodes | ||
162 | -- may not support 'insert' queries and is running for the sake of | ||
163 | -- bootstrapping only. | ||
164 | -- | ||
165 | -- * an ordinary bittorrent client running DHT node. The list of | ||
166 | -- such bootstrapping nodes usually obtained from | ||
167 | -- 'Data.Torrent.tNodes' field or | ||
168 | -- 'Network.BitTorrent.Exchange.Message.Port' messages. | ||
169 | |||
170 | -- Do not include the following hosts in the default bootstrap nodes list: | ||
171 | -- | ||
172 | -- * "dht.aelitis.com" and "dht6.azureusplatform.com" - since | ||
173 | -- Azureus client have a different (and probably incompatible) DHT | ||
174 | -- protocol implementation. | ||
175 | -- | ||
176 | -- * "router.utorrent.com" since it is just an alias to | ||
177 | -- "router.bittorrent.com". | ||
178 | -- XXX: ignoring this advise as it resolves to a different | ||
179 | -- ip address for me. | ||
180 | |||
181 | -- | List of bootstrap nodes maintained by different bittorrent | ||
182 | -- software authors. | ||
183 | defaultBootstrapNodes :: [NodeAddr HostName] | ||
184 | defaultBootstrapNodes = | ||
185 | [ NodeAddr "router.bittorrent.com" 6881 -- by BitTorrent Inc. | ||
186 | |||
187 | -- doesn't work at the moment (use git blame) of commit | ||
188 | , NodeAddr "dht.transmissionbt.com" 6881 -- by Transmission project | ||
189 | |||
190 | , NodeAddr "router.utorrent.com" 6881 | ||
191 | ] | ||
192 | |||
193 | -- TODO Multihomed hosts | ||
194 | |||
195 | -- | Resolve either a numeric network address or a hostname to a | ||
196 | -- numeric IP address of the node. Usually used to resolve | ||
197 | -- 'defaultBootstrapNodes' or 'Data.Torrent.tNodes' lists. | ||
198 | resolveHostName :: NodeAddr HostName -> IO (NodeAddr IPv4) | ||
199 | resolveHostName NodeAddr {..} = do | ||
200 | let hints = defaultHints { addrFamily = AF_INET, addrSocketType = Datagram } | ||
201 | -- getAddrInfo throws exception on empty list, so the pattern matching never fail | ||
202 | info : _ <- getAddrInfo (Just hints) (Just nodeHost) (Just (show nodePort)) | ||
203 | case fromSockAddr (addrAddress info) of | ||
204 | Nothing -> error "resolveNodeAddr: impossible" | ||
205 | Just addr -> return addr | ||
206 | |||
207 | -- | One good node may be sufficient. | ||
208 | -- | ||
209 | -- This operation do block, use | ||
210 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | ||
211 | bootstrap :: forall raw dht u ip. | ||
212 | ( Ord ip | ||
213 | , Address ip | ||
214 | , Functor dht | ||
215 | , Ord (NodeId dht) | ||
216 | , FiniteBits (NodeId dht) | ||
217 | , Serialize (NodeId dht) | ||
218 | , Show (NodeId dht) | ||
219 | , Pretty (NodeId dht) | ||
220 | , SerializableTo raw (Response dht (Ping dht)) | ||
221 | , SerializableTo raw (Query dht (Ping dht)) | ||
222 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
223 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
224 | , Ord (TransactionID dht) | ||
225 | , Serialize (TransactionID dht) | ||
226 | , Eq (QueryMethod dht) | ||
227 | , Show (QueryMethod dht) | ||
228 | , Pretty (NodeInfo dht ip u) | ||
229 | , Kademlia dht | ||
230 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
231 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
232 | , DataHandlers raw dht | ||
233 | , WireFormat raw dht | ||
234 | , Show u | ||
235 | , Default u | ||
236 | , Serialize u | ||
237 | ) => Maybe BS.ByteString -> [PacketDestination dht] -> DHT raw dht u ip () | ||
238 | bootstrap mbs startNodes = do | ||
239 | restored <- | ||
240 | case decode <$> mbs of | ||
241 | Just (Right tbl) -> return (T.toList tbl) | ||
242 | Just (Left e) -> do $(logWarnS) "restore" (Text.pack e) | ||
243 | return [] | ||
244 | Nothing -> return [] | ||
245 | |||
246 | $(logInfoS) "bootstrap" "Start node bootstrapping" | ||
247 | let searchAll aliveNodes = do | ||
248 | nid <- myNodeIdAccordingTo (error "FIXME") | ||
249 | ns <- bgsearch ioFindNodes nid | ||
250 | return ( ns :: [NodeInfo dht ip u] ) | ||
251 | input_nodes <- (restored ++) . T.toList <$> getTable | ||
252 | -- Step 1: Use iterative searches to flesh out the table.. | ||
253 | do let knowns = map (map $ fst) input_nodes | ||
254 | -- Below, we reverse the nodes since the table serialization puts the | ||
255 | -- nearest nodes last and we want to choose a similar node id to bootstrap | ||
256 | -- faster. | ||
257 | (alive_knowns,_) <- unzip <$> queryParallel (pingQ <$> reverse (concat knowns)) | ||
258 | b <- isBootstrapped | ||
259 | -- If our cached nodes are alive and our IP address did not change, it's possible | ||
260 | -- we are already bootsrapped, so no need to do any searches. | ||
261 | when (not b) $ do | ||
262 | ns <- searchAll $ take 2 alive_knowns | ||
263 | -- We only use the supplied bootstrap nodes when we don't know of any | ||
264 | -- others to try. | ||
265 | when (null ns) $ do | ||
266 | -- TODO filter duplicated in startNodes list | ||
267 | -- TODO retransmissions for startNodes | ||
268 | (aliveNodes,_) <- unzip <$> queryParallel (coldPingQ <$> startNodes) | ||
269 | _ <- searchAll $ take 2 aliveNodes | ||
270 | return () | ||
271 | -- Step 2: Repeatedly refresh incomplete buckets until the table is full. | ||
272 | maxbuckets <- asks $ optBucketCount . options | ||
273 | flip fix 0 $ \loop icnt -> do | ||
274 | tbl <- getTable | ||
275 | let unfull = filter ((/=defaultBucketSize) . snd) | ||
276 | us = zip | ||
277 | -- is_last = True for the last bucket | ||
278 | (True:repeat False) | ||
279 | -- Only non-full buckets unless it is the last one and the | ||
280 | -- maximum number of buckets has not been reached. | ||
281 | $ case reverse $ zip [0..] $ T.shape tbl of | ||
282 | p@(n,_):ps | n+1==maxbuckets -> unfull (p:ps) | ||
283 | p:ps -> p:unfull ps | ||
284 | [] -> [] | ||
285 | forM_ us $ \(is_last,(index,_)) -> do | ||
286 | nid <- myNodeIdAccordingTo (error "FIXME") | ||
287 | sample <- liftIO $ genBucketSample nid (bucketRange index is_last) | ||
288 | $(logDebugS) "bootstrapping" | ||
289 | $ "BOOTSTRAP sample" | ||
290 | <> Text.pack (show (is_last,index,T.shape tbl)) | ||
291 | <> " " <> Text.pack (render $ pPrint sample) | ||
292 | refreshNodes sample | ||
293 | $(logDebugS) "bootstrapping" | ||
294 | $ "BOOTSTRAP finished iteration " | ||
295 | <> Text.pack (show (icnt,T.shape tbl,us,defaultBucketSize)) | ||
296 | when (not (null us) && icnt < div (3*maxbuckets) 2) | ||
297 | $ loop (succ icnt) | ||
298 | $(logInfoS) "bootstrap" "Node bootstrapping finished" | ||
299 | |||
300 | -- | Check if this node is already bootstrapped. | ||
301 | -- @bootstrap [good_node] >> isBootstrapped@@ should always return 'True'. | ||
302 | -- | ||
303 | -- This operation do not block. | ||
304 | -- | ||
305 | isBootstrapped :: Eq ip => DHT raw dht u ip Bool | ||
306 | isBootstrapped = T.full <$> getTable | ||
307 | |||
308 | {----------------------------------------------------------------------- | ||
309 | -- Initialization | ||
310 | -----------------------------------------------------------------------} | ||
311 | |||
312 | -- | Serialize current DHT session to byte string. | ||
313 | -- | ||
314 | -- This is blocking operation, use | ||
315 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | ||
316 | snapshot :: ( Address ip | ||
317 | , Ord (NodeId dht) | ||
318 | , Serialize u | ||
319 | , Serialize (NodeId dht) | ||
320 | ) => DHT raw dht u ip BS.ByteString | ||
321 | snapshot = do | ||
322 | tbl <- getTable | ||
323 | return $ encode tbl | ||
324 | |||
325 | {----------------------------------------------------------------------- | ||
326 | -- Operations | ||
327 | -----------------------------------------------------------------------} | ||
328 | |||
329 | #if 0 | ||
330 | |||
331 | -- | Get list of peers which downloading this torrent. | ||
332 | -- | ||
333 | -- This operation is incremental and do block. | ||
334 | -- | ||
335 | lookup :: Address ip => InfoHash -> DHT raw dht u ip `C.Source` [PeerAddr ip] | ||
336 | lookup topic = do -- TODO retry getClosest if bucket is empty | ||
337 | closest <- lift $ getClosest topic | ||
338 | C.sourceList [closest] $= search topic (getPeersQ topic) | ||
339 | |||
340 | #endif | ||
341 | |||
342 | -- TODO do not republish if the topic is already in announceSet | ||
343 | |||
344 | -- | Announce that /this/ peer may have some pieces of the specified | ||
345 | -- torrent. DHT will reannounce this data periodically using | ||
346 | -- 'optReannounce' interval. | ||
347 | -- | ||
348 | -- This operation is synchronous and do block, use | ||
349 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | ||
350 | -- | ||
351 | insert :: Address ip => InfoHash -> PortNumber -> DHT raw dht u ip () | ||
352 | insert ih p = do | ||
353 | publish ih p | ||
354 | insertTopic ih p | ||
355 | |||
356 | -- | Stop announcing /this/ peer for the specified torrent. | ||
357 | -- | ||
358 | -- This operation is atomic and may block for a while. | ||
359 | -- | ||
360 | delete :: InfoHash -> PortNumber -> DHT raw dht u ip () | ||
361 | delete = deleteTopic | ||
362 | {-# INLINE delete #-} | ||
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs deleted file mode 100644 index 003bb5b9..00000000 --- a/src/Network/BitTorrent/DHT/Query.hs +++ /dev/null | |||
@@ -1,750 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module provides functions to interact with other nodes. | ||
9 | -- Normally, you don't need to import this module, use | ||
10 | -- "Network.BitTorrent.DHT" instead. | ||
11 | -- | ||
12 | {-# LANGUAGE CPP #-} | ||
13 | {-# LANGUAGE FlexibleContexts #-} | ||
14 | {-# LANGUAGE ScopedTypeVariables #-} | ||
15 | {-# LANGUAGE TemplateHaskell #-} | ||
16 | {-# LANGUAGE TupleSections #-} | ||
17 | {-# LANGUAGE PartialTypeSignatures #-} | ||
18 | {-# LANGUAGE GADTs #-} | ||
19 | {-# LANGUAGE RankNTypes #-} | ||
20 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
21 | module Network.BitTorrent.DHT.Query | ||
22 | ( -- * Handler | ||
23 | -- | To bind specific set of handlers you need to pass | ||
24 | -- handler list to the 'startNode' function. | ||
25 | pingH | ||
26 | , findNodeH | ||
27 | , getPeersH | ||
28 | , announceH | ||
29 | , defaultHandlers | ||
30 | |||
31 | -- * Query | ||
32 | -- ** Basic | ||
33 | -- | A basic query perform a single request expecting a | ||
34 | -- single response. | ||
35 | , Iteration | ||
36 | , pingQ | ||
37 | , coldPingQ | ||
38 | , findNodeQ | ||
39 | , getPeersQ | ||
40 | , announceQ | ||
41 | |||
42 | -- ** Iterative | ||
43 | -- | An iterative query perform multiple basic queries, | ||
44 | -- concatenate its responses, optionally yielding result and | ||
45 | -- continue to the next iteration. | ||
46 | , Search | ||
47 | -- , search | ||
48 | , publish | ||
49 | , ioFindNode | ||
50 | , ioFindNodes | ||
51 | , ioGetPeers | ||
52 | , isearch | ||
53 | , bgsearch | ||
54 | |||
55 | -- ** Routing table | ||
56 | , insertNode | ||
57 | , refreshNodes | ||
58 | |||
59 | -- ** Messaging | ||
60 | , queryNode | ||
61 | , queryNode' | ||
62 | , (<@>) | ||
63 | ) where | ||
64 | |||
65 | import Data.Bits | ||
66 | import Data.Default | ||
67 | #ifdef THREAD_DEBUG | ||
68 | import Control.Concurrent.Lifted.Instrument hiding (yield) | ||
69 | #else | ||
70 | import GHC.Conc (labelThread) | ||
71 | import Control.Concurrent.Lifted hiding (yield) | ||
72 | #endif | ||
73 | import Control.Exception.Lifted hiding (Handler) | ||
74 | import Control.Monad.Reader | ||
75 | import Control.Monad.Logger | ||
76 | import Data.Maybe | ||
77 | import Data.Conduit | ||
78 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) | ||
79 | import Data.Either | ||
80 | import Data.List as L | ||
81 | import Data.Monoid | ||
82 | import Data.Text as T | ||
83 | import qualified Data.Set as Set | ||
84 | ;import Data.Set (Set) | ||
85 | import Network | ||
86 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | ||
87 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
88 | import Data.Time | ||
89 | import Data.Time.Clock.POSIX | ||
90 | import Data.Hashable (Hashable) | ||
91 | import Data.Serialize | ||
92 | import Data.Hashable | ||
93 | |||
94 | import Network.DatagramServer as KRPC hiding (Options, def) | ||
95 | import Network.KRPC.Method as KRPC | ||
96 | import Network.DatagramServer.Mainline (ReflectedIP(..), QueryExtra(..), ResponseExtra(..)) | ||
97 | import Network.DatagramServer (QueryFailure(..)) | ||
98 | import Data.Torrent | ||
99 | import qualified Network.DHT as DHT | ||
100 | import Network.DHT.Mainline | ||
101 | import Network.DHT.Routing as R | ||
102 | import Network.BitTorrent.DHT.Session | ||
103 | import Control.Concurrent.STM | ||
104 | import qualified Network.BitTorrent.DHT.Search as Search | ||
105 | #ifdef VERSION_bencoding | ||
106 | import Data.BEncode (BValue) | ||
107 | import Network.DatagramServer.Mainline (KMessageOf) | ||
108 | #else | ||
109 | import Data.ByteString (ByteString) | ||
110 | import Network.DatagramServer.Tox | ||
111 | #endif | ||
112 | import Network.Address hiding (NodeId) | ||
113 | import Network.DatagramServer.Types as RPC hiding (Query,Response) | ||
114 | import Network.DHT.Types | ||
115 | import Control.Monad.Trans.Control | ||
116 | import Data.Typeable | ||
117 | import Data.Serialize | ||
118 | import System.IO.Unsafe (unsafeInterleaveIO) | ||
119 | import Data.String | ||
120 | |||
121 | |||
122 | {----------------------------------------------------------------------- | ||
123 | -- Handlers | ||
124 | -----------------------------------------------------------------------} | ||
125 | |||
126 | {- | ||
127 | nodeHandler :: ( Address ip | ||
128 | , KRPC dht (Query KMessageOf a) (Response KMessageOf b) | ||
129 | ) | ||
130 | => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler | ||
131 | -} | ||
132 | nodeHandler :: forall raw dht addr u t q r. | ||
133 | (Address addr, WireFormat raw dht, Pretty (NodeInfo dht addr u), | ||
134 | Default u, | ||
135 | IsString t, Functor dht, | ||
136 | KRPC dht (Query dht q) (Response dht r), | ||
137 | SerializableTo raw (Response dht r), | ||
138 | SerializableTo raw (Query dht q), | ||
139 | Show (QueryMethod dht)) => | ||
140 | (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ()) | ||
141 | -> (NodeAddr addr -> IO (NodeId dht)) | ||
142 | -> (Char -> t -> Text -> IO ()) | ||
143 | -> DHTData dht addr | ||
144 | -> QueryMethod dht | ||
145 | -> (NodeAddr addr -> q -> IO r) | ||
146 | -> Handler IO dht raw | ||
147 | nodeHandler insertNode myNodeIdAccordingTo logm dta method action = handler (\sockaddr -> myNodeIdAccordingTo (error "todo")) method $ \ sockAddr msg -> do | ||
148 | let remoteId = messageSender (msg :: dht (Query dht q)) resptype | ||
149 | qextra = queryExtra qry | ||
150 | resptype = Proxy :: Proxy (Response dht r) | ||
151 | q = queryParams qry | ||
152 | qry = envelopePayload msg :: Query dht q | ||
153 | case fromSockAddr sockAddr of | ||
154 | Nothing -> throwIO BadAddress | ||
155 | Just naddr -> do | ||
156 | logm 'D' "nodeHandler" $ "Received query: " <> T.pack (show $ method) | ||
157 | me <- myNodeIdAccordingTo naddr | ||
158 | rextra <- liftIO $ makeResponseExtra dta me qry resptype | ||
159 | let ni = NodeInfo remoteId naddr def | ||
160 | -- Do not route read-only nodes. (bep 43) | ||
161 | if fromRoutableNode qextra | ||
162 | then insertNode ni Nothing >> return () -- TODO need to block. why? | ||
163 | else logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) | ||
164 | Response | ||
165 | <$> pure rextra | ||
166 | <*> action naddr q | ||
167 | |||
168 | -- | Default 'Ping' handler. | ||
169 | pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht) | ||
170 | pingH dht _ _ = return (DHT.pongMessage dht) | ||
171 | -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } | ||
172 | |||
173 | -- | Default 'FindNode' handler. | ||
174 | findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip) | ||
175 | findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg) | ||
176 | |||
177 | -- | Default 'GetPeers' handler. | ||
178 | getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) | ||
179 | getPeersH getPeerList toks naddr (GetPeers ih) = do | ||
180 | ps <- getPeerList ih | ||
181 | tok <- grantToken toks naddr | ||
182 | return $ GotPeers ps tok | ||
183 | |||
184 | -- | Default 'Announce' handler. | ||
185 | announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced | ||
186 | announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do | ||
187 | valid <- checkToken toks naddr sessionToken | ||
188 | unless valid $ do | ||
189 | throwIO $ InvalidParameter "token" | ||
190 | |||
191 | let annPort = if impliedPort then nodePort else port | ||
192 | peerAddr = PeerAddr Nothing nodeHost annPort | ||
193 | insertPeer peers topic announcedName peerAddr | ||
194 | return Announced | ||
195 | |||
196 | -- | Includes all Kademlia-related handlers. | ||
197 | kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip | ||
198 | , Ord (TransactionID dht) | ||
199 | , Ord (NodeId dht) | ||
200 | , Show u | ||
201 | , SerializableTo raw (Response dht (Ping dht)) | ||
202 | , SerializableTo raw (Query dht (Ping dht)) | ||
203 | , Show (QueryMethod dht) | ||
204 | , Show (NodeId dht) | ||
205 | , FiniteBits (NodeId dht) | ||
206 | , Default u | ||
207 | , Serialize (TransactionID dht) | ||
208 | , WireFormat raw dht | ||
209 | , Kademlia dht | ||
210 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
211 | , Functor dht | ||
212 | , Pretty (NodeInfo dht ip u) | ||
213 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
214 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
215 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
216 | ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] | ||
217 | -- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] | ||
218 | kademliaHandlers logger = do | ||
219 | groknode <- insertNode1 | ||
220 | mynid <- myNodeIdAccordingTo1 | ||
221 | dta <- asks dhtData | ||
222 | let handler :: ( KRPC dht (Query dht a) (Response dht b) | ||
223 | , SerializableTo raw (Response dht b) | ||
224 | , SerializableTo raw (Query dht a) | ||
225 | ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw | ||
226 | handler = nodeHandler groknode mynid (logt logger) dta | ||
227 | dht = Proxy :: Proxy dht | ||
228 | getclosest <- getClosest1 | ||
229 | return [ handler (namePing dht) $ pingH dht | ||
230 | , handler (nameFindNodes dht) $ findNodeH getclosest | ||
231 | ] | ||
232 | |||
233 | instance DataHandlers BValue KMessageOf where | ||
234 | dataHandlers = bthandlers | ||
235 | |||
236 | bthandlers :: | ||
237 | ( Ord ip , Hashable ip, Typeable ip, Serialize ip) => | ||
238 | (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) | ||
239 | -> DHTData KMessageOf ip | ||
240 | -> [MethodHandler BValue KMessageOf ip] | ||
241 | bthandlers getclosest dta = | ||
242 | [ MethodHandler "get_peers" $ getPeersH (getpeers dta) (sessionTokens dta) | ||
243 | , MethodHandler "announce_peer" $ announceH (contactInfo dta) (sessionTokens dta) | ||
244 | ] | ||
245 | where | ||
246 | getpeers dta ih = do | ||
247 | ps <- lookupPeers (contactInfo dta) ih | ||
248 | if L.null ps | ||
249 | then Left <$> getclosest (toNodeId ih) | ||
250 | else return (Right ps) | ||
251 | |||
252 | |||
253 | -- | Includes all default query handlers. | ||
254 | defaultHandlers :: forall raw dht u ip. | ||
255 | ( Ord (TransactionID dht) | ||
256 | , Ord (NodeId dht) | ||
257 | , Show u | ||
258 | , SerializableTo raw (Response dht (Ping dht)) | ||
259 | , SerializableTo raw (Query dht (Ping dht)) | ||
260 | , Show (QueryMethod dht) | ||
261 | , Show (NodeId dht) | ||
262 | , FiniteBits (NodeId dht) | ||
263 | , Default u | ||
264 | , Serialize (TransactionID dht) | ||
265 | , WireFormat raw dht | ||
266 | , Kademlia dht | ||
267 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
268 | , Functor dht | ||
269 | , Pretty (NodeInfo dht ip u) | ||
270 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
271 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
272 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
273 | , Eq ip, Ord ip, Address ip, DataHandlers raw dht | ||
274 | ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] | ||
275 | defaultHandlers logger = do | ||
276 | groknode <- insertNode1 | ||
277 | mynid <- myNodeIdAccordingTo1 | ||
278 | dta <- asks dhtData | ||
279 | let handler :: MethodHandler raw dht ip -> Handler IO dht raw | ||
280 | handler (MethodHandler name action) = nodeHandler groknode mynid (logt logger) dta name action | ||
281 | getclosest <- getClosest1 | ||
282 | hs <- kademliaHandlers logger | ||
283 | return $ hs ++ L.map handler (dataHandlers (fmap (fmap (fmap (const ()))) . getclosest) dta) | ||
284 | |||
285 | {----------------------------------------------------------------------- | ||
286 | -- Basic queries | ||
287 | -----------------------------------------------------------------------} | ||
288 | |||
289 | type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip]) | ||
290 | |||
291 | -- | The most basic query. May be used to check if the given node is | ||
292 | -- alive or get its 'NodeId'. | ||
293 | pingQ :: forall raw dht u ip. | ||
294 | ( DHT.Kademlia dht | ||
295 | , Address ip | ||
296 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
297 | , Default u | ||
298 | , Show u | ||
299 | , Ord (TransactionID dht) | ||
300 | , Serialize (TransactionID dht) | ||
301 | , WireFormat raw dht | ||
302 | , SerializableTo raw (Response dht (Ping dht)) | ||
303 | , SerializableTo raw (Query dht (Ping dht)) | ||
304 | , Ord (NodeId dht) | ||
305 | , FiniteBits (NodeId dht) | ||
306 | , Show (NodeId dht) | ||
307 | , Show (QueryMethod dht) | ||
308 | ) => NodeInfo dht ip u -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) | ||
309 | pingQ ni = do | ||
310 | let ping = DHT.pingMessage (Proxy :: Proxy dht) | ||
311 | (nid, pong, mip) <- queryNode' ni ping | ||
312 | let _ = pong `asTypeOf` ping | ||
313 | -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} | ||
314 | return (NodeInfo nid (nodeAddr ni) def, mip) | ||
315 | |||
316 | -- | The most basic query. May be used to check if the given node is | ||
317 | -- alive or get its 'NodeId'. | ||
318 | coldPingQ :: forall raw dht u ip. | ||
319 | ( DHT.Kademlia dht | ||
320 | , Address ip | ||
321 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
322 | , Default u | ||
323 | , Show u | ||
324 | , Ord (TransactionID dht) | ||
325 | , Serialize (TransactionID dht) | ||
326 | , WireFormat raw dht | ||
327 | , SerializableTo raw (Response dht (Ping dht)) | ||
328 | , SerializableTo raw (Query dht (Ping dht)) | ||
329 | , Ord (NodeId dht) | ||
330 | , FiniteBits (NodeId dht) | ||
331 | , Show (NodeId dht) | ||
332 | , Show (QueryMethod dht) | ||
333 | ) => PacketDestination dht -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) | ||
334 | coldPingQ dest = do | ||
335 | let ping = DHT.pingMessage (Proxy :: Proxy dht) | ||
336 | naddr <- maybe (throwIO $ QueryFailed ProtocolError "unable to construct NodeAddr from PacketDestination") | ||
337 | return | ||
338 | $ fromAddr dest | ||
339 | (nid, pong, mip) <- coldQueryNode' naddr dest ping | ||
340 | let _ = pong `asTypeOf` ping | ||
341 | -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} | ||
342 | return (NodeInfo nid naddr def, mip) | ||
343 | |||
344 | -- TODO [robustness] match range of returned node ids with the | ||
345 | -- expected range and either filter bad nodes or discard response at | ||
346 | -- all throwing an exception | ||
347 | -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo | ||
348 | findNodeQ proxy key ni = do | ||
349 | closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> ni | ||
350 | $(logInfoS) "findNodeQ" $ "NodeFound\n" | ||
351 | <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) | ||
352 | return $ Right closest | ||
353 | |||
354 | #ifdef VERSION_bencoding | ||
355 | getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr | ||
356 | getPeersQ topic ni = do | ||
357 | GotPeers {..} <- GetPeers topic <@> ni | ||
358 | let dist = distance (toNodeId topic) (nodeId ni) | ||
359 | $(logInfoS) "getPeersQ" $ T.pack | ||
360 | $ "distance: " <> render (pPrint dist) <> " , result: " | ||
361 | <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } | ||
362 | return peers | ||
363 | |||
364 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr | ||
365 | announceQ ih p ni = do | ||
366 | GotPeers {..} <- GetPeers ih <@> ni | ||
367 | case peers of | ||
368 | Left ns | ||
369 | | False -> undefined -- TODO check if we can announce | ||
370 | | otherwise -> return (Left ns) | ||
371 | Right _ -> do -- TODO *probably* add to peer cache | ||
372 | Announced <- Announce False ih Nothing p grantedToken <@> ni | ||
373 | return (Right [nodeAddr ni]) | ||
374 | #endif | ||
375 | |||
376 | {----------------------------------------------------------------------- | ||
377 | -- Iterative queries | ||
378 | -----------------------------------------------------------------------} | ||
379 | |||
380 | |||
381 | ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) | ||
382 | ioGetPeers ih = do | ||
383 | session <- ask | ||
384 | return $ \ni -> runDHT session $ do | ||
385 | r <- try $ getPeersQ ih ni | ||
386 | case r of | ||
387 | Right e -> return $ either (,[]) ([],) e | ||
388 | Left e -> let _ = e :: QueryFailure in return ([],[]) | ||
389 | |||
390 | ioFindNode :: ( DHT.Kademlia dht | ||
391 | , WireFormat raw dht | ||
392 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
393 | , Address ip | ||
394 | , Default u | ||
395 | , Show u | ||
396 | , Show (QueryMethod dht) | ||
397 | , TableKey dht infohash | ||
398 | , Eq (NodeId dht) | ||
399 | , Ord (NodeId dht) | ||
400 | , FiniteBits (NodeId dht) | ||
401 | , Show (NodeId dht) | ||
402 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
403 | , Ord (TransactionID dht) | ||
404 | , Serialize (TransactionID dht) | ||
405 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
406 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
407 | , SerializableTo raw (Response dht (Ping dht)) | ||
408 | , SerializableTo raw (Query dht (Ping dht)) | ||
409 | ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) | ||
410 | ioFindNode ih = do | ||
411 | session <- ask | ||
412 | return $ \ni -> runDHT session $ do | ||
413 | ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni | ||
414 | let ns' = L.map (fmap (const def)) ns | ||
415 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns' | ||
416 | |||
417 | |||
418 | -- | Like ioFindNode, but considers all found nodes to be 'Right' results. | ||
419 | ioFindNodes :: ( DHT.Kademlia dht | ||
420 | , WireFormat raw dht | ||
421 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
422 | , Address ip | ||
423 | , Default u | ||
424 | , Show u | ||
425 | , Show (QueryMethod dht) | ||
426 | , TableKey dht infohash | ||
427 | , Eq (NodeId dht) | ||
428 | , Ord (NodeId dht) | ||
429 | , FiniteBits (NodeId dht) | ||
430 | , Show (NodeId dht) | ||
431 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
432 | , Ord (TransactionID dht) | ||
433 | , Serialize (TransactionID dht) | ||
434 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
435 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
436 | , SerializableTo raw (Response dht (Ping dht)) | ||
437 | , SerializableTo raw (Query dht (Ping dht)) | ||
438 | ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) | ||
439 | ioFindNodes ih = do | ||
440 | session <- ask | ||
441 | return $ \ni -> runDHT session $ do | ||
442 | ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni | ||
443 | let ns' = L.map (fmap (const def)) ns | ||
444 | return ([], ns') | ||
445 | |||
446 | isearch :: ( Ord r | ||
447 | , Ord ip | ||
448 | , Ord (NodeId dht) | ||
449 | , FiniteBits (NodeId dht) | ||
450 | , TableKey dht ih | ||
451 | , Show ih) => | ||
452 | (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]))) | ||
453 | -> ih | ||
454 | -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r) | ||
455 | isearch f ih = do | ||
456 | qry <- f ih | ||
457 | ns <- kclosest 8 ih <$> getTable | ||
458 | liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns | ||
459 | a <- fork $ do | ||
460 | tid <- myThreadId | ||
461 | labelThread tid ("search."++show ih) | ||
462 | Search.search s | ||
463 | -- atomically \$ readTVar (Search.searchResults s) | ||
464 | return (a, s) | ||
465 | |||
466 | -- | Background search: fill a lazy list using a background thread. | ||
467 | bgsearch f ih = do | ||
468 | (tid, s) <- isearch f ih | ||
469 | let again shown = do | ||
470 | (chk,fin) <- atomically $ do | ||
471 | r <- (Set.\\ shown) <$> readTVar (Search.searchResults s) | ||
472 | if not $ Set.null r | ||
473 | then (,) r <$> Search.searchIsFinished s | ||
474 | else Search.searchIsFinished s >>= check >> return (Set.empty,True) | ||
475 | let ps = Set.toList chk | ||
476 | if fin then return ps | ||
477 | else do | ||
478 | xs <- unsafeInterleaveIO $ again (shown `Set.union` chk) | ||
479 | return $ ps ++ xs | ||
480 | liftIO $ again Set.empty | ||
481 | |||
482 | type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u] | ||
483 | |||
484 | #if 0 | ||
485 | |||
486 | -- TODO: use reorder and filter (Traversal option) leftovers | ||
487 | -- search :: k -> IterationI ip o -> Search ip o | ||
488 | search _ action = do | ||
489 | awaitForever $ \ batch -> unless (L.null batch) $ do | ||
490 | $(logWarnS) "search" "start query" | ||
491 | responses <- lift $ queryParallel (action <$> batch) | ||
492 | let (nodes, results) = partitionEithers responses | ||
493 | $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) | ||
494 | leftover $ L.concat nodes | ||
495 | let r = mapM_ yield results | ||
496 | _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ()) | ||
497 | r | ||
498 | |||
499 | #endif | ||
500 | |||
501 | publish = error "todo" | ||
502 | -- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip () | ||
503 | -- publish ih p = do | ||
504 | -- nodes <- getClosest ih | ||
505 | -- r <- asks (optReplication . options) | ||
506 | -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
507 | -- return () | ||
508 | |||
509 | |||
510 | probeNode :: ( Default u | ||
511 | , Show u | ||
512 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
513 | , DHT.Kademlia dht | ||
514 | , Address ip | ||
515 | , Ord (TransactionID dht) | ||
516 | , Serialize (TransactionID dht) | ||
517 | , WireFormat raw dht | ||
518 | , SerializableTo raw (Response dht (Ping dht)) | ||
519 | , SerializableTo raw (Query dht (Ping dht)) | ||
520 | , Ord (NodeId dht) | ||
521 | , FiniteBits (NodeId dht) | ||
522 | , Show (NodeId dht) | ||
523 | , Show (QueryMethod dht) | ||
524 | ) => NodeInfo dht ip u -> DHT raw dht u ip (Bool , Maybe ReflectedIP) | ||
525 | probeNode addr = do | ||
526 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint $ nodeAddr addr))) | ||
527 | result <- try $ pingQ addr | ||
528 | let _ = fmap (const ()) result :: Either QueryFailure () | ||
529 | return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result | ||
530 | |||
531 | |||
532 | refreshNodes :: forall raw dht u ip. | ||
533 | ( Address ip | ||
534 | , Ord (NodeId dht) | ||
535 | , Default u | ||
536 | , FiniteBits (NodeId dht) | ||
537 | , Pretty (NodeId dht) | ||
538 | , DHT.Kademlia dht | ||
539 | , Ord ip | ||
540 | , Ord (TransactionID dht) | ||
541 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
542 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
543 | , SerializableTo raw (Response dht (Ping dht)) | ||
544 | , SerializableTo raw (Query dht (Ping dht)) | ||
545 | , Pretty (NodeInfo dht ip u) | ||
546 | , Show (NodeId dht) | ||
547 | , Show u | ||
548 | , Show (QueryMethod dht) | ||
549 | , Serialize (TransactionID dht) | ||
550 | , WireFormat raw dht | ||
551 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
552 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
553 | ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()] | ||
554 | -- FIXME do not use getClosest sinse we should /refresh/ them | ||
555 | refreshNodes nid = do | ||
556 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) | ||
557 | nodes <- getClosest nid | ||
558 | do | ||
559 | -- forM (L.take 1 nodes) \$ \ addr -> do | ||
560 | -- NodeFound ns <- FindNode nid <@> addr | ||
561 | -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () | ||
562 | -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () | ||
563 | -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume | ||
564 | -- nss <- sourceList [nodes] \$= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume | ||
565 | ns <- bgsearch ioFindNodes nid | ||
566 | $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length ns)) <> " nodes." | ||
567 | _ <- queryParallel $ flip L.map ns $ \n -> do | ||
568 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) | ||
569 | pingQ n | ||
570 | -- pingQ takes care of inserting the node. | ||
571 | return () | ||
572 | return () -- \$ L.concat nss | ||
573 | |||
574 | logc :: Char -> String -> DHT raw dht u ip () | ||
575 | logc 'D' = $(logDebugS) "insertNode" . T.pack | ||
576 | logc 'W' = $(logWarnS) "insertNode" . T.pack | ||
577 | logc 'I' = $(logInfoS) "insertNode" . T.pack | ||
578 | logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) | ||
579 | |||
580 | -- | This operation do not block but acquire exclusive access to | ||
581 | -- routing table. | ||
582 | insertNode :: forall raw dht u ip. | ||
583 | ( Address ip | ||
584 | , Ord (NodeId dht) | ||
585 | , FiniteBits (NodeId dht) | ||
586 | , Show (NodeId dht) | ||
587 | , Default u | ||
588 | , Show u | ||
589 | , DHT.Kademlia dht | ||
590 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
591 | , Ord (TransactionID dht) | ||
592 | , WireFormat raw dht | ||
593 | , Serialize (TransactionID dht) | ||
594 | , SerializableTo raw (Response dht (Ping dht)) | ||
595 | , SerializableTo raw (Query dht (Ping dht)) | ||
596 | , Ord (NodeId dht) | ||
597 | , Show (NodeId dht) | ||
598 | , Show (QueryMethod dht) | ||
599 | ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip () | ||
600 | insertNode info witnessed_ip0 = do | ||
601 | f <- insertNode1 | ||
602 | liftIO $ f info witnessed_ip0 | ||
603 | |||
604 | insertNode1 :: forall raw dht u ip. | ||
605 | ( Address ip | ||
606 | , Default u | ||
607 | , Show u | ||
608 | , Ord (NodeId dht) | ||
609 | , FiniteBits (NodeId dht) | ||
610 | , Show (NodeId dht) | ||
611 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
612 | , DHT.Kademlia dht | ||
613 | , Ord (TransactionID dht) | ||
614 | , WireFormat raw dht | ||
615 | , Serialize (TransactionID dht) | ||
616 | , SerializableTo raw (Response dht (Ping dht)) | ||
617 | , SerializableTo raw (Query dht (Ping dht)) | ||
618 | , Ord (NodeId dht) | ||
619 | , Show (NodeId dht) | ||
620 | , Show (QueryMethod dht) | ||
621 | ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) | ||
622 | insertNode1 = do | ||
623 | bc <- optBucketCount <$> asks options | ||
624 | nid <- asks tentativeNodeId | ||
625 | logm0 <- embed_ (uncurry logc) | ||
626 | let logm c = logm0 . (c,) | ||
627 | dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. | ||
628 | probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) | ||
629 | let probe n = probe0 n >>= runDHT dht_node_state . restoreM | ||
630 | {- | ||
631 | changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive | ||
632 | ip <- fromSockAddr ip0 :: Maybe ip | ||
633 | listToMaybe | ||
634 | $ rank id (nodeId $ foreignNode arrival) | ||
635 | $ bep42s ip (DHT.fallbackID params) -- warning: recursive | ||
636 | -} | ||
637 | params = DHT.TableParameters | ||
638 | { maxBuckets = bc :: Int | ||
639 | , fallbackID = nid :: NodeId dht | ||
640 | , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht | ||
641 | , logMessage = logm :: Char -> String -> IO () | ||
642 | , pingProbe = probe :: NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP) | ||
643 | } | ||
644 | tbl <- asks routingInfo | ||
645 | let state = DHT.TableKeeper | ||
646 | { routingInfo = tbl | ||
647 | , grokNode = DHT.insertNode params state | ||
648 | , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO () | ||
649 | } | ||
650 | return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 | ||
651 | |||
652 | -- | Throws exception if node is not responding. | ||
653 | queryNode :: forall raw dht u a b ip. | ||
654 | ( Address ip | ||
655 | , KRPC dht (Query dht a) (Response dht b) | ||
656 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
657 | , Default u | ||
658 | , Show u | ||
659 | , DHT.Kademlia dht | ||
660 | , Ord (TransactionID dht) | ||
661 | , Serialize (TransactionID dht) | ||
662 | , WireFormat raw dht | ||
663 | , SerializableTo raw (Response dht b) | ||
664 | , SerializableTo raw (Query dht a) | ||
665 | , Ord (NodeId dht) | ||
666 | , FiniteBits (NodeId dht) | ||
667 | , Show (NodeId dht) | ||
668 | , Show (QueryMethod dht) | ||
669 | , SerializableTo raw (Response dht (Ping dht)) | ||
670 | , SerializableTo raw (Query dht (Ping dht)) | ||
671 | ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b) | ||
672 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q | ||
673 | |||
674 | queryNode' :: forall raw dht u a b ip. | ||
675 | ( Address ip | ||
676 | , Default u | ||
677 | , Show u | ||
678 | , DHT.Kademlia dht | ||
679 | , KRPC dht (Query dht a) (Response dht b) | ||
680 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
681 | , Ord (TransactionID dht) | ||
682 | , Serialize (TransactionID dht) | ||
683 | , WireFormat raw dht | ||
684 | , SerializableTo raw (Response dht b) | ||
685 | , SerializableTo raw (Query dht a) | ||
686 | , Ord (NodeId dht) | ||
687 | , FiniteBits (NodeId dht) | ||
688 | , Show (NodeId dht) | ||
689 | , Show (QueryMethod dht) | ||
690 | , SerializableTo raw (Response dht (Ping dht)) | ||
691 | , SerializableTo raw (Query dht (Ping dht)) | ||
692 | ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) | ||
693 | queryNode' ni q = do | ||
694 | let addr = nodeAddr ni | ||
695 | dest = makeAddress (Left $ nodeId ni) (toSockAddr addr) | ||
696 | coldQueryNode' addr dest q | ||
697 | |||
698 | coldQueryNode' :: forall raw dht u a b ip. | ||
699 | ( Address ip | ||
700 | , Default u | ||
701 | , Show u | ||
702 | , DHT.Kademlia dht | ||
703 | , KRPC dht (Query dht a) (Response dht b) | ||
704 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
705 | , Ord (TransactionID dht) | ||
706 | , Serialize (TransactionID dht) | ||
707 | , WireFormat raw dht | ||
708 | , SerializableTo raw (Response dht b) | ||
709 | , SerializableTo raw (Query dht a) | ||
710 | , Ord (NodeId dht) | ||
711 | , FiniteBits (NodeId dht) | ||
712 | , Show (NodeId dht) | ||
713 | , Show (QueryMethod dht) | ||
714 | , SerializableTo raw (Response dht (Ping dht)) | ||
715 | , SerializableTo raw (Query dht (Ping dht)) | ||
716 | ) => NodeAddr ip -> PacketDestination dht -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) | ||
717 | coldQueryNode' addr dest q = do | ||
718 | nid <- myNodeIdAccordingTo $ fromMaybe (error "TODO: coldQueryNode' myNodeIdAccordingTo") $ fromAddr dest | ||
719 | dta <- asks dhtData | ||
720 | qextra <- liftIO $ makeQueryExtra dta nid (Proxy :: Proxy (Query dht q)) (Proxy :: Proxy (Response dht b)) | ||
721 | let read_only = False -- TODO: check for NAT issues. (BEP 43) | ||
722 | -- let KRPC.Method name = KRPC.method :: KRPC.Method dht (Query dht a) (Response dht b) | ||
723 | mgr <- asks manager | ||
724 | (Response rextra r, remoteId, witnessed_ip) <- liftIO $ query' mgr dest (Query qextra q) | ||
725 | -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) | ||
726 | -- <> " by " <> T.pack (show (toSockAddr addr)) | ||
727 | _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip | ||
728 | return (remoteId, r, witnessed_ip) | ||
729 | |||
730 | -- | Infix version of 'queryNode' function. | ||
731 | (<@>) :: ( Address ip | ||
732 | , KRPC dht (Query dht a) (Response dht b) | ||
733 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
734 | , Default u | ||
735 | , Show u | ||
736 | , Show (QueryMethod dht) | ||
737 | , Ord (NodeId dht) | ||
738 | , FiniteBits (NodeId dht) | ||
739 | , Show (NodeId dht) | ||
740 | , Ord (TransactionID dht) | ||
741 | , Serialize (TransactionID dht) | ||
742 | , SerializableTo raw (Response dht b) | ||
743 | , SerializableTo raw (Query dht a) | ||
744 | , SerializableTo raw (Response dht (Ping dht)) | ||
745 | , SerializableTo raw (Query dht (Ping dht)) | ||
746 | , WireFormat raw dht | ||
747 | , Kademlia dht | ||
748 | ) => a -> NodeInfo dht ip u -> DHT raw dht u ip b | ||
749 | q <@> addr = snd <$> queryNode addr q | ||
750 | {-# INLINE (<@>) #-} | ||
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs deleted file mode 100644 index b775e7d3..00000000 --- a/src/Network/BitTorrent/DHT/Session.hs +++ /dev/null | |||
@@ -1,571 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013-2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module defines internal state of a node instance. You can | ||
9 | -- have multiple nodes per application but usually you don't have | ||
10 | -- to. Normally, you don't need to import this module, use | ||
11 | -- "Network.BitTorrent.DHT" instead. | ||
12 | -- | ||
13 | {-# LANGUAGE CPP #-} | ||
14 | {-# LANGUAGE RecordWildCards #-} | ||
15 | {-# LANGUAGE FlexibleContexts #-} | ||
16 | {-# LANGUAGE FlexibleInstances #-} | ||
17 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
18 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
19 | {-# LANGUAGE ScopedTypeVariables #-} | ||
20 | {-# LANGUAGE TypeFamilies #-} | ||
21 | {-# LANGUAGE TemplateHaskell #-} | ||
22 | module Network.BitTorrent.DHT.Session | ||
23 | ( -- * Options | ||
24 | -- | Use @optFooBar def@ to get default 'Alpha' or 'K'. | ||
25 | Alpha | ||
26 | , K | ||
27 | , Options (..) | ||
28 | |||
29 | -- * Session | ||
30 | , Node | ||
31 | , options | ||
32 | , tentativeNodeId | ||
33 | , myNodeIdAccordingTo | ||
34 | , myNodeIdAccordingTo1 | ||
35 | , routingInfo | ||
36 | , routableAddress | ||
37 | , getTimestamp | ||
38 | -- , SessionTokens | ||
39 | -- , sessionTokens | ||
40 | -- , contactInfo | ||
41 | , dhtData | ||
42 | , PeerStore | ||
43 | , manager | ||
44 | |||
45 | -- ** Initialization | ||
46 | , LogFun | ||
47 | , logt | ||
48 | , NodeHandler | ||
49 | , newNode | ||
50 | , closeNode | ||
51 | |||
52 | -- * DHT | ||
53 | -- | Use @asks options@ to get options passed to 'startNode' | ||
54 | -- or @asks thisNodeId@ to get id of locally running node. | ||
55 | , DHT | ||
56 | , runDHT | ||
57 | |||
58 | -- ** Tokens | ||
59 | -- , grantToken | ||
60 | -- , checkToken | ||
61 | |||
62 | -- ** Routing table | ||
63 | , getTable | ||
64 | , getClosest | ||
65 | , getClosest1 | ||
66 | |||
67 | #ifdef VERSION_bencoding | ||
68 | -- ** Peer storage | ||
69 | , insertPeer | ||
70 | , getPeerList | ||
71 | , getPeerList1 | ||
72 | , lookupPeers | ||
73 | , insertTopic | ||
74 | , deleteTopic | ||
75 | , getSwarms | ||
76 | , savePeerStore | ||
77 | , mergeSavedPeers | ||
78 | , allPeers | ||
79 | #endif | ||
80 | |||
81 | -- ** Messaging | ||
82 | , queryParallel | ||
83 | ) where | ||
84 | |||
85 | import Prelude hiding (ioError) | ||
86 | |||
87 | import Control.Concurrent.STM | ||
88 | #ifdef THREAD_DEBUG | ||
89 | import Control.Concurrent.Async.Lifted.Instrument | ||
90 | #else | ||
91 | import Control.Concurrent.Async.Lifted | ||
92 | #endif | ||
93 | import Control.Exception.Lifted hiding (Handler) | ||
94 | import Control.Monad.Base | ||
95 | import Control.Monad.Logger | ||
96 | import Control.Monad.Reader | ||
97 | import Control.Monad.Trans.Control | ||
98 | import Control.Monad.Trans.Resource | ||
99 | import Data.Typeable | ||
100 | import Data.String | ||
101 | import Data.Bits | ||
102 | import Data.ByteString | ||
103 | import Data.Conduit.Lazy | ||
104 | import Data.Default | ||
105 | import Data.Fixed | ||
106 | import Data.Hashable | ||
107 | import Data.List as L | ||
108 | import Data.Maybe | ||
109 | import Data.Monoid | ||
110 | import Data.Set as S | ||
111 | import Data.Time | ||
112 | import Network (PortNumber) | ||
113 | import System.Random (randomIO) | ||
114 | import Data.Time.Clock.POSIX | ||
115 | import Data.Text as Text | ||
116 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
117 | import Data.Serialize as S | ||
118 | import Network.DHT.Types | ||
119 | import Network.DatagramServer.Types | ||
120 | |||
121 | |||
122 | import Data.Torrent as Torrent | ||
123 | import Network.DatagramServer as KRPC hiding (Options, def) | ||
124 | import qualified Network.DatagramServer as KRPC (def) | ||
125 | #ifdef VERSION_bencoding | ||
126 | import Data.BEncode (BValue) | ||
127 | import Network.DatagramServer.Mainline (KMessageOf) | ||
128 | #else | ||
129 | import Network.DatagramServer.Tox as Tox | ||
130 | #endif | ||
131 | import Network.Address | ||
132 | import Network.BitTorrent.DHT.ContactInfo (PeerStore) | ||
133 | import qualified Network.BitTorrent.DHT.ContactInfo as P | ||
134 | import Network.DHT.Mainline | ||
135 | import Network.DHT.Routing as R | ||
136 | import Network.BitTorrent.DHT.Token as T | ||
137 | import GHC.Stack as GHC | ||
138 | |||
139 | {----------------------------------------------------------------------- | ||
140 | -- Options | ||
141 | -----------------------------------------------------------------------} | ||
142 | |||
143 | -- | Node lookups can proceed asynchronously. | ||
144 | type Alpha = Int | ||
145 | |||
146 | -- NOTE: libtorrent uses 5, azureus uses 10 | ||
147 | -- | The quantity of simultaneous lookups is typically three. | ||
148 | defaultAlpha :: Alpha | ||
149 | defaultAlpha = 3 | ||
150 | |||
151 | -- TODO add replication loop | ||
152 | |||
153 | -- TODO do not insert infohash -> peeraddr if infohash is too far from | ||
154 | -- this node id | ||
155 | {- | ||
156 | data Order | ||
157 | = NearFirst | ||
158 | | FarFirst | ||
159 | | Random | ||
160 | |||
161 | data Traversal | ||
162 | = Greedy -- ^ aggressive short-circuit traversal | ||
163 | | Exhaustive -- ^ | ||
164 | -} | ||
165 | |||
166 | -- | Original Kamelia DHT uses term /publish/ for data replication | ||
167 | -- process. BitTorrent DHT uses term /announce/ since the purpose of | ||
168 | -- the DHT is peer discovery. Later in documentation, we use terms | ||
169 | -- /publish/ and /announce/ interchangible. | ||
170 | data Options = Options | ||
171 | { -- | The degree of parallelism in 'find_node' queries. More | ||
172 | -- parallism lead to faster bootstrapping and lookup operations, | ||
173 | -- but also increase resource usage. | ||
174 | -- | ||
175 | -- Normally this parameter should not exceed 'optK'. | ||
176 | optAlpha :: {-# UNPACK #-} !Alpha | ||
177 | |||
178 | -- | /K/ parameter - number of nodes to return in 'find_node' | ||
179 | -- responses. | ||
180 | , optK :: {-# UNPACK #-} !K | ||
181 | |||
182 | -- | Number of buckets to maintain. This parameter depends on | ||
183 | -- amount of nodes in the DHT network. | ||
184 | , optBucketCount :: {-# UNPACK #-} !BucketCount | ||
185 | |||
186 | -- | RPC timeout. | ||
187 | , optTimeout :: !NominalDiffTime | ||
188 | |||
189 | -- | /R/ parameter - how many target nodes the 'announce' query | ||
190 | -- should affect. | ||
191 | -- | ||
192 | -- A large replica set compensates for inconsistent routing and | ||
193 | -- reduces the need to frequently republish data for | ||
194 | -- persistence. This comes at an increased cost for | ||
195 | -- 'Network.BitTorrent.DHT.insert' in terms of time, nodes | ||
196 | -- contacted, and storage. | ||
197 | , optReplication :: {-# UNPACK #-} !NodeCount | ||
198 | |||
199 | -- | How often this node should republish (or reannounce) its | ||
200 | -- data. | ||
201 | -- | ||
202 | -- Large replica set ('optReplication') should require | ||
203 | -- smaller reannounce intervals ('optReannounce'). | ||
204 | , optReannounce :: !NominalDiffTime | ||
205 | |||
206 | -- | The time it takes for data to expire in the | ||
207 | -- network. Publisher of the data should republish (or | ||
208 | -- reannounce) data to keep it in the network. | ||
209 | -- | ||
210 | -- The /data expired timeout/ should be more than 'optReannounce' | ||
211 | -- interval. | ||
212 | , optDataExpired :: !NominalDiffTime | ||
213 | } deriving (Show, Eq) | ||
214 | |||
215 | -- | Optimal options for bittorrent client. For short-lifetime | ||
216 | -- utilities you most likely need to tune 'optAlpha' and | ||
217 | -- 'optBucketCount'. | ||
218 | instance Default Options where | ||
219 | def = Options | ||
220 | { optAlpha = defaultAlpha | ||
221 | , optK = defaultK | ||
222 | |||
223 | -- see Fig.2 from "BitTorrent Mainline DHT Measurement" paper. | ||
224 | , optBucketCount = defaultBucketCount | ||
225 | |||
226 | -- see Fig.4 from "Profiling a Million User DHT" paper. | ||
227 | , optTimeout = 5 -- seconds | ||
228 | , optReplication = 20 -- nodes | ||
229 | , optReannounce = 15 * 60 | ||
230 | , optDataExpired = 60 * 60 | ||
231 | } | ||
232 | |||
233 | seconds :: NominalDiffTime -> Int | ||
234 | seconds dt = fromEnum (realToFrac dt :: Uni) | ||
235 | {----------------------------------------------------------------------- | ||
236 | -- Session | ||
237 | -----------------------------------------------------------------------} | ||
238 | |||
239 | -- | A set of torrents this peer intends to share. | ||
240 | type AnnounceSet = Set (InfoHash, PortNumber) | ||
241 | |||
242 | -- | Logger function. | ||
243 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | ||
244 | |||
245 | -- | DHT session keep track state of /this/ node. | ||
246 | data Node raw dht u ip = Node | ||
247 | { -- | Session configuration; | ||
248 | options :: !Options | ||
249 | |||
250 | -- | Pseudo-unique self-assigned session identifier. This value is | ||
251 | -- constant during DHT session and (optionally) between sessions. | ||
252 | , tentativeNodeId :: !(NodeId dht) | ||
253 | |||
254 | , resources :: !InternalState | ||
255 | , manager :: !(Manager raw dht) -- ^ RPC manager; | ||
256 | , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; | ||
257 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; | ||
258 | , dhtData :: DHTData dht ip | ||
259 | , loggerFun :: !LogFun | ||
260 | } | ||
261 | |||
262 | -- | DHT keep track current session and proper resource allocation for | ||
263 | -- safe multithreading. | ||
264 | newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a } | ||
265 | deriving ( Functor, Applicative, Monad, MonadIO | ||
266 | , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow | ||
267 | ) | ||
268 | |||
269 | #if MIN_VERSION_monad_control(1,0,0) | ||
270 | newtype DHTStM raw dht u ip a = StM { | ||
271 | unSt :: StM (ReaderT (Node raw dht u ip) IO) a | ||
272 | } | ||
273 | #endif | ||
274 | |||
275 | instance MonadBaseControl IO (DHT raw dht u ip) where | ||
276 | #if MIN_VERSION_monad_control(1,0,0) | ||
277 | type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a | ||
278 | #else | ||
279 | newtype StM (DHT raw dht u ip) a = StM { | ||
280 | unSt :: StM (ReaderT (Node raw dht u ip) IO) a | ||
281 | } | ||
282 | #endif | ||
283 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> | ||
284 | cc $ \ (DHT m) -> StM <$> cc' m | ||
285 | {-# INLINE liftBaseWith #-} | ||
286 | |||
287 | restoreM = DHT . restoreM . unSt | ||
288 | {-# INLINE restoreM #-} | ||
289 | |||
290 | -- | Check is it is possible to run 'queryNode' or handle pending | ||
291 | -- query from remote node. | ||
292 | instance MonadActive (DHT raw dht u ip) where | ||
293 | monadActive = getManager >>= liftIO . isActive | ||
294 | {-# INLINE monadActive #-} | ||
295 | |||
296 | -- | All allocated resources will be closed at 'closeNode'. | ||
297 | instance MonadResource (DHT raw dht u ip) where | ||
298 | liftResourceT m = do | ||
299 | s <- asks resources | ||
300 | liftIO $ runInternalState m s | ||
301 | |||
302 | -- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where | ||
303 | |||
304 | getManager :: DHT raw dht u ip (Manager raw dht) | ||
305 | getManager = asks manager | ||
306 | |||
307 | instance MonadLogger (DHT raw dht u ip) where | ||
308 | monadLoggerLog loc src lvl msg = do | ||
309 | logger <- asks loggerFun | ||
310 | liftIO $ logger loc src lvl (toLogStr msg) | ||
311 | |||
312 | #ifdef VERSION_bencoding | ||
313 | type NodeHandler = Handler IO KMessageOf BValue | ||
314 | #else | ||
315 | type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString | ||
316 | #endif | ||
317 | |||
318 | logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () | ||
319 | logt lf c m txt = lf (locFromCS callStack) (fromString m) (lvl c) (fromString $ Text.unpack txt) | ||
320 | where | ||
321 | lvl 'D' = LevelDebug | ||
322 | lvl 'I' = LevelInfo | ||
323 | lvl 'W' = LevelWarn | ||
324 | lvl 'E' = LevelError | ||
325 | lvl ch = LevelOther $ Text.cons ch Text.empty | ||
326 | |||
327 | mkLoggerLoc :: GHC.SrcLoc -> Loc | ||
328 | mkLoggerLoc loc = | ||
329 | Loc { loc_filename = GHC.srcLocFile loc | ||
330 | , loc_package = GHC.srcLocPackage loc | ||
331 | , loc_module = GHC.srcLocModule loc | ||
332 | , loc_start = ( GHC.srcLocStartLine loc | ||
333 | , GHC.srcLocStartCol loc) | ||
334 | , loc_end = ( GHC.srcLocEndLine loc | ||
335 | , GHC.srcLocEndCol loc) | ||
336 | } | ||
337 | |||
338 | locFromCS :: GHC.CallStack -> Loc | ||
339 | locFromCS cs = case getCallStack cs of | ||
340 | ((_, loc):_) -> mkLoggerLoc loc | ||
341 | _ -> Loc "<unknown>" "<unknown>" "<unknown>" (0,0) (0,0) | ||
342 | |||
343 | |||
344 | -- | Run DHT session. You /must/ properly close session using | ||
345 | -- 'closeNode' function, otherwise socket or other scarce resources may | ||
346 | -- leak. | ||
347 | newNode :: forall raw dht ip u. | ||
348 | ( Address ip | ||
349 | , FiniteBits (NodeId dht) | ||
350 | , Serialize (NodeId dht) | ||
351 | , Kademlia dht | ||
352 | , WireFormat raw dht | ||
353 | ) | ||
354 | => -- [NodeHandler] -- ^ handlers to run on accepted queries; | ||
355 | Options -- ^ various dht options; | ||
356 | -> NodeAddr ip -- ^ node address to bind; | ||
357 | -> LogFun -- ^ invoked on log messages; | ||
358 | -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated. | ||
359 | -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address. | ||
360 | newNode opts naddr logger mbid = do | ||
361 | s <- createInternalState | ||
362 | runInternalState initNode s | ||
363 | `onException` closeInternalState s | ||
364 | where | ||
365 | rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } | ||
366 | nodeAddr = toSockAddr naddr | ||
367 | initNode = do | ||
368 | s <- getInternalState | ||
369 | (myId, infovar, getst) <- liftIO $ do | ||
370 | (i, ctx) <- initializeServerState (Proxy :: Proxy (dht raw)) mbid | ||
371 | var <- atomically (newTVar Nothing) | ||
372 | let getst dest = do | ||
373 | info <- atomically . readTVar $ var | ||
374 | return ( maybe i myNodeId info, ctx) | ||
375 | return (i, var, getst) | ||
376 | (_, m) <- allocate (newManager rpcOpts (logt logger) nodeAddr getst []) closeManager | ||
377 | liftIO $ do | ||
378 | dta <- initializeDHTData | ||
379 | node <- Node opts myId s m infovar | ||
380 | <$> newTVarIO S.empty | ||
381 | <*> pure dta | ||
382 | <*> pure logger | ||
383 | return node | ||
384 | |||
385 | -- | Some resources like listener thread may live for | ||
386 | -- some short period of time right after this DHT session closed. | ||
387 | closeNode :: Node raw dht u ip -> IO () | ||
388 | closeNode Node {..} = closeInternalState resources | ||
389 | |||
390 | -- | Run DHT operation on the given session. | ||
391 | runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a | ||
392 | runDHT node action = runReaderT (unDHT action) node | ||
393 | {-# INLINE runDHT #-} | ||
394 | |||
395 | {----------------------------------------------------------------------- | ||
396 | -- Routing | ||
397 | -----------------------------------------------------------------------} | ||
398 | |||
399 | -- /pick a random ID/ in the range of the bucket and perform a | ||
400 | -- find_nodes search on it. | ||
401 | |||
402 | |||
403 | {----------------------------------------------------------------------- | ||
404 | -- Routing table | ||
405 | -----------------------------------------------------------------------} | ||
406 | |||
407 | -- | This nodes externally routable address reported by remote peers. | ||
408 | routableAddress :: DHT raw dht u ip (Maybe SockAddr) | ||
409 | routableAddress = do | ||
410 | info <- asks routingInfo >>= liftIO . atomically . readTVar | ||
411 | return $ myAddress <$> info | ||
412 | |||
413 | -- | The current NodeId that the given remote node should know us by. | ||
414 | myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht) | ||
415 | myNodeIdAccordingTo _ = do | ||
416 | info <- asks routingInfo >>= liftIO . atomically . readTVar | ||
417 | maybe (asks tentativeNodeId) | ||
418 | (return . myNodeId) | ||
419 | info | ||
420 | |||
421 | myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) ) | ||
422 | myNodeIdAccordingTo1 = do | ||
423 | var <- asks routingInfo | ||
424 | tid <- asks tentativeNodeId | ||
425 | return $ \ _ -> do | ||
426 | info <- atomically $ readTVar var | ||
427 | return $ maybe tid myNodeId info | ||
428 | |||
429 | -- | Get current routing table. Normally you don't need to use this | ||
430 | -- function, but it can be usefull for debugging and profiling purposes. | ||
431 | getTable :: Eq ip => DHT raw dht u ip (Table dht ip u) | ||
432 | getTable = do | ||
433 | Node { tentativeNodeId = myId | ||
434 | , routingInfo = var | ||
435 | , options = opts } <- ask | ||
436 | let nil = nullTable myId (optBucketCount opts) | ||
437 | liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) | ||
438 | |||
439 | getSwarms :: Ord ip => DHT raw KMessageOf u ip [ (InfoHash, Int, Maybe ByteString) ] | ||
440 | getSwarms = do | ||
441 | store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar | ||
442 | return $ P.knownSwarms store | ||
443 | |||
444 | savePeerStore :: (Ord ip, Address ip) => DHT raw KMessageOf u ip ByteString | ||
445 | savePeerStore = do | ||
446 | var <- asks (contactInfo . dhtData) | ||
447 | peers <- liftIO $ atomically $ readTVar var | ||
448 | return $ S.encode peers | ||
449 | |||
450 | mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw KMessageOf u ip () | ||
451 | mergeSavedPeers bs = do | ||
452 | var <- asks (contactInfo . dhtData) | ||
453 | case S.decode bs of | ||
454 | Right newbies -> liftIO $ atomically $ modifyTVar' var (<> newbies) | ||
455 | Left _ -> return () | ||
456 | |||
457 | |||
458 | allPeers :: Ord ip => InfoHash -> DHT raw KMessageOf u ip [ PeerAddr ip ] | ||
459 | allPeers ih = do | ||
460 | store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar | ||
461 | return $ P.lookup ih store | ||
462 | |||
463 | -- | Find a set of closest nodes from routing table of this node. (in | ||
464 | -- no particular order) | ||
465 | -- | ||
466 | -- This operation used for 'find_nodes' query. | ||
467 | -- | ||
468 | getClosest :: ( Eq ip | ||
469 | , Ord (NodeId dht) | ||
470 | , FiniteBits (NodeId dht) | ||
471 | , TableKey dht k ) => | ||
472 | k -> DHT raw dht u ip [NodeInfo dht ip u] | ||
473 | getClosest node = do | ||
474 | k <- asks (optK . options) | ||
475 | kclosest k node <$> getTable | ||
476 | |||
477 | getClosest1 :: ( Eq ip | ||
478 | , Ord (NodeId dht) | ||
479 | , FiniteBits (NodeId dht) | ||
480 | , TableKey dht k | ||
481 | ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u]) | ||
482 | getClosest1 = do | ||
483 | k <- asks (optK . options) | ||
484 | nobkts <- asks (optBucketCount . options) | ||
485 | myid <- asks tentativeNodeId | ||
486 | var <- asks routingInfo | ||
487 | return $ \node -> do nfo <- atomically $ readTVar var | ||
488 | let tbl = maybe (nullTable myid nobkts) R.myBuckets nfo | ||
489 | return $ kclosest k node tbl | ||
490 | |||
491 | {----------------------------------------------------------------------- | ||
492 | -- Peer storage | ||
493 | -----------------------------------------------------------------------} | ||
494 | |||
495 | refreshContacts :: Ord ip => TVar (PeerStore ip) -> IO () | ||
496 | refreshContacts var = | ||
497 | -- TODO limit dht peer store in size (probably by removing oldest peers) | ||
498 | return () | ||
499 | |||
500 | |||
501 | -- | Insert peer to peer store. Used to handle announce requests. | ||
502 | insertPeer :: Ord ip => TVar (PeerStore ip) -> InfoHash -> Maybe ByteString -> PeerAddr ip -> IO () | ||
503 | insertPeer var ih name addr = do | ||
504 | refreshContacts var | ||
505 | atomically $ modifyTVar' var (P.insertPeer ih name addr) | ||
506 | |||
507 | -- | Get peer set for specific swarm. | ||
508 | lookupPeers :: Ord ip => TVar (PeerStore ip) -> InfoHash -> IO [PeerAddr ip] | ||
509 | lookupPeers var ih = do | ||
510 | refreshContacts var | ||
511 | tm <- getTimestamp | ||
512 | atomically $ do | ||
513 | (ps,store') <- P.freshPeers ih tm <$> readTVar var | ||
514 | writeTVar var store' | ||
515 | return ps | ||
516 | |||
517 | getTimestamp :: IO Timestamp | ||
518 | getTimestamp = do | ||
519 | utcTime <- getCurrentTime | ||
520 | -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) | ||
521 | return $ utcTimeToPOSIXSeconds utcTime | ||
522 | |||
523 | #ifdef VERSION_bencoding | ||
524 | -- | Prepare result for 'get_peers' query. | ||
525 | -- | ||
526 | -- This operation use 'getClosest' as failback so it may block. | ||
527 | -- | ||
528 | getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) | ||
529 | getPeerList ih = do | ||
530 | var <- asks (contactInfo . dhtData) | ||
531 | ps <- liftIO $ lookupPeers var ih | ||
532 | if L.null ps | ||
533 | then Left <$> getClosest ih | ||
534 | else return (Right ps) | ||
535 | |||
536 | getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) | ||
537 | getPeerList1 = do | ||
538 | var <- asks (contactInfo . dhtData) | ||
539 | getclosest <- getClosest1 | ||
540 | return $ \ih -> do | ||
541 | ps <- lookupPeers var ih | ||
542 | if L.null ps | ||
543 | then Left <$> getclosest ih | ||
544 | else return (Right ps) | ||
545 | |||
546 | |||
547 | insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () | ||
548 | insertTopic ih p = do | ||
549 | var <- asks announceInfo | ||
550 | liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) | ||
551 | |||
552 | deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () | ||
553 | deleteTopic ih p = do | ||
554 | var <- asks announceInfo | ||
555 | liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) | ||
556 | |||
557 | #endif | ||
558 | |||
559 | {----------------------------------------------------------------------- | ||
560 | -- Messaging | ||
561 | -----------------------------------------------------------------------} | ||
562 | |||
563 | -- | Failed queries are ignored. | ||
564 | queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a] | ||
565 | queryParallel queries = do | ||
566 | -- TODO: use alpha | ||
567 | -- alpha <- asks (optAlpha . options) | ||
568 | cleanup <$> mapConcurrently try queries | ||
569 | where | ||
570 | cleanup :: [Either QueryFailure a] -> [a] | ||
571 | cleanup = mapMaybe (either (const Nothing) Just) | ||
diff --git a/src/Network/DHT.hs b/src/Network/DHT.hs deleted file mode 100644 index 285cf9ff..00000000 --- a/src/Network/DHT.hs +++ /dev/null | |||
@@ -1,125 +0,0 @@ | |||
1 | {-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} | ||
2 | module Network.DHT | ||
3 | ( -- makeTableKeeper | ||
4 | -- , TableKeeper(..) | ||
5 | module Network.DHT -- for now | ||
6 | , module Network.DHT.Types | ||
7 | ) where | ||
8 | |||
9 | import Data.Bits | ||
10 | import Data.Maybe | ||
11 | import Data.Monoid | ||
12 | import Network.Address | ||
13 | import Network.DHT.Types | ||
14 | import Network.DatagramServer.Types | ||
15 | import Network.DHT.Routing | ||
16 | import Control.Concurrent.STM | ||
17 | #ifdef THREAD_DEBUG | ||
18 | import Control.Concurrent.Lifted.Instrument | ||
19 | #else | ||
20 | import GHC.Conc (labelThread) | ||
21 | import Control.Concurrent.Lifted | ||
22 | #endif | ||
23 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | ||
24 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
25 | |||
26 | import Control.Monad | ||
27 | import Data.Time.Clock (getCurrentTime) | ||
28 | import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) | ||
29 | |||
30 | data TableKeeper msg ip u = TableKeeper | ||
31 | { routingInfo :: TVar (Maybe (Info msg ip u)) | ||
32 | , grokNode :: NodeInfo msg ip u -> Maybe (ReflectedIP) -> IO () | ||
33 | , grokAddress :: Maybe SockAddr -> ReflectedIP -> IO () | ||
34 | } | ||
35 | |||
36 | makeTableKeeper :: forall msg ip u. | ||
37 | ( Address ip | ||
38 | , Show u | ||
39 | , Show (NodeId msg) | ||
40 | , Ord (NodeId msg) | ||
41 | , FiniteBits (NodeId msg) | ||
42 | ) => TableParameters msg ip u -> IO (TableKeeper msg ip u) | ||
43 | makeTableKeeper param@TableParameters{..} = do | ||
44 | error "TODO makeTableKeeper" -- kick off table-updating thread | ||
45 | ri <- atomically (newTVar Nothing) | ||
46 | let tk = TableKeeper{ routingInfo = ri | ||
47 | , grokNode = insertNode param tk | ||
48 | , grokAddress = error "todo" | ||
49 | } | ||
50 | return tk | ||
51 | |||
52 | atomicInsert :: ( Eq ip, Address ip, Ord (NodeId msg), FiniteBits (NodeId msg) | ||
53 | ) => TableParameters msg ip u -> TableKeeper msg ip u -> Timestamp -> Event msg ip u -> Maybe ReflectedIP -> STM (IO [CheckPing msg ip u]) | ||
54 | atomicInsert param@TableParameters{..} state tm arrival witnessed_ip = do | ||
55 | minfo <- readTVar (routingInfo state) | ||
56 | case minfo of | ||
57 | Just inf -> do | ||
58 | (ps,t') <- insert tm arrival $ myBuckets inf | ||
59 | writeTVar (routingInfo state) $ Just $ inf { myBuckets = t' } | ||
60 | return $ do | ||
61 | case witnessed_ip of | ||
62 | Just (ReflectedIP ip) | ||
63 | | ip /= myAddress inf | ||
64 | -> logMessage 'I' $ unwords | ||
65 | $ [ "Possible NAT?" | ||
66 | , show (toSockAddr $ nodeAddr $ foreignNode arrival) | ||
67 | , "reports my address:" | ||
68 | , show ip ] | ||
69 | -- TODO: Let routing table vote on my IP/NodeId. | ||
70 | _ -> return () | ||
71 | return ps | ||
72 | Nothing -> | ||
73 | let dropped = return $ do | ||
74 | -- Ignore non-witnessing nodes until somebody tells | ||
75 | -- us our ip address. | ||
76 | logMessage 'W' ("Dropped " ++ show (toSockAddr $ nodeAddr $ foreignNode arrival)) | ||
77 | return [] | ||
78 | in fromMaybe dropped $ do | ||
79 | ReflectedIP ip <- witnessed_ip | ||
80 | let nil = nullTable (adjustID ip arrival) maxBuckets | ||
81 | return $ do | ||
82 | (ps,t') <- insert tm arrival nil | ||
83 | let new_info = Info t' (adjustID ip arrival) ip | ||
84 | writeTVar (routingInfo state) $ Just new_info | ||
85 | return $ do | ||
86 | logMessage 'I' $ unwords | ||
87 | [ "External IP address:" | ||
88 | , show ip | ||
89 | , "(reported by" | ||
90 | , show (toSockAddr $ nodeAddr $ foreignNode arrival) | ||
91 | ++ ")" | ||
92 | ] | ||
93 | return ps | ||
94 | |||
95 | -- | This operation do not block but acquire exclusive access to | ||
96 | -- routing table. | ||
97 | insertNode :: forall msg ip u. | ||
98 | ( Address ip | ||
99 | , Show u | ||
100 | , Show (NodeId msg) | ||
101 | , Ord (NodeId msg) | ||
102 | , FiniteBits (NodeId msg) | ||
103 | ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO () | ||
104 | insertNode param@TableParameters{..} state info witnessed_ip0 = do | ||
105 | tm <- utcTimeToPOSIXSeconds <$> getCurrentTime -- Network.DHT.Routing.TimeStamp = POSIXTime | ||
106 | let showTable = do | ||
107 | t <- atomically $ fmap myBuckets <$> readTVar (routingInfo state) | ||
108 | let logMsg = "Routing table: " <> pPrint t | ||
109 | logMessage 'D' (render logMsg) | ||
110 | let arrival = TryInsert info | ||
111 | logMessage 'D' $ show ( TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ ) | ||
112 | ps <- join $ atomically $ atomicInsert param state tm arrival witnessed_ip0 | ||
113 | showTable | ||
114 | _ <- fork $ do | ||
115 | myThreadId >>= flip labelThread "DHT.insertNode.pingResults" | ||
116 | forM_ ps $ \(CheckPing ns)-> do | ||
117 | forM_ ns $ \n -> do | ||
118 | (b,mip) <- pingProbe n | ||
119 | let alive = PingResult n b | ||
120 | logMessage 'D' $ "PingResult "++show (nodeId n,b) | ||
121 | _ <- join $ atomically $ atomicInsert param state tm alive mip | ||
122 | showTable | ||
123 | return () | ||
124 | return () | ||
125 | |||
diff --git a/src/Network/DHT/Mainline.hs b/src/Network/DHT/Mainline.hs deleted file mode 100644 index bdf9e8b2..00000000 --- a/src/Network/DHT/Mainline.hs +++ /dev/null | |||
@@ -1,579 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module provides message datatypes which is used for /Node to | ||
9 | -- Node/ communication. Bittorrent DHT is based on Kademlia | ||
10 | -- specification, but have a slightly different set of messages | ||
11 | -- which have been adopted for /peer/ discovery mechanism. Messages | ||
12 | -- are sent over "Network.KRPC" protocol, but normally you should | ||
13 | -- use "Network.BitTorrent.DHT.Session" to send and receive | ||
14 | -- messages. | ||
15 | -- | ||
16 | -- DHT queries are not /recursive/, they are /iterative/. This means | ||
17 | -- that /querying/ node . While original specification (namely BEP5) | ||
18 | -- do not impose any restrictions for /quered/ node behaviour, a | ||
19 | -- good DHT implementation should follow some rules to guarantee | ||
20 | -- that unlimit recursion will never happen. The following set of | ||
21 | -- restrictions: | ||
22 | -- | ||
23 | -- * 'Ping' query must not trigger any message. | ||
24 | -- | ||
25 | -- * 'FindNode' query /may/ trigger 'Ping' query to check if a | ||
26 | -- list of nodes to return is /good/. See | ||
27 | -- 'Network.DHT.Routing.Routing' for further explanation. | ||
28 | -- | ||
29 | -- * 'GetPeers' query may trigger 'Ping' query for the same reason. | ||
30 | -- | ||
31 | -- * 'Announce' query must trigger 'Ping' query for the same reason. | ||
32 | -- | ||
33 | -- It is easy to see that the most long RPC chain is: | ||
34 | -- | ||
35 | -- @ | ||
36 | -- | | | | ||
37 | -- Node_A | | | ||
38 | -- | FindNode or GetPeers or Announce | | | ||
39 | -- | ------------------------------------> Node_B | | ||
40 | -- | | Ping | | ||
41 | -- | | -----------> | | ||
42 | -- | | Node_C | ||
43 | -- | | Pong | | ||
44 | -- | NodeFound or GotPeers or Announced | <----------- | | ||
45 | -- | <------------------------------------- Node_B | | ||
46 | -- Node_A | | | ||
47 | -- | | | | ||
48 | -- @ | ||
49 | -- | ||
50 | -- where in some cases 'Node_C' is 'Node_A'. | ||
51 | -- | ||
52 | -- For more info see: | ||
53 | -- <http://www.bittorrent.org/beps/bep_0005.html#dht-queries> | ||
54 | -- | ||
55 | -- For Kamelia messages see original Kademlia paper: | ||
56 | -- <http://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf> | ||
57 | -- | ||
58 | {-# LANGUAGE CPP #-} | ||
59 | {-# LANGUAGE StandaloneDeriving #-} | ||
60 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
61 | {-# LANGUAGE DeriveDataTypeable #-} | ||
62 | {-# LANGUAGE FlexibleInstances #-} | ||
63 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
64 | {-# LANGUAGE UndecidableInstances #-} | ||
65 | {-# LANGUAGE ScopedTypeVariables #-} | ||
66 | {-# LANGUAGE TypeFamilies #-} | ||
67 | module Network.DHT.Mainline | ||
68 | ( -- * Envelopes | ||
69 | Query (..) | ||
70 | , Response (..) | ||
71 | |||
72 | -- * Queries | ||
73 | -- ** ping | ||
74 | , Ping (..) | ||
75 | |||
76 | -- ** find_node | ||
77 | , FindNode (..) | ||
78 | , NodeFound (..) | ||
79 | , bep42s | ||
80 | -- , bep42 | ||
81 | |||
82 | |||
83 | #ifdef VERSION_bencoding | ||
84 | -- ** get_peers | ||
85 | , PeerList | ||
86 | , GetPeers (..) | ||
87 | , GotPeers (..) | ||
88 | |||
89 | -- ** announce_peer | ||
90 | , Announce (..) | ||
91 | , Announced (..) | ||
92 | #endif | ||
93 | , DHTData(..) | ||
94 | , SessionTokens(..) | ||
95 | , grantToken | ||
96 | , checkToken | ||
97 | ) where | ||
98 | |||
99 | import Data.String | ||
100 | import Control.Applicative | ||
101 | import Data.Bool | ||
102 | #ifdef VERSION_bencoding | ||
103 | import Data.BEncode as BE | ||
104 | import Data.BEncode.BDict as BDict hiding (map) | ||
105 | #else | ||
106 | import qualified Network.DatagramServer.Tox as Tox | ||
107 | import Network.DatagramServer.Tox (NodeId) | ||
108 | import Data.Word | ||
109 | import Control.Monad | ||
110 | #endif | ||
111 | import Network.KRPC.Method | ||
112 | import Network.Address hiding (NodeId) | ||
113 | import Data.Bits | ||
114 | import Data.ByteString (ByteString) | ||
115 | import qualified Data.ByteString as BS | ||
116 | import Data.Digest.CRC32C | ||
117 | import Data.List as L | ||
118 | import Data.Monoid | ||
119 | import Data.Serialize as S | ||
120 | import Data.Typeable | ||
121 | import Data.Word | ||
122 | import Network | ||
123 | import Network.DatagramServer | ||
124 | import Network.DatagramServer.Mainline | ||
125 | import Data.Maybe | ||
126 | |||
127 | import Data.Torrent (InfoHash) | ||
128 | import Network.BitTorrent.DHT.Token as T | ||
129 | import Network.BitTorrent.DHT.ContactInfo | ||
130 | #ifdef VERSION_bencoding | ||
131 | import Network.DatagramServer () | ||
132 | #endif | ||
133 | import Network.DatagramServer.Types hiding (Query,Response) | ||
134 | import Network.DHT.Types | ||
135 | import Network.DHT.Routing | ||
136 | import Data.Time | ||
137 | import Control.Concurrent.STM | ||
138 | import System.Random | ||
139 | import Data.Hashable | ||
140 | |||
141 | |||
142 | {----------------------------------------------------------------------- | ||
143 | -- envelopes | ||
144 | -----------------------------------------------------------------------} | ||
145 | |||
146 | #ifndef VERSION_bencoding | ||
147 | type BKey = ByteString | ||
148 | #endif | ||
149 | |||
150 | node_id_key :: BKey | ||
151 | node_id_key = "id" | ||
152 | |||
153 | read_only_key :: BKey | ||
154 | read_only_key = "ro" | ||
155 | |||
156 | |||
157 | #ifdef VERSION_bencoding | ||
158 | instance BEncode a => BEncode (Query KMessageOf a) where | ||
159 | toBEncode Query {..} = toDict $ | ||
160 | BDict.union ( node_id_key .=! queringNodeId queryExtra | ||
161 | .: read_only_key .=? bool Nothing (Just (1 :: Integer)) (queryIsReadOnly queryExtra) | ||
162 | .: endDict) | ||
163 | (dict (toBEncode queryParams)) | ||
164 | where | ||
165 | dict (BDict d) = d | ||
166 | dict _ = error "impossible: instance BEncode (Query a)" | ||
167 | |||
168 | fromBEncode v = | ||
169 | Query <$> (MainlineQuery <$> fromDict (field (req node_id_key)) v | ||
170 | <*> fromDict (fromMaybe False <$>? read_only_key) v) | ||
171 | <*> fromBEncode v | ||
172 | #else | ||
173 | data Query a = Query a | ||
174 | #endif | ||
175 | |||
176 | #ifdef VERSION_bencoding | ||
177 | instance BEncode a => BEncode (Response KMessageOf a) where | ||
178 | toBEncode = toBEncode . toQuery | ||
179 | where | ||
180 | toQuery (Response (MainlineResponse nid) a) = Query (MainlineQuery nid False) a | ||
181 | |||
182 | fromBEncode b = fromQuery <$> fromBEncode b | ||
183 | where | ||
184 | fromQuery (Query (MainlineQuery nid _) a) = Response (MainlineResponse nid) a | ||
185 | #else | ||
186 | data Response KMessageOf a = Response KMessageOf a | ||
187 | #endif | ||
188 | |||
189 | {----------------------------------------------------------------------- | ||
190 | -- ping method | ||
191 | -----------------------------------------------------------------------} | ||
192 | |||
193 | -- / The most basic query is a ping. Ping query is used to check if a | ||
194 | -- quered node is still alive. | ||
195 | -- data Ping = Ping Tox.Nonce8 deriving (Show, Eq, Typeable) | ||
196 | |||
197 | #ifdef VERSION_bencoding | ||
198 | instance BEncode (Ping KMessageOf) where | ||
199 | toBEncode Ping = toDict endDict | ||
200 | fromBEncode _ = pure Ping | ||
201 | #else | ||
202 | instance Serialize (Query (Ping KMessageOf)) where | ||
203 | get = do | ||
204 | b <- get | ||
205 | when ( (b::Word8) /= 0) $ fail "Bad ping request" | ||
206 | nonce <- get | ||
207 | return $ Query (Ping nonce) | ||
208 | put (Query (Ping nonce)) = do | ||
209 | put (0 :: Word8) | ||
210 | put nonce | ||
211 | instance Serialize (Response Ping) where | ||
212 | get = do | ||
213 | b <- get | ||
214 | when ( (b::Word8) /= 1) $ fail "Bad ping response" | ||
215 | nonce <- get | ||
216 | return $ Response (Ping nonce) | ||
217 | put (Response (Ping nonce)) = do | ||
218 | put (1 :: Word8) | ||
219 | put nonce | ||
220 | #endif | ||
221 | |||
222 | -- | \"q\" = \"ping\" | ||
223 | instance KRPC KMessageOf (Query KMessageOf (Ping KMessageOf)) (Response KMessageOf (Ping KMessageOf)) where | ||
224 | method = "ping" | ||
225 | makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43) | ||
226 | makeResponseExtra _ nid _ _ = return $ MainlineResponse nid | ||
227 | |||
228 | -- TODO KError Sender/Responder | ||
229 | messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q | ||
230 | messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r | ||
231 | |||
232 | {----------------------------------------------------------------------- | ||
233 | -- find_node method | ||
234 | -----------------------------------------------------------------------} | ||
235 | |||
236 | -- / Find node is used to find the contact information for a node | ||
237 | -- given its ID. | ||
238 | -- data FindNode KMessageOf ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes | ||
239 | -- deriving (Show, Eq, Typeable) | ||
240 | |||
241 | target_key :: BKey | ||
242 | target_key = "target" | ||
243 | |||
244 | #ifdef VERSION_bencoding | ||
245 | instance Typeable ip => BEncode (FindNode KMessageOf ip) where | ||
246 | toBEncode (FindNode nid) = toDict $ target_key .=! nid .: endDict | ||
247 | fromBEncode = fromDict $ FindNode <$>! target_key | ||
248 | #else | ||
249 | instance Serialize (Query KMessageOf (FindNode KMessageOf ip)) where | ||
250 | get = do | ||
251 | nid <- get | ||
252 | nonce <- get | ||
253 | return $ Query (FindNode nid nonce) | ||
254 | put (Query (FindNode nid nonce)) = do | ||
255 | put nid | ||
256 | put nonce | ||
257 | #endif | ||
258 | |||
259 | -- | When a node receives a 'FindNode' query, it should respond with a | ||
260 | -- the compact node info for the target node or the K (8) closest good | ||
261 | -- nodes in its own routing table. | ||
262 | -- | ||
263 | #ifdef VERSION_bencoding | ||
264 | -- newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()] deriving (Show, Eq, Typeable) | ||
265 | #else | ||
266 | data NodeFound KMessageOf ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 deriving (Show, Eq, Typeable) | ||
267 | #endif | ||
268 | -- Tox: send_nodes | ||
269 | |||
270 | nodes_key :: BKey | ||
271 | nodes_key = "nodes" | ||
272 | |||
273 | -- Convert IPv4 address. Useful for using variadic IP type. | ||
274 | from4 :: forall dht u s. Address s => NodeInfo dht IPv4 u -> Either String (NodeInfo dht s u) | ||
275 | from4 n = maybe (Left "Error converting IPv4") Right | ||
276 | $ traverseAddress (fromAddr :: IPv4 -> Maybe s) n | ||
277 | |||
278 | #ifdef VERSION_bencoding | ||
279 | binary :: Serialize a => BKey -> BE.Get [a] | ||
280 | binary k = field (req k) >>= either (fail . format) return . | ||
281 | runGet (many get) | ||
282 | where | ||
283 | format str = "fail to deserialize " ++ show k ++ " field: " ++ str | ||
284 | |||
285 | instance Address ip => BEncode (NodeFound KMessageOf ip) where | ||
286 | toBEncode (NodeFound ns) = toDict $ | ||
287 | nodes_key .=! runPut (mapM_ put ns) | ||
288 | .: endDict | ||
289 | |||
290 | -- TODO: handle IPv6 by reading the "nodes6" key (see bep 32) | ||
291 | fromBEncode bval = NodeFound <$> (traverse from4 =<< fromDict (binary nodes_key) bval) | ||
292 | #else | ||
293 | instance Serialize (Response KMessageOf (NodeFound KMessageOf ip)) where | ||
294 | get = do | ||
295 | count <- get :: Get Word8 | ||
296 | nodes <- sequence $ replicate (fromIntegral count) get | ||
297 | nonce <- get :: Get Tox.Nonce8 | ||
298 | return $ Response $ NodeFound nodes nonce | ||
299 | |||
300 | put (Response (NodeFound nodes nonce)) = do | ||
301 | put (fromIntegral (length nodes) :: Word8) | ||
302 | mapM_ put nodes | ||
303 | put nonce | ||
304 | |||
305 | #endif | ||
306 | |||
307 | -- | \"q\" == \"find_node\" | ||
308 | instance (Address ip, Typeable ip) | ||
309 | => KRPC KMessageOf (Query KMessageOf (FindNode KMessageOf ip)) (Response KMessageOf (NodeFound KMessageOf ip)) where | ||
310 | method = "find_node" | ||
311 | makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43) | ||
312 | makeResponseExtra _ nid _ _ = return $ MainlineResponse nid | ||
313 | |||
314 | -- TODO KError Sender/Responder | ||
315 | messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q | ||
316 | messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r | ||
317 | |||
318 | {----------------------------------------------------------------------- | ||
319 | -- get_peers method | ||
320 | -----------------------------------------------------------------------} | ||
321 | |||
322 | -- | Get peers associated with a torrent infohash. | ||
323 | newtype GetPeers ip = GetPeers InfoHash | ||
324 | deriving (Show, Eq, Typeable) | ||
325 | |||
326 | info_hash_key :: BKey | ||
327 | info_hash_key = "info_hash" | ||
328 | |||
329 | instance Typeable ip => BEncode (GetPeers ip) where | ||
330 | toBEncode (GetPeers ih) = toDict $ info_hash_key .=! ih .: endDict | ||
331 | fromBEncode = fromDict $ GetPeers <$>! info_hash_key | ||
332 | |||
333 | type PeerList ip = Either [NodeInfo KMessageOf ip ()] [PeerAddr ip] | ||
334 | |||
335 | data GotPeers ip = GotPeers | ||
336 | { -- | If the queried node has no peers for the infohash, returned | ||
337 | -- the K nodes in the queried nodes routing table closest to the | ||
338 | -- infohash supplied in the query. | ||
339 | peers :: PeerList ip | ||
340 | |||
341 | -- | The token value is a required argument for a future | ||
342 | -- announce_peer query. | ||
343 | , grantedToken :: Token | ||
344 | } deriving (Show, Eq, Typeable) | ||
345 | |||
346 | peers_key :: BKey | ||
347 | peers_key = "values" | ||
348 | |||
349 | token_key :: BKey | ||
350 | token_key = "token" | ||
351 | |||
352 | name_key :: BKey | ||
353 | name_key = "name" | ||
354 | |||
355 | instance (Typeable ip, Serialize ip) => BEncode (GotPeers ip) where | ||
356 | toBEncode GotPeers {..} = toDict $ | ||
357 | case peers of | ||
358 | Left ns -> | ||
359 | nodes_key .=! runPut (mapM_ put ns) | ||
360 | .: token_key .=! grantedToken | ||
361 | .: endDict | ||
362 | Right ps -> | ||
363 | token_key .=! grantedToken | ||
364 | .: peers_key .=! L.map S.encode ps | ||
365 | .: endDict | ||
366 | |||
367 | fromBEncode = fromDict $ do | ||
368 | mns <- optional (binary nodes_key) -- "nodes" | ||
369 | tok <- field (req token_key) -- "token" | ||
370 | mps <- optional (field (req peers_key) >>= decodePeers) -- "values" | ||
371 | case (Right <$> mps) <|> (Left <$> mns) of | ||
372 | Nothing -> fail "get_peers: neihter peers nor nodes key is valid" | ||
373 | Just xs -> pure $ GotPeers xs tok | ||
374 | where | ||
375 | decodePeers = either fail pure . mapM S.decode | ||
376 | |||
377 | -- | \"q" = \"get_peers\" | ||
378 | instance (Typeable ip, Serialize ip) => | ||
379 | KRPC KMessageOf (Query KMessageOf (GetPeers ip)) (Response KMessageOf (GotPeers ip)) where | ||
380 | method = "get_peers" | ||
381 | makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43) | ||
382 | makeResponseExtra _ nid _ _ = return $ MainlineResponse nid | ||
383 | |||
384 | -- TODO KError Sender/Responder | ||
385 | messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q | ||
386 | messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r | ||
387 | |||
388 | |||
389 | {----------------------------------------------------------------------- | ||
390 | -- announce method | ||
391 | -----------------------------------------------------------------------} | ||
392 | |||
393 | -- | Announce that the peer, controlling the querying node, is | ||
394 | -- downloading a torrent on a port. | ||
395 | data Announce = Announce | ||
396 | { -- | If set, the 'port' field should be ignored and the source | ||
397 | -- port of the UDP packet should be used as the peer's port | ||
398 | -- instead. This is useful for peers behind a NAT that may not | ||
399 | -- know their external port, and supporting uTP, they accept | ||
400 | -- incoming connections on the same port as the DHT port. | ||
401 | impliedPort :: Bool | ||
402 | |||
403 | -- | infohash of the torrent; | ||
404 | , topic :: InfoHash | ||
405 | |||
406 | -- | some clients announce the friendly name of the torrent here. | ||
407 | , announcedName :: Maybe ByteString | ||
408 | |||
409 | -- | the port /this/ peer is listening; | ||
410 | , port :: PortNumber | ||
411 | |||
412 | -- TODO: optional boolean "seed" key | ||
413 | |||
414 | -- | received in response to a previous get_peers query. | ||
415 | , sessionToken :: Token | ||
416 | |||
417 | } deriving (Show, Eq, Typeable) | ||
418 | |||
419 | port_key :: BKey | ||
420 | port_key = "port" | ||
421 | |||
422 | implied_port_key :: BKey | ||
423 | implied_port_key = "implied_port" | ||
424 | |||
425 | instance BEncode Announce where | ||
426 | toBEncode Announce {..} = toDict $ | ||
427 | implied_port_key .=? flagField impliedPort | ||
428 | .: info_hash_key .=! topic | ||
429 | .: name_key .=? announcedName | ||
430 | .: port_key .=! port | ||
431 | .: token_key .=! sessionToken | ||
432 | .: endDict | ||
433 | where | ||
434 | flagField flag = if flag then Just (1 :: Int) else Nothing | ||
435 | |||
436 | fromBEncode = fromDict $ do | ||
437 | Announce <$> (boolField <$> optional (field (req implied_port_key))) | ||
438 | <*>! info_hash_key | ||
439 | <*>? name_key | ||
440 | <*>! port_key | ||
441 | <*>! token_key | ||
442 | where | ||
443 | boolField = maybe False (/= (0 :: Int)) | ||
444 | |||
445 | -- | The queried node must verify that the token was previously sent | ||
446 | -- to the same IP address as the querying node. Then the queried node | ||
447 | -- should store the IP address of the querying node and the supplied | ||
448 | -- port number under the infohash in its store of peer contact | ||
449 | -- information. | ||
450 | data Announced = Announced | ||
451 | deriving (Show, Eq, Typeable) | ||
452 | |||
453 | instance BEncode Announced where | ||
454 | toBEncode _ = toBEncode ( Ping :: Ping KMessageOf ) | ||
455 | fromBEncode _ = pure Announced | ||
456 | |||
457 | -- | \"q" = \"announce\" | ||
458 | instance KRPC KMessageOf (Query KMessageOf Announce) (Response KMessageOf Announced) where | ||
459 | method = "announce_peer" | ||
460 | makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43) | ||
461 | makeResponseExtra _ nid _ _ = return $ MainlineResponse nid | ||
462 | |||
463 | -- TODO KError Sender/Responder | ||
464 | messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q | ||
465 | messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r | ||
466 | |||
467 | |||
468 | -- | Yields all 8 DHT neighborhoods available to you given a particular ip | ||
469 | -- address. | ||
470 | bep42s :: Address a => a -> NodeId KMessageOf -> [NodeId KMessageOf] | ||
471 | bep42s addr (NodeId r) = mapMaybe (bep42 addr) rs | ||
472 | where | ||
473 | rs = map (NodeId . change3bits r) [0..7] | ||
474 | |||
475 | -- change3bits :: ByteString -> Word8 -> ByteString | ||
476 | -- change3bits bs n = BS.snoc (BS.init bs) (BS.last bs .&. 0xF8 .|. n) | ||
477 | |||
478 | change3bits :: (Num b, Bits b) => b -> b -> b | ||
479 | change3bits bs n = (bs .&. complement 7) .|. n | ||
480 | |||
481 | -- | Modifies a purely random 'NodeId' to one that is related to a given | ||
482 | -- routable address in accordance with BEP 42. | ||
483 | bep42 :: Address a => a -> NodeId KMessageOf -> Maybe (NodeId KMessageOf) | ||
484 | bep42 addr (NodeId r) | ||
485 | | Just ip <- fmap S.encode (fromAddr addr :: Maybe IPv4) | ||
486 | <|> fmap S.encode (fromAddr addr :: Maybe IPv6) | ||
487 | = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0) | ||
488 | | otherwise | ||
489 | = Nothing | ||
490 | where | ||
491 | ip4mask = "\x03\x0f\x3f\xff" :: ByteString | ||
492 | ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString | ||
493 | nbhood_select = (fromIntegral r :: Word8) .&. 7 | ||
494 | retr n = pure $ BS.drop (nodeIdSize - n) $ S.encode r | ||
495 | crc = flip shiftL (finiteBitSize (NodeId undefined) - 32) . fromIntegral . crc32c . BS.pack | ||
496 | applyMask ip = case BS.zipWith (.&.) msk ip of | ||
497 | (b:bs) -> (b .|. shiftL nbhood_select 5) : bs | ||
498 | bs -> bs | ||
499 | where msk | BS.length ip == 4 = ip4mask | ||
500 | | otherwise = ip6mask | ||
501 | |||
502 | {----------------------------------------------------------------------- | ||
503 | -- Tokens policy | ||
504 | -----------------------------------------------------------------------} | ||
505 | |||
506 | data SessionTokens = SessionTokens | ||
507 | { tokenMap :: !TokenMap | ||
508 | , lastUpdate :: !UTCTime | ||
509 | , maxInterval :: !NominalDiffTime | ||
510 | } | ||
511 | |||
512 | nullSessionTokens :: IO SessionTokens | ||
513 | nullSessionTokens = SessionTokens | ||
514 | <$> (tokens <$> randomIO) | ||
515 | <*> getCurrentTime | ||
516 | <*> pure defaultUpdateInterval | ||
517 | |||
518 | -- TODO invalidate *twice* if needed | ||
519 | invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens | ||
520 | invalidateTokens curTime ts @ SessionTokens {..} | ||
521 | | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens | ||
522 | { tokenMap = update tokenMap | ||
523 | , lastUpdate = curTime | ||
524 | , maxInterval = maxInterval | ||
525 | } | ||
526 | | otherwise = ts | ||
527 | |||
528 | {----------------------------------------------------------------------- | ||
529 | -- Tokens | ||
530 | -----------------------------------------------------------------------} | ||
531 | |||
532 | tryUpdateSecret :: TVar SessionTokens -> IO () | ||
533 | tryUpdateSecret toks = do | ||
534 | curTime <- getCurrentTime | ||
535 | atomically $ modifyTVar' toks (invalidateTokens curTime) | ||
536 | |||
537 | grantToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> IO Token | ||
538 | grantToken sessionTokens addr = do | ||
539 | tryUpdateSecret sessionTokens | ||
540 | toks <- readTVarIO sessionTokens | ||
541 | return $ T.lookup addr $ tokenMap toks | ||
542 | |||
543 | -- | Throws 'HandlerError' if the token is invalid or already | ||
544 | -- expired. See 'TokenMap' for details. | ||
545 | checkToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> Token -> IO Bool | ||
546 | checkToken sessionTokens addr questionableToken = do | ||
547 | tryUpdateSecret sessionTokens | ||
548 | toks <- readTVarIO sessionTokens | ||
549 | return $ T.member addr questionableToken (tokenMap toks) | ||
550 | |||
551 | |||
552 | -------------------------- | ||
553 | |||
554 | |||
555 | instance Kademlia KMessageOf where | ||
556 | data DHTData KMessageOf ip = TorrentData | ||
557 | { contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; | ||
558 | , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. | ||
559 | } | ||
560 | |||
561 | |||
562 | dhtAdjustID _ fallback ip0 arrival | ||
563 | = fromMaybe fallback $ do | ||
564 | ip <- fromSockAddr ip0 -- :: Maybe ip | ||
565 | let _ = ip `asTypeOf` nodeAddr (foreignNode arrival) | ||
566 | listToMaybe | ||
567 | $ rank id (nodeId $ foreignNode arrival) | ||
568 | $ bep42s ip fallback | ||
569 | |||
570 | namePing _ = "ping" | ||
571 | nameFindNodes _ = "find-nodes" | ||
572 | |||
573 | initializeDHTData = TorrentData | ||
574 | <$> newTVarIO def | ||
575 | <*> (newTVarIO =<< nullSessionTokens) | ||
576 | |||
577 | deriving instance IsString (QueryMethod dht) => IsString (Method dht param result) | ||
578 | deriving instance BEncode (QueryMethod dht) => BEncode (Method dht param result) | ||
579 | |||
diff --git a/src/Network/DHT/Tox.hs b/src/Network/DHT/Tox.hs deleted file mode 100644 index d6fc866f..00000000 --- a/src/Network/DHT/Tox.hs +++ /dev/null | |||
@@ -1,112 +0,0 @@ | |||
1 | {-# LANGUAGE TypeFamilies #-} | ||
2 | {-# LANGUAGE FlexibleInstances, MultiParamTypeClasses #-} | ||
3 | module Network.DHT.Tox where | ||
4 | |||
5 | import Data.Serialize | ||
6 | import Data.Default | ||
7 | import Text.PrettyPrint.HughesPJClass | ||
8 | |||
9 | import Network.DHT.Types | ||
10 | import Network.DatagramServer.Types | ||
11 | import qualified Network.DatagramServer.Tox as Tox | ||
12 | import Network.KRPC.Method | ||
13 | import Data.Word | ||
14 | import Data.ByteString (ByteString) | ||
15 | import Data.IP | ||
16 | import Data.Bool | ||
17 | import Data.Maybe | ||
18 | import Control.Monad | ||
19 | import System.Random | ||
20 | |||
21 | instance Kademlia Tox.Message where | ||
22 | data DHTData Tox.Message ip = ToxData | ||
23 | namePing _ = Tox.Ping | ||
24 | nameFindNodes _ = Tox.GetNodes | ||
25 | initializeDHTData = return ToxData | ||
26 | |||
27 | instance Pretty (NodeId Tox.Message) where | ||
28 | pPrint (Tox.NodeId nid) = encodeHexDoc nid | ||
29 | |||
30 | instance Serialize (Query Tox.Message (Ping Tox.Message)) where | ||
31 | get = getToxPing False Network.DHT.Types.Query Tox.QueryNonce | ||
32 | put (Network.DHT.Types.Query extra Ping) = putToxPing False (Tox.qryNonce extra) | ||
33 | instance Serialize (Response Tox.Message (Ping Tox.Message)) where | ||
34 | get = getToxPing True Network.DHT.Types.Response Tox.ResponseNonce | ||
35 | put (Network.DHT.Types.Response extra Ping) = putToxPing True (Tox.rspNonce extra) | ||
36 | |||
37 | instance Serialize (Query Tox.Message (FindNode Tox.Message ip)) where | ||
38 | get = do | ||
39 | nid <- get | ||
40 | n8 <- get | ||
41 | return $ Network.DHT.Types.Query (Tox.QueryNonce n8) (FindNode nid) | ||
42 | put (Network.DHT.Types.Query (Tox.QueryNonce n8) (FindNode nid)) = do | ||
43 | put nid | ||
44 | put n8 | ||
45 | instance Serialize (Response Tox.Message (NodeFound Tox.Message IPv4)) where | ||
46 | get = do | ||
47 | num <- get :: Get Word8 | ||
48 | when (num > 4) $ fail "Too many nodes in Tox get-nodes reply" | ||
49 | ns0 <- sequence $ replicate (fromIntegral num) (nodeFormatToNodeInfo <$> get) | ||
50 | -- TODO: Allow tcp and ipv6. For now filtering to udp ip4... | ||
51 | let ns = flip mapMaybe ns0 $ \(NodeInfo nid addr u) -> do | ||
52 | guard $ not u | ||
53 | ip4 <- fromAddr addr | ||
54 | return $ NodeInfo nid ip4 () | ||
55 | n8 <- get | ||
56 | return $ Network.DHT.Types.Response (Tox.ResponseNonce n8) $ NodeFound ns | ||
57 | put (Network.DHT.Types.Response (Tox.ResponseNonce n8) (NodeFound ns)) = do | ||
58 | put ( fromIntegral (length ns) :: Word8 ) | ||
59 | forM_ ns $ \(NodeInfo nid ip4 ()) -> do | ||
60 | put Tox.NodeFormat { nodePublicKey = nid | ||
61 | , nodeIsTCP = False | ||
62 | , nodeIP = IPv4 (nodeHost ip4) | ||
63 | , nodePort = nodePort ip4 | ||
64 | } | ||
65 | put n8 | ||
66 | |||
67 | instance KRPC Tox.Message (Query Tox.Message (FindNode Tox.Message IPv4)) | ||
68 | (Response Tox.Message (NodeFound Tox.Message IPv4)) where | ||
69 | method = Method Tox.GetNodes | ||
70 | validateExchange = validateToxExchange | ||
71 | makeQueryExtra _ _ _ _ = Tox.QueryNonce <$> randomIO | ||
72 | makeResponseExtra _ _ q _ = return $ Tox.ResponseNonce $ Tox.qryNonce $ queryExtra q | ||
73 | messageSender q _ = Tox.msgClient q | ||
74 | messageResponder _ r = Tox.msgClient r | ||
75 | |||
76 | instance KRPC Tox.Message (Query Tox.Message (Ping Tox.Message)) | ||
77 | (Response Tox.Message (Ping Tox.Message)) where | ||
78 | method = Method Tox.Ping | ||
79 | validateExchange = validateToxExchange | ||
80 | makeQueryExtra _ _ _ _ = Tox.QueryNonce <$> randomIO | ||
81 | makeResponseExtra _ _ q _ = return $ Tox.ResponseNonce $ Tox.qryNonce $ queryExtra q | ||
82 | messageSender q _ = Tox.msgClient q | ||
83 | messageResponder _ r = Tox.msgClient r | ||
84 | |||
85 | instance DataHandlers ByteString Tox.Message | ||
86 | |||
87 | instance Default Bool where def = False | ||
88 | |||
89 | getToxPing isPong c n = do | ||
90 | q'r <- get :: Get Word8 | ||
91 | when (bool 0 1 isPong /= q'r) $ | ||
92 | fail "Tox ping/pong parse fail." | ||
93 | n8 <- get :: Get Tox.Nonce8 | ||
94 | return $ c (n n8) Ping | ||
95 | |||
96 | putToxPing isPong n8 = do | ||
97 | put (bool 0 1 isPong :: Word8) | ||
98 | put n8 | ||
99 | |||
100 | validateToxExchange q r = qnonce == rnonce | ||
101 | where | ||
102 | qnonce = Tox.qryNonce . queryExtra . Tox.msgPayload $ q | ||
103 | rnonce = Tox.rspNonce . responseExtra . Tox.msgPayload $ r | ||
104 | |||
105 | |||
106 | nodeFormatToNodeInfo nf = NodeInfo nid addr u | ||
107 | where | ||
108 | u = Tox.nodeIsTCP nf | ||
109 | addr = NodeAddr (Tox.nodeIP nf) (Tox.nodePort nf) | ||
110 | nid = Tox.nodePublicKey nf | ||
111 | |||
112 | -- instance Default Bool where def = False | ||
diff --git a/src/Network/DHT/Types.hs b/src/Network/DHT/Types.hs deleted file mode 100644 index 287d5680..00000000 --- a/src/Network/DHT/Types.hs +++ /dev/null | |||
@@ -1,176 +0,0 @@ | |||
1 | {-# LANGUAGE DefaultSignatures #-} | ||
2 | {-# LANGUAGE GADTs, MultiParamTypeClasses #-} | ||
3 | {-# LANGUAGE FunctionalDependencies #-} | ||
4 | {-# LANGUAGE TypeFamilies #-} | ||
5 | {-# LANGUAGE ScopedTypeVariables #-} | ||
6 | {-# LANGUAGE StandaloneDeriving #-} | ||
7 | {-# LANGUAGE FlexibleContexts #-} | ||
8 | {-# LANGUAGE DeriveGeneric #-} | ||
9 | module Network.DHT.Types | ||
10 | ( module Network.DHT.Types | ||
11 | , TableKey | ||
12 | , toNodeId | ||
13 | ) where | ||
14 | |||
15 | import Network.Socket (SockAddr) | ||
16 | import Network.DatagramServer.Types | ||
17 | import Network.DHT.Routing | ||
18 | import Data.Typeable | ||
19 | import GHC.Generics | ||
20 | import Data.Serialize | ||
21 | import Data.Hashable | ||
22 | import Data.String | ||
23 | import Data.Monoid | ||
24 | import Data.Char | ||
25 | |||
26 | data TableParameters msg ip u = TableParameters | ||
27 | { maxBuckets :: Int | ||
28 | , fallbackID :: NodeId msg | ||
29 | , pingProbe :: NodeInfo msg ip u -> IO (Bool, Maybe ReflectedIP) | ||
30 | , logMessage :: Char -> String -> IO () | ||
31 | , adjustID :: SockAddr -> Event msg ip u -> NodeId msg | ||
32 | } | ||
33 | |||
34 | -- | All queries have an \"id\" key and value containing the node ID | ||
35 | -- of the querying node. | ||
36 | data Query dht a = Query | ||
37 | { queryExtra :: QueryExtra dht -- ^ DHT-specific query headers | ||
38 | , queryParams :: a -- ^ query parameters. | ||
39 | } deriving (Typeable,Generic) | ||
40 | |||
41 | deriving instance (Eq (NodeId dht), Eq (QueryExtra dht), Eq a ) => Eq (Query dht a) | ||
42 | deriving instance (Show (NodeId dht), Show (QueryExtra dht), Show a ) => Show (Query dht a) | ||
43 | |||
44 | -- | All responses have an \"id\" key and value containing the node ID | ||
45 | -- of the responding node. | ||
46 | data Response dht a = Response | ||
47 | { responseExtra :: ResponseExtra dht | ||
48 | , responseVals :: a -- ^ query result. | ||
49 | } deriving (Typeable,Generic) | ||
50 | |||
51 | deriving instance (Eq (NodeId dht), Eq (ResponseExtra dht), Eq a ) => Eq (Response dht a) | ||
52 | deriving instance (Show (NodeId dht), Show (ResponseExtra dht), Show a ) => Show (Response dht a) | ||
53 | |||
54 | -- | The most basic query is a ping. Ping query is used to check if a | ||
55 | -- quered node is still alive. | ||
56 | data Ping ( dht :: * -> *) = Ping | ||
57 | deriving (Show, Eq, Typeable) | ||
58 | -- | Find node is used to find the contact information for a node | ||
59 | -- given its ID. | ||
60 | newtype FindNode dht ip = FindNode (NodeId dht) | ||
61 | deriving (Typeable) | ||
62 | newtype NodeFound dht ip = NodeFound [NodeInfo dht ip ()] | ||
63 | deriving (Typeable) | ||
64 | |||
65 | deriving instance Eq (NodeId dht) => Eq (FindNode dht ip) | ||
66 | deriving instance Eq (NodeId dht) => Eq (NodeFound dht ip) | ||
67 | deriving instance Show (NodeId dht) => Show (FindNode dht ip) | ||
68 | deriving instance ( Show (NodeId dht) | ||
69 | , Show ip | ||
70 | ) => Show (NodeFound dht ip) | ||
71 | |||
72 | pingMessage :: Proxy dht -> Ping dht | ||
73 | pingMessage _ = Ping | ||
74 | pongMessage :: Proxy dht -> Ping dht | ||
75 | pongMessage _ = Ping | ||
76 | findNodeMessage :: TableKey dht k => Proxy dht -> k -> FindNode dht ip | ||
77 | findNodeMessage _ k = FindNode (toNodeId k) | ||
78 | foundNodesMessage :: [NodeInfo dht ip ()] -> NodeFound dht ip | ||
79 | findWho (FindNode nid) = nid | ||
80 | foundNodes :: NodeFound dht ip -> [NodeInfo dht ip ()] | ||
81 | foundNodes (NodeFound ns) = ns | ||
82 | findWho :: FindNode dht ip -> NodeId dht | ||
83 | foundNodesMessage ns = NodeFound ns | ||
84 | |||
85 | class Kademlia dht where | ||
86 | data DHTData dht ip | ||
87 | dhtAdjustID :: Address ip => Proxy dht -> NodeId dht -> SockAddr -> Event dht ip u -> NodeId dht | ||
88 | dhtAdjustID _ nid _ _ = nid | ||
89 | namePing :: Proxy dht -> QueryMethod dht | ||
90 | nameFindNodes :: Proxy dht -> QueryMethod dht | ||
91 | initializeDHTData :: IO (DHTData dht ip) | ||
92 | data MethodHandler raw dht ip = | ||
93 | forall a b. ( SerializableTo raw (Response dht b) | ||
94 | , SerializableTo raw (Query dht a) | ||
95 | , KRPC dht (Query dht a) (Response dht b) | ||
96 | ) => MethodHandler (QueryMethod dht) (NodeAddr ip -> a -> IO b) | ||
97 | |||
98 | class DataHandlers raw dht where | ||
99 | dataHandlers :: | ||
100 | ( Ord ip , Hashable ip, Typeable ip, Serialize ip) => | ||
101 | (NodeId dht -> IO [NodeInfo dht ip ()]) | ||
102 | -> DHTData dht ip | ||
103 | -> [MethodHandler raw dht ip] | ||
104 | dataHandlers _ _ = [] | ||
105 | |||
106 | -- | Method datatype used to describe method name, parameters and | ||
107 | -- return values of procedure. Client use a method to /invoke/, server | ||
108 | -- /implements/ the method to make the actual work. | ||
109 | -- | ||
110 | -- We use the following fantom types to ensure type-safiety: | ||
111 | -- | ||
112 | -- * param: Type of method parameters. | ||
113 | -- | ||
114 | -- * result: Type of return value of the method. | ||
115 | -- | ||
116 | newtype Method dht param result = Method { methodName :: QueryMethod dht } | ||
117 | |||
118 | deriving instance Eq (QueryMethod dht) => Eq (Method dht param result) | ||
119 | deriving instance Ord (QueryMethod dht) => Ord (Method dht param result) | ||
120 | |||
121 | -- | Example: | ||
122 | -- | ||
123 | -- @show (Method \"concat\" :: [Int] Int) == \"concat :: [Int] -> Int\"@ | ||
124 | -- | ||
125 | instance (Show (QueryMethod dht), Typeable a, Typeable b) => Show (Method dht a b) where | ||
126 | showsPrec _ = showsMethod | ||
127 | |||
128 | showsMethod :: forall dht a b. ( Show (QueryMethod dht), Typeable a , Typeable b ) => Method dht a b -> ShowS | ||
129 | showsMethod (Method name) = | ||
130 | -- showString (BC.unpack name) <> | ||
131 | shows (show name) <> | ||
132 | showString " :: " <> | ||
133 | shows paramsTy <> | ||
134 | showString " -> " <> | ||
135 | shows valuesTy | ||
136 | where | ||
137 | impossible = error "KRPC.showsMethod: impossible" | ||
138 | paramsTy = typeOf (impossible :: a) | ||
139 | valuesTy = typeOf (impossible :: b) | ||
140 | |||
141 | -- | In order to perform or handle KRPC query you need to provide | ||
142 | -- corresponding 'KRPC' class. | ||
143 | -- | ||
144 | -- Example: | ||
145 | -- | ||
146 | -- @ | ||
147 | -- data Ping = Ping Text deriving BEncode | ||
148 | -- data Pong = Pong Text deriving BEncode | ||
149 | -- | ||
150 | -- instance 'KRPC' Ping Pong where | ||
151 | -- method = \"ping\" | ||
152 | -- @ | ||
153 | -- | ||
154 | class ( Typeable req, Typeable resp, Envelope dht) | ||
155 | => KRPC dht req resp | req -> resp, resp -> req where | ||
156 | |||
157 | -- | Method name. Default implementation uses lowercased @req@ | ||
158 | -- datatype name. | ||
159 | -- | ||
160 | method :: Method dht req resp | ||
161 | |||
162 | -- TODO add underscores | ||
163 | default method :: (IsString (QueryMethod dht), Typeable req) => Method dht req resp | ||
164 | method = Method $ fromString $ map toLower $ show $ typeOf hole | ||
165 | where | ||
166 | hole = error "krpc.method: impossible" :: req | ||
167 | |||
168 | |||
169 | validateExchange :: dht req -> dht resp -> Bool | ||
170 | validateExchange _ _ = True | ||
171 | |||
172 | makeQueryExtra :: DHTData dht ip -> NodeId dht -> Proxy req -> Proxy resp -> IO (QueryExtra dht) | ||
173 | makeResponseExtra :: DHTData dht ip -> NodeId dht -> req -> Proxy resp -> IO (ResponseExtra dht) | ||
174 | |||
175 | messageSender :: dht req -> Proxy resp -> NodeId dht | ||
176 | messageResponder :: Proxy req -> dht resp -> NodeId dht | ||
diff --git a/src/Network/DatagramServer.hs b/src/Network/DatagramServer.hs deleted file mode 100644 index 891417d4..00000000 --- a/src/Network/DatagramServer.hs +++ /dev/null | |||
@@ -1,608 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013, 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module provides safe remote procedure call. One important | ||
9 | -- point is exceptions and errors, so to be able handle them | ||
10 | -- properly we need to investigate a bit about how this all works. | ||
11 | -- Internally, in order to make method invokation KRPC makes the | ||
12 | -- following steps: | ||
13 | -- | ||
14 | -- * Caller serialize arguments to bencoded bytestrings; | ||
15 | -- | ||
16 | -- * Caller send bytestring data over UDP to the callee; | ||
17 | -- | ||
18 | -- * Callee receive and decode arguments to the method and method | ||
19 | -- name. If it can't decode then it send 'ProtocolError' back to the | ||
20 | -- caller; | ||
21 | -- | ||
22 | -- * Callee search for the @method name@ in the method table. | ||
23 | -- If it not present in the table then callee send 'MethodUnknown' | ||
24 | -- back to the caller; | ||
25 | -- | ||
26 | -- * Callee check if argument names match. If not it send | ||
27 | -- 'ProtocolError' back; | ||
28 | -- | ||
29 | -- * Callee make the actuall call to the plain old haskell | ||
30 | -- function. If the function throw exception then callee send | ||
31 | -- 'ServerError' back. | ||
32 | -- | ||
33 | -- * Callee serialize result of the function to bencoded bytestring. | ||
34 | -- | ||
35 | -- * Callee encode result to bencoded bytestring and send it back | ||
36 | -- to the caller. | ||
37 | -- | ||
38 | -- * Caller check if return values names match with the signature | ||
39 | -- it called in the first step. | ||
40 | -- | ||
41 | -- * Caller extracts results and finally return results of the | ||
42 | -- procedure call as ordinary haskell values. | ||
43 | -- | ||
44 | -- If every other error occurred then caller get the | ||
45 | -- 'GenericError'. All errors returned by callee are throwed as | ||
46 | -- ordinary haskell exceptions at caller side. Also note that both | ||
47 | -- caller and callee use plain UDP, so KRPC is unreliable. | ||
48 | -- | ||
49 | -- For async 'query' use @async@ package. | ||
50 | -- | ||
51 | -- For protocol details see "Network.DatagramServer.Mainline" module. | ||
52 | -- | ||
53 | {-# LANGUAGE CPP #-} | ||
54 | {-# LANGUAGE OverloadedStrings #-} | ||
55 | {-# LANGUAGE FlexibleInstances #-} | ||
56 | {-# LANGUAGE FlexibleContexts #-} | ||
57 | {-# LANGUAGE ScopedTypeVariables #-} | ||
58 | {-# LANGUAGE DefaultSignatures #-} | ||
59 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
60 | {-# LANGUAGE FunctionalDependencies #-} | ||
61 | {-# LANGUAGE DeriveDataTypeable #-} | ||
62 | {-# LANGUAGE TemplateHaskell #-} | ||
63 | {-# LANGUAGE KindSignatures #-} | ||
64 | module Network.DatagramServer | ||
65 | ( | ||
66 | -- ** Query | ||
67 | QueryFailure (..) | ||
68 | , query | ||
69 | , query' | ||
70 | , queryRaw | ||
71 | , getQueryCount | ||
72 | |||
73 | -- ** Handler | ||
74 | , HandlerFailure (..) | ||
75 | , Handler | ||
76 | , handler | ||
77 | |||
78 | -- * Manager | ||
79 | , Options (..) | ||
80 | , def | ||
81 | , Manager | ||
82 | , newManager | ||
83 | , closeManager | ||
84 | -- , withManager | ||
85 | , isActive | ||
86 | , listen | ||
87 | , Protocol(..) | ||
88 | |||
89 | -- * Re-exports | ||
90 | , ErrorCode (..) | ||
91 | , SockAddr (..) | ||
92 | ) where | ||
93 | |||
94 | import Data.Default.Class | ||
95 | import Network.Socket (SockAddr (..)) | ||
96 | |||
97 | import Control.Applicative | ||
98 | #ifdef THREAD_DEBUG | ||
99 | import Control.Concurrent.Lifted.Instrument | ||
100 | #else | ||
101 | import GHC.Conc (labelThread) | ||
102 | import Control.Concurrent.Lifted | ||
103 | #endif | ||
104 | import Control.Exception hiding (Handler) | ||
105 | import qualified Control.Exception.Lifted as E (Handler (..)) | ||
106 | import Control.Exception.Lifted as Lifted (catches, finally) | ||
107 | import Control.Monad | ||
108 | import Control.Monad.Logger | ||
109 | import Control.Monad.Reader | ||
110 | import Control.Monad.Trans.Control | ||
111 | import qualified Data.ByteString.Base16 as Base16 | ||
112 | import Data.ByteString as BS | ||
113 | import Data.ByteString.Char8 as BC | ||
114 | import Data.ByteString.Lazy as BL | ||
115 | import Data.Default.Class | ||
116 | import Data.IORef | ||
117 | import Data.List as L | ||
118 | import Data.Map as M | ||
119 | import Data.Monoid | ||
120 | import Data.Serialize as S | ||
121 | import Data.Text as T | ||
122 | import Data.Text.Encoding as T | ||
123 | import Data.Tuple | ||
124 | import Data.Typeable | ||
125 | import Network.DatagramServer.Types | ||
126 | import Network.Socket hiding (listen) | ||
127 | import Network.Socket.ByteString as BS | ||
128 | import System.IO.Error | ||
129 | import System.Timeout | ||
130 | import Network.KRPC.Method | ||
131 | |||
132 | |||
133 | {----------------------------------------------------------------------- | ||
134 | -- Options | ||
135 | -----------------------------------------------------------------------} | ||
136 | |||
137 | -- | RPC manager options. | ||
138 | data Options = Options | ||
139 | { -- | Initial 'TransactionId' incremented with each 'query'; | ||
140 | optSeedTransaction :: {-# UNPACK #-} !Int | ||
141 | |||
142 | -- | Time to wait for response from remote node, in seconds. | ||
143 | , optQueryTimeout :: {-# UNPACK #-} !Int | ||
144 | |||
145 | -- | Maximum number of bytes to receive. | ||
146 | , optMaxMsgSize :: {-# UNPACK #-} !Int | ||
147 | |||
148 | } deriving (Show, Eq) | ||
149 | |||
150 | defaultSeedTransaction :: Int | ||
151 | defaultSeedTransaction = 0 | ||
152 | |||
153 | defaultQueryTimeout :: Int | ||
154 | defaultQueryTimeout = 120 | ||
155 | |||
156 | defaultMaxMsgSize :: Int | ||
157 | defaultMaxMsgSize = 64 * 1024 | ||
158 | |||
159 | -- | Permissive defaults. | ||
160 | instance Default Options where | ||
161 | def = Options | ||
162 | { optSeedTransaction = defaultSeedTransaction | ||
163 | , optQueryTimeout = defaultQueryTimeout | ||
164 | , optMaxMsgSize = defaultMaxMsgSize | ||
165 | } | ||
166 | |||
167 | validateOptions :: Options -> IO () | ||
168 | validateOptions Options {..} | ||
169 | | optQueryTimeout < 1 | ||
170 | = throwIO (userError "krpc: non-positive query timeout") | ||
171 | | optMaxMsgSize < 1 | ||
172 | = throwIO (userError "krpc: non-positive buffer size") | ||
173 | | otherwise = return () | ||
174 | |||
175 | {----------------------------------------------------------------------- | ||
176 | -- Options | ||
177 | -----------------------------------------------------------------------} | ||
178 | |||
179 | type KResult msg raw = Either (KError (TransactionID msg)) (msg raw)-- Response | ||
180 | |||
181 | type TransactionCounter = IORef Int | ||
182 | type CallId msg = (TransactionID msg, SockAddr) | ||
183 | type CallRes msg raw = MVar (raw, KResult msg raw) -- (raw response, decoded response) | ||
184 | type PendingCalls msg raw = IORef (Map (CallId msg) (CallRes msg raw)) | ||
185 | |||
186 | type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v)) | ||
187 | |||
188 | -- | Handler is a function which will be invoked then some /remote/ | ||
189 | -- node querying /this/ node. | ||
190 | type Handler h msg v = (QueryMethod msg, HandlerBody h msg v) | ||
191 | |||
192 | -- | Keep track pending queries made by /this/ node and handle queries | ||
193 | -- made by /remote/ nodes. | ||
194 | data Manager raw msg = Manager | ||
195 | { sock :: !Socket | ||
196 | , options :: !Options | ||
197 | , listenerThread :: !(MVar ThreadId) | ||
198 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter | ||
199 | , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw) | ||
200 | -- , handlers :: [Handler h msg raw] -- TODO delete this, it's not used | ||
201 | , logMsg :: Char -> String -> T.Text -> IO () | ||
202 | , serverState :: PacketDestination msg -> IO (NodeId msg, CipherContext raw msg) | ||
203 | } | ||
204 | |||
205 | sockAddrFamily :: SockAddr -> Family | ||
206 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
207 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
208 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
209 | sockAddrFamily (SockAddrCan _ ) = AF_CAN | ||
210 | |||
211 | -- | Bind socket to the specified address. To enable query handling | ||
212 | -- run 'listen'. | ||
213 | newManager :: Options -- ^ various protocol options; | ||
214 | -> (Char -> String -> T.Text -> IO ()) -- ^ loging function | ||
215 | -> SockAddr -- ^ address to listen on; | ||
216 | -> ( PacketDestination msg -> IO (NodeId msg, CipherContext raw msg) ) | ||
217 | -> [Handler h msg raw] -- ^ handlers to run on incoming queries. | ||
218 | -> IO (Manager raw msg) -- ^ new rpc manager. | ||
219 | newManager opts @ Options {..} logmsg servAddr getst handlers = do | ||
220 | validateOptions opts | ||
221 | sock <- bindServ | ||
222 | tref <- newEmptyMVar | ||
223 | tran <- newIORef optSeedTransaction | ||
224 | calls <- newIORef M.empty | ||
225 | return $ Manager sock opts tref tran calls logmsg getst | ||
226 | where | ||
227 | bindServ = do | ||
228 | let family = sockAddrFamily servAddr | ||
229 | sock <- socket family Datagram defaultProtocol | ||
230 | when (family == AF_INET6) $ do | ||
231 | setSocketOption sock IPv6Only 0 | ||
232 | bindSocket sock servAddr | ||
233 | return sock | ||
234 | |||
235 | -- | Unblock all pending calls and close socket. | ||
236 | closeManager :: Manager raw msg -> IO () | ||
237 | closeManager Manager {..} = do | ||
238 | maybe (return ()) killThread =<< tryTakeMVar listenerThread | ||
239 | -- TODO unblock calls | ||
240 | close sock | ||
241 | |||
242 | -- | Check if the manager is still active. Manager becomes active | ||
243 | -- until 'closeManager' called. | ||
244 | isActive :: Manager raw msg -> IO Bool | ||
245 | isActive Manager {..} = liftIO $ isBound sock | ||
246 | {-# INLINE isActive #-} | ||
247 | |||
248 | #if 0 | ||
249 | -- | Normally you should use Control.Monad.Trans.Resource.allocate | ||
250 | -- function. | ||
251 | withManager :: Options -> SockAddr -> [Handler h msg raw] | ||
252 | -> (Manager raw msg -> IO a) -> IO a | ||
253 | withManager opts addr hs = bracket (newManager opts addr hs) closeManager | ||
254 | #endif | ||
255 | |||
256 | {----------------------------------------------------------------------- | ||
257 | -- Logging | ||
258 | -----------------------------------------------------------------------} | ||
259 | |||
260 | -- TODO prettify log messages | ||
261 | querySignature :: ( Show ( QueryMethod msg ) | ||
262 | , Serialize ( TransactionID msg ) ) | ||
263 | => QueryMethod msg -> TransactionID msg -> SockAddr -> Text | ||
264 | querySignature name transaction addr = T.concat | ||
265 | [ "&", T.pack (show name) | ||
266 | , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) | ||
267 | , " @", T.pack (show addr) | ||
268 | ] | ||
269 | |||
270 | {----------------------------------------------------------------------- | ||
271 | -- Client | ||
272 | -----------------------------------------------------------------------} | ||
273 | -- we don't need to know about TransactionId while performing query, | ||
274 | -- so we introduce QueryFailure exceptions | ||
275 | |||
276 | -- | Used to signal 'query' errors. | ||
277 | data QueryFailure | ||
278 | = SendFailed -- ^ unable to send query; | ||
279 | | QueryFailed ErrorCode Text -- ^ remote node return error; | ||
280 | | TimeoutExpired -- ^ remote node not responding. | ||
281 | deriving (Show, Eq, Typeable) | ||
282 | |||
283 | instance Exception QueryFailure | ||
284 | |||
285 | sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () | ||
286 | sendMessage sock addr a = do | ||
287 | liftIO $ sendManyTo sock [a] addr | ||
288 | |||
289 | genTransactionId :: Envelope msg => TransactionCounter -> IO (TransactionID msg) | ||
290 | genTransactionId ref = do | ||
291 | cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) | ||
292 | uniqueTransactionId cur | ||
293 | |||
294 | -- | How many times 'query' call have been performed. | ||
295 | getQueryCount :: Manager raw msg -> IO Int | ||
296 | getQueryCount mgr@Manager{..} = do | ||
297 | curTrans <- readIORef transactionCounter | ||
298 | return $ curTrans - optSeedTransaction options | ||
299 | |||
300 | registerQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (CallRes msg raw) | ||
301 | registerQuery cid ref = do | ||
302 | ares <- newEmptyMVar | ||
303 | atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) | ||
304 | return ares | ||
305 | |||
306 | -- simultaneous M.lookup and M.delete guarantees that we never get two | ||
307 | -- or more responses to the same query | ||
308 | unregisterQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (Maybe (CallRes msg raw)) | ||
309 | unregisterQuery cid ref = do | ||
310 | atomicModifyIORef' ref $ swap . | ||
311 | M.updateLookupWithKey (const (const Nothing)) cid | ||
312 | |||
313 | |||
314 | -- (sendmsg EINVAL) | ||
315 | sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO () | ||
316 | sendQuery sock addr q = handle sockError $ sendMessage sock addr q | ||
317 | where | ||
318 | sockError :: IOError -> IO () | ||
319 | sockError _ = throwIO SendFailed | ||
320 | |||
321 | -- | Enqueue query to the given node. | ||
322 | -- | ||
323 | -- This function should throw 'QueryFailure' exception if quered node | ||
324 | -- respond with @error@ message or the query timeout expires. | ||
325 | -- | ||
326 | query :: forall h a b raw msg. | ||
327 | ( SerializableTo raw b | ||
328 | , Show (QueryMethod msg) | ||
329 | , Ord (TransactionID msg) | ||
330 | , Serialize (TransactionID msg) | ||
331 | , SerializableTo raw a | ||
332 | , WireFormat raw msg | ||
333 | , KRPC msg a b | ||
334 | ) => Manager raw msg -> PacketDestination msg -> a -> IO b | ||
335 | query mgr addr params = queryK mgr addr params (\_ x _ _ -> x) | ||
336 | |||
337 | -- | Like 'query' but possibly returns your externally routable IP address. | ||
338 | query' :: forall h a b raw msg. | ||
339 | ( SerializableTo raw b | ||
340 | , Show (QueryMethod msg) | ||
341 | , Ord (TransactionID msg) | ||
342 | , Serialize (TransactionID msg) | ||
343 | , SerializableTo raw a , WireFormat raw msg | ||
344 | , KRPC msg a b | ||
345 | ) => Manager raw msg -> PacketDestination msg -> a -> IO (b , NodeId msg, Maybe ReflectedIP) | ||
346 | query' mgr addr params = queryK mgr addr params (\_ b nid ip -> (b,nid,ip)) | ||
347 | |||
348 | -- | Enqueue a query, but give us the complete BEncoded content sent by the | ||
349 | -- remote Node. This is useful for handling extensions that this library does | ||
350 | -- not otherwise support. | ||
351 | queryRaw :: forall h a b raw msg. | ||
352 | ( SerializableTo raw b | ||
353 | , Show (QueryMethod msg) | ||
354 | , Ord (TransactionID msg) | ||
355 | , Serialize (TransactionID msg) | ||
356 | , SerializableTo raw a | ||
357 | , WireFormat raw msg | ||
358 | , KRPC msg a b | ||
359 | ) => Manager raw msg -> PacketDestination msg -> a -> IO (b , raw) | ||
360 | queryRaw mgr addr params = queryK mgr addr params (\raw x _ _ -> (x,raw)) | ||
361 | |||
362 | queryK :: forall h a b x raw msg. | ||
363 | ( SerializableTo raw b | ||
364 | , SerializableTo raw a | ||
365 | , WireFormat raw msg | ||
366 | , Show (QueryMethod msg) | ||
367 | , Ord (TransactionID msg) | ||
368 | , Serialize (TransactionID msg) | ||
369 | , KRPC msg a b | ||
370 | ) => | ||
371 | Manager raw msg -> PacketDestination msg -> a -> (raw -> b -> NodeId msg -> Maybe ReflectedIP -> x) -> IO x | ||
372 | queryK mgr@Manager{..} dest params kont = do | ||
373 | tid <- liftIO $ genTransactionId transactionCounter | ||
374 | let addr = toSockAddr dest | ||
375 | Method meth = method :: Method msg a b | ||
376 | signature = querySignature meth tid addr | ||
377 | logMsg 'D' "query.sending" signature | ||
378 | -- [Debug#query.sending] &MessageType 0 #312020202020202020202020202020202020202020202020 @77.37.142.179:33445 | ||
379 | |||
380 | mres <- liftIO $ do | ||
381 | ares <- registerQuery (tid, addr) pendingCalls | ||
382 | |||
383 | (cli,ctx) <- serverState dest | ||
384 | q <- buildQuery cli addr meth tid params | ||
385 | let qb = encodePayload (q :: msg a) :: msg raw | ||
386 | qbs = encodeHeaders ctx qb dest | ||
387 | sendQuery sock addr qbs | ||
388 | `onException` unregisterQuery (tid, addr) pendingCalls | ||
389 | |||
390 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do | ||
391 | fix $ \loop -> do | ||
392 | (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) | ||
393 | case res of | ||
394 | Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) | ||
395 | Right m -> case decodePayload m of | ||
396 | Right r -> case envelopeClass (r :: msg b) of | ||
397 | Response reflectedAddr | ||
398 | | validateExchange q r -> do | ||
399 | let remoteId = messageResponder (Proxy :: Proxy a) r | ||
400 | return $ kont raw (envelopePayload r) remoteId reflectedAddr | ||
401 | | otherwise -> loop | ||
402 | Error (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) -- XXX neccessary? | ||
403 | Query _ -> throwIO $ QueryFailed ProtocolError "BUG!! UNREACHABLE" | ||
404 | Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) | ||
405 | |||
406 | case mres of | ||
407 | Just res -> do | ||
408 | logMsg 'D' "query.responded" $ signature | ||
409 | return res | ||
410 | |||
411 | Nothing -> do | ||
412 | _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls | ||
413 | logMsg 'W' "query.not_responding" $ signature <> " for " <> | ||
414 | T.pack (show (optQueryTimeout options)) <> " seconds" | ||
415 | throw $ TimeoutExpired | ||
416 | |||
417 | {----------------------------------------------------------------------- | ||
418 | -- Handlers | ||
419 | -----------------------------------------------------------------------} | ||
420 | -- we already throw: | ||
421 | -- | ||
422 | -- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; | ||
423 | -- | ||
424 | -- * ErrorCode(ServerError) in the 'runHandler'; | ||
425 | -- | ||
426 | -- * ErrorCode(GenericError) in the 'runHandler' (those can be | ||
427 | -- async exception too) | ||
428 | -- | ||
429 | -- so HandlerFailure should cover *only* 'ProtocolError's. | ||
430 | |||
431 | -- | Used to signal protocol errors. | ||
432 | data HandlerFailure | ||
433 | = BadAddress -- ^ for e.g.: node calls herself; | ||
434 | | InvalidParameter Text -- ^ for e.g.: bad session token. | ||
435 | deriving (Show, Eq, Typeable) | ||
436 | |||
437 | instance Exception HandlerFailure | ||
438 | |||
439 | prettyHF :: HandlerFailure -> BS.ByteString | ||
440 | prettyHF BadAddress = T.encodeUtf8 "bad address" | ||
441 | prettyHF (InvalidParameter reason) = T.encodeUtf8 $ | ||
442 | "invalid parameter: " <> reason | ||
443 | |||
444 | prettyQF :: QueryFailure -> BS.ByteString | ||
445 | prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " | ||
446 | <> T.pack (show e) | ||
447 | |||
448 | -- | Make handler from handler function. Any thrown exception will be | ||
449 | -- supressed and send over the wire back to the querying node. | ||
450 | -- | ||
451 | -- If the handler make some 'query' normally it /should/ handle | ||
452 | -- corresponding 'QueryFailure's. | ||
453 | -- | ||
454 | handler :: forall h a b msg raw. (Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b) | ||
455 | => (SockAddr -> h (NodeId msg)) -> QueryMethod msg -> (SockAddr -> msg a -> h b) -> Handler h msg raw | ||
456 | handler whoami name body = (name, wrapper) | ||
457 | where | ||
458 | wrapper :: SockAddr -> msg raw -> h (Either String (msg raw)) | ||
459 | wrapper addr args = | ||
460 | case decodePayload args of | ||
461 | Left e -> pure $ Left e | ||
462 | Right a -> do | ||
463 | (\me bs -> Right $ encodePayload $ buildReply me addr args bs) <$> whoami addr <*> body addr a | ||
464 | |||
465 | runHandler :: ( Envelope msg | ||
466 | , Show (QueryMethod msg) | ||
467 | , Serialize (TransactionID msg)) | ||
468 | => Manager raw msg -> QueryMethod msg -> HandlerBody IO msg raw -> SockAddr -> msg raw -> IO (KResult msg raw) | ||
469 | runHandler mgr@Manager{..} meth h addr m = Lifted.catches wrapper failbacks | ||
470 | where | ||
471 | signature = querySignature meth (envelopeTransaction m) addr | ||
472 | |||
473 | wrapper = do | ||
474 | logMsg 'D' "handler.quered" signature | ||
475 | result <- h addr m | ||
476 | |||
477 | case result of | ||
478 | Left msg -> do | ||
479 | logMsg 'D' "handler.bad_query" $ signature <> " !" <> T.pack msg | ||
480 | return $ Left $ KError ProtocolError (BC.pack msg) (envelopeTransaction m) | ||
481 | |||
482 | Right a -> do -- KQueryArgs | ||
483 | logMsg 'D' "handler.success" signature | ||
484 | return $ Right a | ||
485 | |||
486 | failbacks = | ||
487 | [ E.Handler $ \ (e :: HandlerFailure) -> do | ||
488 | logMsg 'D' "handler.HandlerFailure" signature | ||
489 | return $ Left $ KError ProtocolError (prettyHF e) (envelopeTransaction m) | ||
490 | |||
491 | |||
492 | -- may happen if handler makes query and fail | ||
493 | , E.Handler $ \ (e :: QueryFailure) -> do | ||
494 | logMsg 'D' "handler.QueryFailure" signature | ||
495 | return $ Left $ KError ServerError (prettyQF e) (envelopeTransaction m) | ||
496 | |||
497 | -- since handler thread exit after sendMessage we can safely | ||
498 | -- suppress async exception here | ||
499 | , E.Handler $ \ (e :: SomeException) -> do | ||
500 | logMsg 'D' "handler.SomeException" (signature <> T.pack (" "++show e)) | ||
501 | return $ Left $ KError GenericError (BC.pack (show e)) (envelopeTransaction m) | ||
502 | ] | ||
503 | |||
504 | dispatchHandler :: ( Eq (QueryMethod msg) | ||
505 | , Show (QueryMethod msg) | ||
506 | , Serialize (TransactionID msg) | ||
507 | , Envelope msg | ||
508 | ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> msg raw -> SockAddr -> IO (KResult msg raw) | ||
509 | dispatchHandler mgr handlers meth q addr = do | ||
510 | case L.lookup meth handlers of | ||
511 | Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q) | ||
512 | Just h -> runHandler mgr meth h addr q | ||
513 | |||
514 | {----------------------------------------------------------------------- | ||
515 | -- Listener | ||
516 | -----------------------------------------------------------------------} | ||
517 | |||
518 | -- TODO bound amount of parallel handler *threads*: | ||
519 | -- | ||
520 | -- peer A flooding with find_node | ||
521 | -- peer B trying to ping peer C | ||
522 | -- peer B fork too many threads | ||
523 | -- ... space leak | ||
524 | -- | ||
525 | handleQuery :: ( WireFormat raw msg | ||
526 | , Eq (QueryMethod msg) | ||
527 | , Show (QueryMethod msg) | ||
528 | , Serialize (TransactionID msg) | ||
529 | ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> raw -> msg raw -> SockAddr -> IO () | ||
530 | handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do | ||
531 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" | ||
532 | res <- dispatchHandler mgr hs meth q addr | ||
533 | (me,ctx) <- serverState (error "TODO TOX ToxCipherContext 2 or () for Mainline") | ||
534 | let res' = either buildError Just res | ||
535 | dest = makeAddress (Right q) addr | ||
536 | resbs = fmap (\raw -> encodeHeaders ctx raw dest) res' :: Maybe BS.ByteString | ||
537 | -- TODO: Generalize this debug print. | ||
538 | -- resbe = either toBEncode toBEncode res | ||
539 | -- .(logOther "q") \$ T.unlines | ||
540 | -- [ either (const "<unicode-fail>") id \$ T.decodeUtf8' (BL.toStrict $ showBEncode raw) | ||
541 | -- , "==>" | ||
542 | -- , either (const "<unicode-fail>") id \$ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) | ||
543 | -- ] | ||
544 | maybe (return ()) (sendMessage sock addr) resbs | ||
545 | |||
546 | handleResponse :: ( Ord (TransactionID msg) | ||
547 | , Envelope msg | ||
548 | ) => Manager raw msg -> raw -> KResult msg raw -> SockAddr -> IO () | ||
549 | handleResponse mgr@Manager{..} raw result addr = do | ||
550 | liftIO $ do | ||
551 | let resultId = either errorId envelopeTransaction result | ||
552 | mcall <- unregisterQuery (resultId, addr) pendingCalls | ||
553 | case mcall of | ||
554 | Nothing -> return () | ||
555 | Just ares -> putMVar ares (raw,result) | ||
556 | |||
557 | data Protocol raw (msg :: * -> *) = Protocol { rawProxy :: !(Proxy raw) | ||
558 | , msgProxy :: !(Proxy msg) | ||
559 | } | ||
560 | |||
561 | listener :: forall raw msg. | ||
562 | ( WireFormat raw msg | ||
563 | , Ord (TransactionID msg) | ||
564 | , Eq (QueryMethod msg) | ||
565 | , Show (QueryMethod msg) | ||
566 | , Serialize (TransactionID msg) | ||
567 | ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () | ||
568 | listener mgr@Manager{..} hs p = do | ||
569 | fix $ \again -> do | ||
570 | (me, ctx) <- serverState (error "TODO TOX ToxCipherContext or () for Mainline") | ||
571 | (bs, addr) <- liftIO $ do | ||
572 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) | ||
573 | case parsePacket (msgProxy p) bs >>= \r -> (,) r <$> decodeHeaders ctx r of | ||
574 | Left e -> do | ||
575 | -- XXX: Send parse failure message? | ||
576 | -- liftIO \$ sendMessage sock addr $ encodeHeaders ctx (unknownMessage e) | ||
577 | logMsg 'W' "listener" (T.pack $ show e) | ||
578 | return () -- Without transaction id, error message isn't very useful. | ||
579 | Right (raw,m) -> | ||
580 | case envelopeClass m of | ||
581 | Query meth -> handleQuery mgr hs meth (raw::raw) m addr | ||
582 | Response _ -> handleResponse mgr raw (Right m) addr | ||
583 | Error e -> handleResponse mgr raw (Left e) addr | ||
584 | |||
585 | again | ||
586 | where | ||
587 | exceptions :: IOError -> IO (BS.ByteString, SockAddr) | ||
588 | exceptions e | ||
589 | -- packets with empty payload may trigger eof exception | ||
590 | | isEOFError e = return ("", SockAddrInet 0 0) | ||
591 | | otherwise = throwIO e | ||
592 | |||
593 | -- | Should be run before any 'query', otherwise they will never | ||
594 | -- succeed. | ||
595 | listen :: forall raw msg. | ||
596 | ( WireFormat raw msg | ||
597 | , Ord (TransactionID msg) | ||
598 | , Eq (QueryMethod msg) | ||
599 | , Show (QueryMethod msg) | ||
600 | , Serialize (TransactionID msg) | ||
601 | , Typeable msg | ||
602 | ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () | ||
603 | listen mgr@Manager{..} hs p = do | ||
604 | tid <- fork $ do | ||
605 | myThreadId >>= liftIO . flip labelThread ("KRPC.listen." ++ (L.last $ L.words $ show $ typeOf (Proxy :: Proxy msg))) | ||
606 | listener mgr hs p `Lifted.finally` | ||
607 | liftIO (takeMVar listenerThread) | ||
608 | liftIO $ putMVar listenerThread tid | ||
diff --git a/src/Network/DatagramServer/Error.hs b/src/Network/DatagramServer/Error.hs deleted file mode 100644 index 77b132a7..00000000 --- a/src/Network/DatagramServer/Error.hs +++ /dev/null | |||
@@ -1,68 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE DeriveDataTypeable #-} | ||
3 | module Network.DatagramServer.Error where | ||
4 | |||
5 | import Control.Exception.Lifted as Lifted | ||
6 | import Data.ByteString (ByteString) | ||
7 | import Data.ByteString.Char8 as Char8 | ||
8 | import Data.Data | ||
9 | import Data.Default | ||
10 | import Data.Typeable | ||
11 | |||
12 | {----------------------------------------------------------------------- | ||
13 | -- Error messages | ||
14 | -----------------------------------------------------------------------} | ||
15 | |||
16 | -- | Types of RPC errors. | ||
17 | data ErrorCode | ||
18 | -- | Some error doesn't fit in any other category. | ||
19 | = GenericError | ||
20 | |||
21 | -- | Occur when server fail to process procedure call. | ||
22 | | ServerError | ||
23 | |||
24 | -- | Malformed packet, invalid arguments or bad token. | ||
25 | | ProtocolError | ||
26 | |||
27 | -- | Occur when client trying to call method server don't know. | ||
28 | | MethodUnknown | ||
29 | deriving (Show, Read, Eq, Ord, Bounded, Typeable, Data) | ||
30 | |||
31 | -- | According to the table: | ||
32 | -- <http://bittorrent.org/beps/bep_0005.html#errors> | ||
33 | instance Enum ErrorCode where | ||
34 | fromEnum GenericError = 201 | ||
35 | fromEnum ServerError = 202 | ||
36 | fromEnum ProtocolError = 203 | ||
37 | fromEnum MethodUnknown = 204 | ||
38 | {-# INLINE fromEnum #-} | ||
39 | |||
40 | toEnum 201 = GenericError | ||
41 | toEnum 202 = ServerError | ||
42 | toEnum 203 = ProtocolError | ||
43 | toEnum 204 = MethodUnknown | ||
44 | toEnum _ = GenericError | ||
45 | {-# INLINE toEnum #-} | ||
46 | |||
47 | -- | Errors are sent when a query cannot be fulfilled. Error message | ||
48 | -- can be send only from server to client but not in the opposite | ||
49 | -- direction. | ||
50 | -- | ||
51 | data KError tid = KError | ||
52 | { errorCode :: !ErrorCode -- ^ the type of error; | ||
53 | , errorMessage :: !ByteString -- ^ human-readable text message; | ||
54 | , errorId :: !tid -- ^ match to the corresponding 'queryId'. | ||
55 | } deriving ( Show, Eq, Ord, Typeable, Data, Read ) | ||
56 | |||
57 | instance (Typeable tid, Show tid) => Exception (KError tid) | ||
58 | |||
59 | -- | Received 'queryArgs' or 'respVals' can not be decoded. | ||
60 | decodeError :: String -> tid -> KError tid | ||
61 | decodeError msg = KError ProtocolError (Char8.pack msg) | ||
62 | |||
63 | -- | A remote node has send some 'KMessage' this node is unable to | ||
64 | -- decode. | ||
65 | unknownMessage :: Default tid => String -> KError tid | ||
66 | unknownMessage msg = KError ProtocolError (Char8.pack msg) def | ||
67 | |||
68 | |||
diff --git a/src/Network/DatagramServer/Mainline.hs b/src/Network/DatagramServer/Mainline.hs deleted file mode 100644 index fea64ee6..00000000 --- a/src/Network/DatagramServer/Mainline.hs +++ /dev/null | |||
@@ -1,404 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013, 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- KRPC messages types used in communication. All messages are | ||
9 | -- encoded as bencode dictionary. | ||
10 | -- | ||
11 | -- Normally, you don't need to import this module. | ||
12 | -- | ||
13 | -- See <http://www.bittorrent.org/beps/bep_0005.html#krpc-protocol> | ||
14 | -- | ||
15 | {-# LANGUAGE CPP #-} | ||
16 | {-# LANGUAGE DefaultSignatures #-} | ||
17 | {-# LANGUAGE DeriveDataTypeable #-} | ||
18 | {-# LANGUAGE DeriveFunctor #-} | ||
19 | {-# LANGUAGE DeriveTraversable #-} | ||
20 | {-# LANGUAGE FlexibleContexts #-} | ||
21 | {-# LANGUAGE FlexibleInstances #-} | ||
22 | {-# LANGUAGE FunctionalDependencies #-} | ||
23 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
24 | {-# LANGUAGE OverloadedStrings #-} | ||
25 | {-# LANGUAGE TypeSynonymInstances #-} | ||
26 | {-# LANGUAGE TypeFamilies #-} | ||
27 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
28 | module Network.DatagramServer.Mainline | ||
29 | ( -- * Transaction | ||
30 | TransactionId | ||
31 | |||
32 | -- * Error | ||
33 | , ErrorCode (..) | ||
34 | , KError(..) | ||
35 | , decodeError | ||
36 | , unknownMessage | ||
37 | |||
38 | -- * Query | ||
39 | , KQueryOf(..) | ||
40 | , KQuery | ||
41 | , MethodName | ||
42 | |||
43 | -- * Response | ||
44 | , KResponseOf(..) | ||
45 | , KResponse | ||
46 | , ReflectedIP(..) | ||
47 | |||
48 | -- * Message | ||
49 | , KMessageOf (..) | ||
50 | , KMessage | ||
51 | , KQueryArgs | ||
52 | , QueryExtra(..) | ||
53 | , ResponseExtra(..) | ||
54 | , PacketDestination(..) | ||
55 | |||
56 | , NodeId(..) | ||
57 | , nodeIdSize | ||
58 | |||
59 | ) where | ||
60 | |||
61 | import Control.Applicative | ||
62 | import Control.Arrow | ||
63 | import Control.Exception.Lifted as Lifted | ||
64 | import Data.BEncode as BE | ||
65 | import Network.DatagramServer.Types | ||
66 | import Data.Bits | ||
67 | import Data.ByteString.Base16 as Base16 | ||
68 | import Data.ByteString (ByteString) | ||
69 | import qualified Data.ByteString as BS | ||
70 | import qualified Data.ByteString.Char8 as Char8 | ||
71 | import qualified Data.ByteString.Lazy as L | ||
72 | import Data.Default | ||
73 | import Data.LargeWord | ||
74 | import Data.Monoid | ||
75 | import qualified Data.Serialize as S | ||
76 | import Data.Serialize (Serialize, get, put, remaining, getBytes, putByteString) | ||
77 | import Data.String | ||
78 | import Data.Word | ||
79 | import Data.Typeable | ||
80 | import Network.Socket (SockAddr (..),PortNumber,HostAddress) | ||
81 | import Text.PrettyPrint as PP hiding ((<>)) | ||
82 | import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) | ||
83 | import Data.Hashable | ||
84 | |||
85 | |||
86 | -- | This transaction ID is generated by the querying node and is | ||
87 | -- echoed in the response, so responses may be correlated with | ||
88 | -- multiple queries to the same node. The transaction ID should be | ||
89 | -- encoded as a short string of binary numbers, typically 2 characters | ||
90 | -- are enough as they cover 2^16 outstanding queries. | ||
91 | type TransactionId = TransactionID KMessageOf | ||
92 | |||
93 | {----------------------------------------------------------------------- | ||
94 | -- Query messages | ||
95 | -----------------------------------------------------------------------} | ||
96 | |||
97 | type MethodName = ByteString | ||
98 | type KQueryArgs = BValue | ||
99 | |||
100 | -- | Query used to signal that caller want to make procedure call to | ||
101 | -- callee and pass arguments in. Therefore query may be only sent from | ||
102 | -- client to server but not in the opposite direction. | ||
103 | -- | ||
104 | data KQueryOf a = KQuery | ||
105 | { queryArgs :: !a -- ^ values to be passed to method; | ||
106 | , queryMethod :: !MethodName -- ^ method to call; | ||
107 | , queryId :: !TransactionId -- ^ one-time query token. | ||
108 | } deriving ( Show, Eq, Ord, Typeable, Read, Functor, Foldable, Traversable ) | ||
109 | |||
110 | type KQuery = KQueryOf KQueryArgs | ||
111 | |||
112 | -- | Queries, or KRPC message dictionaries with a \"y\" value of | ||
113 | -- \"q\", contain two additional keys; \"q\" and \"a\". Key \"q\" has | ||
114 | -- a string value containing the method name of the query. Key \"a\" | ||
115 | -- has a dictionary value containing named arguments to the query. | ||
116 | -- | ||
117 | -- Example Query packet: | ||
118 | -- | ||
119 | -- > { "t" : "aa", "y" : "q", "q" : "ping", "a" : { "msg" : "hi!" } } | ||
120 | -- | ||
121 | instance (Typeable a, BEncode a) => BEncode (KQueryOf a) where | ||
122 | toBEncode KQuery {..} = toDict $ | ||
123 | "a" .=! queryArgs | ||
124 | .: "q" .=! queryMethod | ||
125 | .: "t" .=! queryId | ||
126 | .: "y" .=! ("q" :: ByteString) | ||
127 | .: endDict | ||
128 | {-# INLINE toBEncode #-} | ||
129 | |||
130 | fromBEncode = fromDict $ do | ||
131 | lookAhead $ match "y" (BString "q") | ||
132 | KQuery <$>! "a" <*>! "q" <*>! "t" | ||
133 | {-# INLINE fromBEncode #-} | ||
134 | |||
135 | instance BEncode ReflectedIP where | ||
136 | toBEncode (ReflectedIP addr) = BString (encodeAddr addr) | ||
137 | fromBEncode (BString bs) = ReflectedIP <$> decodeAddr bs | ||
138 | fromBEncode _ = Left "ReflectedIP should be a bencoded string" | ||
139 | |||
140 | port16 :: Word16 -> PortNumber | ||
141 | port16 = fromIntegral | ||
142 | |||
143 | decodeAddr :: ByteString -> Either String SockAddr | ||
144 | decodeAddr bs | BS.length bs == 6 | ||
145 | = ( \(a,p) -> SockAddrInet <$> fmap port16 p <*> a ) | ||
146 | $ (S.runGet S.getWord32host *** S.decode ) | ||
147 | $ BS.splitAt 4 bs | ||
148 | decodeAddr bs | BS.length bs == 18 | ||
149 | = ( \(a,p) -> flip SockAddrInet6 0 <$> fmap port16 p <*> a <*> pure 0 ) | ||
150 | $ (S.decode *** S.decode ) | ||
151 | $ BS.splitAt 16 bs | ||
152 | decodeAddr _ = Left "incorrectly sized address and port" | ||
153 | |||
154 | encodeAddr :: SockAddr -> ByteString | ||
155 | encodeAddr (SockAddrInet port addr) | ||
156 | = S.runPut (S.putWord32host addr >> S.put (fromIntegral port :: Word16)) | ||
157 | encodeAddr (SockAddrInet6 port _ addr _) | ||
158 | = S.runPut (S.put addr >> S.put (fromIntegral port :: Word16)) | ||
159 | encodeAddr _ = BS.empty | ||
160 | |||
161 | {----------------------------------------------------------------------- | ||
162 | -- Response messages | ||
163 | -----------------------------------------------------------------------} | ||
164 | |||
165 | -- | Response messages are sent upon successful completion of a | ||
166 | -- query: | ||
167 | -- | ||
168 | -- * KResponse used to signal that callee successufully process a | ||
169 | -- procedure call and to return values from procedure. | ||
170 | -- | ||
171 | -- * KResponse should not be sent if error occurred during RPC, | ||
172 | -- 'KError' should be sent instead. | ||
173 | -- | ||
174 | -- * KResponse can be only sent from server to client. | ||
175 | -- | ||
176 | data KResponseOf a = KResponse | ||
177 | { respVals :: a -- ^ 'BDict' containing return values; | ||
178 | , respId :: TransactionId -- ^ match to the corresponding 'queryId'. | ||
179 | , respIP :: Maybe ReflectedIP | ||
180 | } deriving (Show, Eq, Ord, Typeable, Functor, Foldable, Traversable) | ||
181 | |||
182 | type KResponse = KResponseOf KQueryArgs | ||
183 | |||
184 | -- | Responses, or KRPC message dictionaries with a \"y\" value of | ||
185 | -- \"r\", contain one additional key \"r\". The value of \"r\" is a | ||
186 | -- dictionary containing named return values. | ||
187 | -- | ||
188 | -- Example Response packet: | ||
189 | -- | ||
190 | -- > { "t" : "aa", "y" : "r", "r" : { "msg" : "you've sent: hi!" } } | ||
191 | -- | ||
192 | instance (Typeable a, BEncode a) => BEncode (KResponseOf a) where | ||
193 | toBEncode KResponse {..} = toDict $ | ||
194 | "ip" .=? respIP | ||
195 | .: "r" .=! respVals | ||
196 | .: "t" .=! respId | ||
197 | .: "y" .=! ("r" :: ByteString) | ||
198 | .: endDict | ||
199 | {-# INLINE toBEncode #-} | ||
200 | |||
201 | fromBEncode = fromDict $ do | ||
202 | lookAhead $ match "y" (BString "r") | ||
203 | addr <- optional (field (req "ip")) | ||
204 | (\r t -> KResponse r t addr) <$>! "r" <*>! "t" | ||
205 | {-# INLINE fromBEncode #-} | ||
206 | |||
207 | {----------------------------------------------------------------------- | ||
208 | -- Summed messages | ||
209 | -----------------------------------------------------------------------} | ||
210 | |||
211 | -- | Generic KRPC message. | ||
212 | data KMessageOf a | ||
213 | = Q (KQueryOf a) | ||
214 | | R (KResponseOf a) | ||
215 | | E (KError TransactionId) | ||
216 | deriving (Show, Eq, Functor, Foldable, Traversable) | ||
217 | |||
218 | type KMessage = KMessageOf KQueryArgs | ||
219 | |||
220 | instance BEncode KMessage where | ||
221 | toBEncode (Q q) = toBEncode q | ||
222 | toBEncode (R r) = toBEncode r | ||
223 | toBEncode (E e) = toBEncode e | ||
224 | |||
225 | fromBEncode b = | ||
226 | Q <$> fromBEncode b | ||
227 | <|> R <$> fromBEncode b | ||
228 | <|> E <$> fromBEncode b | ||
229 | <|> decodingError "KMessage: unknown message or message tag" | ||
230 | |||
231 | nodeIdSize :: Int | ||
232 | nodeIdSize = finiteBitSize (undefined :: NodeId KMessageOf) `div` 8 | ||
233 | |||
234 | instance BEncode (NodeId KMessageOf) where | ||
235 | toBEncode (NodeId w) = toBEncode $ S.encode w | ||
236 | fromBEncode bval = fromBEncode bval >>= S.decode | ||
237 | |||
238 | -- instance BEncode NodeId where TODO | ||
239 | |||
240 | instance Serialize (NodeId KMessageOf) where | ||
241 | get = NodeId <$> get | ||
242 | {-# INLINE get #-} | ||
243 | put (NodeId bs) = put bs | ||
244 | {-# INLINE put #-} | ||
245 | |||
246 | -- | ASCII encoded. | ||
247 | instance IsString (NodeId KMessageOf) where | ||
248 | fromString str | ||
249 | | length str == nodeIdSize = NodeId (either error id $ S.decode (fromString str :: ByteString)) | ||
250 | | length str == 2 * nodeIdSize = NodeId (either error id $ S.decode (fst $ Base16.decode $ fromString str)) | ||
251 | | otherwise = error "fromString: invalid NodeId length" | ||
252 | {-# INLINE fromString #-} | ||
253 | |||
254 | -- | Meaningless node id, for testing purposes only. | ||
255 | instance Default (NodeId KMessageOf) where | ||
256 | def = NodeId 0 | ||
257 | |||
258 | -- | base16 encoded. | ||
259 | instance Pretty (NodeId KMessageOf) where pPrint (NodeId nid) = encodeHexDoc nid | ||
260 | |||
261 | |||
262 | instance Serialize (TransactionID KMessageOf) where | ||
263 | get = do | ||
264 | cnt <- remaining | ||
265 | TID <$> getBytes cnt | ||
266 | |||
267 | put (TID bs) = putByteString bs | ||
268 | |||
269 | |||
270 | instance Envelope KMessageOf where | ||
271 | type QueryMethod KMessageOf = ByteString | ||
272 | |||
273 | newtype TransactionID KMessageOf = TID ByteString | ||
274 | deriving (Eq,Ord,IsString,Show,Read,BEncode) | ||
275 | |||
276 | -- | Each node has a globally unique identifier known as the \"node | ||
277 | -- ID.\" | ||
278 | -- | ||
279 | -- Normally, /this/ node id should be saved between invocations | ||
280 | -- of the client software. | ||
281 | newtype NodeId KMessageOf = NodeId Word160 | ||
282 | deriving (Show, Eq, Ord, Typeable, Bits, FiniteBits) | ||
283 | |||
284 | data QueryExtra KMessageOf = MainlineQuery | ||
285 | { queringNodeId :: NodeId KMessageOf -- ^ node id of /quering/ node; | ||
286 | , queryIsReadOnly :: Bool -- ^ node is read-only as per BEP 43 | ||
287 | } | ||
288 | deriving (Show, Eq, Ord, Typeable) | ||
289 | |||
290 | newtype ResponseExtra KMessageOf = MainlineResponse | ||
291 | { queredNodeId :: NodeId KMessageOf -- ^ node id of /quered/ node | ||
292 | } | ||
293 | deriving (Show, Eq, Ord, Typeable) | ||
294 | |||
295 | newtype PacketDestination KMessageOf = MainlineNode SockAddr | ||
296 | deriving (Show, Eq, Ord, Typeable) | ||
297 | |||
298 | envelopePayload (Q q) = queryArgs q | ||
299 | envelopePayload (R r) = respVals r | ||
300 | envelopePayload (E _) = error "TODO: messagePayload for KError" | ||
301 | |||
302 | envelopeTransaction (Q q) = queryId q | ||
303 | envelopeTransaction (R r) = respId r | ||
304 | envelopeTransaction (E e) = errorId e | ||
305 | |||
306 | envelopeClass (Q q) = Query (queryMethod q) | ||
307 | envelopeClass (R r) = Response (respIP r) | ||
308 | envelopeClass (E e) = Error e | ||
309 | |||
310 | -- replyAddress :: envelope a -> SockAddr -> PacketDestination envelope | ||
311 | makeAddress _ addr = MainlineNode addr | ||
312 | |||
313 | buildReply self addr qry response = | ||
314 | (R (KResponse response (envelopeTransaction qry) (Just $ ReflectedIP addr))) | ||
315 | |||
316 | buildQuery self addr meth tid qry = return $ Q (KQuery qry meth tid) | ||
317 | |||
318 | uniqueTransactionId cnt = return $ TID $ Char8.pack (show cnt) | ||
319 | |||
320 | fromRoutableNode = not . queryIsReadOnly | ||
321 | |||
322 | instance Hashable (PacketDestination KMessageOf) where | ||
323 | hashWithSalt s (MainlineNode sockaddr) = hashWithSalt s (show sockaddr) | ||
324 | |||
325 | -- Serialize, Pretty) PacketDestination KMessageOf = MainlineNode SockAddr | ||
326 | instance Serialize (PacketDestination KMessageOf) where | ||
327 | put (MainlineNode addr) = putSockAddr addr | ||
328 | get = MainlineNode <$> getSockAddr | ||
329 | |||
330 | instance Pretty (PacketDestination KMessageOf) where | ||
331 | pPrint (MainlineNode addr) = PP.text $ show addr | ||
332 | |||
333 | instance Address (PacketDestination KMessageOf) where | ||
334 | toSockAddr (MainlineNode addr) = addr | ||
335 | fromSockAddr addr = Just $ MainlineNode addr | ||
336 | |||
337 | instance WireFormat BValue KMessageOf where | ||
338 | type SerializableTo BValue = BEncode | ||
339 | type CipherContext BValue KMessageOf = () | ||
340 | |||
341 | parsePacket _ = BE.decode | ||
342 | |||
343 | buildError = Just . E | ||
344 | |||
345 | decodeHeaders _ = BE.fromBEncode | ||
346 | decodePayload kmsg = mapM BE.fromBEncode kmsg | ||
347 | |||
348 | encodeHeaders _ kmsg _ = L.toStrict $ BE.encode kmsg | ||
349 | encodePayload msg = fmap BE.toBEncode msg | ||
350 | |||
351 | initializeServerState _ mbid = do | ||
352 | i <- maybe genNodeId return mbid | ||
353 | return (i, ()) | ||
354 | |||
355 | -- | KRPC 'compact list' compatible encoding: contact information for | ||
356 | -- nodes is encoded as a 26-byte string. Also known as "Compact node | ||
357 | -- info" the 20-byte Node ID in network byte order has the compact | ||
358 | -- IP-address/port info concatenated to the end. | ||
359 | instance Serialize a => Serialize (NodeInfo KMessageOf a ()) where | ||
360 | get = (\a b -> NodeInfo a b ()) <$> get <*> get | ||
361 | put NodeInfo {..} = put nodeId >> put nodeAddr | ||
362 | |||
363 | |||
364 | #ifdef VERSION_bencoding | ||
365 | instance BEncode ErrorCode where | ||
366 | toBEncode = toBEncode . fromEnum | ||
367 | {-# INLINE toBEncode #-} | ||
368 | |||
369 | fromBEncode b = toEnum <$> fromBEncode b | ||
370 | {-# INLINE fromBEncode #-} | ||
371 | #endif | ||
372 | |||
373 | -- | Errors, or KRPC message dictionaries with a \"y\" value of \"e\", | ||
374 | -- contain one additional key \"e\". The value of \"e\" is a | ||
375 | -- list. The first element is an integer representing the error | ||
376 | -- code. The second element is a string containing the error | ||
377 | -- message. | ||
378 | -- | ||
379 | -- Example Error Packet: | ||
380 | -- | ||
381 | -- > { "t": "aa", "y":"e", "e":[201, "A Generic Error Ocurred"]} | ||
382 | -- | ||
383 | -- or bencoded: | ||
384 | -- | ||
385 | -- > d1:eli201e23:A Generic Error Ocurrede1:t2:aa1:y1:ee | ||
386 | -- | ||
387 | #ifdef VERSION_bencoding | ||
388 | instance (Typeable tid, BEncode tid) => BEncode (KError tid) where | ||
389 | toBEncode KError {..} = toDict $ | ||
390 | "e" .=! (errorCode, errorMessage) | ||
391 | .: "t" .=! errorId | ||
392 | .: "y" .=! ("e" :: ByteString) | ||
393 | .: endDict | ||
394 | {-# INLINE toBEncode #-} | ||
395 | |||
396 | fromBEncode = fromDict $ do | ||
397 | lookAhead $ match "y" (BString "e") | ||
398 | (code, msg) <- field (req "e") | ||
399 | KError code msg <$>! "t" | ||
400 | {-# INLINE fromBEncode #-} | ||
401 | #endif | ||
402 | |||
403 | instance Read (NodeId KMessageOf) where readsPrec d s = map (\(w,xs) -> (NodeId w, xs)) $ decodeHex s | ||
404 | |||
diff --git a/src/Network/DatagramServer/Tox.hs b/src/Network/DatagramServer/Tox.hs deleted file mode 100644 index cf39e7e1..00000000 --- a/src/Network/DatagramServer/Tox.hs +++ /dev/null | |||
@@ -1,554 +0,0 @@ | |||
1 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
2 | {-# LANGUAGE StandaloneDeriving #-} | ||
3 | {-# LANGUAGE FlexibleInstances #-} | ||
4 | {-# LANGUAGE DeriveDataTypeable #-} | ||
5 | {-# LANGUAGE DeriveFunctor #-} | ||
6 | {-# LANGUAGE DeriveGeneric #-} | ||
7 | {-# LANGUAGE DeriveTraversable #-} | ||
8 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
9 | {-# LANGUAGE PatternSynonyms #-} | ||
10 | {-# LANGUAGE RecordWildCards #-} | ||
11 | {-# LANGUAGE TupleSections #-} | ||
12 | {-# LANGUAGE TypeFamilies #-} | ||
13 | {-# LANGUAGE UnboxedTuples #-} | ||
14 | {-# LANGUAGE TemplateHaskell #-} | ||
15 | {-# LANGUAGE RankNTypes #-} | ||
16 | module Network.DatagramServer.Tox where | ||
17 | |||
18 | import Data.Bits | ||
19 | import Data.ByteString (ByteString) | ||
20 | import Data.ByteArray as BA (ByteArrayAccess,length,withByteArray) | ||
21 | import qualified Data.Serialize as S | ||
22 | -- import qualified Data.ByteString.Lazy as L | ||
23 | import qualified Data.ByteString.Char8 as Char8 | ||
24 | -- import Data.Data (Data) | ||
25 | import Data.Word | ||
26 | import Data.LargeWord | ||
27 | import Data.IP | ||
28 | import Data.Serialize | ||
29 | import Network.Address | ||
30 | import GHC.Generics (Generic) | ||
31 | import Network.Socket | ||
32 | import Network.DatagramServer.Types | ||
33 | import qualified Network.DatagramServer.Types as Envelope (NodeId) | ||
34 | import Crypto.PubKey.ECC.Types | ||
35 | import Crypto.PubKey.Curve25519 | ||
36 | import Crypto.ECC.Class | ||
37 | import qualified Crypto.Cipher.Salsa as Salsa | ||
38 | import qualified Crypto.Cipher.XSalsa as XSalsa | ||
39 | import qualified Crypto.MAC.Poly1305 as Poly1305 | ||
40 | import Data.LargeWord | ||
41 | import Foreign.Ptr | ||
42 | import Foreign.Storable | ||
43 | import Foreign.Marshal.Alloc | ||
44 | import Data.Typeable | ||
45 | import StaticAssert | ||
46 | import Crypto.Error.Types | ||
47 | import qualified Crypto.Error as Cryptonite | ||
48 | import Data.Hashable | ||
49 | import Text.PrettyPrint as PP hiding ((<>)) | ||
50 | import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) | ||
51 | import qualified Data.ByteArray as BA | ||
52 | import Data.ByteArray ( Bytes, convert ) | ||
53 | import Data.Monoid | ||
54 | import System.Endian | ||
55 | import qualified Data.ByteString.Base16 as Base16 | ||
56 | import qualified Data.ByteString.Char8 as C8 | ||
57 | import qualified Data.ByteString.Char8 as C8 | ||
58 | |||
59 | |||
60 | type Key32 = Word256 -- 32 byte key | ||
61 | type Nonce8 = Word64 -- 8 bytes | ||
62 | type Nonce24 = Word192 -- 24 bytes | ||
63 | |||
64 | |||
65 | data NodeFormat = NodeFormat | ||
66 | { nodePublicKey :: NodeId Message -- 32 byte public key | ||
67 | , nodeIsTCP :: Bool -- This has no analog in mainline NodeInfo structure | ||
68 | , nodeIP :: IP -- IPv4 or IPv6 address | ||
69 | , nodePort :: PortNumber | ||
70 | } | ||
71 | deriving (Eq, Ord, Show) | ||
72 | |||
73 | encodeFamily :: (Family, SocketType) -> Word8 | ||
74 | encodeFamily (AF_INET , Datagram) = 2 | ||
75 | encodeFamily (AF_INET6 , Datagram) = 10 | ||
76 | encodeFamily (AF_INET , Stream ) = 130 | ||
77 | encodeFamily (AF_INET6 , Stream ) = 138 | ||
78 | encodeFamily _ = error "Unsupported protocol" | ||
79 | |||
80 | newtype MessageType = MessageType Word8 | ||
81 | deriving (Eq, Ord, Show, Read) | ||
82 | |||
83 | instance Serialize MessageType where | ||
84 | put (MessageType b) = put b | ||
85 | get = MessageType <$> get | ||
86 | |||
87 | pattern Ping = MessageType 0 | ||
88 | pattern Pong = MessageType 1 | ||
89 | pattern GetNodes = MessageType 2 | ||
90 | pattern SendNodes = MessageType 4 | ||
91 | {- | ||
92 | #define NET_PACKET_PING_REQUEST 0 /* Ping request packet ID. */ | ||
93 | #define NET_PACKET_PING_RESPONSE 1 /* Ping response packet ID. */ | ||
94 | #define NET_PACKET_GET_NODES 2 /* Get nodes request packet ID. */ | ||
95 | #define NET_PACKET_SEND_NODES_IPV6 4 /* Send nodes response packet ID for other addresses. */ | ||
96 | #define NET_PACKET_COOKIE_REQUEST 24 /* Cookie request packet */ | ||
97 | #define NET_PACKET_COOKIE_RESPONSE 25 /* Cookie response packet */ | ||
98 | #define NET_PACKET_CRYPTO_HS 26 /* Crypto handshake packet */ | ||
99 | #define NET_PACKET_CRYPTO_DATA 27 /* Crypto data packet */ | ||
100 | #define NET_PACKET_CRYPTO 32 /* Encrypted data packet ID. */ | ||
101 | #define NET_PACKET_LAN_DISCOVERY 33 /* LAN discovery packet ID. */ | ||
102 | |||
103 | /* See: docs/Prevent_Tracking.txt and onion.{c, h} */ | ||
104 | #define NET_PACKET_ONION_SEND_INITIAL 128 | ||
105 | #define NET_PACKET_ONION_SEND_1 129 | ||
106 | #define NET_PACKET_ONION_SEND_2 130 | ||
107 | |||
108 | #define NET_PACKET_ANNOUNCE_REQUEST 131 | ||
109 | #define NET_PACKET_ANNOUNCE_RESPONSE 132 | ||
110 | #define NET_PACKET_ONION_DATA_REQUEST 133 | ||
111 | #define NET_PACKET_ONION_DATA_RESPONSE 134 | ||
112 | |||
113 | #define NET_PACKET_ONION_RECV_3 140 | ||
114 | #define NET_PACKET_ONION_RECV_2 141 | ||
115 | #define NET_PACKET_ONION_RECV_1 142 | ||
116 | -} | ||
117 | |||
118 | |||
119 | -- | Use with 'PingPayload', 'GetNodesPayload', or 'SendNodesPayload' | ||
120 | data Message a = Message | ||
121 | { msgType :: MessageType | ||
122 | , msgClient :: NodeId Message | ||
123 | , msgNonce :: TransactionID Message | ||
124 | , msgPayload :: a | ||
125 | } | ||
126 | deriving (Eq, Show, Generic, Functor, Foldable, Traversable) | ||
127 | |||
128 | instance Show (NodeId Message) where | ||
129 | showsPrec d pubkey s = | ||
130 | "NodeId \"" ++ C8.unpack (Base16.encode $ convert pubkey) ++ '"':s | ||
131 | |||
132 | instance Show (TransactionID Message) where | ||
133 | showsPrec d nonce = mappend "TID " . quoted (mappend $ bin2hex nonce) | ||
134 | |||
135 | isQuery :: Message a -> Bool | ||
136 | isQuery (Message { msgType = SendNodes }) = False | ||
137 | isQuery (Message { msgType = MessageType typ }) | even typ = True | ||
138 | isQuery _ = False | ||
139 | |||
140 | isResponse :: Message a -> Bool | ||
141 | isResponse m = not (isQuery m) | ||
142 | |||
143 | isError :: Message a -> Bool | ||
144 | isError _ = False | ||
145 | |||
146 | data PingPayload = PingPayload | ||
147 | { isPong :: Bool | ||
148 | , pingId :: Nonce8 | ||
149 | } | ||
150 | |||
151 | data GetNodesPayload = GetNodesPayload | ||
152 | { nodesForWho :: NodeId Message | ||
153 | , nodesNonce :: Nonce8 | ||
154 | } | ||
155 | |||
156 | data SendNodesPayload = SendNodesPayload | ||
157 | |||
158 | -- From: docs/updates/DHT.md | ||
159 | -- | ||
160 | -- Node format: | ||
161 | -- [uint8_t family (2 == IPv4, 10 == IPv6, 130 == TCP IPv4, 138 == TCP IPv6)] | ||
162 | -- [ip (in network byte order), length=4 bytes if ipv4, 16 bytes if ipv6] | ||
163 | -- [port (in network byte order), length=2 bytes] | ||
164 | -- [char array (node_id), length=32 bytes] | ||
165 | -- | ||
166 | -- see also: DHT.h (pack_nodes() and unpack_nodes()) | ||
167 | instance Serialize NodeFormat where | ||
168 | |||
169 | get = do | ||
170 | typ <- get :: Get Word8 | ||
171 | (ip,istcp) <- | ||
172 | case typ :: Word8 of | ||
173 | 2 -> (,False) . IPv4 <$> get | ||
174 | 130 -> (,True) . IPv4 <$> get | ||
175 | 10 -> (,False) . IPv6 <$> get | ||
176 | 138 -> (,True) . IPv6 <$> get | ||
177 | _ -> fail "Unsupported type of Tox node_format structure" | ||
178 | port <- get | ||
179 | pubkey <- get | ||
180 | return $ NodeFormat { nodeIsTCP = istcp | ||
181 | , nodeIP = ip | ||
182 | , nodePort = port | ||
183 | , nodePublicKey = NodeId pubkey | ||
184 | } | ||
185 | |||
186 | put (NodeFormat{ nodePublicKey = NodeId pubkey, ..}) = do | ||
187 | put $ case (# nodeIP, nodeIsTCP #) of | ||
188 | (# IPv4 _, False #) -> 2 | ||
189 | (# IPv4 _, True #) -> 130 | ||
190 | (# IPv6 _, False #) -> 10 | ||
191 | (# IPv6 _, True #) -> 138 :: Word8 | ||
192 | put nodeIP | ||
193 | put nodePort | ||
194 | put pubkey | ||
195 | |||
196 | -- Note: the char array is a public key, the 32-bytes is provided by libsodium-dev | ||
197 | -- in /usr/include/sodium/crypto_box.h as the symbol crypto_box_PUBLICKEYBYTES | ||
198 | -- but toxcore/crypto_core.c will fail to compile if it is not 32. | ||
199 | |||
200 | |||
201 | -- Ping(Request and response): | ||
202 | -- | ||
203 | -- [byte with value: 00 for request, 01 for response] | ||
204 | -- [char array (client node_id), length=32 bytes] | ||
205 | -- [random 24 byte nonce] | ||
206 | -- [Encrypted with the nonce and private key of the sender: | ||
207 | -- [1 byte type (0 for request, 1 for response)] | ||
208 | -- [random 8 byte (ping_id)] | ||
209 | -- ] | ||
210 | -- | ||
211 | -- ping_id = a random integer, the response must contain the exact same number as the request | ||
212 | |||
213 | |||
214 | -- Get nodes (Request): | ||
215 | -- | ||
216 | -- [byte with value: 02] | ||
217 | -- [char array (client node_id), length=32 bytes] | ||
218 | -- [random 24 byte nonce] | ||
219 | -- [Encrypted with the nonce and private key of the sender: | ||
220 | -- [char array: requested_node_id (node_id of which we want the ip), length=32 bytes] | ||
221 | -- [Sendback data (must be sent back unmodified by in the response), length=8 bytes] | ||
222 | -- ] | ||
223 | -- | ||
224 | -- Valid replies: a send_nodes packet | ||
225 | |||
226 | -- Send_nodes (response (for all addresses)): | ||
227 | -- | ||
228 | -- [byte with value: 04] | ||
229 | -- [char array (client node_id), length=32 bytes] | ||
230 | -- [random 24 byte nonce] | ||
231 | -- [Encrypted with the nonce and private key of the sender: | ||
232 | -- [uint8_t number of nodes in this packet] | ||
233 | -- [Nodes in node format, length=?? * (number of nodes (maximum of 4 nodes)) bytes] | ||
234 | -- [Sendback data, length=8 bytes] | ||
235 | -- ] | ||
236 | |||
237 | data ToxCipherContext = ToxCipherContext | ||
238 | { dhtSecretKey :: SecretKey | ||
239 | } | ||
240 | |||
241 | data Ciphered = Ciphered { cipheredMAC :: Poly1305.Auth | ||
242 | , cipheredBytes :: ByteString } | ||
243 | deriving Eq | ||
244 | |||
245 | quoted shows s = '"':shows ('"':s) | ||
246 | |||
247 | bin2hex :: ByteArrayAccess bs => bs -> String | ||
248 | bin2hex = C8.unpack . Base16.encode . convert | ||
249 | |||
250 | instance Show Ciphered where | ||
251 | showsPrec d (Ciphered (Poly1305.Auth mac) bytes) = | ||
252 | mappend "Ciphered (Auth " | ||
253 | . quoted (mappend $ bin2hex mac) | ||
254 | . (") " ++) | ||
255 | . quoted (mappend $ bin2hex bytes) | ||
256 | |||
257 | getMessage :: Get (Message Ciphered) | ||
258 | getMessage = do | ||
259 | typ <- get | ||
260 | nid <- get | ||
261 | tid <- get | ||
262 | mac <- Poly1305.Auth . convert <$> getBytes 16 | ||
263 | cnt <- remaining | ||
264 | bs <- getBytes cnt | ||
265 | return Message { msgType = typ | ||
266 | , msgClient = nid | ||
267 | , msgNonce = tid | ||
268 | , msgPayload = Ciphered mac bs } | ||
269 | |||
270 | putMessage :: Message Ciphered -> Put | ||
271 | putMessage (Message {..}) = do | ||
272 | put msgType | ||
273 | put msgClient | ||
274 | put msgNonce | ||
275 | let Ciphered (Poly1305.Auth mac) bs = msgPayload | ||
276 | putByteString (convert mac) | ||
277 | putByteString bs | ||
278 | |||
279 | -- XXX: assumes ByteArray is little-endian | ||
280 | id2key :: NodeId Message -> PublicKey | ||
281 | id2key recipient = case publicKey recipient of | ||
282 | CryptoPassed key -> key | ||
283 | CryptoFailed e -> error ("id2key: "++show e) | ||
284 | |||
285 | -- XXX: S.decode is Big-endian | ||
286 | -- TODO: implement ByteArray instance, avoid S.decode | ||
287 | key2id :: PublicKey -> NodeId Message | ||
288 | key2id pk = case S.decode (BA.convert pk) of | ||
289 | Left _ -> error "key2id" | ||
290 | Right nid -> nid | ||
291 | |||
292 | |||
293 | zeros32 :: Bytes | ||
294 | zeros32 = BA.replicate 32 0 | ||
295 | |||
296 | zeros24 :: Bytes | ||
297 | zeros24 = BA.take 24 zeros32 | ||
298 | |||
299 | hsalsa20 k n = a <> b | ||
300 | where | ||
301 | Salsa.State st = XSalsa.initialize 20 k n | ||
302 | (_, as) = BA.splitAt 4 st | ||
303 | (a, xs) = BA.splitAt 16 as | ||
304 | (_, bs) = BA.splitAt 24 xs | ||
305 | (b, _ ) = BA.splitAt 16 bs | ||
306 | |||
307 | lookupSecret :: ToxCipherContext -> NodeId Message -> TransactionID Message -> (Poly1305.State, XSalsa.State) | ||
308 | lookupSecret ctx recipient nonce = (hash, crypt) | ||
309 | where | ||
310 | -- diffie helman | ||
311 | shared = ecdh (Proxy :: Proxy Curve_X25519) (dhtSecretKey ctx) (id2key recipient) -- ByteArrayAccess b => b | ||
312 | -- shared secret XSalsa key | ||
313 | k = hsalsa20 shared zeros24 | ||
314 | -- cipher state | ||
315 | st0 = XSalsa.initialize 20 k nonce | ||
316 | -- Poly1305 key | ||
317 | (rs, crypt) = XSalsa.combine st0 zeros32 | ||
318 | Cryptonite.CryptoPassed hash = Poly1305.initialize rs -- TODO: Pattern fail? | ||
319 | |||
320 | decipher :: ToxCipherContext -> Message Ciphered -> Either String (Message ByteString) | ||
321 | decipher ctx ciphered = mapM (decipherAndAuth hash crypt) ciphered | ||
322 | where | ||
323 | (hash, crypt) = lookupSecret ctx (msgClient ciphered) (msgNonce ciphered) | ||
324 | |||
325 | encipher :: ToxCipherContext -> NodeId Message -> Message ByteString -> Message Ciphered | ||
326 | encipher ctx recipient plain = encipherAndHash hash crypt <$> plain | ||
327 | where | ||
328 | (hash, crypt) = lookupSecret ctx recipient (msgNonce plain) | ||
329 | |||
330 | encipherAndHash :: Poly1305.State -> XSalsa.State -> ByteString -> Ciphered | ||
331 | encipherAndHash hash crypt m = Ciphered a c | ||
332 | where | ||
333 | c = fst . XSalsa.combine crypt $ m | ||
334 | a = Poly1305.finalize . Poly1305.update hash $ c | ||
335 | |||
336 | decipherAndAuth :: Poly1305.State -> XSalsa.State -> Ciphered -> Either String ByteString | ||
337 | decipherAndAuth hash crypt (Ciphered mac c) | ||
338 | {- | ||
339 | | C8.length m /= C8.length c = Left $ "Unequal lengths: "++show (C8.length m, C8.length c) | ||
340 | -- | C8.length c /= 40 = Left $ "Unexpected c length: " ++ show (C8.length c, bin2hex c) | ||
341 | | otherwise = Right m | ||
342 | -} | ||
343 | | (a == mac) = Right m | ||
344 | | otherwise = Left "decipherAndAuth: auth fail" | ||
345 | where | ||
346 | m = fst . XSalsa.combine crypt $ c | ||
347 | a = Poly1305.finalize . Poly1305.update hash $ c | ||
348 | |||
349 | |||
350 | -- see rfc7748 | ||
351 | -- | ||
352 | -- Crypto.ECC | ||
353 | -- Crypto.PubKey.Curve25519 | ||
354 | -- Crypto.Cipher.XSalsa | ||
355 | -- | ||
356 | curve25519 :: Curve | ||
357 | curve25519 = CurveFP (CurvePrime prime curvecommon) | ||
358 | where | ||
359 | prime = 2^255 - 19 -- (≅ 1 modulo 4) | ||
360 | |||
361 | sqrt_of_39420360 = 14781619447589544791020593568409986887264606134616475288964881837755586237401 | ||
362 | |||
363 | -- 1 * v^2 = u^3 + 486662*u^2 + u | ||
364 | |||
365 | curvecommon = CurveCommon | ||
366 | { ecc_a = 486662 | ||
367 | , ecc_b = 1 | ||
368 | , ecc_g = Point 9 sqrt_of_39420360 -- base point | ||
369 | , ecc_n = 2^252 + 0x14def9dea2f79cd65812631a5cf5d3ed -- order | ||
370 | , ecc_h = 8 -- cofactor | ||
371 | } | ||
372 | |||
373 | -- crypto_box uses xsalsa20 symmetric encryption and poly1305 authentication. | ||
374 | -- https://en.wikipedia.org/wiki/Poly1305 | ||
375 | |||
376 | instance Envelope Message where | ||
377 | newtype TransactionID Message = TID Nonce24 | ||
378 | deriving (Eq,Ord) -- Read | ||
379 | |||
380 | newtype NodeId Message = NodeId Word256 | ||
381 | deriving (Eq, Ord, Bits, FiniteBits) | ||
382 | |||
383 | type QueryMethod Message = MessageType | ||
384 | |||
385 | newtype QueryExtra Message = QueryNonce { qryNonce :: Nonce8 } | ||
386 | deriving (Eq, Ord, Show) | ||
387 | |||
388 | newtype ResponseExtra Message = ResponseNonce { rspNonce :: Nonce8 } | ||
389 | deriving (Eq, Ord, Show) | ||
390 | |||
391 | data PacketDestination Message = ToxAddr { toxID :: NodeId Message | ||
392 | , toxSockAddr :: SockAddr | ||
393 | } | ||
394 | deriving (Eq,Ord,Show) | ||
395 | |||
396 | envelopePayload = msgPayload | ||
397 | |||
398 | envelopeTransaction = msgNonce -- FIXME: should be decrypted nonce | ||
399 | |||
400 | envelopeClass Message { msgType = Ping } = Query Ping | ||
401 | envelopeClass Message { msgType = Pong } = Response Nothing | ||
402 | envelopeClass Message { msgType = GetNodes } = Query GetNodes | ||
403 | envelopeClass Message { msgType = SendNodes } = Response Nothing | ||
404 | |||
405 | makeAddress qry = ToxAddr (either id msgClient qry) | ||
406 | |||
407 | buildReply self addr qry payload = (fmap (const payload) qry) { msgClient = self } | ||
408 | |||
409 | -- buildQuery :: NodeId envelope -> SockAddr -> QueryMethod envelope -> TransactionID envelope -> a -> IO (envelope a) | ||
410 | buildQuery nid addr meth tid q = return $ Message | ||
411 | { msgType = meth | ||
412 | , msgClient = nid | ||
413 | , msgNonce = tid | ||
414 | , msgPayload = q | ||
415 | } | ||
416 | |||
417 | -- FIXME: Should generate encrypted nonces. | ||
418 | -- Should be unpredictable. | ||
419 | uniqueTransactionId cnt = do | ||
420 | return $ either (error "failed to create TransactionId") TID | ||
421 | $ S.decode $ Char8.pack (take 24 $ show cnt ++ repeat ' ') | ||
422 | |||
423 | |||
424 | {- | ||
425 | instance Serialize (TransactionID Message) where | ||
426 | get = do | ||
427 | lo <- getWord64le | ||
428 | mid <- getWord64le | ||
429 | hi <- getWord64le | ||
430 | return $ TID (LargeKey lo | ||
431 | (LargeKey mid hi)) | ||
432 | |||
433 | put (TID (LargeKey lo (LargeKey mid hi))) = do | ||
434 | putWord64le lo | ||
435 | putWord64le mid | ||
436 | putWord64le hi | ||
437 | |||
438 | instance Serialize (NodeId Message) where | ||
439 | get = do | ||
440 | lo <- getWord64le | ||
441 | mid <- getWord64le | ||
442 | hi <- getWord64le | ||
443 | highest <- getWord64le | ||
444 | return $ NodeId (LargeKey lo | ||
445 | (LargeKey mid | ||
446 | (LargeKey hi highest))) | ||
447 | put (NodeId (LargeKey lo (LargeKey mid (LargeKey hi highest)))) = do | ||
448 | putWord64le lo | ||
449 | putWord64le mid | ||
450 | putWord64le hi | ||
451 | putWord64le highest | ||
452 | |||
453 | -} | ||
454 | |||
455 | instance Serialize (TransactionID Message) where | ||
456 | get = do | ||
457 | hi <- getWord64be | ||
458 | mid <- getWord64be | ||
459 | lo <- getWord64be | ||
460 | return $ TID (LargeKey lo | ||
461 | (LargeKey mid hi)) | ||
462 | |||
463 | put (TID (LargeKey lo (LargeKey mid hi))) = do | ||
464 | putWord64be hi | ||
465 | putWord64be mid | ||
466 | putWord64be lo | ||
467 | |||
468 | instance Serialize (NodeId Message) where | ||
469 | get = do | ||
470 | highest <- getWord64be | ||
471 | hi <- getWord64be | ||
472 | mid <- getWord64be | ||
473 | lo <- getWord64be | ||
474 | return $ NodeId (LargeKey lo | ||
475 | (LargeKey mid | ||
476 | (LargeKey hi highest))) | ||
477 | put (NodeId (LargeKey lo (LargeKey mid (LargeKey hi highest)))) = do | ||
478 | putWord64be highest | ||
479 | putWord64be hi | ||
480 | putWord64be mid | ||
481 | putWord64be lo | ||
482 | |||
483 | |||
484 | staticAssert isLittleEndian -- assumed by 'withWord64Ptr' | ||
485 | |||
486 | with3Word64Ptr :: Nonce24 -> (Ptr Word64 -> IO a) -> IO a | ||
487 | with3Word64Ptr (LargeKey wlo (LargeKey wmid whi)) kont = | ||
488 | allocaBytes (sizeOf wlo * 3) $ \p -> do | ||
489 | pokeElemOff p 2 $ toBE64 wlo | ||
490 | pokeElemOff p 1 $ toBE64 wmid | ||
491 | pokeElemOff p 0 $ toBE64 whi | ||
492 | kont p | ||
493 | |||
494 | with4Word64Ptr :: Key32 -> (Ptr Word64 -> IO a) -> IO a | ||
495 | with4Word64Ptr (LargeKey wlo (LargeKey wmid (LargeKey whi whighest))) kont = | ||
496 | allocaBytes (sizeOf wlo * 4) $ \p -> do | ||
497 | pokeElemOff p 3 $ toBE64 wlo | ||
498 | pokeElemOff p 2 $ toBE64 wmid | ||
499 | pokeElemOff p 1 $ toBE64 whi | ||
500 | pokeElemOff p 0 $ toBE64 whighest | ||
501 | kont p | ||
502 | |||
503 | |||
504 | instance ByteArrayAccess (TransactionID Message) where | ||
505 | length _ = 24 | ||
506 | withByteArray (TID nonce) kont = with3Word64Ptr nonce (kont . castPtr) | ||
507 | |||
508 | instance ByteArrayAccess (NodeId Message) where | ||
509 | length _ = 32 | ||
510 | withByteArray (NodeId nonce) kont = with4Word64Ptr nonce (kont . castPtr) | ||
511 | |||
512 | |||
513 | instance Hashable (NodeId Message) where | ||
514 | hashWithSalt s (NodeId (LargeKey a (LargeKey b (LargeKey c d)))) = | ||
515 | hashWithSalt s (a,b,c,d) | ||
516 | |||
517 | instance Hashable (PacketDestination Message) where | ||
518 | hashWithSalt s (ToxAddr nid addr) = hashWithSalt s nid | ||
519 | |||
520 | instance Serialize (PacketDestination Message) where | ||
521 | put (ToxAddr (NodeId nid) addr) = put nid >> putSockAddr addr | ||
522 | get = ToxAddr <$> (NodeId <$> get) <*> getSockAddr | ||
523 | |||
524 | instance Pretty (PacketDestination Message) where | ||
525 | pPrint = PP.text . show | ||
526 | |||
527 | instance Address (PacketDestination Message) where | ||
528 | toSockAddr (ToxAddr _ addr) = addr | ||
529 | fromSockAddr _ = Nothing | ||
530 | |||
531 | instance WireFormat ByteString Message where | ||
532 | type SerializableTo ByteString = Serialize | ||
533 | type CipherContext ByteString Message = ToxCipherContext | ||
534 | |||
535 | decodePayload = mapM decode | ||
536 | encodePayload = fmap encode | ||
537 | |||
538 | decodeHeaders ctx bs = runGet getMessage bs >>= decipher ctx | ||
539 | encodeHeaders ctx msg recipient = runPut $ putMessage $ encipher ctx (toxID recipient) msg | ||
540 | |||
541 | initializeServerState _ _ = do | ||
542 | k <- generateSecretKey | ||
543 | {- | ||
544 | nid <- withByteArray (toPublic k) $ \p -> do | ||
545 | wlo <- peekElemOff p 0 | ||
546 | wmid <- peekElemOff p 1 | ||
547 | whi <- peekElemOff p 2 | ||
548 | whigest <- peekElemOff p 3 | ||
549 | return $ LargeKey wlo (LargeKey wmid (LargeKey whi whigest)) | ||
550 | -} | ||
551 | return (key2id $ toPublic k, ToxCipherContext k) | ||
552 | |||
553 | |||
554 | instance Read (NodeId Message) where readsPrec d s = map (\(w,xs) -> (NodeId w, xs)) $ decodeHex s | ||
diff --git a/src/Network/KRPC/Method.hs b/src/Network/KRPC/Method.hs deleted file mode 100644 index da69d14b..00000000 --- a/src/Network/KRPC/Method.hs +++ /dev/null | |||
@@ -1,40 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013, 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- Normally, you don't need to import this module. | ||
9 | -- | ||
10 | {-# LANGUAGE CPP #-} | ||
11 | {-# LANGUAGE DefaultSignatures #-} | ||
12 | {-# LANGUAGE FlexibleContexts #-} | ||
13 | {-# LANGUAGE FunctionalDependencies #-} | ||
14 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
15 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
16 | {-# LANGUAGE RankNTypes #-} | ||
17 | {-# LANGUAGE ScopedTypeVariables #-} | ||
18 | {-# LANGUAGE StandaloneDeriving #-} | ||
19 | {-# LANGUAGE TypeFamilies #-} | ||
20 | module Network.KRPC.Method | ||
21 | ( Method (..) | ||
22 | , KRPC (..) | ||
23 | ) where | ||
24 | |||
25 | #ifdef VERSION_bencoding | ||
26 | import Data.BEncode (BEncode) | ||
27 | #else | ||
28 | import Data.Serialize | ||
29 | #endif | ||
30 | import Data.ByteString.Char8 as BC | ||
31 | import Data.Char | ||
32 | import Data.Monoid | ||
33 | import Data.List as L | ||
34 | import Data.String | ||
35 | import Data.Typeable | ||
36 | import Network.DatagramServer.Mainline | ||
37 | import Network.DatagramServer.Types | ||
38 | import Network.DHT.Types | ||
39 | |||
40 | |||