summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Network/BitTorrent/DHT.hs362
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs750
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs571
-rw-r--r--src/Network/DHT.hs125
-rw-r--r--src/Network/DHT/Mainline.hs579
-rw-r--r--src/Network/DHT/Tox.hs112
-rw-r--r--src/Network/DHT/Types.hs176
-rw-r--r--src/Network/DatagramServer.hs608
-rw-r--r--src/Network/DatagramServer/Error.hs68
-rw-r--r--src/Network/DatagramServer/Mainline.hs404
-rw-r--r--src/Network/DatagramServer/Tox.hs554
-rw-r--r--src/Network/KRPC/Method.hs40
12 files changed, 0 insertions, 4349 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs
deleted file mode 100644
index 1a67c7c4..00000000
--- a/src/Network/BitTorrent/DHT.hs
+++ /dev/null
@@ -1,362 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- BitTorrent uses a \"distributed sloppy hash table\" (DHT) for
9-- storing peer contact information for \"trackerless\" torrents. In
10-- effect, each peer becomes a tracker.
11--
12-- Normally you don't need to import other DHT modules.
13--
14-- For more info see:
15-- <http://www.bittorrent.org/beps/bep_0005.html>
16--
17{-# LANGUAGE FlexibleInstances #-}
18{-# LANGUAGE FlexibleContexts #-}
19{-# LANGUAGE TemplateHaskell #-}
20{-# LANGUAGE TypeOperators #-}
21{-# LANGUAGE ScopedTypeVariables #-}
22{-# LANGUAGE CPP #-}
23module Network.BitTorrent.DHT
24 ( -- * Distributed Hash Table
25 DHT
26 , Options (..)
27 , fullLogging
28 , dht
29
30 -- * Bootstrapping
31 -- $bootstrapping-terms
32 , tNodes
33 , defaultBootstrapNodes
34 , resolveHostName
35 , bootstrap
36 , isBootstrapped
37
38 -- * Initialization
39 , snapshot
40
41 -- * Operations
42 -- , Network.BitTorrent.DHT.lookup
43 , Network.BitTorrent.DHT.insert
44 , Network.BitTorrent.DHT.delete
45
46 -- * Embedding
47 -- ** Session
48 , LogFun
49 , Node
50 , defaultHandlers
51 , newNode
52 , closeNode
53
54 -- ** Monad
55 -- , MonadDHT (..)
56 , runDHT
57 ) where
58
59import Control.Monad.Logger
60import Control.Monad.Reader
61import Control.Exception
62import qualified Data.ByteString as BS
63import Data.Conduit as C
64import qualified Data.Conduit.List as C
65import Data.Serialize
66import Network.Socket
67import Text.PrettyPrint.HughesPJClass as PP (pPrint,render)
68
69import Data.Torrent
70import Network.Address
71import Network.BitTorrent.DHT.Query
72import Network.BitTorrent.DHT.Session
73import Network.DHT.Routing as T hiding (null)
74import qualified Data.Text as Text
75import Data.Typeable
76import Data.Monoid
77import Network.DatagramServer.Mainline (KMessageOf)
78import qualified Network.DatagramServer as KRPC (listen, Protocol(..))
79import Network.DatagramServer.Types
80import Network.DHT.Types
81import Data.Bits
82import Data.Default
83import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
84import Network.KRPC.Method
85
86{-----------------------------------------------------------------------
87-- DHT types
88-----------------------------------------------------------------------}
89
90#if 0
91class MonadDHT m where
92 liftDHT :: DHT raw dht u IPv4 a -> m a
93
94instance MonadDHT (DHT raw dht u IPv4) where
95 liftDHT = id
96#endif
97
98-- | Convenience method. Pass this to 'dht' to enable full logging.
99fullLogging :: LogSource -> LogLevel -> Bool
100fullLogging _ _ = True
101
102-- | Run DHT on specified port. <add note about resources>
103dht ::
104 ( Ord ip
105 , Address ip
106 , Functor dht
107 , Ord (NodeId dht)
108 , FiniteBits (NodeId dht)
109 , Serialize (NodeId dht)
110 , Show (NodeId dht)
111 , SerializableTo raw (Response dht (Ping dht))
112 , SerializableTo raw (Query dht (Ping dht))
113 , SerializableTo raw (Response dht (NodeFound dht ip))
114 , SerializableTo raw (Query dht (FindNode dht ip))
115 , Ord (TransactionID dht)
116 , Serialize (TransactionID dht)
117 , Eq (QueryMethod dht)
118 , Show (QueryMethod dht)
119 , Pretty (NodeInfo dht ip u)
120 , Kademlia dht
121 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
122 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
123 , DataHandlers raw dht
124 , WireFormat raw dht
125 , Show u
126 , Default u
127 , Typeable dht
128 )
129 => Options -- ^ normally you need to use 'Data.Default.def';
130 -> NodeAddr ip -- ^ address to bind this node;
131 -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default
132 -> DHT raw dht u ip a -- ^ actions to run: 'bootstrap', 'lookup', etc;
133 -> IO a -- ^ result.
134dht opts addr logfilter action = do
135 runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do
136 bracket (newNode opts addr logger Nothing) closeNode $
137 \ node -> runDHT node $ do
138 hs <- defaultHandlers logger
139 m <- asks manager
140 liftIO $ KRPC.listen m hs (KRPC.Protocol Proxy Proxy)
141 action
142{-# INLINE dht #-}
143
144{-----------------------------------------------------------------------
145-- Bootstrapping
146-----------------------------------------------------------------------}
147-- $bootstrapping-terms
148--
149-- [@Bootstrapping@] DHT @bootstrapping@ is the process of filling
150-- routing 'Table' by /good/ nodes.
151--
152-- [@Bootstrapping time@] Bootstrapping process can take up to 5
153-- minutes. Bootstrapping should only happen at first application
154-- startup, if possible you should use 'snapshot' & 'restore'
155-- mechanism which must work faster.
156--
157-- [@Bootstrap nodes@] DHT @bootstrap node@ is either:
158--
159-- * a specialized high performance node maintained by bittorrent
160-- software authors\/maintainers, like those listed in
161-- 'defaultBootstrapNodes'. /Specialized/ means that those nodes
162-- may not support 'insert' queries and is running for the sake of
163-- bootstrapping only.
164--
165-- * an ordinary bittorrent client running DHT node. The list of
166-- such bootstrapping nodes usually obtained from
167-- 'Data.Torrent.tNodes' field or
168-- 'Network.BitTorrent.Exchange.Message.Port' messages.
169
170-- Do not include the following hosts in the default bootstrap nodes list:
171--
172-- * "dht.aelitis.com" and "dht6.azureusplatform.com" - since
173-- Azureus client have a different (and probably incompatible) DHT
174-- protocol implementation.
175--
176-- * "router.utorrent.com" since it is just an alias to
177-- "router.bittorrent.com".
178-- XXX: ignoring this advise as it resolves to a different
179-- ip address for me.
180
181-- | List of bootstrap nodes maintained by different bittorrent
182-- software authors.
183defaultBootstrapNodes :: [NodeAddr HostName]
184defaultBootstrapNodes =
185 [ NodeAddr "router.bittorrent.com" 6881 -- by BitTorrent Inc.
186
187 -- doesn't work at the moment (use git blame) of commit
188 , NodeAddr "dht.transmissionbt.com" 6881 -- by Transmission project
189
190 , NodeAddr "router.utorrent.com" 6881
191 ]
192
193-- TODO Multihomed hosts
194
195-- | Resolve either a numeric network address or a hostname to a
196-- numeric IP address of the node. Usually used to resolve
197-- 'defaultBootstrapNodes' or 'Data.Torrent.tNodes' lists.
198resolveHostName :: NodeAddr HostName -> IO (NodeAddr IPv4)
199resolveHostName NodeAddr {..} = do
200 let hints = defaultHints { addrFamily = AF_INET, addrSocketType = Datagram }
201 -- getAddrInfo throws exception on empty list, so the pattern matching never fail
202 info : _ <- getAddrInfo (Just hints) (Just nodeHost) (Just (show nodePort))
203 case fromSockAddr (addrAddress info) of
204 Nothing -> error "resolveNodeAddr: impossible"
205 Just addr -> return addr
206
207-- | One good node may be sufficient.
208--
209-- This operation do block, use
210-- 'Control.Concurrent.Async.Lifted.async' if needed.
211bootstrap :: forall raw dht u ip.
212 ( Ord ip
213 , Address ip
214 , Functor dht
215 , Ord (NodeId dht)
216 , FiniteBits (NodeId dht)
217 , Serialize (NodeId dht)
218 , Show (NodeId dht)
219 , Pretty (NodeId dht)
220 , SerializableTo raw (Response dht (Ping dht))
221 , SerializableTo raw (Query dht (Ping dht))
222 , SerializableTo raw (Response dht (NodeFound dht ip))
223 , SerializableTo raw (Query dht (FindNode dht ip))
224 , Ord (TransactionID dht)
225 , Serialize (TransactionID dht)
226 , Eq (QueryMethod dht)
227 , Show (QueryMethod dht)
228 , Pretty (NodeInfo dht ip u)
229 , Kademlia dht
230 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
231 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
232 , DataHandlers raw dht
233 , WireFormat raw dht
234 , Show u
235 , Default u
236 , Serialize u
237 ) => Maybe BS.ByteString -> [PacketDestination dht] -> DHT raw dht u ip ()
238bootstrap mbs startNodes = do
239 restored <-
240 case decode <$> mbs of
241 Just (Right tbl) -> return (T.toList tbl)
242 Just (Left e) -> do $(logWarnS) "restore" (Text.pack e)
243 return []
244 Nothing -> return []
245
246 $(logInfoS) "bootstrap" "Start node bootstrapping"
247 let searchAll aliveNodes = do
248 nid <- myNodeIdAccordingTo (error "FIXME")
249 ns <- bgsearch ioFindNodes nid
250 return ( ns :: [NodeInfo dht ip u] )
251 input_nodes <- (restored ++) . T.toList <$> getTable
252 -- Step 1: Use iterative searches to flesh out the table..
253 do let knowns = map (map $ fst) input_nodes
254 -- Below, we reverse the nodes since the table serialization puts the
255 -- nearest nodes last and we want to choose a similar node id to bootstrap
256 -- faster.
257 (alive_knowns,_) <- unzip <$> queryParallel (pingQ <$> reverse (concat knowns))
258 b <- isBootstrapped
259 -- If our cached nodes are alive and our IP address did not change, it's possible
260 -- we are already bootsrapped, so no need to do any searches.
261 when (not b) $ do
262 ns <- searchAll $ take 2 alive_knowns
263 -- We only use the supplied bootstrap nodes when we don't know of any
264 -- others to try.
265 when (null ns) $ do
266 -- TODO filter duplicated in startNodes list
267 -- TODO retransmissions for startNodes
268 (aliveNodes,_) <- unzip <$> queryParallel (coldPingQ <$> startNodes)
269 _ <- searchAll $ take 2 aliveNodes
270 return ()
271 -- Step 2: Repeatedly refresh incomplete buckets until the table is full.
272 maxbuckets <- asks $ optBucketCount . options
273 flip fix 0 $ \loop icnt -> do
274 tbl <- getTable
275 let unfull = filter ((/=defaultBucketSize) . snd)
276 us = zip
277 -- is_last = True for the last bucket
278 (True:repeat False)
279 -- Only non-full buckets unless it is the last one and the
280 -- maximum number of buckets has not been reached.
281 $ case reverse $ zip [0..] $ T.shape tbl of
282 p@(n,_):ps | n+1==maxbuckets -> unfull (p:ps)
283 p:ps -> p:unfull ps
284 [] -> []
285 forM_ us $ \(is_last,(index,_)) -> do
286 nid <- myNodeIdAccordingTo (error "FIXME")
287 sample <- liftIO $ genBucketSample nid (bucketRange index is_last)
288 $(logDebugS) "bootstrapping"
289 $ "BOOTSTRAP sample"
290 <> Text.pack (show (is_last,index,T.shape tbl))
291 <> " " <> Text.pack (render $ pPrint sample)
292 refreshNodes sample
293 $(logDebugS) "bootstrapping"
294 $ "BOOTSTRAP finished iteration "
295 <> Text.pack (show (icnt,T.shape tbl,us,defaultBucketSize))
296 when (not (null us) && icnt < div (3*maxbuckets) 2)
297 $ loop (succ icnt)
298 $(logInfoS) "bootstrap" "Node bootstrapping finished"
299
300-- | Check if this node is already bootstrapped.
301-- @bootstrap [good_node] >> isBootstrapped@@ should always return 'True'.
302--
303-- This operation do not block.
304--
305isBootstrapped :: Eq ip => DHT raw dht u ip Bool
306isBootstrapped = T.full <$> getTable
307
308{-----------------------------------------------------------------------
309-- Initialization
310-----------------------------------------------------------------------}
311
312-- | Serialize current DHT session to byte string.
313--
314-- This is blocking operation, use
315-- 'Control.Concurrent.Async.Lifted.async' if needed.
316snapshot :: ( Address ip
317 , Ord (NodeId dht)
318 , Serialize u
319 , Serialize (NodeId dht)
320 ) => DHT raw dht u ip BS.ByteString
321snapshot = do
322 tbl <- getTable
323 return $ encode tbl
324
325{-----------------------------------------------------------------------
326-- Operations
327-----------------------------------------------------------------------}
328
329#if 0
330
331-- | Get list of peers which downloading this torrent.
332--
333-- This operation is incremental and do block.
334--
335lookup :: Address ip => InfoHash -> DHT raw dht u ip `C.Source` [PeerAddr ip]
336lookup topic = do -- TODO retry getClosest if bucket is empty
337 closest <- lift $ getClosest topic
338 C.sourceList [closest] $= search topic (getPeersQ topic)
339
340#endif
341
342-- TODO do not republish if the topic is already in announceSet
343
344-- | Announce that /this/ peer may have some pieces of the specified
345-- torrent. DHT will reannounce this data periodically using
346-- 'optReannounce' interval.
347--
348-- This operation is synchronous and do block, use
349-- 'Control.Concurrent.Async.Lifted.async' if needed.
350--
351insert :: Address ip => InfoHash -> PortNumber -> DHT raw dht u ip ()
352insert ih p = do
353 publish ih p
354 insertTopic ih p
355
356-- | Stop announcing /this/ peer for the specified torrent.
357--
358-- This operation is atomic and may block for a while.
359--
360delete :: InfoHash -> PortNumber -> DHT raw dht u ip ()
361delete = deleteTopic
362{-# INLINE delete #-}
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
deleted file mode 100644
index 003bb5b9..00000000
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ /dev/null
@@ -1,750 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This module provides functions to interact with other nodes.
9-- Normally, you don't need to import this module, use
10-- "Network.BitTorrent.DHT" instead.
11--
12{-# LANGUAGE CPP #-}
13{-# LANGUAGE FlexibleContexts #-}
14{-# LANGUAGE ScopedTypeVariables #-}
15{-# LANGUAGE TemplateHaskell #-}
16{-# LANGUAGE TupleSections #-}
17{-# LANGUAGE PartialTypeSignatures #-}
18{-# LANGUAGE GADTs #-}
19{-# LANGUAGE RankNTypes #-}
20{-# LANGUAGE MultiParamTypeClasses #-}
21module Network.BitTorrent.DHT.Query
22 ( -- * Handler
23 -- | To bind specific set of handlers you need to pass
24 -- handler list to the 'startNode' function.
25 pingH
26 , findNodeH
27 , getPeersH
28 , announceH
29 , defaultHandlers
30
31 -- * Query
32 -- ** Basic
33 -- | A basic query perform a single request expecting a
34 -- single response.
35 , Iteration
36 , pingQ
37 , coldPingQ
38 , findNodeQ
39 , getPeersQ
40 , announceQ
41
42 -- ** Iterative
43 -- | An iterative query perform multiple basic queries,
44 -- concatenate its responses, optionally yielding result and
45 -- continue to the next iteration.
46 , Search
47 -- , search
48 , publish
49 , ioFindNode
50 , ioFindNodes
51 , ioGetPeers
52 , isearch
53 , bgsearch
54
55 -- ** Routing table
56 , insertNode
57 , refreshNodes
58
59 -- ** Messaging
60 , queryNode
61 , queryNode'
62 , (<@>)
63 ) where
64
65import Data.Bits
66import Data.Default
67#ifdef THREAD_DEBUG
68import Control.Concurrent.Lifted.Instrument hiding (yield)
69#else
70import GHC.Conc (labelThread)
71import Control.Concurrent.Lifted hiding (yield)
72#endif
73import Control.Exception.Lifted hiding (Handler)
74import Control.Monad.Reader
75import Control.Monad.Logger
76import Data.Maybe
77import Data.Conduit
78import Data.Conduit.List as C hiding (mapMaybe, mapM_)
79import Data.Either
80import Data.List as L
81import Data.Monoid
82import Data.Text as T
83import qualified Data.Set as Set
84 ;import Data.Set (Set)
85import Network
86import Text.PrettyPrint as PP hiding ((<>), ($$))
87import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
88import Data.Time
89import Data.Time.Clock.POSIX
90import Data.Hashable (Hashable)
91import Data.Serialize
92import Data.Hashable
93
94import Network.DatagramServer as KRPC hiding (Options, def)
95import Network.KRPC.Method as KRPC
96import Network.DatagramServer.Mainline (ReflectedIP(..), QueryExtra(..), ResponseExtra(..))
97import Network.DatagramServer (QueryFailure(..))
98import Data.Torrent
99import qualified Network.DHT as DHT
100import Network.DHT.Mainline
101import Network.DHT.Routing as R
102import Network.BitTorrent.DHT.Session
103import Control.Concurrent.STM
104import qualified Network.BitTorrent.DHT.Search as Search
105#ifdef VERSION_bencoding
106import Data.BEncode (BValue)
107import Network.DatagramServer.Mainline (KMessageOf)
108#else
109import Data.ByteString (ByteString)
110import Network.DatagramServer.Tox
111#endif
112import Network.Address hiding (NodeId)
113import Network.DatagramServer.Types as RPC hiding (Query,Response)
114import Network.DHT.Types
115import Control.Monad.Trans.Control
116import Data.Typeable
117import Data.Serialize
118import System.IO.Unsafe (unsafeInterleaveIO)
119import Data.String
120
121
122{-----------------------------------------------------------------------
123-- Handlers
124-----------------------------------------------------------------------}
125
126{-
127nodeHandler :: ( Address ip
128 , KRPC dht (Query KMessageOf a) (Response KMessageOf b)
129 )
130 => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler
131-}
132nodeHandler :: forall raw dht addr u t q r.
133 (Address addr, WireFormat raw dht, Pretty (NodeInfo dht addr u),
134 Default u,
135 IsString t, Functor dht,
136 KRPC dht (Query dht q) (Response dht r),
137 SerializableTo raw (Response dht r),
138 SerializableTo raw (Query dht q),
139 Show (QueryMethod dht)) =>
140 (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ())
141 -> (NodeAddr addr -> IO (NodeId dht))
142 -> (Char -> t -> Text -> IO ())
143 -> DHTData dht addr
144 -> QueryMethod dht
145 -> (NodeAddr addr -> q -> IO r)
146 -> Handler IO dht raw
147nodeHandler insertNode myNodeIdAccordingTo logm dta method action = handler (\sockaddr -> myNodeIdAccordingTo (error "todo")) method $ \ sockAddr msg -> do
148 let remoteId = messageSender (msg :: dht (Query dht q)) resptype
149 qextra = queryExtra qry
150 resptype = Proxy :: Proxy (Response dht r)
151 q = queryParams qry
152 qry = envelopePayload msg :: Query dht q
153 case fromSockAddr sockAddr of
154 Nothing -> throwIO BadAddress
155 Just naddr -> do
156 logm 'D' "nodeHandler" $ "Received query: " <> T.pack (show $ method)
157 me <- myNodeIdAccordingTo naddr
158 rextra <- liftIO $ makeResponseExtra dta me qry resptype
159 let ni = NodeInfo remoteId naddr def
160 -- Do not route read-only nodes. (bep 43)
161 if fromRoutableNode qextra
162 then insertNode ni Nothing >> return () -- TODO need to block. why?
163 else logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni)
164 Response
165 <$> pure rextra
166 <*> action naddr q
167
168-- | Default 'Ping' handler.
169pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht)
170pingH dht _ _ = return (DHT.pongMessage dht)
171-- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True }
172
173-- | Default 'FindNode' handler.
174findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip)
175findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg)
176
177-- | Default 'GetPeers' handler.
178getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip)
179getPeersH getPeerList toks naddr (GetPeers ih) = do
180 ps <- getPeerList ih
181 tok <- grantToken toks naddr
182 return $ GotPeers ps tok
183
184-- | Default 'Announce' handler.
185announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced
186announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do
187 valid <- checkToken toks naddr sessionToken
188 unless valid $ do
189 throwIO $ InvalidParameter "token"
190
191 let annPort = if impliedPort then nodePort else port
192 peerAddr = PeerAddr Nothing nodeHost annPort
193 insertPeer peers topic announcedName peerAddr
194 return Announced
195
196-- | Includes all Kademlia-related handlers.
197kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip
198 , Ord (TransactionID dht)
199 , Ord (NodeId dht)
200 , Show u
201 , SerializableTo raw (Response dht (Ping dht))
202 , SerializableTo raw (Query dht (Ping dht))
203 , Show (QueryMethod dht)
204 , Show (NodeId dht)
205 , FiniteBits (NodeId dht)
206 , Default u
207 , Serialize (TransactionID dht)
208 , WireFormat raw dht
209 , Kademlia dht
210 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
211 , Functor dht
212 , Pretty (NodeInfo dht ip u)
213 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
214 , SerializableTo raw (Response dht (NodeFound dht ip))
215 , SerializableTo raw (Query dht (FindNode dht ip))
216 ) => LogFun -> DHT raw dht u ip [Handler IO dht raw]
217-- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler]
218kademliaHandlers logger = do
219 groknode <- insertNode1
220 mynid <- myNodeIdAccordingTo1
221 dta <- asks dhtData
222 let handler :: ( KRPC dht (Query dht a) (Response dht b)
223 , SerializableTo raw (Response dht b)
224 , SerializableTo raw (Query dht a)
225 ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw
226 handler = nodeHandler groknode mynid (logt logger) dta
227 dht = Proxy :: Proxy dht
228 getclosest <- getClosest1
229 return [ handler (namePing dht) $ pingH dht
230 , handler (nameFindNodes dht) $ findNodeH getclosest
231 ]
232
233instance DataHandlers BValue KMessageOf where
234 dataHandlers = bthandlers
235
236bthandlers ::
237 ( Ord ip , Hashable ip, Typeable ip, Serialize ip) =>
238 (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()])
239 -> DHTData KMessageOf ip
240 -> [MethodHandler BValue KMessageOf ip]
241bthandlers getclosest dta =
242 [ MethodHandler "get_peers" $ getPeersH (getpeers dta) (sessionTokens dta)
243 , MethodHandler "announce_peer" $ announceH (contactInfo dta) (sessionTokens dta)
244 ]
245 where
246 getpeers dta ih = do
247 ps <- lookupPeers (contactInfo dta) ih
248 if L.null ps
249 then Left <$> getclosest (toNodeId ih)
250 else return (Right ps)
251
252
253-- | Includes all default query handlers.
254defaultHandlers :: forall raw dht u ip.
255 ( Ord (TransactionID dht)
256 , Ord (NodeId dht)
257 , Show u
258 , SerializableTo raw (Response dht (Ping dht))
259 , SerializableTo raw (Query dht (Ping dht))
260 , Show (QueryMethod dht)
261 , Show (NodeId dht)
262 , FiniteBits (NodeId dht)
263 , Default u
264 , Serialize (TransactionID dht)
265 , WireFormat raw dht
266 , Kademlia dht
267 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
268 , Functor dht
269 , Pretty (NodeInfo dht ip u)
270 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
271 , SerializableTo raw (Response dht (NodeFound dht ip))
272 , SerializableTo raw (Query dht (FindNode dht ip))
273 , Eq ip, Ord ip, Address ip, DataHandlers raw dht
274 ) => LogFun -> DHT raw dht u ip [Handler IO dht raw]
275defaultHandlers logger = do
276 groknode <- insertNode1
277 mynid <- myNodeIdAccordingTo1
278 dta <- asks dhtData
279 let handler :: MethodHandler raw dht ip -> Handler IO dht raw
280 handler (MethodHandler name action) = nodeHandler groknode mynid (logt logger) dta name action
281 getclosest <- getClosest1
282 hs <- kademliaHandlers logger
283 return $ hs ++ L.map handler (dataHandlers (fmap (fmap (fmap (const ()))) . getclosest) dta)
284
285{-----------------------------------------------------------------------
286-- Basic queries
287-----------------------------------------------------------------------}
288
289type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip])
290
291-- | The most basic query. May be used to check if the given node is
292-- alive or get its 'NodeId'.
293pingQ :: forall raw dht u ip.
294 ( DHT.Kademlia dht
295 , Address ip
296 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
297 , Default u
298 , Show u
299 , Ord (TransactionID dht)
300 , Serialize (TransactionID dht)
301 , WireFormat raw dht
302 , SerializableTo raw (Response dht (Ping dht))
303 , SerializableTo raw (Query dht (Ping dht))
304 , Ord (NodeId dht)
305 , FiniteBits (NodeId dht)
306 , Show (NodeId dht)
307 , Show (QueryMethod dht)
308 ) => NodeInfo dht ip u -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP)
309pingQ ni = do
310 let ping = DHT.pingMessage (Proxy :: Proxy dht)
311 (nid, pong, mip) <- queryNode' ni ping
312 let _ = pong `asTypeOf` ping
313 -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid}
314 return (NodeInfo nid (nodeAddr ni) def, mip)
315
316-- | The most basic query. May be used to check if the given node is
317-- alive or get its 'NodeId'.
318coldPingQ :: forall raw dht u ip.
319 ( DHT.Kademlia dht
320 , Address ip
321 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
322 , Default u
323 , Show u
324 , Ord (TransactionID dht)
325 , Serialize (TransactionID dht)
326 , WireFormat raw dht
327 , SerializableTo raw (Response dht (Ping dht))
328 , SerializableTo raw (Query dht (Ping dht))
329 , Ord (NodeId dht)
330 , FiniteBits (NodeId dht)
331 , Show (NodeId dht)
332 , Show (QueryMethod dht)
333 ) => PacketDestination dht -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP)
334coldPingQ dest = do
335 let ping = DHT.pingMessage (Proxy :: Proxy dht)
336 naddr <- maybe (throwIO $ QueryFailed ProtocolError "unable to construct NodeAddr from PacketDestination")
337 return
338 $ fromAddr dest
339 (nid, pong, mip) <- coldQueryNode' naddr dest ping
340 let _ = pong `asTypeOf` ping
341 -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid}
342 return (NodeInfo nid naddr def, mip)
343
344-- TODO [robustness] match range of returned node ids with the
345-- expected range and either filter bad nodes or discard response at
346-- all throwing an exception
347-- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo
348findNodeQ proxy key ni = do
349 closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> ni
350 $(logInfoS) "findNodeQ" $ "NodeFound\n"
351 <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest)
352 return $ Right closest
353
354#ifdef VERSION_bencoding
355getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr
356getPeersQ topic ni = do
357 GotPeers {..} <- GetPeers topic <@> ni
358 let dist = distance (toNodeId topic) (nodeId ni)
359 $(logInfoS) "getPeersQ" $ T.pack
360 $ "distance: " <> render (pPrint dist) <> " , result: "
361 <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" }
362 return peers
363
364announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr
365announceQ ih p ni = do
366 GotPeers {..} <- GetPeers ih <@> ni
367 case peers of
368 Left ns
369 | False -> undefined -- TODO check if we can announce
370 | otherwise -> return (Left ns)
371 Right _ -> do -- TODO *probably* add to peer cache
372 Announced <- Announce False ih Nothing p grantedToken <@> ni
373 return (Right [nodeAddr ni])
374#endif
375
376{-----------------------------------------------------------------------
377-- Iterative queries
378-----------------------------------------------------------------------}
379
380
381ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip]))
382ioGetPeers ih = do
383 session <- ask
384 return $ \ni -> runDHT session $ do
385 r <- try $ getPeersQ ih ni
386 case r of
387 Right e -> return $ either (,[]) ([],) e
388 Left e -> let _ = e :: QueryFailure in return ([],[])
389
390ioFindNode :: ( DHT.Kademlia dht
391 , WireFormat raw dht
392 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
393 , Address ip
394 , Default u
395 , Show u
396 , Show (QueryMethod dht)
397 , TableKey dht infohash
398 , Eq (NodeId dht)
399 , Ord (NodeId dht)
400 , FiniteBits (NodeId dht)
401 , Show (NodeId dht)
402 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
403 , Ord (TransactionID dht)
404 , Serialize (TransactionID dht)
405 , SerializableTo raw (Response dht (NodeFound dht ip))
406 , SerializableTo raw (Query dht (FindNode dht ip))
407 , SerializableTo raw (Response dht (Ping dht))
408 , SerializableTo raw (Query dht (Ping dht))
409 ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u]))
410ioFindNode ih = do
411 session <- ask
412 return $ \ni -> runDHT session $ do
413 ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni
414 let ns' = L.map (fmap (const def)) ns
415 return $ L.partition (\n -> nodeId n /= toNodeId ih) ns'
416
417
418-- | Like ioFindNode, but considers all found nodes to be 'Right' results.
419ioFindNodes :: ( DHT.Kademlia dht
420 , WireFormat raw dht
421 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
422 , Address ip
423 , Default u
424 , Show u
425 , Show (QueryMethod dht)
426 , TableKey dht infohash
427 , Eq (NodeId dht)
428 , Ord (NodeId dht)
429 , FiniteBits (NodeId dht)
430 , Show (NodeId dht)
431 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
432 , Ord (TransactionID dht)
433 , Serialize (TransactionID dht)
434 , SerializableTo raw (Response dht (NodeFound dht ip))
435 , SerializableTo raw (Query dht (FindNode dht ip))
436 , SerializableTo raw (Response dht (Ping dht))
437 , SerializableTo raw (Query dht (Ping dht))
438 ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u]))
439ioFindNodes ih = do
440 session <- ask
441 return $ \ni -> runDHT session $ do
442 ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni
443 let ns' = L.map (fmap (const def)) ns
444 return ([], ns')
445
446isearch :: ( Ord r
447 , Ord ip
448 , Ord (NodeId dht)
449 , FiniteBits (NodeId dht)
450 , TableKey dht ih
451 , Show ih) =>
452 (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])))
453 -> ih
454 -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r)
455isearch f ih = do
456 qry <- f ih
457 ns <- kclosest 8 ih <$> getTable
458 liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns
459 a <- fork $ do
460 tid <- myThreadId
461 labelThread tid ("search."++show ih)
462 Search.search s
463 -- atomically \$ readTVar (Search.searchResults s)
464 return (a, s)
465
466-- | Background search: fill a lazy list using a background thread.
467bgsearch f ih = do
468 (tid, s) <- isearch f ih
469 let again shown = do
470 (chk,fin) <- atomically $ do
471 r <- (Set.\\ shown) <$> readTVar (Search.searchResults s)
472 if not $ Set.null r
473 then (,) r <$> Search.searchIsFinished s
474 else Search.searchIsFinished s >>= check >> return (Set.empty,True)
475 let ps = Set.toList chk
476 if fin then return ps
477 else do
478 xs <- unsafeInterleaveIO $ again (shown `Set.union` chk)
479 return $ ps ++ xs
480 liftIO $ again Set.empty
481
482type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u]
483
484#if 0
485
486-- TODO: use reorder and filter (Traversal option) leftovers
487-- search :: k -> IterationI ip o -> Search ip o
488search _ action = do
489 awaitForever $ \ batch -> unless (L.null batch) $ do
490 $(logWarnS) "search" "start query"
491 responses <- lift $ queryParallel (action <$> batch)
492 let (nodes, results) = partitionEithers responses
493 $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results)))
494 leftover $ L.concat nodes
495 let r = mapM_ yield results
496 _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ())
497 r
498
499#endif
500
501publish = error "todo"
502-- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip ()
503-- publish ih p = do
504 -- nodes <- getClosest ih
505 -- r <- asks (optReplication . options)
506 -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
507 -- return ()
508
509
510probeNode :: ( Default u
511 , Show u
512 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
513 , DHT.Kademlia dht
514 , Address ip
515 , Ord (TransactionID dht)
516 , Serialize (TransactionID dht)
517 , WireFormat raw dht
518 , SerializableTo raw (Response dht (Ping dht))
519 , SerializableTo raw (Query dht (Ping dht))
520 , Ord (NodeId dht)
521 , FiniteBits (NodeId dht)
522 , Show (NodeId dht)
523 , Show (QueryMethod dht)
524 ) => NodeInfo dht ip u -> DHT raw dht u ip (Bool , Maybe ReflectedIP)
525probeNode addr = do
526 $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint $ nodeAddr addr)))
527 result <- try $ pingQ addr
528 let _ = fmap (const ()) result :: Either QueryFailure ()
529 return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result
530
531
532refreshNodes :: forall raw dht u ip.
533 ( Address ip
534 , Ord (NodeId dht)
535 , Default u
536 , FiniteBits (NodeId dht)
537 , Pretty (NodeId dht)
538 , DHT.Kademlia dht
539 , Ord ip
540 , Ord (TransactionID dht)
541 , SerializableTo raw (Response dht (NodeFound dht ip))
542 , SerializableTo raw (Query dht (FindNode dht ip))
543 , SerializableTo raw (Response dht (Ping dht))
544 , SerializableTo raw (Query dht (Ping dht))
545 , Pretty (NodeInfo dht ip u)
546 , Show (NodeId dht)
547 , Show u
548 , Show (QueryMethod dht)
549 , Serialize (TransactionID dht)
550 , WireFormat raw dht
551 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
552 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
553 ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()]
554-- FIXME do not use getClosest sinse we should /refresh/ them
555refreshNodes nid = do
556 $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid)))
557 nodes <- getClosest nid
558 do
559 -- forM (L.take 1 nodes) \$ \ addr -> do
560 -- NodeFound ns <- FindNode nid <@> addr
561 -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) ()
562 -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) ()
563 -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume
564 -- nss <- sourceList [nodes] \$= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume
565 ns <- bgsearch ioFindNodes nid
566 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length ns)) <> " nodes."
567 _ <- queryParallel $ flip L.map ns $ \n -> do
568 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n))
569 pingQ n
570 -- pingQ takes care of inserting the node.
571 return ()
572 return () -- \$ L.concat nss
573
574logc :: Char -> String -> DHT raw dht u ip ()
575logc 'D' = $(logDebugS) "insertNode" . T.pack
576logc 'W' = $(logWarnS) "insertNode" . T.pack
577logc 'I' = $(logInfoS) "insertNode" . T.pack
578logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :)
579
580-- | This operation do not block but acquire exclusive access to
581-- routing table.
582insertNode :: forall raw dht u ip.
583 ( Address ip
584 , Ord (NodeId dht)
585 , FiniteBits (NodeId dht)
586 , Show (NodeId dht)
587 , Default u
588 , Show u
589 , DHT.Kademlia dht
590 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
591 , Ord (TransactionID dht)
592 , WireFormat raw dht
593 , Serialize (TransactionID dht)
594 , SerializableTo raw (Response dht (Ping dht))
595 , SerializableTo raw (Query dht (Ping dht))
596 , Ord (NodeId dht)
597 , Show (NodeId dht)
598 , Show (QueryMethod dht)
599 ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip ()
600insertNode info witnessed_ip0 = do
601 f <- insertNode1
602 liftIO $ f info witnessed_ip0
603
604insertNode1 :: forall raw dht u ip.
605 ( Address ip
606 , Default u
607 , Show u
608 , Ord (NodeId dht)
609 , FiniteBits (NodeId dht)
610 , Show (NodeId dht)
611 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
612 , DHT.Kademlia dht
613 , Ord (TransactionID dht)
614 , WireFormat raw dht
615 , Serialize (TransactionID dht)
616 , SerializableTo raw (Response dht (Ping dht))
617 , SerializableTo raw (Query dht (Ping dht))
618 , Ord (NodeId dht)
619 , Show (NodeId dht)
620 , Show (QueryMethod dht)
621 ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ())
622insertNode1 = do
623 bc <- optBucketCount <$> asks options
624 nid <- asks tentativeNodeId
625 logm0 <- embed_ (uncurry logc)
626 let logm c = logm0 . (c,)
627 dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state.
628 probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP)
629 let probe n = probe0 n >>= runDHT dht_node_state . restoreM
630 {-
631 changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive
632 ip <- fromSockAddr ip0 :: Maybe ip
633 listToMaybe
634 $ rank id (nodeId $ foreignNode arrival)
635 $ bep42s ip (DHT.fallbackID params) -- warning: recursive
636 -}
637 params = DHT.TableParameters
638 { maxBuckets = bc :: Int
639 , fallbackID = nid :: NodeId dht
640 , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht
641 , logMessage = logm :: Char -> String -> IO ()
642 , pingProbe = probe :: NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP)
643 }
644 tbl <- asks routingInfo
645 let state = DHT.TableKeeper
646 { routingInfo = tbl
647 , grokNode = DHT.insertNode params state
648 , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO ()
649 }
650 return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0
651
652-- | Throws exception if node is not responding.
653queryNode :: forall raw dht u a b ip.
654 ( Address ip
655 , KRPC dht (Query dht a) (Response dht b)
656 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
657 , Default u
658 , Show u
659 , DHT.Kademlia dht
660 , Ord (TransactionID dht)
661 , Serialize (TransactionID dht)
662 , WireFormat raw dht
663 , SerializableTo raw (Response dht b)
664 , SerializableTo raw (Query dht a)
665 , Ord (NodeId dht)
666 , FiniteBits (NodeId dht)
667 , Show (NodeId dht)
668 , Show (QueryMethod dht)
669 , SerializableTo raw (Response dht (Ping dht))
670 , SerializableTo raw (Query dht (Ping dht))
671 ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b)
672queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q
673
674queryNode' :: forall raw dht u a b ip.
675 ( Address ip
676 , Default u
677 , Show u
678 , DHT.Kademlia dht
679 , KRPC dht (Query dht a) (Response dht b)
680 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
681 , Ord (TransactionID dht)
682 , Serialize (TransactionID dht)
683 , WireFormat raw dht
684 , SerializableTo raw (Response dht b)
685 , SerializableTo raw (Query dht a)
686 , Ord (NodeId dht)
687 , FiniteBits (NodeId dht)
688 , Show (NodeId dht)
689 , Show (QueryMethod dht)
690 , SerializableTo raw (Response dht (Ping dht))
691 , SerializableTo raw (Query dht (Ping dht))
692 ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP)
693queryNode' ni q = do
694 let addr = nodeAddr ni
695 dest = makeAddress (Left $ nodeId ni) (toSockAddr addr)
696 coldQueryNode' addr dest q
697
698coldQueryNode' :: forall raw dht u a b ip.
699 ( Address ip
700 , Default u
701 , Show u
702 , DHT.Kademlia dht
703 , KRPC dht (Query dht a) (Response dht b)
704 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
705 , Ord (TransactionID dht)
706 , Serialize (TransactionID dht)
707 , WireFormat raw dht
708 , SerializableTo raw (Response dht b)
709 , SerializableTo raw (Query dht a)
710 , Ord (NodeId dht)
711 , FiniteBits (NodeId dht)
712 , Show (NodeId dht)
713 , Show (QueryMethod dht)
714 , SerializableTo raw (Response dht (Ping dht))
715 , SerializableTo raw (Query dht (Ping dht))
716 ) => NodeAddr ip -> PacketDestination dht -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP)
717coldQueryNode' addr dest q = do
718 nid <- myNodeIdAccordingTo $ fromMaybe (error "TODO: coldQueryNode' myNodeIdAccordingTo") $ fromAddr dest
719 dta <- asks dhtData
720 qextra <- liftIO $ makeQueryExtra dta nid (Proxy :: Proxy (Query dht q)) (Proxy :: Proxy (Response dht b))
721 let read_only = False -- TODO: check for NAT issues. (BEP 43)
722 -- let KRPC.Method name = KRPC.method :: KRPC.Method dht (Query dht a) (Response dht b)
723 mgr <- asks manager
724 (Response rextra r, remoteId, witnessed_ip) <- liftIO $ query' mgr dest (Query qextra q)
725 -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip)
726 -- <> " by " <> T.pack (show (toSockAddr addr))
727 _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip
728 return (remoteId, r, witnessed_ip)
729
730-- | Infix version of 'queryNode' function.
731(<@>) :: ( Address ip
732 , KRPC dht (Query dht a) (Response dht b)
733 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
734 , Default u
735 , Show u
736 , Show (QueryMethod dht)
737 , Ord (NodeId dht)
738 , FiniteBits (NodeId dht)
739 , Show (NodeId dht)
740 , Ord (TransactionID dht)
741 , Serialize (TransactionID dht)
742 , SerializableTo raw (Response dht b)
743 , SerializableTo raw (Query dht a)
744 , SerializableTo raw (Response dht (Ping dht))
745 , SerializableTo raw (Query dht (Ping dht))
746 , WireFormat raw dht
747 , Kademlia dht
748 ) => a -> NodeInfo dht ip u -> DHT raw dht u ip b
749q <@> addr = snd <$> queryNode addr q
750{-# INLINE (<@>) #-}
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
deleted file mode 100644
index b775e7d3..00000000
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ /dev/null
@@ -1,571 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013-2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This module defines internal state of a node instance. You can
9-- have multiple nodes per application but usually you don't have
10-- to. Normally, you don't need to import this module, use
11-- "Network.BitTorrent.DHT" instead.
12--
13{-# LANGUAGE CPP #-}
14{-# LANGUAGE RecordWildCards #-}
15{-# LANGUAGE FlexibleContexts #-}
16{-# LANGUAGE FlexibleInstances #-}
17{-# LANGUAGE GeneralizedNewtypeDeriving #-}
18{-# LANGUAGE MultiParamTypeClasses #-}
19{-# LANGUAGE ScopedTypeVariables #-}
20{-# LANGUAGE TypeFamilies #-}
21{-# LANGUAGE TemplateHaskell #-}
22module Network.BitTorrent.DHT.Session
23 ( -- * Options
24 -- | Use @optFooBar def@ to get default 'Alpha' or 'K'.
25 Alpha
26 , K
27 , Options (..)
28
29 -- * Session
30 , Node
31 , options
32 , tentativeNodeId
33 , myNodeIdAccordingTo
34 , myNodeIdAccordingTo1
35 , routingInfo
36 , routableAddress
37 , getTimestamp
38 -- , SessionTokens
39 -- , sessionTokens
40 -- , contactInfo
41 , dhtData
42 , PeerStore
43 , manager
44
45 -- ** Initialization
46 , LogFun
47 , logt
48 , NodeHandler
49 , newNode
50 , closeNode
51
52 -- * DHT
53 -- | Use @asks options@ to get options passed to 'startNode'
54 -- or @asks thisNodeId@ to get id of locally running node.
55 , DHT
56 , runDHT
57
58 -- ** Tokens
59 -- , grantToken
60 -- , checkToken
61
62 -- ** Routing table
63 , getTable
64 , getClosest
65 , getClosest1
66
67#ifdef VERSION_bencoding
68 -- ** Peer storage
69 , insertPeer
70 , getPeerList
71 , getPeerList1
72 , lookupPeers
73 , insertTopic
74 , deleteTopic
75 , getSwarms
76 , savePeerStore
77 , mergeSavedPeers
78 , allPeers
79#endif
80
81 -- ** Messaging
82 , queryParallel
83 ) where
84
85import Prelude hiding (ioError)
86
87import Control.Concurrent.STM
88#ifdef THREAD_DEBUG
89import Control.Concurrent.Async.Lifted.Instrument
90#else
91import Control.Concurrent.Async.Lifted
92#endif
93import Control.Exception.Lifted hiding (Handler)
94import Control.Monad.Base
95import Control.Monad.Logger
96import Control.Monad.Reader
97import Control.Monad.Trans.Control
98import Control.Monad.Trans.Resource
99import Data.Typeable
100import Data.String
101import Data.Bits
102import Data.ByteString
103import Data.Conduit.Lazy
104import Data.Default
105import Data.Fixed
106import Data.Hashable
107import Data.List as L
108import Data.Maybe
109import Data.Monoid
110import Data.Set as S
111import Data.Time
112import Network (PortNumber)
113import System.Random (randomIO)
114import Data.Time.Clock.POSIX
115import Data.Text as Text
116import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
117import Data.Serialize as S
118import Network.DHT.Types
119import Network.DatagramServer.Types
120
121
122import Data.Torrent as Torrent
123import Network.DatagramServer as KRPC hiding (Options, def)
124import qualified Network.DatagramServer as KRPC (def)
125#ifdef VERSION_bencoding
126import Data.BEncode (BValue)
127import Network.DatagramServer.Mainline (KMessageOf)
128#else
129import Network.DatagramServer.Tox as Tox
130#endif
131import Network.Address
132import Network.BitTorrent.DHT.ContactInfo (PeerStore)
133import qualified Network.BitTorrent.DHT.ContactInfo as P
134import Network.DHT.Mainline
135import Network.DHT.Routing as R
136import Network.BitTorrent.DHT.Token as T
137import GHC.Stack as GHC
138
139{-----------------------------------------------------------------------
140-- Options
141-----------------------------------------------------------------------}
142
143-- | Node lookups can proceed asynchronously.
144type Alpha = Int
145
146-- NOTE: libtorrent uses 5, azureus uses 10
147-- | The quantity of simultaneous lookups is typically three.
148defaultAlpha :: Alpha
149defaultAlpha = 3
150
151-- TODO add replication loop
152
153-- TODO do not insert infohash -> peeraddr if infohash is too far from
154-- this node id
155{-
156data Order
157 = NearFirst
158 | FarFirst
159 | Random
160
161data Traversal
162 = Greedy -- ^ aggressive short-circuit traversal
163 | Exhaustive -- ^
164-}
165
166-- | Original Kamelia DHT uses term /publish/ for data replication
167-- process. BitTorrent DHT uses term /announce/ since the purpose of
168-- the DHT is peer discovery. Later in documentation, we use terms
169-- /publish/ and /announce/ interchangible.
170data Options = Options
171 { -- | The degree of parallelism in 'find_node' queries. More
172 -- parallism lead to faster bootstrapping and lookup operations,
173 -- but also increase resource usage.
174 --
175 -- Normally this parameter should not exceed 'optK'.
176 optAlpha :: {-# UNPACK #-} !Alpha
177
178 -- | /K/ parameter - number of nodes to return in 'find_node'
179 -- responses.
180 , optK :: {-# UNPACK #-} !K
181
182 -- | Number of buckets to maintain. This parameter depends on
183 -- amount of nodes in the DHT network.
184 , optBucketCount :: {-# UNPACK #-} !BucketCount
185
186 -- | RPC timeout.
187 , optTimeout :: !NominalDiffTime
188
189 -- | /R/ parameter - how many target nodes the 'announce' query
190 -- should affect.
191 --
192 -- A large replica set compensates for inconsistent routing and
193 -- reduces the need to frequently republish data for
194 -- persistence. This comes at an increased cost for
195 -- 'Network.BitTorrent.DHT.insert' in terms of time, nodes
196 -- contacted, and storage.
197 , optReplication :: {-# UNPACK #-} !NodeCount
198
199 -- | How often this node should republish (or reannounce) its
200 -- data.
201 --
202 -- Large replica set ('optReplication') should require
203 -- smaller reannounce intervals ('optReannounce').
204 , optReannounce :: !NominalDiffTime
205
206 -- | The time it takes for data to expire in the
207 -- network. Publisher of the data should republish (or
208 -- reannounce) data to keep it in the network.
209 --
210 -- The /data expired timeout/ should be more than 'optReannounce'
211 -- interval.
212 , optDataExpired :: !NominalDiffTime
213 } deriving (Show, Eq)
214
215-- | Optimal options for bittorrent client. For short-lifetime
216-- utilities you most likely need to tune 'optAlpha' and
217-- 'optBucketCount'.
218instance Default Options where
219 def = Options
220 { optAlpha = defaultAlpha
221 , optK = defaultK
222
223 -- see Fig.2 from "BitTorrent Mainline DHT Measurement" paper.
224 , optBucketCount = defaultBucketCount
225
226 -- see Fig.4 from "Profiling a Million User DHT" paper.
227 , optTimeout = 5 -- seconds
228 , optReplication = 20 -- nodes
229 , optReannounce = 15 * 60
230 , optDataExpired = 60 * 60
231 }
232
233seconds :: NominalDiffTime -> Int
234seconds dt = fromEnum (realToFrac dt :: Uni)
235{-----------------------------------------------------------------------
236-- Session
237-----------------------------------------------------------------------}
238
239-- | A set of torrents this peer intends to share.
240type AnnounceSet = Set (InfoHash, PortNumber)
241
242-- | Logger function.
243type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
244
245-- | DHT session keep track state of /this/ node.
246data Node raw dht u ip = Node
247 { -- | Session configuration;
248 options :: !Options
249
250 -- | Pseudo-unique self-assigned session identifier. This value is
251 -- constant during DHT session and (optionally) between sessions.
252 , tentativeNodeId :: !(NodeId dht)
253
254 , resources :: !InternalState
255 , manager :: !(Manager raw dht) -- ^ RPC manager;
256 , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table;
257 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node;
258 , dhtData :: DHTData dht ip
259 , loggerFun :: !LogFun
260 }
261
262-- | DHT keep track current session and proper resource allocation for
263-- safe multithreading.
264newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a }
265 deriving ( Functor, Applicative, Monad, MonadIO
266 , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow
267 )
268
269#if MIN_VERSION_monad_control(1,0,0)
270newtype DHTStM raw dht u ip a = StM {
271 unSt :: StM (ReaderT (Node raw dht u ip) IO) a
272 }
273#endif
274
275instance MonadBaseControl IO (DHT raw dht u ip) where
276#if MIN_VERSION_monad_control(1,0,0)
277 type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a
278#else
279 newtype StM (DHT raw dht u ip) a = StM {
280 unSt :: StM (ReaderT (Node raw dht u ip) IO) a
281 }
282#endif
283 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' ->
284 cc $ \ (DHT m) -> StM <$> cc' m
285 {-# INLINE liftBaseWith #-}
286
287 restoreM = DHT . restoreM . unSt
288 {-# INLINE restoreM #-}
289
290-- | Check is it is possible to run 'queryNode' or handle pending
291-- query from remote node.
292instance MonadActive (DHT raw dht u ip) where
293 monadActive = getManager >>= liftIO . isActive
294 {-# INLINE monadActive #-}
295
296-- | All allocated resources will be closed at 'closeNode'.
297instance MonadResource (DHT raw dht u ip) where
298 liftResourceT m = do
299 s <- asks resources
300 liftIO $ runInternalState m s
301
302-- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where
303
304getManager :: DHT raw dht u ip (Manager raw dht)
305getManager = asks manager
306
307instance MonadLogger (DHT raw dht u ip) where
308 monadLoggerLog loc src lvl msg = do
309 logger <- asks loggerFun
310 liftIO $ logger loc src lvl (toLogStr msg)
311
312#ifdef VERSION_bencoding
313type NodeHandler = Handler IO KMessageOf BValue
314#else
315type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString
316#endif
317
318logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO ()
319logt lf c m txt = lf (locFromCS callStack) (fromString m) (lvl c) (fromString $ Text.unpack txt)
320 where
321 lvl 'D' = LevelDebug
322 lvl 'I' = LevelInfo
323 lvl 'W' = LevelWarn
324 lvl 'E' = LevelError
325 lvl ch = LevelOther $ Text.cons ch Text.empty
326
327mkLoggerLoc :: GHC.SrcLoc -> Loc
328mkLoggerLoc loc =
329 Loc { loc_filename = GHC.srcLocFile loc
330 , loc_package = GHC.srcLocPackage loc
331 , loc_module = GHC.srcLocModule loc
332 , loc_start = ( GHC.srcLocStartLine loc
333 , GHC.srcLocStartCol loc)
334 , loc_end = ( GHC.srcLocEndLine loc
335 , GHC.srcLocEndCol loc)
336 }
337
338locFromCS :: GHC.CallStack -> Loc
339locFromCS cs = case getCallStack cs of
340 ((_, loc):_) -> mkLoggerLoc loc
341 _ -> Loc "<unknown>" "<unknown>" "<unknown>" (0,0) (0,0)
342
343
344-- | Run DHT session. You /must/ properly close session using
345-- 'closeNode' function, otherwise socket or other scarce resources may
346-- leak.
347newNode :: forall raw dht ip u.
348 ( Address ip
349 , FiniteBits (NodeId dht)
350 , Serialize (NodeId dht)
351 , Kademlia dht
352 , WireFormat raw dht
353 )
354 => -- [NodeHandler] -- ^ handlers to run on accepted queries;
355 Options -- ^ various dht options;
356 -> NodeAddr ip -- ^ node address to bind;
357 -> LogFun -- ^ invoked on log messages;
358 -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated.
359 -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address.
360newNode opts naddr logger mbid = do
361 s <- createInternalState
362 runInternalState initNode s
363 `onException` closeInternalState s
364 where
365 rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) }
366 nodeAddr = toSockAddr naddr
367 initNode = do
368 s <- getInternalState
369 (myId, infovar, getst) <- liftIO $ do
370 (i, ctx) <- initializeServerState (Proxy :: Proxy (dht raw)) mbid
371 var <- atomically (newTVar Nothing)
372 let getst dest = do
373 info <- atomically . readTVar $ var
374 return ( maybe i myNodeId info, ctx)
375 return (i, var, getst)
376 (_, m) <- allocate (newManager rpcOpts (logt logger) nodeAddr getst []) closeManager
377 liftIO $ do
378 dta <- initializeDHTData
379 node <- Node opts myId s m infovar
380 <$> newTVarIO S.empty
381 <*> pure dta
382 <*> pure logger
383 return node
384
385-- | Some resources like listener thread may live for
386-- some short period of time right after this DHT session closed.
387closeNode :: Node raw dht u ip -> IO ()
388closeNode Node {..} = closeInternalState resources
389
390-- | Run DHT operation on the given session.
391runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a
392runDHT node action = runReaderT (unDHT action) node
393{-# INLINE runDHT #-}
394
395{-----------------------------------------------------------------------
396-- Routing
397-----------------------------------------------------------------------}
398
399-- /pick a random ID/ in the range of the bucket and perform a
400-- find_nodes search on it.
401
402
403{-----------------------------------------------------------------------
404-- Routing table
405-----------------------------------------------------------------------}
406
407-- | This nodes externally routable address reported by remote peers.
408routableAddress :: DHT raw dht u ip (Maybe SockAddr)
409routableAddress = do
410 info <- asks routingInfo >>= liftIO . atomically . readTVar
411 return $ myAddress <$> info
412
413-- | The current NodeId that the given remote node should know us by.
414myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht)
415myNodeIdAccordingTo _ = do
416 info <- asks routingInfo >>= liftIO . atomically . readTVar
417 maybe (asks tentativeNodeId)
418 (return . myNodeId)
419 info
420
421myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) )
422myNodeIdAccordingTo1 = do
423 var <- asks routingInfo
424 tid <- asks tentativeNodeId
425 return $ \ _ -> do
426 info <- atomically $ readTVar var
427 return $ maybe tid myNodeId info
428
429-- | Get current routing table. Normally you don't need to use this
430-- function, but it can be usefull for debugging and profiling purposes.
431getTable :: Eq ip => DHT raw dht u ip (Table dht ip u)
432getTable = do
433 Node { tentativeNodeId = myId
434 , routingInfo = var
435 , options = opts } <- ask
436 let nil = nullTable myId (optBucketCount opts)
437 liftIO (maybe nil R.myBuckets <$> atomically (readTVar var))
438
439getSwarms :: Ord ip => DHT raw KMessageOf u ip [ (InfoHash, Int, Maybe ByteString) ]
440getSwarms = do
441 store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar
442 return $ P.knownSwarms store
443
444savePeerStore :: (Ord ip, Address ip) => DHT raw KMessageOf u ip ByteString
445savePeerStore = do
446 var <- asks (contactInfo . dhtData)
447 peers <- liftIO $ atomically $ readTVar var
448 return $ S.encode peers
449
450mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw KMessageOf u ip ()
451mergeSavedPeers bs = do
452 var <- asks (contactInfo . dhtData)
453 case S.decode bs of
454 Right newbies -> liftIO $ atomically $ modifyTVar' var (<> newbies)
455 Left _ -> return ()
456
457
458allPeers :: Ord ip => InfoHash -> DHT raw KMessageOf u ip [ PeerAddr ip ]
459allPeers ih = do
460 store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar
461 return $ P.lookup ih store
462
463-- | Find a set of closest nodes from routing table of this node. (in
464-- no particular order)
465--
466-- This operation used for 'find_nodes' query.
467--
468getClosest :: ( Eq ip
469 , Ord (NodeId dht)
470 , FiniteBits (NodeId dht)
471 , TableKey dht k ) =>
472 k -> DHT raw dht u ip [NodeInfo dht ip u]
473getClosest node = do
474 k <- asks (optK . options)
475 kclosest k node <$> getTable
476
477getClosest1 :: ( Eq ip
478 , Ord (NodeId dht)
479 , FiniteBits (NodeId dht)
480 , TableKey dht k
481 ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u])
482getClosest1 = do
483 k <- asks (optK . options)
484 nobkts <- asks (optBucketCount . options)
485 myid <- asks tentativeNodeId
486 var <- asks routingInfo
487 return $ \node -> do nfo <- atomically $ readTVar var
488 let tbl = maybe (nullTable myid nobkts) R.myBuckets nfo
489 return $ kclosest k node tbl
490
491{-----------------------------------------------------------------------
492-- Peer storage
493-----------------------------------------------------------------------}
494
495refreshContacts :: Ord ip => TVar (PeerStore ip) -> IO ()
496refreshContacts var =
497 -- TODO limit dht peer store in size (probably by removing oldest peers)
498 return ()
499
500
501-- | Insert peer to peer store. Used to handle announce requests.
502insertPeer :: Ord ip => TVar (PeerStore ip) -> InfoHash -> Maybe ByteString -> PeerAddr ip -> IO ()
503insertPeer var ih name addr = do
504 refreshContacts var
505 atomically $ modifyTVar' var (P.insertPeer ih name addr)
506
507-- | Get peer set for specific swarm.
508lookupPeers :: Ord ip => TVar (PeerStore ip) -> InfoHash -> IO [PeerAddr ip]
509lookupPeers var ih = do
510 refreshContacts var
511 tm <- getTimestamp
512 atomically $ do
513 (ps,store') <- P.freshPeers ih tm <$> readTVar var
514 writeTVar var store'
515 return ps
516
517getTimestamp :: IO Timestamp
518getTimestamp = do
519 utcTime <- getCurrentTime
520 -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime)))
521 return $ utcTimeToPOSIXSeconds utcTime
522
523#ifdef VERSION_bencoding
524-- | Prepare result for 'get_peers' query.
525--
526-- This operation use 'getClosest' as failback so it may block.
527--
528getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip)
529getPeerList ih = do
530 var <- asks (contactInfo . dhtData)
531 ps <- liftIO $ lookupPeers var ih
532 if L.null ps
533 then Left <$> getClosest ih
534 else return (Right ps)
535
536getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip))
537getPeerList1 = do
538 var <- asks (contactInfo . dhtData)
539 getclosest <- getClosest1
540 return $ \ih -> do
541 ps <- lookupPeers var ih
542 if L.null ps
543 then Left <$> getclosest ih
544 else return (Right ps)
545
546
547insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip ()
548insertTopic ih p = do
549 var <- asks announceInfo
550 liftIO $ atomically $ modifyTVar' var (S.insert (ih, p))
551
552deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip ()
553deleteTopic ih p = do
554 var <- asks announceInfo
555 liftIO $ atomically $ modifyTVar' var (S.delete (ih, p))
556
557#endif
558
559{-----------------------------------------------------------------------
560-- Messaging
561-----------------------------------------------------------------------}
562
563-- | Failed queries are ignored.
564queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a]
565queryParallel queries = do
566 -- TODO: use alpha
567 -- alpha <- asks (optAlpha . options)
568 cleanup <$> mapConcurrently try queries
569 where
570 cleanup :: [Either QueryFailure a] -> [a]
571 cleanup = mapMaybe (either (const Nothing) Just)
diff --git a/src/Network/DHT.hs b/src/Network/DHT.hs
deleted file mode 100644
index 285cf9ff..00000000
--- a/src/Network/DHT.hs
+++ /dev/null
@@ -1,125 +0,0 @@
1{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
2module Network.DHT
3 ( -- makeTableKeeper
4 -- , TableKeeper(..)
5 module Network.DHT -- for now
6 , module Network.DHT.Types
7 ) where
8
9import Data.Bits
10import Data.Maybe
11import Data.Monoid
12import Network.Address
13import Network.DHT.Types
14import Network.DatagramServer.Types
15import Network.DHT.Routing
16import Control.Concurrent.STM
17#ifdef THREAD_DEBUG
18import Control.Concurrent.Lifted.Instrument
19#else
20import GHC.Conc (labelThread)
21import Control.Concurrent.Lifted
22#endif
23import Text.PrettyPrint as PP hiding ((<>), ($$))
24import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
25
26import Control.Monad
27import Data.Time.Clock (getCurrentTime)
28import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
29
30data TableKeeper msg ip u = TableKeeper
31 { routingInfo :: TVar (Maybe (Info msg ip u))
32 , grokNode :: NodeInfo msg ip u -> Maybe (ReflectedIP) -> IO ()
33 , grokAddress :: Maybe SockAddr -> ReflectedIP -> IO ()
34 }
35
36makeTableKeeper :: forall msg ip u.
37 ( Address ip
38 , Show u
39 , Show (NodeId msg)
40 , Ord (NodeId msg)
41 , FiniteBits (NodeId msg)
42 ) => TableParameters msg ip u -> IO (TableKeeper msg ip u)
43makeTableKeeper param@TableParameters{..} = do
44 error "TODO makeTableKeeper" -- kick off table-updating thread
45 ri <- atomically (newTVar Nothing)
46 let tk = TableKeeper{ routingInfo = ri
47 , grokNode = insertNode param tk
48 , grokAddress = error "todo"
49 }
50 return tk
51
52atomicInsert :: ( Eq ip, Address ip, Ord (NodeId msg), FiniteBits (NodeId msg)
53 ) => TableParameters msg ip u -> TableKeeper msg ip u -> Timestamp -> Event msg ip u -> Maybe ReflectedIP -> STM (IO [CheckPing msg ip u])
54atomicInsert param@TableParameters{..} state tm arrival witnessed_ip = do
55 minfo <- readTVar (routingInfo state)
56 case minfo of
57 Just inf -> do
58 (ps,t') <- insert tm arrival $ myBuckets inf
59 writeTVar (routingInfo state) $ Just $ inf { myBuckets = t' }
60 return $ do
61 case witnessed_ip of
62 Just (ReflectedIP ip)
63 | ip /= myAddress inf
64 -> logMessage 'I' $ unwords
65 $ [ "Possible NAT?"
66 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
67 , "reports my address:"
68 , show ip ]
69 -- TODO: Let routing table vote on my IP/NodeId.
70 _ -> return ()
71 return ps
72 Nothing ->
73 let dropped = return $ do
74 -- Ignore non-witnessing nodes until somebody tells
75 -- us our ip address.
76 logMessage 'W' ("Dropped " ++ show (toSockAddr $ nodeAddr $ foreignNode arrival))
77 return []
78 in fromMaybe dropped $ do
79 ReflectedIP ip <- witnessed_ip
80 let nil = nullTable (adjustID ip arrival) maxBuckets
81 return $ do
82 (ps,t') <- insert tm arrival nil
83 let new_info = Info t' (adjustID ip arrival) ip
84 writeTVar (routingInfo state) $ Just new_info
85 return $ do
86 logMessage 'I' $ unwords
87 [ "External IP address:"
88 , show ip
89 , "(reported by"
90 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
91 ++ ")"
92 ]
93 return ps
94
95-- | This operation do not block but acquire exclusive access to
96-- routing table.
97insertNode :: forall msg ip u.
98 ( Address ip
99 , Show u
100 , Show (NodeId msg)
101 , Ord (NodeId msg)
102 , FiniteBits (NodeId msg)
103 ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO ()
104insertNode param@TableParameters{..} state info witnessed_ip0 = do
105 tm <- utcTimeToPOSIXSeconds <$> getCurrentTime -- Network.DHT.Routing.TimeStamp = POSIXTime
106 let showTable = do
107 t <- atomically $ fmap myBuckets <$> readTVar (routingInfo state)
108 let logMsg = "Routing table: " <> pPrint t
109 logMessage 'D' (render logMsg)
110 let arrival = TryInsert info
111 logMessage 'D' $ show ( TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ )
112 ps <- join $ atomically $ atomicInsert param state tm arrival witnessed_ip0
113 showTable
114 _ <- fork $ do
115 myThreadId >>= flip labelThread "DHT.insertNode.pingResults"
116 forM_ ps $ \(CheckPing ns)-> do
117 forM_ ns $ \n -> do
118 (b,mip) <- pingProbe n
119 let alive = PingResult n b
120 logMessage 'D' $ "PingResult "++show (nodeId n,b)
121 _ <- join $ atomically $ atomicInsert param state tm alive mip
122 showTable
123 return ()
124 return ()
125
diff --git a/src/Network/DHT/Mainline.hs b/src/Network/DHT/Mainline.hs
deleted file mode 100644
index bdf9e8b2..00000000
--- a/src/Network/DHT/Mainline.hs
+++ /dev/null
@@ -1,579 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This module provides message datatypes which is used for /Node to
9-- Node/ communication. Bittorrent DHT is based on Kademlia
10-- specification, but have a slightly different set of messages
11-- which have been adopted for /peer/ discovery mechanism. Messages
12-- are sent over "Network.KRPC" protocol, but normally you should
13-- use "Network.BitTorrent.DHT.Session" to send and receive
14-- messages.
15--
16-- DHT queries are not /recursive/, they are /iterative/. This means
17-- that /querying/ node . While original specification (namely BEP5)
18-- do not impose any restrictions for /quered/ node behaviour, a
19-- good DHT implementation should follow some rules to guarantee
20-- that unlimit recursion will never happen. The following set of
21-- restrictions:
22--
23-- * 'Ping' query must not trigger any message.
24--
25-- * 'FindNode' query /may/ trigger 'Ping' query to check if a
26-- list of nodes to return is /good/. See
27-- 'Network.DHT.Routing.Routing' for further explanation.
28--
29-- * 'GetPeers' query may trigger 'Ping' query for the same reason.
30--
31-- * 'Announce' query must trigger 'Ping' query for the same reason.
32--
33-- It is easy to see that the most long RPC chain is:
34--
35-- @
36-- | | |
37-- Node_A | |
38-- | FindNode or GetPeers or Announce | |
39-- | ------------------------------------> Node_B |
40-- | | Ping |
41-- | | -----------> |
42-- | | Node_C
43-- | | Pong |
44-- | NodeFound or GotPeers or Announced | <----------- |
45-- | <------------------------------------- Node_B |
46-- Node_A | |
47-- | | |
48-- @
49--
50-- where in some cases 'Node_C' is 'Node_A'.
51--
52-- For more info see:
53-- <http://www.bittorrent.org/beps/bep_0005.html#dht-queries>
54--
55-- For Kamelia messages see original Kademlia paper:
56-- <http://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf>
57--
58{-# LANGUAGE CPP #-}
59{-# LANGUAGE StandaloneDeriving #-}
60{-# LANGUAGE GeneralizedNewtypeDeriving #-}
61{-# LANGUAGE DeriveDataTypeable #-}
62{-# LANGUAGE FlexibleInstances #-}
63{-# LANGUAGE MultiParamTypeClasses #-}
64{-# LANGUAGE UndecidableInstances #-}
65{-# LANGUAGE ScopedTypeVariables #-}
66{-# LANGUAGE TypeFamilies #-}
67module Network.DHT.Mainline
68 ( -- * Envelopes
69 Query (..)
70 , Response (..)
71
72 -- * Queries
73 -- ** ping
74 , Ping (..)
75
76 -- ** find_node
77 , FindNode (..)
78 , NodeFound (..)
79 , bep42s
80 -- , bep42
81
82
83#ifdef VERSION_bencoding
84 -- ** get_peers
85 , PeerList
86 , GetPeers (..)
87 , GotPeers (..)
88
89 -- ** announce_peer
90 , Announce (..)
91 , Announced (..)
92#endif
93 , DHTData(..)
94 , SessionTokens(..)
95 , grantToken
96 , checkToken
97 ) where
98
99import Data.String
100import Control.Applicative
101import Data.Bool
102#ifdef VERSION_bencoding
103import Data.BEncode as BE
104import Data.BEncode.BDict as BDict hiding (map)
105#else
106import qualified Network.DatagramServer.Tox as Tox
107import Network.DatagramServer.Tox (NodeId)
108import Data.Word
109import Control.Monad
110#endif
111import Network.KRPC.Method
112import Network.Address hiding (NodeId)
113import Data.Bits
114import Data.ByteString (ByteString)
115import qualified Data.ByteString as BS
116import Data.Digest.CRC32C
117import Data.List as L
118import Data.Monoid
119import Data.Serialize as S
120import Data.Typeable
121import Data.Word
122import Network
123import Network.DatagramServer
124import Network.DatagramServer.Mainline
125import Data.Maybe
126
127import Data.Torrent (InfoHash)
128import Network.BitTorrent.DHT.Token as T
129import Network.BitTorrent.DHT.ContactInfo
130#ifdef VERSION_bencoding
131import Network.DatagramServer ()
132#endif
133import Network.DatagramServer.Types hiding (Query,Response)
134import Network.DHT.Types
135import Network.DHT.Routing
136import Data.Time
137import Control.Concurrent.STM
138import System.Random
139import Data.Hashable
140
141
142{-----------------------------------------------------------------------
143-- envelopes
144-----------------------------------------------------------------------}
145
146#ifndef VERSION_bencoding
147type BKey = ByteString
148#endif
149
150node_id_key :: BKey
151node_id_key = "id"
152
153read_only_key :: BKey
154read_only_key = "ro"
155
156
157#ifdef VERSION_bencoding
158instance BEncode a => BEncode (Query KMessageOf a) where
159 toBEncode Query {..} = toDict $
160 BDict.union ( node_id_key .=! queringNodeId queryExtra
161 .: read_only_key .=? bool Nothing (Just (1 :: Integer)) (queryIsReadOnly queryExtra)
162 .: endDict)
163 (dict (toBEncode queryParams))
164 where
165 dict (BDict d) = d
166 dict _ = error "impossible: instance BEncode (Query a)"
167
168 fromBEncode v =
169 Query <$> (MainlineQuery <$> fromDict (field (req node_id_key)) v
170 <*> fromDict (fromMaybe False <$>? read_only_key) v)
171 <*> fromBEncode v
172#else
173data Query a = Query a
174#endif
175
176#ifdef VERSION_bencoding
177instance BEncode a => BEncode (Response KMessageOf a) where
178 toBEncode = toBEncode . toQuery
179 where
180 toQuery (Response (MainlineResponse nid) a) = Query (MainlineQuery nid False) a
181
182 fromBEncode b = fromQuery <$> fromBEncode b
183 where
184 fromQuery (Query (MainlineQuery nid _) a) = Response (MainlineResponse nid) a
185#else
186data Response KMessageOf a = Response KMessageOf a
187#endif
188
189{-----------------------------------------------------------------------
190-- ping method
191-----------------------------------------------------------------------}
192
193-- / The most basic query is a ping. Ping query is used to check if a
194-- quered node is still alive.
195-- data Ping = Ping Tox.Nonce8 deriving (Show, Eq, Typeable)
196
197#ifdef VERSION_bencoding
198instance BEncode (Ping KMessageOf) where
199 toBEncode Ping = toDict endDict
200 fromBEncode _ = pure Ping
201#else
202instance Serialize (Query (Ping KMessageOf)) where
203 get = do
204 b <- get
205 when ( (b::Word8) /= 0) $ fail "Bad ping request"
206 nonce <- get
207 return $ Query (Ping nonce)
208 put (Query (Ping nonce)) = do
209 put (0 :: Word8)
210 put nonce
211instance Serialize (Response Ping) where
212 get = do
213 b <- get
214 when ( (b::Word8) /= 1) $ fail "Bad ping response"
215 nonce <- get
216 return $ Response (Ping nonce)
217 put (Response (Ping nonce)) = do
218 put (1 :: Word8)
219 put nonce
220#endif
221
222-- | \"q\" = \"ping\"
223instance KRPC KMessageOf (Query KMessageOf (Ping KMessageOf)) (Response KMessageOf (Ping KMessageOf)) where
224 method = "ping"
225 makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43)
226 makeResponseExtra _ nid _ _ = return $ MainlineResponse nid
227
228 -- TODO KError Sender/Responder
229 messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q
230 messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r
231
232{-----------------------------------------------------------------------
233-- find_node method
234-----------------------------------------------------------------------}
235
236-- / Find node is used to find the contact information for a node
237-- given its ID.
238-- data FindNode KMessageOf ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes
239 -- deriving (Show, Eq, Typeable)
240
241target_key :: BKey
242target_key = "target"
243
244#ifdef VERSION_bencoding
245instance Typeable ip => BEncode (FindNode KMessageOf ip) where
246 toBEncode (FindNode nid) = toDict $ target_key .=! nid .: endDict
247 fromBEncode = fromDict $ FindNode <$>! target_key
248#else
249instance Serialize (Query KMessageOf (FindNode KMessageOf ip)) where
250 get = do
251 nid <- get
252 nonce <- get
253 return $ Query (FindNode nid nonce)
254 put (Query (FindNode nid nonce)) = do
255 put nid
256 put nonce
257#endif
258
259-- | When a node receives a 'FindNode' query, it should respond with a
260-- the compact node info for the target node or the K (8) closest good
261-- nodes in its own routing table.
262--
263#ifdef VERSION_bencoding
264-- newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()] deriving (Show, Eq, Typeable)
265#else
266data NodeFound KMessageOf ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 deriving (Show, Eq, Typeable)
267#endif
268-- Tox: send_nodes
269
270nodes_key :: BKey
271nodes_key = "nodes"
272
273-- Convert IPv4 address. Useful for using variadic IP type.
274from4 :: forall dht u s. Address s => NodeInfo dht IPv4 u -> Either String (NodeInfo dht s u)
275from4 n = maybe (Left "Error converting IPv4") Right
276 $ traverseAddress (fromAddr :: IPv4 -> Maybe s) n
277
278#ifdef VERSION_bencoding
279binary :: Serialize a => BKey -> BE.Get [a]
280binary k = field (req k) >>= either (fail . format) return .
281 runGet (many get)
282 where
283 format str = "fail to deserialize " ++ show k ++ " field: " ++ str
284
285instance Address ip => BEncode (NodeFound KMessageOf ip) where
286 toBEncode (NodeFound ns) = toDict $
287 nodes_key .=! runPut (mapM_ put ns)
288 .: endDict
289
290 -- TODO: handle IPv6 by reading the "nodes6" key (see bep 32)
291 fromBEncode bval = NodeFound <$> (traverse from4 =<< fromDict (binary nodes_key) bval)
292#else
293instance Serialize (Response KMessageOf (NodeFound KMessageOf ip)) where
294 get = do
295 count <- get :: Get Word8
296 nodes <- sequence $ replicate (fromIntegral count) get
297 nonce <- get :: Get Tox.Nonce8
298 return $ Response $ NodeFound nodes nonce
299
300 put (Response (NodeFound nodes nonce)) = do
301 put (fromIntegral (length nodes) :: Word8)
302 mapM_ put nodes
303 put nonce
304
305#endif
306
307-- | \"q\" == \"find_node\"
308instance (Address ip, Typeable ip)
309 => KRPC KMessageOf (Query KMessageOf (FindNode KMessageOf ip)) (Response KMessageOf (NodeFound KMessageOf ip)) where
310 method = "find_node"
311 makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43)
312 makeResponseExtra _ nid _ _ = return $ MainlineResponse nid
313
314 -- TODO KError Sender/Responder
315 messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q
316 messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r
317
318{-----------------------------------------------------------------------
319-- get_peers method
320-----------------------------------------------------------------------}
321
322-- | Get peers associated with a torrent infohash.
323newtype GetPeers ip = GetPeers InfoHash
324 deriving (Show, Eq, Typeable)
325
326info_hash_key :: BKey
327info_hash_key = "info_hash"
328
329instance Typeable ip => BEncode (GetPeers ip) where
330 toBEncode (GetPeers ih) = toDict $ info_hash_key .=! ih .: endDict
331 fromBEncode = fromDict $ GetPeers <$>! info_hash_key
332
333type PeerList ip = Either [NodeInfo KMessageOf ip ()] [PeerAddr ip]
334
335data GotPeers ip = GotPeers
336 { -- | If the queried node has no peers for the infohash, returned
337 -- the K nodes in the queried nodes routing table closest to the
338 -- infohash supplied in the query.
339 peers :: PeerList ip
340
341 -- | The token value is a required argument for a future
342 -- announce_peer query.
343 , grantedToken :: Token
344 } deriving (Show, Eq, Typeable)
345
346peers_key :: BKey
347peers_key = "values"
348
349token_key :: BKey
350token_key = "token"
351
352name_key :: BKey
353name_key = "name"
354
355instance (Typeable ip, Serialize ip) => BEncode (GotPeers ip) where
356 toBEncode GotPeers {..} = toDict $
357 case peers of
358 Left ns ->
359 nodes_key .=! runPut (mapM_ put ns)
360 .: token_key .=! grantedToken
361 .: endDict
362 Right ps ->
363 token_key .=! grantedToken
364 .: peers_key .=! L.map S.encode ps
365 .: endDict
366
367 fromBEncode = fromDict $ do
368 mns <- optional (binary nodes_key) -- "nodes"
369 tok <- field (req token_key) -- "token"
370 mps <- optional (field (req peers_key) >>= decodePeers) -- "values"
371 case (Right <$> mps) <|> (Left <$> mns) of
372 Nothing -> fail "get_peers: neihter peers nor nodes key is valid"
373 Just xs -> pure $ GotPeers xs tok
374 where
375 decodePeers = either fail pure . mapM S.decode
376
377-- | \"q" = \"get_peers\"
378instance (Typeable ip, Serialize ip) =>
379 KRPC KMessageOf (Query KMessageOf (GetPeers ip)) (Response KMessageOf (GotPeers ip)) where
380 method = "get_peers"
381 makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43)
382 makeResponseExtra _ nid _ _ = return $ MainlineResponse nid
383
384 -- TODO KError Sender/Responder
385 messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q
386 messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r
387
388
389{-----------------------------------------------------------------------
390-- announce method
391-----------------------------------------------------------------------}
392
393-- | Announce that the peer, controlling the querying node, is
394-- downloading a torrent on a port.
395data Announce = Announce
396 { -- | If set, the 'port' field should be ignored and the source
397 -- port of the UDP packet should be used as the peer's port
398 -- instead. This is useful for peers behind a NAT that may not
399 -- know their external port, and supporting uTP, they accept
400 -- incoming connections on the same port as the DHT port.
401 impliedPort :: Bool
402
403 -- | infohash of the torrent;
404 , topic :: InfoHash
405
406 -- | some clients announce the friendly name of the torrent here.
407 , announcedName :: Maybe ByteString
408
409 -- | the port /this/ peer is listening;
410 , port :: PortNumber
411
412 -- TODO: optional boolean "seed" key
413
414 -- | received in response to a previous get_peers query.
415 , sessionToken :: Token
416
417 } deriving (Show, Eq, Typeable)
418
419port_key :: BKey
420port_key = "port"
421
422implied_port_key :: BKey
423implied_port_key = "implied_port"
424
425instance BEncode Announce where
426 toBEncode Announce {..} = toDict $
427 implied_port_key .=? flagField impliedPort
428 .: info_hash_key .=! topic
429 .: name_key .=? announcedName
430 .: port_key .=! port
431 .: token_key .=! sessionToken
432 .: endDict
433 where
434 flagField flag = if flag then Just (1 :: Int) else Nothing
435
436 fromBEncode = fromDict $ do
437 Announce <$> (boolField <$> optional (field (req implied_port_key)))
438 <*>! info_hash_key
439 <*>? name_key
440 <*>! port_key
441 <*>! token_key
442 where
443 boolField = maybe False (/= (0 :: Int))
444
445-- | The queried node must verify that the token was previously sent
446-- to the same IP address as the querying node. Then the queried node
447-- should store the IP address of the querying node and the supplied
448-- port number under the infohash in its store of peer contact
449-- information.
450data Announced = Announced
451 deriving (Show, Eq, Typeable)
452
453instance BEncode Announced where
454 toBEncode _ = toBEncode ( Ping :: Ping KMessageOf )
455 fromBEncode _ = pure Announced
456
457-- | \"q" = \"announce\"
458instance KRPC KMessageOf (Query KMessageOf Announce) (Response KMessageOf Announced) where
459 method = "announce_peer"
460 makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43)
461 makeResponseExtra _ nid _ _ = return $ MainlineResponse nid
462
463 -- TODO KError Sender/Responder
464 messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q
465 messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r
466
467
468-- | Yields all 8 DHT neighborhoods available to you given a particular ip
469-- address.
470bep42s :: Address a => a -> NodeId KMessageOf -> [NodeId KMessageOf]
471bep42s addr (NodeId r) = mapMaybe (bep42 addr) rs
472 where
473 rs = map (NodeId . change3bits r) [0..7]
474
475-- change3bits :: ByteString -> Word8 -> ByteString
476-- change3bits bs n = BS.snoc (BS.init bs) (BS.last bs .&. 0xF8 .|. n)
477
478change3bits :: (Num b, Bits b) => b -> b -> b
479change3bits bs n = (bs .&. complement 7) .|. n
480
481-- | Modifies a purely random 'NodeId' to one that is related to a given
482-- routable address in accordance with BEP 42.
483bep42 :: Address a => a -> NodeId KMessageOf -> Maybe (NodeId KMessageOf)
484bep42 addr (NodeId r)
485 | Just ip <- fmap S.encode (fromAddr addr :: Maybe IPv4)
486 <|> fmap S.encode (fromAddr addr :: Maybe IPv6)
487 = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0)
488 | otherwise
489 = Nothing
490 where
491 ip4mask = "\x03\x0f\x3f\xff" :: ByteString
492 ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString
493 nbhood_select = (fromIntegral r :: Word8) .&. 7
494 retr n = pure $ BS.drop (nodeIdSize - n) $ S.encode r
495 crc = flip shiftL (finiteBitSize (NodeId undefined) - 32) . fromIntegral . crc32c . BS.pack
496 applyMask ip = case BS.zipWith (.&.) msk ip of
497 (b:bs) -> (b .|. shiftL nbhood_select 5) : bs
498 bs -> bs
499 where msk | BS.length ip == 4 = ip4mask
500 | otherwise = ip6mask
501
502{-----------------------------------------------------------------------
503-- Tokens policy
504-----------------------------------------------------------------------}
505
506data SessionTokens = SessionTokens
507 { tokenMap :: !TokenMap
508 , lastUpdate :: !UTCTime
509 , maxInterval :: !NominalDiffTime
510 }
511
512nullSessionTokens :: IO SessionTokens
513nullSessionTokens = SessionTokens
514 <$> (tokens <$> randomIO)
515 <*> getCurrentTime
516 <*> pure defaultUpdateInterval
517
518-- TODO invalidate *twice* if needed
519invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens
520invalidateTokens curTime ts @ SessionTokens {..}
521 | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens
522 { tokenMap = update tokenMap
523 , lastUpdate = curTime
524 , maxInterval = maxInterval
525 }
526 | otherwise = ts
527
528{-----------------------------------------------------------------------
529-- Tokens
530-----------------------------------------------------------------------}
531
532tryUpdateSecret :: TVar SessionTokens -> IO ()
533tryUpdateSecret toks = do
534 curTime <- getCurrentTime
535 atomically $ modifyTVar' toks (invalidateTokens curTime)
536
537grantToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> IO Token
538grantToken sessionTokens addr = do
539 tryUpdateSecret sessionTokens
540 toks <- readTVarIO sessionTokens
541 return $ T.lookup addr $ tokenMap toks
542
543-- | Throws 'HandlerError' if the token is invalid or already
544-- expired. See 'TokenMap' for details.
545checkToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> Token -> IO Bool
546checkToken sessionTokens addr questionableToken = do
547 tryUpdateSecret sessionTokens
548 toks <- readTVarIO sessionTokens
549 return $ T.member addr questionableToken (tokenMap toks)
550
551
552--------------------------
553
554
555instance Kademlia KMessageOf where
556 data DHTData KMessageOf ip = TorrentData
557 { contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes;
558 , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs.
559 }
560
561
562 dhtAdjustID _ fallback ip0 arrival
563 = fromMaybe fallback $ do
564 ip <- fromSockAddr ip0 -- :: Maybe ip
565 let _ = ip `asTypeOf` nodeAddr (foreignNode arrival)
566 listToMaybe
567 $ rank id (nodeId $ foreignNode arrival)
568 $ bep42s ip fallback
569
570 namePing _ = "ping"
571 nameFindNodes _ = "find-nodes"
572
573 initializeDHTData = TorrentData
574 <$> newTVarIO def
575 <*> (newTVarIO =<< nullSessionTokens)
576
577deriving instance IsString (QueryMethod dht) => IsString (Method dht param result)
578deriving instance BEncode (QueryMethod dht) => BEncode (Method dht param result)
579
diff --git a/src/Network/DHT/Tox.hs b/src/Network/DHT/Tox.hs
deleted file mode 100644
index d6fc866f..00000000
--- a/src/Network/DHT/Tox.hs
+++ /dev/null
@@ -1,112 +0,0 @@
1{-# LANGUAGE TypeFamilies #-}
2{-# LANGUAGE FlexibleInstances, MultiParamTypeClasses #-}
3module Network.DHT.Tox where
4
5import Data.Serialize
6import Data.Default
7import Text.PrettyPrint.HughesPJClass
8
9import Network.DHT.Types
10import Network.DatagramServer.Types
11import qualified Network.DatagramServer.Tox as Tox
12import Network.KRPC.Method
13import Data.Word
14import Data.ByteString (ByteString)
15import Data.IP
16import Data.Bool
17import Data.Maybe
18import Control.Monad
19import System.Random
20
21instance Kademlia Tox.Message where
22 data DHTData Tox.Message ip = ToxData
23 namePing _ = Tox.Ping
24 nameFindNodes _ = Tox.GetNodes
25 initializeDHTData = return ToxData
26
27instance Pretty (NodeId Tox.Message) where
28 pPrint (Tox.NodeId nid) = encodeHexDoc nid
29
30instance Serialize (Query Tox.Message (Ping Tox.Message)) where
31 get = getToxPing False Network.DHT.Types.Query Tox.QueryNonce
32 put (Network.DHT.Types.Query extra Ping) = putToxPing False (Tox.qryNonce extra)
33instance Serialize (Response Tox.Message (Ping Tox.Message)) where
34 get = getToxPing True Network.DHT.Types.Response Tox.ResponseNonce
35 put (Network.DHT.Types.Response extra Ping) = putToxPing True (Tox.rspNonce extra)
36
37instance Serialize (Query Tox.Message (FindNode Tox.Message ip)) where
38 get = do
39 nid <- get
40 n8 <- get
41 return $ Network.DHT.Types.Query (Tox.QueryNonce n8) (FindNode nid)
42 put (Network.DHT.Types.Query (Tox.QueryNonce n8) (FindNode nid)) = do
43 put nid
44 put n8
45instance Serialize (Response Tox.Message (NodeFound Tox.Message IPv4)) where
46 get = do
47 num <- get :: Get Word8
48 when (num > 4) $ fail "Too many nodes in Tox get-nodes reply"
49 ns0 <- sequence $ replicate (fromIntegral num) (nodeFormatToNodeInfo <$> get)
50 -- TODO: Allow tcp and ipv6. For now filtering to udp ip4...
51 let ns = flip mapMaybe ns0 $ \(NodeInfo nid addr u) -> do
52 guard $ not u
53 ip4 <- fromAddr addr
54 return $ NodeInfo nid ip4 ()
55 n8 <- get
56 return $ Network.DHT.Types.Response (Tox.ResponseNonce n8) $ NodeFound ns
57 put (Network.DHT.Types.Response (Tox.ResponseNonce n8) (NodeFound ns)) = do
58 put ( fromIntegral (length ns) :: Word8 )
59 forM_ ns $ \(NodeInfo nid ip4 ()) -> do
60 put Tox.NodeFormat { nodePublicKey = nid
61 , nodeIsTCP = False
62 , nodeIP = IPv4 (nodeHost ip4)
63 , nodePort = nodePort ip4
64 }
65 put n8
66
67instance KRPC Tox.Message (Query Tox.Message (FindNode Tox.Message IPv4))
68 (Response Tox.Message (NodeFound Tox.Message IPv4)) where
69 method = Method Tox.GetNodes
70 validateExchange = validateToxExchange
71 makeQueryExtra _ _ _ _ = Tox.QueryNonce <$> randomIO
72 makeResponseExtra _ _ q _ = return $ Tox.ResponseNonce $ Tox.qryNonce $ queryExtra q
73 messageSender q _ = Tox.msgClient q
74 messageResponder _ r = Tox.msgClient r
75
76instance KRPC Tox.Message (Query Tox.Message (Ping Tox.Message))
77 (Response Tox.Message (Ping Tox.Message)) where
78 method = Method Tox.Ping
79 validateExchange = validateToxExchange
80 makeQueryExtra _ _ _ _ = Tox.QueryNonce <$> randomIO
81 makeResponseExtra _ _ q _ = return $ Tox.ResponseNonce $ Tox.qryNonce $ queryExtra q
82 messageSender q _ = Tox.msgClient q
83 messageResponder _ r = Tox.msgClient r
84
85instance DataHandlers ByteString Tox.Message
86
87instance Default Bool where def = False
88
89getToxPing isPong c n = do
90 q'r <- get :: Get Word8
91 when (bool 0 1 isPong /= q'r) $
92 fail "Tox ping/pong parse fail."
93 n8 <- get :: Get Tox.Nonce8
94 return $ c (n n8) Ping
95
96putToxPing isPong n8 = do
97 put (bool 0 1 isPong :: Word8)
98 put n8
99
100validateToxExchange q r = qnonce == rnonce
101 where
102 qnonce = Tox.qryNonce . queryExtra . Tox.msgPayload $ q
103 rnonce = Tox.rspNonce . responseExtra . Tox.msgPayload $ r
104
105
106nodeFormatToNodeInfo nf = NodeInfo nid addr u
107 where
108 u = Tox.nodeIsTCP nf
109 addr = NodeAddr (Tox.nodeIP nf) (Tox.nodePort nf)
110 nid = Tox.nodePublicKey nf
111
112-- instance Default Bool where def = False
diff --git a/src/Network/DHT/Types.hs b/src/Network/DHT/Types.hs
deleted file mode 100644
index 287d5680..00000000
--- a/src/Network/DHT/Types.hs
+++ /dev/null
@@ -1,176 +0,0 @@
1{-# LANGUAGE DefaultSignatures #-}
2{-# LANGUAGE GADTs, MultiParamTypeClasses #-}
3{-# LANGUAGE FunctionalDependencies #-}
4{-# LANGUAGE TypeFamilies #-}
5{-# LANGUAGE ScopedTypeVariables #-}
6{-# LANGUAGE StandaloneDeriving #-}
7{-# LANGUAGE FlexibleContexts #-}
8{-# LANGUAGE DeriveGeneric #-}
9module Network.DHT.Types
10 ( module Network.DHT.Types
11 , TableKey
12 , toNodeId
13 ) where
14
15import Network.Socket (SockAddr)
16import Network.DatagramServer.Types
17import Network.DHT.Routing
18import Data.Typeable
19import GHC.Generics
20import Data.Serialize
21import Data.Hashable
22import Data.String
23import Data.Monoid
24import Data.Char
25
26data TableParameters msg ip u = TableParameters
27 { maxBuckets :: Int
28 , fallbackID :: NodeId msg
29 , pingProbe :: NodeInfo msg ip u -> IO (Bool, Maybe ReflectedIP)
30 , logMessage :: Char -> String -> IO ()
31 , adjustID :: SockAddr -> Event msg ip u -> NodeId msg
32 }
33
34-- | All queries have an \"id\" key and value containing the node ID
35-- of the querying node.
36data Query dht a = Query
37 { queryExtra :: QueryExtra dht -- ^ DHT-specific query headers
38 , queryParams :: a -- ^ query parameters.
39 } deriving (Typeable,Generic)
40
41deriving instance (Eq (NodeId dht), Eq (QueryExtra dht), Eq a ) => Eq (Query dht a)
42deriving instance (Show (NodeId dht), Show (QueryExtra dht), Show a ) => Show (Query dht a)
43
44-- | All responses have an \"id\" key and value containing the node ID
45-- of the responding node.
46data Response dht a = Response
47 { responseExtra :: ResponseExtra dht
48 , responseVals :: a -- ^ query result.
49 } deriving (Typeable,Generic)
50
51deriving instance (Eq (NodeId dht), Eq (ResponseExtra dht), Eq a ) => Eq (Response dht a)
52deriving instance (Show (NodeId dht), Show (ResponseExtra dht), Show a ) => Show (Response dht a)
53
54-- | The most basic query is a ping. Ping query is used to check if a
55-- quered node is still alive.
56data Ping ( dht :: * -> *) = Ping
57 deriving (Show, Eq, Typeable)
58-- | Find node is used to find the contact information for a node
59-- given its ID.
60newtype FindNode dht ip = FindNode (NodeId dht)
61 deriving (Typeable)
62newtype NodeFound dht ip = NodeFound [NodeInfo dht ip ()]
63 deriving (Typeable)
64
65deriving instance Eq (NodeId dht) => Eq (FindNode dht ip)
66deriving instance Eq (NodeId dht) => Eq (NodeFound dht ip)
67deriving instance Show (NodeId dht) => Show (FindNode dht ip)
68deriving instance ( Show (NodeId dht)
69 , Show ip
70 ) => Show (NodeFound dht ip)
71
72pingMessage :: Proxy dht -> Ping dht
73pingMessage _ = Ping
74pongMessage :: Proxy dht -> Ping dht
75pongMessage _ = Ping
76findNodeMessage :: TableKey dht k => Proxy dht -> k -> FindNode dht ip
77findNodeMessage _ k = FindNode (toNodeId k)
78foundNodesMessage :: [NodeInfo dht ip ()] -> NodeFound dht ip
79findWho (FindNode nid) = nid
80foundNodes :: NodeFound dht ip -> [NodeInfo dht ip ()]
81foundNodes (NodeFound ns) = ns
82findWho :: FindNode dht ip -> NodeId dht
83foundNodesMessage ns = NodeFound ns
84
85class Kademlia dht where
86 data DHTData dht ip
87 dhtAdjustID :: Address ip => Proxy dht -> NodeId dht -> SockAddr -> Event dht ip u -> NodeId dht
88 dhtAdjustID _ nid _ _ = nid
89 namePing :: Proxy dht -> QueryMethod dht
90 nameFindNodes :: Proxy dht -> QueryMethod dht
91 initializeDHTData :: IO (DHTData dht ip)
92data MethodHandler raw dht ip =
93 forall a b. ( SerializableTo raw (Response dht b)
94 , SerializableTo raw (Query dht a)
95 , KRPC dht (Query dht a) (Response dht b)
96 ) => MethodHandler (QueryMethod dht) (NodeAddr ip -> a -> IO b)
97
98class DataHandlers raw dht where
99 dataHandlers ::
100 ( Ord ip , Hashable ip, Typeable ip, Serialize ip) =>
101 (NodeId dht -> IO [NodeInfo dht ip ()])
102 -> DHTData dht ip
103 -> [MethodHandler raw dht ip]
104 dataHandlers _ _ = []
105
106-- | Method datatype used to describe method name, parameters and
107-- return values of procedure. Client use a method to /invoke/, server
108-- /implements/ the method to make the actual work.
109--
110-- We use the following fantom types to ensure type-safiety:
111--
112-- * param: Type of method parameters.
113--
114-- * result: Type of return value of the method.
115--
116newtype Method dht param result = Method { methodName :: QueryMethod dht }
117
118deriving instance Eq (QueryMethod dht) => Eq (Method dht param result)
119deriving instance Ord (QueryMethod dht) => Ord (Method dht param result)
120
121-- | Example:
122--
123-- @show (Method \"concat\" :: [Int] Int) == \"concat :: [Int] -> Int\"@
124--
125instance (Show (QueryMethod dht), Typeable a, Typeable b) => Show (Method dht a b) where
126 showsPrec _ = showsMethod
127
128showsMethod :: forall dht a b. ( Show (QueryMethod dht), Typeable a , Typeable b ) => Method dht a b -> ShowS
129showsMethod (Method name) =
130 -- showString (BC.unpack name) <>
131 shows (show name) <>
132 showString " :: " <>
133 shows paramsTy <>
134 showString " -> " <>
135 shows valuesTy
136 where
137 impossible = error "KRPC.showsMethod: impossible"
138 paramsTy = typeOf (impossible :: a)
139 valuesTy = typeOf (impossible :: b)
140
141-- | In order to perform or handle KRPC query you need to provide
142-- corresponding 'KRPC' class.
143--
144-- Example:
145--
146-- @
147-- data Ping = Ping Text deriving BEncode
148-- data Pong = Pong Text deriving BEncode
149--
150-- instance 'KRPC' Ping Pong where
151-- method = \"ping\"
152-- @
153--
154class ( Typeable req, Typeable resp, Envelope dht)
155 => KRPC dht req resp | req -> resp, resp -> req where
156
157 -- | Method name. Default implementation uses lowercased @req@
158 -- datatype name.
159 --
160 method :: Method dht req resp
161
162 -- TODO add underscores
163 default method :: (IsString (QueryMethod dht), Typeable req) => Method dht req resp
164 method = Method $ fromString $ map toLower $ show $ typeOf hole
165 where
166 hole = error "krpc.method: impossible" :: req
167
168
169 validateExchange :: dht req -> dht resp -> Bool
170 validateExchange _ _ = True
171
172 makeQueryExtra :: DHTData dht ip -> NodeId dht -> Proxy req -> Proxy resp -> IO (QueryExtra dht)
173 makeResponseExtra :: DHTData dht ip -> NodeId dht -> req -> Proxy resp -> IO (ResponseExtra dht)
174
175 messageSender :: dht req -> Proxy resp -> NodeId dht
176 messageResponder :: Proxy req -> dht resp -> NodeId dht
diff --git a/src/Network/DatagramServer.hs b/src/Network/DatagramServer.hs
deleted file mode 100644
index 891417d4..00000000
--- a/src/Network/DatagramServer.hs
+++ /dev/null
@@ -1,608 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013, 2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This module provides safe remote procedure call. One important
9-- point is exceptions and errors, so to be able handle them
10-- properly we need to investigate a bit about how this all works.
11-- Internally, in order to make method invokation KRPC makes the
12-- following steps:
13--
14-- * Caller serialize arguments to bencoded bytestrings;
15--
16-- * Caller send bytestring data over UDP to the callee;
17--
18-- * Callee receive and decode arguments to the method and method
19-- name. If it can't decode then it send 'ProtocolError' back to the
20-- caller;
21--
22-- * Callee search for the @method name@ in the method table.
23-- If it not present in the table then callee send 'MethodUnknown'
24-- back to the caller;
25--
26-- * Callee check if argument names match. If not it send
27-- 'ProtocolError' back;
28--
29-- * Callee make the actuall call to the plain old haskell
30-- function. If the function throw exception then callee send
31-- 'ServerError' back.
32--
33-- * Callee serialize result of the function to bencoded bytestring.
34--
35-- * Callee encode result to bencoded bytestring and send it back
36-- to the caller.
37--
38-- * Caller check if return values names match with the signature
39-- it called in the first step.
40--
41-- * Caller extracts results and finally return results of the
42-- procedure call as ordinary haskell values.
43--
44-- If every other error occurred then caller get the
45-- 'GenericError'. All errors returned by callee are throwed as
46-- ordinary haskell exceptions at caller side. Also note that both
47-- caller and callee use plain UDP, so KRPC is unreliable.
48--
49-- For async 'query' use @async@ package.
50--
51-- For protocol details see "Network.DatagramServer.Mainline" module.
52--
53{-# LANGUAGE CPP #-}
54{-# LANGUAGE OverloadedStrings #-}
55{-# LANGUAGE FlexibleInstances #-}
56{-# LANGUAGE FlexibleContexts #-}
57{-# LANGUAGE ScopedTypeVariables #-}
58{-# LANGUAGE DefaultSignatures #-}
59{-# LANGUAGE MultiParamTypeClasses #-}
60{-# LANGUAGE FunctionalDependencies #-}
61{-# LANGUAGE DeriveDataTypeable #-}
62{-# LANGUAGE TemplateHaskell #-}
63{-# LANGUAGE KindSignatures #-}
64module Network.DatagramServer
65 (
66 -- ** Query
67 QueryFailure (..)
68 , query
69 , query'
70 , queryRaw
71 , getQueryCount
72
73 -- ** Handler
74 , HandlerFailure (..)
75 , Handler
76 , handler
77
78 -- * Manager
79 , Options (..)
80 , def
81 , Manager
82 , newManager
83 , closeManager
84 -- , withManager
85 , isActive
86 , listen
87 , Protocol(..)
88
89 -- * Re-exports
90 , ErrorCode (..)
91 , SockAddr (..)
92 ) where
93
94import Data.Default.Class
95import Network.Socket (SockAddr (..))
96
97import Control.Applicative
98#ifdef THREAD_DEBUG
99import Control.Concurrent.Lifted.Instrument
100#else
101import GHC.Conc (labelThread)
102import Control.Concurrent.Lifted
103#endif
104import Control.Exception hiding (Handler)
105import qualified Control.Exception.Lifted as E (Handler (..))
106import Control.Exception.Lifted as Lifted (catches, finally)
107import Control.Monad
108import Control.Monad.Logger
109import Control.Monad.Reader
110import Control.Monad.Trans.Control
111import qualified Data.ByteString.Base16 as Base16
112import Data.ByteString as BS
113import Data.ByteString.Char8 as BC
114import Data.ByteString.Lazy as BL
115import Data.Default.Class
116import Data.IORef
117import Data.List as L
118import Data.Map as M
119import Data.Monoid
120import Data.Serialize as S
121import Data.Text as T
122import Data.Text.Encoding as T
123import Data.Tuple
124import Data.Typeable
125import Network.DatagramServer.Types
126import Network.Socket hiding (listen)
127import Network.Socket.ByteString as BS
128import System.IO.Error
129import System.Timeout
130import Network.KRPC.Method
131
132
133{-----------------------------------------------------------------------
134-- Options
135-----------------------------------------------------------------------}
136
137-- | RPC manager options.
138data Options = Options
139 { -- | Initial 'TransactionId' incremented with each 'query';
140 optSeedTransaction :: {-# UNPACK #-} !Int
141
142 -- | Time to wait for response from remote node, in seconds.
143 , optQueryTimeout :: {-# UNPACK #-} !Int
144
145 -- | Maximum number of bytes to receive.
146 , optMaxMsgSize :: {-# UNPACK #-} !Int
147
148 } deriving (Show, Eq)
149
150defaultSeedTransaction :: Int
151defaultSeedTransaction = 0
152
153defaultQueryTimeout :: Int
154defaultQueryTimeout = 120
155
156defaultMaxMsgSize :: Int
157defaultMaxMsgSize = 64 * 1024
158
159-- | Permissive defaults.
160instance Default Options where
161 def = Options
162 { optSeedTransaction = defaultSeedTransaction
163 , optQueryTimeout = defaultQueryTimeout
164 , optMaxMsgSize = defaultMaxMsgSize
165 }
166
167validateOptions :: Options -> IO ()
168validateOptions Options {..}
169 | optQueryTimeout < 1
170 = throwIO (userError "krpc: non-positive query timeout")
171 | optMaxMsgSize < 1
172 = throwIO (userError "krpc: non-positive buffer size")
173 | otherwise = return ()
174
175{-----------------------------------------------------------------------
176-- Options
177-----------------------------------------------------------------------}
178
179type KResult msg raw = Either (KError (TransactionID msg)) (msg raw)-- Response
180
181type TransactionCounter = IORef Int
182type CallId msg = (TransactionID msg, SockAddr)
183type CallRes msg raw = MVar (raw, KResult msg raw) -- (raw response, decoded response)
184type PendingCalls msg raw = IORef (Map (CallId msg) (CallRes msg raw))
185
186type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v))
187
188-- | Handler is a function which will be invoked then some /remote/
189-- node querying /this/ node.
190type Handler h msg v = (QueryMethod msg, HandlerBody h msg v)
191
192-- | Keep track pending queries made by /this/ node and handle queries
193-- made by /remote/ nodes.
194data Manager raw msg = Manager
195 { sock :: !Socket
196 , options :: !Options
197 , listenerThread :: !(MVar ThreadId)
198 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
199 , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw)
200 -- , handlers :: [Handler h msg raw] -- TODO delete this, it's not used
201 , logMsg :: Char -> String -> T.Text -> IO ()
202 , serverState :: PacketDestination msg -> IO (NodeId msg, CipherContext raw msg)
203 }
204
205sockAddrFamily :: SockAddr -> Family
206sockAddrFamily (SockAddrInet _ _ ) = AF_INET
207sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
208sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
209sockAddrFamily (SockAddrCan _ ) = AF_CAN
210
211-- | Bind socket to the specified address. To enable query handling
212-- run 'listen'.
213newManager :: Options -- ^ various protocol options;
214 -> (Char -> String -> T.Text -> IO ()) -- ^ loging function
215 -> SockAddr -- ^ address to listen on;
216 -> ( PacketDestination msg -> IO (NodeId msg, CipherContext raw msg) )
217 -> [Handler h msg raw] -- ^ handlers to run on incoming queries.
218 -> IO (Manager raw msg) -- ^ new rpc manager.
219newManager opts @ Options {..} logmsg servAddr getst handlers = do
220 validateOptions opts
221 sock <- bindServ
222 tref <- newEmptyMVar
223 tran <- newIORef optSeedTransaction
224 calls <- newIORef M.empty
225 return $ Manager sock opts tref tran calls logmsg getst
226 where
227 bindServ = do
228 let family = sockAddrFamily servAddr
229 sock <- socket family Datagram defaultProtocol
230 when (family == AF_INET6) $ do
231 setSocketOption sock IPv6Only 0
232 bindSocket sock servAddr
233 return sock
234
235-- | Unblock all pending calls and close socket.
236closeManager :: Manager raw msg -> IO ()
237closeManager Manager {..} = do
238 maybe (return ()) killThread =<< tryTakeMVar listenerThread
239 -- TODO unblock calls
240 close sock
241
242-- | Check if the manager is still active. Manager becomes active
243-- until 'closeManager' called.
244isActive :: Manager raw msg -> IO Bool
245isActive Manager {..} = liftIO $ isBound sock
246{-# INLINE isActive #-}
247
248#if 0
249-- | Normally you should use Control.Monad.Trans.Resource.allocate
250-- function.
251withManager :: Options -> SockAddr -> [Handler h msg raw]
252 -> (Manager raw msg -> IO a) -> IO a
253withManager opts addr hs = bracket (newManager opts addr hs) closeManager
254#endif
255
256{-----------------------------------------------------------------------
257-- Logging
258-----------------------------------------------------------------------}
259
260-- TODO prettify log messages
261querySignature :: ( Show ( QueryMethod msg )
262 , Serialize ( TransactionID msg ) )
263 => QueryMethod msg -> TransactionID msg -> SockAddr -> Text
264querySignature name transaction addr = T.concat
265 [ "&", T.pack (show name)
266 , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction)
267 , " @", T.pack (show addr)
268 ]
269
270{-----------------------------------------------------------------------
271-- Client
272-----------------------------------------------------------------------}
273-- we don't need to know about TransactionId while performing query,
274-- so we introduce QueryFailure exceptions
275
276-- | Used to signal 'query' errors.
277data QueryFailure
278 = SendFailed -- ^ unable to send query;
279 | QueryFailed ErrorCode Text -- ^ remote node return error;
280 | TimeoutExpired -- ^ remote node not responding.
281 deriving (Show, Eq, Typeable)
282
283instance Exception QueryFailure
284
285sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m ()
286sendMessage sock addr a = do
287 liftIO $ sendManyTo sock [a] addr
288
289genTransactionId :: Envelope msg => TransactionCounter -> IO (TransactionID msg)
290genTransactionId ref = do
291 cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur)
292 uniqueTransactionId cur
293
294-- | How many times 'query' call have been performed.
295getQueryCount :: Manager raw msg -> IO Int
296getQueryCount mgr@Manager{..} = do
297 curTrans <- readIORef transactionCounter
298 return $ curTrans - optSeedTransaction options
299
300registerQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (CallRes msg raw)
301registerQuery cid ref = do
302 ares <- newEmptyMVar
303 atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ())
304 return ares
305
306-- simultaneous M.lookup and M.delete guarantees that we never get two
307-- or more responses to the same query
308unregisterQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (Maybe (CallRes msg raw))
309unregisterQuery cid ref = do
310 atomicModifyIORef' ref $ swap .
311 M.updateLookupWithKey (const (const Nothing)) cid
312
313
314-- (sendmsg EINVAL)
315sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO ()
316sendQuery sock addr q = handle sockError $ sendMessage sock addr q
317 where
318 sockError :: IOError -> IO ()
319 sockError _ = throwIO SendFailed
320
321-- | Enqueue query to the given node.
322--
323-- This function should throw 'QueryFailure' exception if quered node
324-- respond with @error@ message or the query timeout expires.
325--
326query :: forall h a b raw msg.
327 ( SerializableTo raw b
328 , Show (QueryMethod msg)
329 , Ord (TransactionID msg)
330 , Serialize (TransactionID msg)
331 , SerializableTo raw a
332 , WireFormat raw msg
333 , KRPC msg a b
334 ) => Manager raw msg -> PacketDestination msg -> a -> IO b
335query mgr addr params = queryK mgr addr params (\_ x _ _ -> x)
336
337-- | Like 'query' but possibly returns your externally routable IP address.
338query' :: forall h a b raw msg.
339 ( SerializableTo raw b
340 , Show (QueryMethod msg)
341 , Ord (TransactionID msg)
342 , Serialize (TransactionID msg)
343 , SerializableTo raw a , WireFormat raw msg
344 , KRPC msg a b
345 ) => Manager raw msg -> PacketDestination msg -> a -> IO (b , NodeId msg, Maybe ReflectedIP)
346query' mgr addr params = queryK mgr addr params (\_ b nid ip -> (b,nid,ip))
347
348-- | Enqueue a query, but give us the complete BEncoded content sent by the
349-- remote Node. This is useful for handling extensions that this library does
350-- not otherwise support.
351queryRaw :: forall h a b raw msg.
352 ( SerializableTo raw b
353 , Show (QueryMethod msg)
354 , Ord (TransactionID msg)
355 , Serialize (TransactionID msg)
356 , SerializableTo raw a
357 , WireFormat raw msg
358 , KRPC msg a b
359 ) => Manager raw msg -> PacketDestination msg -> a -> IO (b , raw)
360queryRaw mgr addr params = queryK mgr addr params (\raw x _ _ -> (x,raw))
361
362queryK :: forall h a b x raw msg.
363 ( SerializableTo raw b
364 , SerializableTo raw a
365 , WireFormat raw msg
366 , Show (QueryMethod msg)
367 , Ord (TransactionID msg)
368 , Serialize (TransactionID msg)
369 , KRPC msg a b
370 ) =>
371 Manager raw msg -> PacketDestination msg -> a -> (raw -> b -> NodeId msg -> Maybe ReflectedIP -> x) -> IO x
372queryK mgr@Manager{..} dest params kont = do
373 tid <- liftIO $ genTransactionId transactionCounter
374 let addr = toSockAddr dest
375 Method meth = method :: Method msg a b
376 signature = querySignature meth tid addr
377 logMsg 'D' "query.sending" signature
378 -- [Debug#query.sending] &MessageType 0 #312020202020202020202020202020202020202020202020 @77.37.142.179:33445
379
380 mres <- liftIO $ do
381 ares <- registerQuery (tid, addr) pendingCalls
382
383 (cli,ctx) <- serverState dest
384 q <- buildQuery cli addr meth tid params
385 let qb = encodePayload (q :: msg a) :: msg raw
386 qbs = encodeHeaders ctx qb dest
387 sendQuery sock addr qbs
388 `onException` unregisterQuery (tid, addr) pendingCalls
389
390 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do
391 fix $ \loop -> do
392 (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult)
393 case res of
394 Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m)
395 Right m -> case decodePayload m of
396 Right r -> case envelopeClass (r :: msg b) of
397 Response reflectedAddr
398 | validateExchange q r -> do
399 let remoteId = messageResponder (Proxy :: Proxy a) r
400 return $ kont raw (envelopePayload r) remoteId reflectedAddr
401 | otherwise -> loop
402 Error (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) -- XXX neccessary?
403 Query _ -> throwIO $ QueryFailed ProtocolError "BUG!! UNREACHABLE"
404 Left e -> throwIO $ QueryFailed ProtocolError (T.pack e)
405
406 case mres of
407 Just res -> do
408 logMsg 'D' "query.responded" $ signature
409 return res
410
411 Nothing -> do
412 _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls
413 logMsg 'W' "query.not_responding" $ signature <> " for " <>
414 T.pack (show (optQueryTimeout options)) <> " seconds"
415 throw $ TimeoutExpired
416
417{-----------------------------------------------------------------------
418-- Handlers
419-----------------------------------------------------------------------}
420-- we already throw:
421--
422-- * ErrorCode(MethodUnknown) in the 'dispatchHandler';
423--
424-- * ErrorCode(ServerError) in the 'runHandler';
425--
426-- * ErrorCode(GenericError) in the 'runHandler' (those can be
427-- async exception too)
428--
429-- so HandlerFailure should cover *only* 'ProtocolError's.
430
431-- | Used to signal protocol errors.
432data HandlerFailure
433 = BadAddress -- ^ for e.g.: node calls herself;
434 | InvalidParameter Text -- ^ for e.g.: bad session token.
435 deriving (Show, Eq, Typeable)
436
437instance Exception HandlerFailure
438
439prettyHF :: HandlerFailure -> BS.ByteString
440prettyHF BadAddress = T.encodeUtf8 "bad address"
441prettyHF (InvalidParameter reason) = T.encodeUtf8 $
442 "invalid parameter: " <> reason
443
444prettyQF :: QueryFailure -> BS.ByteString
445prettyQF e = T.encodeUtf8 $ "handler fail while performing query: "
446 <> T.pack (show e)
447
448-- | Make handler from handler function. Any thrown exception will be
449-- supressed and send over the wire back to the querying node.
450--
451-- If the handler make some 'query' normally it /should/ handle
452-- corresponding 'QueryFailure's.
453--
454handler :: forall h a b msg raw. (Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b)
455 => (SockAddr -> h (NodeId msg)) -> QueryMethod msg -> (SockAddr -> msg a -> h b) -> Handler h msg raw
456handler whoami name body = (name, wrapper)
457 where
458 wrapper :: SockAddr -> msg raw -> h (Either String (msg raw))
459 wrapper addr args =
460 case decodePayload args of
461 Left e -> pure $ Left e
462 Right a -> do
463 (\me bs -> Right $ encodePayload $ buildReply me addr args bs) <$> whoami addr <*> body addr a
464
465runHandler :: ( Envelope msg
466 , Show (QueryMethod msg)
467 , Serialize (TransactionID msg))
468 => Manager raw msg -> QueryMethod msg -> HandlerBody IO msg raw -> SockAddr -> msg raw -> IO (KResult msg raw)
469runHandler mgr@Manager{..} meth h addr m = Lifted.catches wrapper failbacks
470 where
471 signature = querySignature meth (envelopeTransaction m) addr
472
473 wrapper = do
474 logMsg 'D' "handler.quered" signature
475 result <- h addr m
476
477 case result of
478 Left msg -> do
479 logMsg 'D' "handler.bad_query" $ signature <> " !" <> T.pack msg
480 return $ Left $ KError ProtocolError (BC.pack msg) (envelopeTransaction m)
481
482 Right a -> do -- KQueryArgs
483 logMsg 'D' "handler.success" signature
484 return $ Right a
485
486 failbacks =
487 [ E.Handler $ \ (e :: HandlerFailure) -> do
488 logMsg 'D' "handler.HandlerFailure" signature
489 return $ Left $ KError ProtocolError (prettyHF e) (envelopeTransaction m)
490
491
492 -- may happen if handler makes query and fail
493 , E.Handler $ \ (e :: QueryFailure) -> do
494 logMsg 'D' "handler.QueryFailure" signature
495 return $ Left $ KError ServerError (prettyQF e) (envelopeTransaction m)
496
497 -- since handler thread exit after sendMessage we can safely
498 -- suppress async exception here
499 , E.Handler $ \ (e :: SomeException) -> do
500 logMsg 'D' "handler.SomeException" (signature <> T.pack (" "++show e))
501 return $ Left $ KError GenericError (BC.pack (show e)) (envelopeTransaction m)
502 ]
503
504dispatchHandler :: ( Eq (QueryMethod msg)
505 , Show (QueryMethod msg)
506 , Serialize (TransactionID msg)
507 , Envelope msg
508 ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> msg raw -> SockAddr -> IO (KResult msg raw)
509dispatchHandler mgr handlers meth q addr = do
510 case L.lookup meth handlers of
511 Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q)
512 Just h -> runHandler mgr meth h addr q
513
514{-----------------------------------------------------------------------
515-- Listener
516-----------------------------------------------------------------------}
517
518-- TODO bound amount of parallel handler *threads*:
519--
520-- peer A flooding with find_node
521-- peer B trying to ping peer C
522-- peer B fork too many threads
523-- ... space leak
524--
525handleQuery :: ( WireFormat raw msg
526 , Eq (QueryMethod msg)
527 , Show (QueryMethod msg)
528 , Serialize (TransactionID msg)
529 ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> raw -> msg raw -> SockAddr -> IO ()
530handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do
531 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
532 res <- dispatchHandler mgr hs meth q addr
533 (me,ctx) <- serverState (error "TODO TOX ToxCipherContext 2 or () for Mainline")
534 let res' = either buildError Just res
535 dest = makeAddress (Right q) addr
536 resbs = fmap (\raw -> encodeHeaders ctx raw dest) res' :: Maybe BS.ByteString
537 -- TODO: Generalize this debug print.
538 -- resbe = either toBEncode toBEncode res
539 -- .(logOther "q") \$ T.unlines
540 -- [ either (const "<unicode-fail>") id \$ T.decodeUtf8' (BL.toStrict $ showBEncode raw)
541 -- , "==>"
542 -- , either (const "<unicode-fail>") id \$ T.decodeUtf8' (BL.toStrict $ showBEncode resbe)
543 -- ]
544 maybe (return ()) (sendMessage sock addr) resbs
545
546handleResponse :: ( Ord (TransactionID msg)
547 , Envelope msg
548 ) => Manager raw msg -> raw -> KResult msg raw -> SockAddr -> IO ()
549handleResponse mgr@Manager{..} raw result addr = do
550 liftIO $ do
551 let resultId = either errorId envelopeTransaction result
552 mcall <- unregisterQuery (resultId, addr) pendingCalls
553 case mcall of
554 Nothing -> return ()
555 Just ares -> putMVar ares (raw,result)
556
557data Protocol raw (msg :: * -> *) = Protocol { rawProxy :: !(Proxy raw)
558 , msgProxy :: !(Proxy msg)
559 }
560
561listener :: forall raw msg.
562 ( WireFormat raw msg
563 , Ord (TransactionID msg)
564 , Eq (QueryMethod msg)
565 , Show (QueryMethod msg)
566 , Serialize (TransactionID msg)
567 ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO ()
568listener mgr@Manager{..} hs p = do
569 fix $ \again -> do
570 (me, ctx) <- serverState (error "TODO TOX ToxCipherContext or () for Mainline")
571 (bs, addr) <- liftIO $ do
572 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options)
573 case parsePacket (msgProxy p) bs >>= \r -> (,) r <$> decodeHeaders ctx r of
574 Left e -> do
575 -- XXX: Send parse failure message?
576 -- liftIO \$ sendMessage sock addr $ encodeHeaders ctx (unknownMessage e)
577 logMsg 'W' "listener" (T.pack $ show e)
578 return () -- Without transaction id, error message isn't very useful.
579 Right (raw,m) ->
580 case envelopeClass m of
581 Query meth -> handleQuery mgr hs meth (raw::raw) m addr
582 Response _ -> handleResponse mgr raw (Right m) addr
583 Error e -> handleResponse mgr raw (Left e) addr
584
585 again
586 where
587 exceptions :: IOError -> IO (BS.ByteString, SockAddr)
588 exceptions e
589 -- packets with empty payload may trigger eof exception
590 | isEOFError e = return ("", SockAddrInet 0 0)
591 | otherwise = throwIO e
592
593-- | Should be run before any 'query', otherwise they will never
594-- succeed.
595listen :: forall raw msg.
596 ( WireFormat raw msg
597 , Ord (TransactionID msg)
598 , Eq (QueryMethod msg)
599 , Show (QueryMethod msg)
600 , Serialize (TransactionID msg)
601 , Typeable msg
602 ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO ()
603listen mgr@Manager{..} hs p = do
604 tid <- fork $ do
605 myThreadId >>= liftIO . flip labelThread ("KRPC.listen." ++ (L.last $ L.words $ show $ typeOf (Proxy :: Proxy msg)))
606 listener mgr hs p `Lifted.finally`
607 liftIO (takeMVar listenerThread)
608 liftIO $ putMVar listenerThread tid
diff --git a/src/Network/DatagramServer/Error.hs b/src/Network/DatagramServer/Error.hs
deleted file mode 100644
index 77b132a7..00000000
--- a/src/Network/DatagramServer/Error.hs
+++ /dev/null
@@ -1,68 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE DeriveDataTypeable #-}
3module Network.DatagramServer.Error where
4
5import Control.Exception.Lifted as Lifted
6import Data.ByteString (ByteString)
7import Data.ByteString.Char8 as Char8
8import Data.Data
9import Data.Default
10import Data.Typeable
11
12{-----------------------------------------------------------------------
13-- Error messages
14-----------------------------------------------------------------------}
15
16-- | Types of RPC errors.
17data ErrorCode
18 -- | Some error doesn't fit in any other category.
19 = GenericError
20
21 -- | Occur when server fail to process procedure call.
22 | ServerError
23
24 -- | Malformed packet, invalid arguments or bad token.
25 | ProtocolError
26
27 -- | Occur when client trying to call method server don't know.
28 | MethodUnknown
29 deriving (Show, Read, Eq, Ord, Bounded, Typeable, Data)
30
31-- | According to the table:
32-- <http://bittorrent.org/beps/bep_0005.html#errors>
33instance Enum ErrorCode where
34 fromEnum GenericError = 201
35 fromEnum ServerError = 202
36 fromEnum ProtocolError = 203
37 fromEnum MethodUnknown = 204
38 {-# INLINE fromEnum #-}
39
40 toEnum 201 = GenericError
41 toEnum 202 = ServerError
42 toEnum 203 = ProtocolError
43 toEnum 204 = MethodUnknown
44 toEnum _ = GenericError
45 {-# INLINE toEnum #-}
46
47-- | Errors are sent when a query cannot be fulfilled. Error message
48-- can be send only from server to client but not in the opposite
49-- direction.
50--
51data KError tid = KError
52 { errorCode :: !ErrorCode -- ^ the type of error;
53 , errorMessage :: !ByteString -- ^ human-readable text message;
54 , errorId :: !tid -- ^ match to the corresponding 'queryId'.
55 } deriving ( Show, Eq, Ord, Typeable, Data, Read )
56
57instance (Typeable tid, Show tid) => Exception (KError tid)
58
59-- | Received 'queryArgs' or 'respVals' can not be decoded.
60decodeError :: String -> tid -> KError tid
61decodeError msg = KError ProtocolError (Char8.pack msg)
62
63-- | A remote node has send some 'KMessage' this node is unable to
64-- decode.
65unknownMessage :: Default tid => String -> KError tid
66unknownMessage msg = KError ProtocolError (Char8.pack msg) def
67
68
diff --git a/src/Network/DatagramServer/Mainline.hs b/src/Network/DatagramServer/Mainline.hs
deleted file mode 100644
index fea64ee6..00000000
--- a/src/Network/DatagramServer/Mainline.hs
+++ /dev/null
@@ -1,404 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013, 2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- KRPC messages types used in communication. All messages are
9-- encoded as bencode dictionary.
10--
11-- Normally, you don't need to import this module.
12--
13-- See <http://www.bittorrent.org/beps/bep_0005.html#krpc-protocol>
14--
15{-# LANGUAGE CPP #-}
16{-# LANGUAGE DefaultSignatures #-}
17{-# LANGUAGE DeriveDataTypeable #-}
18{-# LANGUAGE DeriveFunctor #-}
19{-# LANGUAGE DeriveTraversable #-}
20{-# LANGUAGE FlexibleContexts #-}
21{-# LANGUAGE FlexibleInstances #-}
22{-# LANGUAGE FunctionalDependencies #-}
23{-# LANGUAGE MultiParamTypeClasses #-}
24{-# LANGUAGE OverloadedStrings #-}
25{-# LANGUAGE TypeSynonymInstances #-}
26{-# LANGUAGE TypeFamilies #-}
27{-# LANGUAGE GeneralizedNewtypeDeriving #-}
28module Network.DatagramServer.Mainline
29 ( -- * Transaction
30 TransactionId
31
32 -- * Error
33 , ErrorCode (..)
34 , KError(..)
35 , decodeError
36 , unknownMessage
37
38 -- * Query
39 , KQueryOf(..)
40 , KQuery
41 , MethodName
42
43 -- * Response
44 , KResponseOf(..)
45 , KResponse
46 , ReflectedIP(..)
47
48 -- * Message
49 , KMessageOf (..)
50 , KMessage
51 , KQueryArgs
52 , QueryExtra(..)
53 , ResponseExtra(..)
54 , PacketDestination(..)
55
56 , NodeId(..)
57 , nodeIdSize
58
59 ) where
60
61import Control.Applicative
62import Control.Arrow
63import Control.Exception.Lifted as Lifted
64import Data.BEncode as BE
65import Network.DatagramServer.Types
66import Data.Bits
67import Data.ByteString.Base16 as Base16
68import Data.ByteString (ByteString)
69import qualified Data.ByteString as BS
70import qualified Data.ByteString.Char8 as Char8
71import qualified Data.ByteString.Lazy as L
72import Data.Default
73import Data.LargeWord
74import Data.Monoid
75import qualified Data.Serialize as S
76import Data.Serialize (Serialize, get, put, remaining, getBytes, putByteString)
77import Data.String
78import Data.Word
79import Data.Typeable
80import Network.Socket (SockAddr (..),PortNumber,HostAddress)
81import Text.PrettyPrint as PP hiding ((<>))
82import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
83import Data.Hashable
84
85
86-- | This transaction ID is generated by the querying node and is
87-- echoed in the response, so responses may be correlated with
88-- multiple queries to the same node. The transaction ID should be
89-- encoded as a short string of binary numbers, typically 2 characters
90-- are enough as they cover 2^16 outstanding queries.
91type TransactionId = TransactionID KMessageOf
92
93{-----------------------------------------------------------------------
94-- Query messages
95-----------------------------------------------------------------------}
96
97type MethodName = ByteString
98type KQueryArgs = BValue
99
100-- | Query used to signal that caller want to make procedure call to
101-- callee and pass arguments in. Therefore query may be only sent from
102-- client to server but not in the opposite direction.
103--
104data KQueryOf a = KQuery
105 { queryArgs :: !a -- ^ values to be passed to method;
106 , queryMethod :: !MethodName -- ^ method to call;
107 , queryId :: !TransactionId -- ^ one-time query token.
108 } deriving ( Show, Eq, Ord, Typeable, Read, Functor, Foldable, Traversable )
109
110type KQuery = KQueryOf KQueryArgs
111
112-- | Queries, or KRPC message dictionaries with a \"y\" value of
113-- \"q\", contain two additional keys; \"q\" and \"a\". Key \"q\" has
114-- a string value containing the method name of the query. Key \"a\"
115-- has a dictionary value containing named arguments to the query.
116--
117-- Example Query packet:
118--
119-- > { "t" : "aa", "y" : "q", "q" : "ping", "a" : { "msg" : "hi!" } }
120--
121instance (Typeable a, BEncode a) => BEncode (KQueryOf a) where
122 toBEncode KQuery {..} = toDict $
123 "a" .=! queryArgs
124 .: "q" .=! queryMethod
125 .: "t" .=! queryId
126 .: "y" .=! ("q" :: ByteString)
127 .: endDict
128 {-# INLINE toBEncode #-}
129
130 fromBEncode = fromDict $ do
131 lookAhead $ match "y" (BString "q")
132 KQuery <$>! "a" <*>! "q" <*>! "t"
133 {-# INLINE fromBEncode #-}
134
135instance BEncode ReflectedIP where
136 toBEncode (ReflectedIP addr) = BString (encodeAddr addr)
137 fromBEncode (BString bs) = ReflectedIP <$> decodeAddr bs
138 fromBEncode _ = Left "ReflectedIP should be a bencoded string"
139
140port16 :: Word16 -> PortNumber
141port16 = fromIntegral
142
143decodeAddr :: ByteString -> Either String SockAddr
144decodeAddr bs | BS.length bs == 6
145 = ( \(a,p) -> SockAddrInet <$> fmap port16 p <*> a )
146 $ (S.runGet S.getWord32host *** S.decode )
147 $ BS.splitAt 4 bs
148decodeAddr bs | BS.length bs == 18
149 = ( \(a,p) -> flip SockAddrInet6 0 <$> fmap port16 p <*> a <*> pure 0 )
150 $ (S.decode *** S.decode )
151 $ BS.splitAt 16 bs
152decodeAddr _ = Left "incorrectly sized address and port"
153
154encodeAddr :: SockAddr -> ByteString
155encodeAddr (SockAddrInet port addr)
156 = S.runPut (S.putWord32host addr >> S.put (fromIntegral port :: Word16))
157encodeAddr (SockAddrInet6 port _ addr _)
158 = S.runPut (S.put addr >> S.put (fromIntegral port :: Word16))
159encodeAddr _ = BS.empty
160
161{-----------------------------------------------------------------------
162-- Response messages
163-----------------------------------------------------------------------}
164
165-- | Response messages are sent upon successful completion of a
166-- query:
167--
168-- * KResponse used to signal that callee successufully process a
169-- procedure call and to return values from procedure.
170--
171-- * KResponse should not be sent if error occurred during RPC,
172-- 'KError' should be sent instead.
173--
174-- * KResponse can be only sent from server to client.
175--
176data KResponseOf a = KResponse
177 { respVals :: a -- ^ 'BDict' containing return values;
178 , respId :: TransactionId -- ^ match to the corresponding 'queryId'.
179 , respIP :: Maybe ReflectedIP
180 } deriving (Show, Eq, Ord, Typeable, Functor, Foldable, Traversable)
181
182type KResponse = KResponseOf KQueryArgs
183
184-- | Responses, or KRPC message dictionaries with a \"y\" value of
185-- \"r\", contain one additional key \"r\". The value of \"r\" is a
186-- dictionary containing named return values.
187--
188-- Example Response packet:
189--
190-- > { "t" : "aa", "y" : "r", "r" : { "msg" : "you've sent: hi!" } }
191--
192instance (Typeable a, BEncode a) => BEncode (KResponseOf a) where
193 toBEncode KResponse {..} = toDict $
194 "ip" .=? respIP
195 .: "r" .=! respVals
196 .: "t" .=! respId
197 .: "y" .=! ("r" :: ByteString)
198 .: endDict
199 {-# INLINE toBEncode #-}
200
201 fromBEncode = fromDict $ do
202 lookAhead $ match "y" (BString "r")
203 addr <- optional (field (req "ip"))
204 (\r t -> KResponse r t addr) <$>! "r" <*>! "t"
205 {-# INLINE fromBEncode #-}
206
207{-----------------------------------------------------------------------
208-- Summed messages
209-----------------------------------------------------------------------}
210
211-- | Generic KRPC message.
212data KMessageOf a
213 = Q (KQueryOf a)
214 | R (KResponseOf a)
215 | E (KError TransactionId)
216 deriving (Show, Eq, Functor, Foldable, Traversable)
217
218type KMessage = KMessageOf KQueryArgs
219
220instance BEncode KMessage where
221 toBEncode (Q q) = toBEncode q
222 toBEncode (R r) = toBEncode r
223 toBEncode (E e) = toBEncode e
224
225 fromBEncode b =
226 Q <$> fromBEncode b
227 <|> R <$> fromBEncode b
228 <|> E <$> fromBEncode b
229 <|> decodingError "KMessage: unknown message or message tag"
230
231nodeIdSize :: Int
232nodeIdSize = finiteBitSize (undefined :: NodeId KMessageOf) `div` 8
233
234instance BEncode (NodeId KMessageOf) where
235 toBEncode (NodeId w) = toBEncode $ S.encode w
236 fromBEncode bval = fromBEncode bval >>= S.decode
237
238-- instance BEncode NodeId where TODO
239
240instance Serialize (NodeId KMessageOf) where
241 get = NodeId <$> get
242 {-# INLINE get #-}
243 put (NodeId bs) = put bs
244 {-# INLINE put #-}
245
246-- | ASCII encoded.
247instance IsString (NodeId KMessageOf) where
248 fromString str
249 | length str == nodeIdSize = NodeId (either error id $ S.decode (fromString str :: ByteString))
250 | length str == 2 * nodeIdSize = NodeId (either error id $ S.decode (fst $ Base16.decode $ fromString str))
251 | otherwise = error "fromString: invalid NodeId length"
252 {-# INLINE fromString #-}
253
254-- | Meaningless node id, for testing purposes only.
255instance Default (NodeId KMessageOf) where
256 def = NodeId 0
257
258-- | base16 encoded.
259instance Pretty (NodeId KMessageOf) where pPrint (NodeId nid) = encodeHexDoc nid
260
261
262instance Serialize (TransactionID KMessageOf) where
263 get = do
264 cnt <- remaining
265 TID <$> getBytes cnt
266
267 put (TID bs) = putByteString bs
268
269
270instance Envelope KMessageOf where
271 type QueryMethod KMessageOf = ByteString
272
273 newtype TransactionID KMessageOf = TID ByteString
274 deriving (Eq,Ord,IsString,Show,Read,BEncode)
275
276 -- | Each node has a globally unique identifier known as the \"node
277 -- ID.\"
278 --
279 -- Normally, /this/ node id should be saved between invocations
280 -- of the client software.
281 newtype NodeId KMessageOf = NodeId Word160
282 deriving (Show, Eq, Ord, Typeable, Bits, FiniteBits)
283
284 data QueryExtra KMessageOf = MainlineQuery
285 { queringNodeId :: NodeId KMessageOf -- ^ node id of /quering/ node;
286 , queryIsReadOnly :: Bool -- ^ node is read-only as per BEP 43
287 }
288 deriving (Show, Eq, Ord, Typeable)
289
290 newtype ResponseExtra KMessageOf = MainlineResponse
291 { queredNodeId :: NodeId KMessageOf -- ^ node id of /quered/ node
292 }
293 deriving (Show, Eq, Ord, Typeable)
294
295 newtype PacketDestination KMessageOf = MainlineNode SockAddr
296 deriving (Show, Eq, Ord, Typeable)
297
298 envelopePayload (Q q) = queryArgs q
299 envelopePayload (R r) = respVals r
300 envelopePayload (E _) = error "TODO: messagePayload for KError"
301
302 envelopeTransaction (Q q) = queryId q
303 envelopeTransaction (R r) = respId r
304 envelopeTransaction (E e) = errorId e
305
306 envelopeClass (Q q) = Query (queryMethod q)
307 envelopeClass (R r) = Response (respIP r)
308 envelopeClass (E e) = Error e
309
310 -- replyAddress :: envelope a -> SockAddr -> PacketDestination envelope
311 makeAddress _ addr = MainlineNode addr
312
313 buildReply self addr qry response =
314 (R (KResponse response (envelopeTransaction qry) (Just $ ReflectedIP addr)))
315
316 buildQuery self addr meth tid qry = return $ Q (KQuery qry meth tid)
317
318 uniqueTransactionId cnt = return $ TID $ Char8.pack (show cnt)
319
320 fromRoutableNode = not . queryIsReadOnly
321
322instance Hashable (PacketDestination KMessageOf) where
323 hashWithSalt s (MainlineNode sockaddr) = hashWithSalt s (show sockaddr)
324
325-- Serialize, Pretty) PacketDestination KMessageOf = MainlineNode SockAddr
326instance Serialize (PacketDestination KMessageOf) where
327 put (MainlineNode addr) = putSockAddr addr
328 get = MainlineNode <$> getSockAddr
329
330instance Pretty (PacketDestination KMessageOf) where
331 pPrint (MainlineNode addr) = PP.text $ show addr
332
333instance Address (PacketDestination KMessageOf) where
334 toSockAddr (MainlineNode addr) = addr
335 fromSockAddr addr = Just $ MainlineNode addr
336
337instance WireFormat BValue KMessageOf where
338 type SerializableTo BValue = BEncode
339 type CipherContext BValue KMessageOf = ()
340
341 parsePacket _ = BE.decode
342
343 buildError = Just . E
344
345 decodeHeaders _ = BE.fromBEncode
346 decodePayload kmsg = mapM BE.fromBEncode kmsg
347
348 encodeHeaders _ kmsg _ = L.toStrict $ BE.encode kmsg
349 encodePayload msg = fmap BE.toBEncode msg
350
351 initializeServerState _ mbid = do
352 i <- maybe genNodeId return mbid
353 return (i, ())
354
355-- | KRPC 'compact list' compatible encoding: contact information for
356-- nodes is encoded as a 26-byte string. Also known as "Compact node
357-- info" the 20-byte Node ID in network byte order has the compact
358-- IP-address/port info concatenated to the end.
359instance Serialize a => Serialize (NodeInfo KMessageOf a ()) where
360 get = (\a b -> NodeInfo a b ()) <$> get <*> get
361 put NodeInfo {..} = put nodeId >> put nodeAddr
362
363
364#ifdef VERSION_bencoding
365instance BEncode ErrorCode where
366 toBEncode = toBEncode . fromEnum
367 {-# INLINE toBEncode #-}
368
369 fromBEncode b = toEnum <$> fromBEncode b
370 {-# INLINE fromBEncode #-}
371#endif
372
373-- | Errors, or KRPC message dictionaries with a \"y\" value of \"e\",
374-- contain one additional key \"e\". The value of \"e\" is a
375-- list. The first element is an integer representing the error
376-- code. The second element is a string containing the error
377-- message.
378--
379-- Example Error Packet:
380--
381-- > { "t": "aa", "y":"e", "e":[201, "A Generic Error Ocurred"]}
382--
383-- or bencoded:
384--
385-- > d1:eli201e23:A Generic Error Ocurrede1:t2:aa1:y1:ee
386--
387#ifdef VERSION_bencoding
388instance (Typeable tid, BEncode tid) => BEncode (KError tid) where
389 toBEncode KError {..} = toDict $
390 "e" .=! (errorCode, errorMessage)
391 .: "t" .=! errorId
392 .: "y" .=! ("e" :: ByteString)
393 .: endDict
394 {-# INLINE toBEncode #-}
395
396 fromBEncode = fromDict $ do
397 lookAhead $ match "y" (BString "e")
398 (code, msg) <- field (req "e")
399 KError code msg <$>! "t"
400 {-# INLINE fromBEncode #-}
401#endif
402
403instance Read (NodeId KMessageOf) where readsPrec d s = map (\(w,xs) -> (NodeId w, xs)) $ decodeHex s
404
diff --git a/src/Network/DatagramServer/Tox.hs b/src/Network/DatagramServer/Tox.hs
deleted file mode 100644
index cf39e7e1..00000000
--- a/src/Network/DatagramServer/Tox.hs
+++ /dev/null
@@ -1,554 +0,0 @@
1{-# LANGUAGE GeneralizedNewtypeDeriving #-}
2{-# LANGUAGE StandaloneDeriving #-}
3{-# LANGUAGE FlexibleInstances #-}
4{-# LANGUAGE DeriveDataTypeable #-}
5{-# LANGUAGE DeriveFunctor #-}
6{-# LANGUAGE DeriveGeneric #-}
7{-# LANGUAGE DeriveTraversable #-}
8{-# LANGUAGE MultiParamTypeClasses #-}
9{-# LANGUAGE PatternSynonyms #-}
10{-# LANGUAGE RecordWildCards #-}
11{-# LANGUAGE TupleSections #-}
12{-# LANGUAGE TypeFamilies #-}
13{-# LANGUAGE UnboxedTuples #-}
14{-# LANGUAGE TemplateHaskell #-}
15{-# LANGUAGE RankNTypes #-}
16module Network.DatagramServer.Tox where
17
18import Data.Bits
19import Data.ByteString (ByteString)
20import Data.ByteArray as BA (ByteArrayAccess,length,withByteArray)
21import qualified Data.Serialize as S
22-- import qualified Data.ByteString.Lazy as L
23import qualified Data.ByteString.Char8 as Char8
24-- import Data.Data (Data)
25import Data.Word
26import Data.LargeWord
27import Data.IP
28import Data.Serialize
29import Network.Address
30import GHC.Generics (Generic)
31import Network.Socket
32import Network.DatagramServer.Types
33import qualified Network.DatagramServer.Types as Envelope (NodeId)
34import Crypto.PubKey.ECC.Types
35import Crypto.PubKey.Curve25519
36import Crypto.ECC.Class
37import qualified Crypto.Cipher.Salsa as Salsa
38import qualified Crypto.Cipher.XSalsa as XSalsa
39import qualified Crypto.MAC.Poly1305 as Poly1305
40import Data.LargeWord
41import Foreign.Ptr
42import Foreign.Storable
43import Foreign.Marshal.Alloc
44import Data.Typeable
45import StaticAssert
46import Crypto.Error.Types
47import qualified Crypto.Error as Cryptonite
48import Data.Hashable
49import Text.PrettyPrint as PP hiding ((<>))
50import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
51import qualified Data.ByteArray as BA
52import Data.ByteArray ( Bytes, convert )
53import Data.Monoid
54import System.Endian
55import qualified Data.ByteString.Base16 as Base16
56import qualified Data.ByteString.Char8 as C8
57import qualified Data.ByteString.Char8 as C8
58
59
60type Key32 = Word256 -- 32 byte key
61type Nonce8 = Word64 -- 8 bytes
62type Nonce24 = Word192 -- 24 bytes
63
64
65data NodeFormat = NodeFormat
66 { nodePublicKey :: NodeId Message -- 32 byte public key
67 , nodeIsTCP :: Bool -- This has no analog in mainline NodeInfo structure
68 , nodeIP :: IP -- IPv4 or IPv6 address
69 , nodePort :: PortNumber
70 }
71 deriving (Eq, Ord, Show)
72
73encodeFamily :: (Family, SocketType) -> Word8
74encodeFamily (AF_INET , Datagram) = 2
75encodeFamily (AF_INET6 , Datagram) = 10
76encodeFamily (AF_INET , Stream ) = 130
77encodeFamily (AF_INET6 , Stream ) = 138
78encodeFamily _ = error "Unsupported protocol"
79
80newtype MessageType = MessageType Word8
81 deriving (Eq, Ord, Show, Read)
82
83instance Serialize MessageType where
84 put (MessageType b) = put b
85 get = MessageType <$> get
86
87pattern Ping = MessageType 0
88pattern Pong = MessageType 1
89pattern GetNodes = MessageType 2
90pattern SendNodes = MessageType 4
91{-
92 #define NET_PACKET_PING_REQUEST 0 /* Ping request packet ID. */
93 #define NET_PACKET_PING_RESPONSE 1 /* Ping response packet ID. */
94 #define NET_PACKET_GET_NODES 2 /* Get nodes request packet ID. */
95 #define NET_PACKET_SEND_NODES_IPV6 4 /* Send nodes response packet ID for other addresses. */
96 #define NET_PACKET_COOKIE_REQUEST 24 /* Cookie request packet */
97 #define NET_PACKET_COOKIE_RESPONSE 25 /* Cookie response packet */
98 #define NET_PACKET_CRYPTO_HS 26 /* Crypto handshake packet */
99 #define NET_PACKET_CRYPTO_DATA 27 /* Crypto data packet */
100 #define NET_PACKET_CRYPTO 32 /* Encrypted data packet ID. */
101 #define NET_PACKET_LAN_DISCOVERY 33 /* LAN discovery packet ID. */
102
103 /* See: docs/Prevent_Tracking.txt and onion.{c, h} */
104 #define NET_PACKET_ONION_SEND_INITIAL 128
105 #define NET_PACKET_ONION_SEND_1 129
106 #define NET_PACKET_ONION_SEND_2 130
107
108 #define NET_PACKET_ANNOUNCE_REQUEST 131
109 #define NET_PACKET_ANNOUNCE_RESPONSE 132
110 #define NET_PACKET_ONION_DATA_REQUEST 133
111 #define NET_PACKET_ONION_DATA_RESPONSE 134
112
113 #define NET_PACKET_ONION_RECV_3 140
114 #define NET_PACKET_ONION_RECV_2 141
115 #define NET_PACKET_ONION_RECV_1 142
116 -}
117
118
119-- | Use with 'PingPayload', 'GetNodesPayload', or 'SendNodesPayload'
120data Message a = Message
121 { msgType :: MessageType
122 , msgClient :: NodeId Message
123 , msgNonce :: TransactionID Message
124 , msgPayload :: a
125 }
126 deriving (Eq, Show, Generic, Functor, Foldable, Traversable)
127
128instance Show (NodeId Message) where
129 showsPrec d pubkey s =
130 "NodeId \"" ++ C8.unpack (Base16.encode $ convert pubkey) ++ '"':s
131
132instance Show (TransactionID Message) where
133 showsPrec d nonce = mappend "TID " . quoted (mappend $ bin2hex nonce)
134
135isQuery :: Message a -> Bool
136isQuery (Message { msgType = SendNodes }) = False
137isQuery (Message { msgType = MessageType typ }) | even typ = True
138isQuery _ = False
139
140isResponse :: Message a -> Bool
141isResponse m = not (isQuery m)
142
143isError :: Message a -> Bool
144isError _ = False
145
146data PingPayload = PingPayload
147 { isPong :: Bool
148 , pingId :: Nonce8
149 }
150
151data GetNodesPayload = GetNodesPayload
152 { nodesForWho :: NodeId Message
153 , nodesNonce :: Nonce8
154 }
155
156data SendNodesPayload = SendNodesPayload
157
158-- From: docs/updates/DHT.md
159--
160-- Node format:
161-- [uint8_t family (2 == IPv4, 10 == IPv6, 130 == TCP IPv4, 138 == TCP IPv6)]
162-- [ip (in network byte order), length=4 bytes if ipv4, 16 bytes if ipv6]
163-- [port (in network byte order), length=2 bytes]
164-- [char array (node_id), length=32 bytes]
165--
166-- see also: DHT.h (pack_nodes() and unpack_nodes())
167instance Serialize NodeFormat where
168
169 get = do
170 typ <- get :: Get Word8
171 (ip,istcp) <-
172 case typ :: Word8 of
173 2 -> (,False) . IPv4 <$> get
174 130 -> (,True) . IPv4 <$> get
175 10 -> (,False) . IPv6 <$> get
176 138 -> (,True) . IPv6 <$> get
177 _ -> fail "Unsupported type of Tox node_format structure"
178 port <- get
179 pubkey <- get
180 return $ NodeFormat { nodeIsTCP = istcp
181 , nodeIP = ip
182 , nodePort = port
183 , nodePublicKey = NodeId pubkey
184 }
185
186 put (NodeFormat{ nodePublicKey = NodeId pubkey, ..}) = do
187 put $ case (# nodeIP, nodeIsTCP #) of
188 (# IPv4 _, False #) -> 2
189 (# IPv4 _, True #) -> 130
190 (# IPv6 _, False #) -> 10
191 (# IPv6 _, True #) -> 138 :: Word8
192 put nodeIP
193 put nodePort
194 put pubkey
195
196-- Note: the char array is a public key, the 32-bytes is provided by libsodium-dev
197-- in /usr/include/sodium/crypto_box.h as the symbol crypto_box_PUBLICKEYBYTES
198-- but toxcore/crypto_core.c will fail to compile if it is not 32.
199
200
201-- Ping(Request and response):
202--
203-- [byte with value: 00 for request, 01 for response]
204-- [char array (client node_id), length=32 bytes]
205-- [random 24 byte nonce]
206-- [Encrypted with the nonce and private key of the sender:
207-- [1 byte type (0 for request, 1 for response)]
208-- [random 8 byte (ping_id)]
209-- ]
210--
211-- ping_id = a random integer, the response must contain the exact same number as the request
212
213
214-- Get nodes (Request):
215--
216-- [byte with value: 02]
217-- [char array (client node_id), length=32 bytes]
218-- [random 24 byte nonce]
219-- [Encrypted with the nonce and private key of the sender:
220-- [char array: requested_node_id (node_id of which we want the ip), length=32 bytes]
221-- [Sendback data (must be sent back unmodified by in the response), length=8 bytes]
222-- ]
223--
224-- Valid replies: a send_nodes packet
225
226-- Send_nodes (response (for all addresses)):
227--
228-- [byte with value: 04]
229-- [char array (client node_id), length=32 bytes]
230-- [random 24 byte nonce]
231-- [Encrypted with the nonce and private key of the sender:
232-- [uint8_t number of nodes in this packet]
233-- [Nodes in node format, length=?? * (number of nodes (maximum of 4 nodes)) bytes]
234-- [Sendback data, length=8 bytes]
235-- ]
236
237data ToxCipherContext = ToxCipherContext
238 { dhtSecretKey :: SecretKey
239 }
240
241data Ciphered = Ciphered { cipheredMAC :: Poly1305.Auth
242 , cipheredBytes :: ByteString }
243 deriving Eq
244
245quoted shows s = '"':shows ('"':s)
246
247bin2hex :: ByteArrayAccess bs => bs -> String
248bin2hex = C8.unpack . Base16.encode . convert
249
250instance Show Ciphered where
251 showsPrec d (Ciphered (Poly1305.Auth mac) bytes) =
252 mappend "Ciphered (Auth "
253 . quoted (mappend $ bin2hex mac)
254 . (") " ++)
255 . quoted (mappend $ bin2hex bytes)
256
257getMessage :: Get (Message Ciphered)
258getMessage = do
259 typ <- get
260 nid <- get
261 tid <- get
262 mac <- Poly1305.Auth . convert <$> getBytes 16
263 cnt <- remaining
264 bs <- getBytes cnt
265 return Message { msgType = typ
266 , msgClient = nid
267 , msgNonce = tid
268 , msgPayload = Ciphered mac bs }
269
270putMessage :: Message Ciphered -> Put
271putMessage (Message {..}) = do
272 put msgType
273 put msgClient
274 put msgNonce
275 let Ciphered (Poly1305.Auth mac) bs = msgPayload
276 putByteString (convert mac)
277 putByteString bs
278
279-- XXX: assumes ByteArray is little-endian
280id2key :: NodeId Message -> PublicKey
281id2key recipient = case publicKey recipient of
282 CryptoPassed key -> key
283 CryptoFailed e -> error ("id2key: "++show e)
284
285-- XXX: S.decode is Big-endian
286-- TODO: implement ByteArray instance, avoid S.decode
287key2id :: PublicKey -> NodeId Message
288key2id pk = case S.decode (BA.convert pk) of
289 Left _ -> error "key2id"
290 Right nid -> nid
291
292
293zeros32 :: Bytes
294zeros32 = BA.replicate 32 0
295
296zeros24 :: Bytes
297zeros24 = BA.take 24 zeros32
298
299hsalsa20 k n = a <> b
300 where
301 Salsa.State st = XSalsa.initialize 20 k n
302 (_, as) = BA.splitAt 4 st
303 (a, xs) = BA.splitAt 16 as
304 (_, bs) = BA.splitAt 24 xs
305 (b, _ ) = BA.splitAt 16 bs
306
307lookupSecret :: ToxCipherContext -> NodeId Message -> TransactionID Message -> (Poly1305.State, XSalsa.State)
308lookupSecret ctx recipient nonce = (hash, crypt)
309 where
310 -- diffie helman
311 shared = ecdh (Proxy :: Proxy Curve_X25519) (dhtSecretKey ctx) (id2key recipient) -- ByteArrayAccess b => b
312 -- shared secret XSalsa key
313 k = hsalsa20 shared zeros24
314 -- cipher state
315 st0 = XSalsa.initialize 20 k nonce
316 -- Poly1305 key
317 (rs, crypt) = XSalsa.combine st0 zeros32
318 Cryptonite.CryptoPassed hash = Poly1305.initialize rs -- TODO: Pattern fail?
319
320decipher :: ToxCipherContext -> Message Ciphered -> Either String (Message ByteString)
321decipher ctx ciphered = mapM (decipherAndAuth hash crypt) ciphered
322 where
323 (hash, crypt) = lookupSecret ctx (msgClient ciphered) (msgNonce ciphered)
324
325encipher :: ToxCipherContext -> NodeId Message -> Message ByteString -> Message Ciphered
326encipher ctx recipient plain = encipherAndHash hash crypt <$> plain
327 where
328 (hash, crypt) = lookupSecret ctx recipient (msgNonce plain)
329
330encipherAndHash :: Poly1305.State -> XSalsa.State -> ByteString -> Ciphered
331encipherAndHash hash crypt m = Ciphered a c
332 where
333 c = fst . XSalsa.combine crypt $ m
334 a = Poly1305.finalize . Poly1305.update hash $ c
335
336decipherAndAuth :: Poly1305.State -> XSalsa.State -> Ciphered -> Either String ByteString
337decipherAndAuth hash crypt (Ciphered mac c)
338 {-
339 | C8.length m /= C8.length c = Left $ "Unequal lengths: "++show (C8.length m, C8.length c)
340 -- | C8.length c /= 40 = Left $ "Unexpected c length: " ++ show (C8.length c, bin2hex c)
341 | otherwise = Right m
342 -}
343 | (a == mac) = Right m
344 | otherwise = Left "decipherAndAuth: auth fail"
345 where
346 m = fst . XSalsa.combine crypt $ c
347 a = Poly1305.finalize . Poly1305.update hash $ c
348
349
350-- see rfc7748
351--
352-- Crypto.ECC
353-- Crypto.PubKey.Curve25519
354-- Crypto.Cipher.XSalsa
355--
356curve25519 :: Curve
357curve25519 = CurveFP (CurvePrime prime curvecommon)
358 where
359 prime = 2^255 - 19 -- (≅ 1 modulo 4)
360
361 sqrt_of_39420360 = 14781619447589544791020593568409986887264606134616475288964881837755586237401
362
363 -- 1 * v^2 = u^3 + 486662*u^2 + u
364
365 curvecommon = CurveCommon
366 { ecc_a = 486662
367 , ecc_b = 1
368 , ecc_g = Point 9 sqrt_of_39420360 -- base point
369 , ecc_n = 2^252 + 0x14def9dea2f79cd65812631a5cf5d3ed -- order
370 , ecc_h = 8 -- cofactor
371 }
372
373-- crypto_box uses xsalsa20 symmetric encryption and poly1305 authentication.
374-- https://en.wikipedia.org/wiki/Poly1305
375
376instance Envelope Message where
377 newtype TransactionID Message = TID Nonce24
378 deriving (Eq,Ord) -- Read
379
380 newtype NodeId Message = NodeId Word256
381 deriving (Eq, Ord, Bits, FiniteBits)
382
383 type QueryMethod Message = MessageType
384
385 newtype QueryExtra Message = QueryNonce { qryNonce :: Nonce8 }
386 deriving (Eq, Ord, Show)
387
388 newtype ResponseExtra Message = ResponseNonce { rspNonce :: Nonce8 }
389 deriving (Eq, Ord, Show)
390
391 data PacketDestination Message = ToxAddr { toxID :: NodeId Message
392 , toxSockAddr :: SockAddr
393 }
394 deriving (Eq,Ord,Show)
395
396 envelopePayload = msgPayload
397
398 envelopeTransaction = msgNonce -- FIXME: should be decrypted nonce
399
400 envelopeClass Message { msgType = Ping } = Query Ping
401 envelopeClass Message { msgType = Pong } = Response Nothing
402 envelopeClass Message { msgType = GetNodes } = Query GetNodes
403 envelopeClass Message { msgType = SendNodes } = Response Nothing
404
405 makeAddress qry = ToxAddr (either id msgClient qry)
406
407 buildReply self addr qry payload = (fmap (const payload) qry) { msgClient = self }
408
409 -- buildQuery :: NodeId envelope -> SockAddr -> QueryMethod envelope -> TransactionID envelope -> a -> IO (envelope a)
410 buildQuery nid addr meth tid q = return $ Message
411 { msgType = meth
412 , msgClient = nid
413 , msgNonce = tid
414 , msgPayload = q
415 }
416
417 -- FIXME: Should generate encrypted nonces.
418 -- Should be unpredictable.
419 uniqueTransactionId cnt = do
420 return $ either (error "failed to create TransactionId") TID
421 $ S.decode $ Char8.pack (take 24 $ show cnt ++ repeat ' ')
422
423
424{-
425instance Serialize (TransactionID Message) where
426 get = do
427 lo <- getWord64le
428 mid <- getWord64le
429 hi <- getWord64le
430 return $ TID (LargeKey lo
431 (LargeKey mid hi))
432
433 put (TID (LargeKey lo (LargeKey mid hi))) = do
434 putWord64le lo
435 putWord64le mid
436 putWord64le hi
437
438instance Serialize (NodeId Message) where
439 get = do
440 lo <- getWord64le
441 mid <- getWord64le
442 hi <- getWord64le
443 highest <- getWord64le
444 return $ NodeId (LargeKey lo
445 (LargeKey mid
446 (LargeKey hi highest)))
447 put (NodeId (LargeKey lo (LargeKey mid (LargeKey hi highest)))) = do
448 putWord64le lo
449 putWord64le mid
450 putWord64le hi
451 putWord64le highest
452
453-}
454
455instance Serialize (TransactionID Message) where
456 get = do
457 hi <- getWord64be
458 mid <- getWord64be
459 lo <- getWord64be
460 return $ TID (LargeKey lo
461 (LargeKey mid hi))
462
463 put (TID (LargeKey lo (LargeKey mid hi))) = do
464 putWord64be hi
465 putWord64be mid
466 putWord64be lo
467
468instance Serialize (NodeId Message) where
469 get = do
470 highest <- getWord64be
471 hi <- getWord64be
472 mid <- getWord64be
473 lo <- getWord64be
474 return $ NodeId (LargeKey lo
475 (LargeKey mid
476 (LargeKey hi highest)))
477 put (NodeId (LargeKey lo (LargeKey mid (LargeKey hi highest)))) = do
478 putWord64be highest
479 putWord64be hi
480 putWord64be mid
481 putWord64be lo
482
483
484staticAssert isLittleEndian -- assumed by 'withWord64Ptr'
485
486with3Word64Ptr :: Nonce24 -> (Ptr Word64 -> IO a) -> IO a
487with3Word64Ptr (LargeKey wlo (LargeKey wmid whi)) kont =
488 allocaBytes (sizeOf wlo * 3) $ \p -> do
489 pokeElemOff p 2 $ toBE64 wlo
490 pokeElemOff p 1 $ toBE64 wmid
491 pokeElemOff p 0 $ toBE64 whi
492 kont p
493
494with4Word64Ptr :: Key32 -> (Ptr Word64 -> IO a) -> IO a
495with4Word64Ptr (LargeKey wlo (LargeKey wmid (LargeKey whi whighest))) kont =
496 allocaBytes (sizeOf wlo * 4) $ \p -> do
497 pokeElemOff p 3 $ toBE64 wlo
498 pokeElemOff p 2 $ toBE64 wmid
499 pokeElemOff p 1 $ toBE64 whi
500 pokeElemOff p 0 $ toBE64 whighest
501 kont p
502
503
504instance ByteArrayAccess (TransactionID Message) where
505 length _ = 24
506 withByteArray (TID nonce) kont = with3Word64Ptr nonce (kont . castPtr)
507
508instance ByteArrayAccess (NodeId Message) where
509 length _ = 32
510 withByteArray (NodeId nonce) kont = with4Word64Ptr nonce (kont . castPtr)
511
512
513instance Hashable (NodeId Message) where
514 hashWithSalt s (NodeId (LargeKey a (LargeKey b (LargeKey c d)))) =
515 hashWithSalt s (a,b,c,d)
516
517instance Hashable (PacketDestination Message) where
518 hashWithSalt s (ToxAddr nid addr) = hashWithSalt s nid
519
520instance Serialize (PacketDestination Message) where
521 put (ToxAddr (NodeId nid) addr) = put nid >> putSockAddr addr
522 get = ToxAddr <$> (NodeId <$> get) <*> getSockAddr
523
524instance Pretty (PacketDestination Message) where
525 pPrint = PP.text . show
526
527instance Address (PacketDestination Message) where
528 toSockAddr (ToxAddr _ addr) = addr
529 fromSockAddr _ = Nothing
530
531instance WireFormat ByteString Message where
532 type SerializableTo ByteString = Serialize
533 type CipherContext ByteString Message = ToxCipherContext
534
535 decodePayload = mapM decode
536 encodePayload = fmap encode
537
538 decodeHeaders ctx bs = runGet getMessage bs >>= decipher ctx
539 encodeHeaders ctx msg recipient = runPut $ putMessage $ encipher ctx (toxID recipient) msg
540
541 initializeServerState _ _ = do
542 k <- generateSecretKey
543 {-
544 nid <- withByteArray (toPublic k) $ \p -> do
545 wlo <- peekElemOff p 0
546 wmid <- peekElemOff p 1
547 whi <- peekElemOff p 2
548 whigest <- peekElemOff p 3
549 return $ LargeKey wlo (LargeKey wmid (LargeKey whi whigest))
550 -}
551 return (key2id $ toPublic k, ToxCipherContext k)
552
553
554instance Read (NodeId Message) where readsPrec d s = map (\(w,xs) -> (NodeId w, xs)) $ decodeHex s
diff --git a/src/Network/KRPC/Method.hs b/src/Network/KRPC/Method.hs
deleted file mode 100644
index da69d14b..00000000
--- a/src/Network/KRPC/Method.hs
+++ /dev/null
@@ -1,40 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013, 2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- Normally, you don't need to import this module.
9--
10{-# LANGUAGE CPP #-}
11{-# LANGUAGE DefaultSignatures #-}
12{-# LANGUAGE FlexibleContexts #-}
13{-# LANGUAGE FunctionalDependencies #-}
14{-# LANGUAGE GeneralizedNewtypeDeriving #-}
15{-# LANGUAGE MultiParamTypeClasses #-}
16{-# LANGUAGE RankNTypes #-}
17{-# LANGUAGE ScopedTypeVariables #-}
18{-# LANGUAGE StandaloneDeriving #-}
19{-# LANGUAGE TypeFamilies #-}
20module Network.KRPC.Method
21 ( Method (..)
22 , KRPC (..)
23 ) where
24
25#ifdef VERSION_bencoding
26import Data.BEncode (BEncode)
27#else
28import Data.Serialize
29#endif
30import Data.ByteString.Char8 as BC
31import Data.Char
32import Data.Monoid
33import Data.List as L
34import Data.String
35import Data.Typeable
36import Network.DatagramServer.Mainline
37import Network.DatagramServer.Types
38import Network.DHT.Types
39
40