1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer :
5-- Stability : experimental
6-- Portability : portable
8-- BitTorrent uses a \"distributed sloppy hash table\" (DHT) for
9-- storing peer contact information for \"trackerless\" torrents. In
10-- effect, each peer becomes a tracker.
12-- Normally you don't need to import other DHT modules.
14-- For more info see:
15-- <>
17{-# LANGUAGE FlexibleInstances #-}
18{-# LANGUAGE FlexibleContexts #-}
19{-# LANGUAGE TemplateHaskell #-}
20{-# LANGUAGE TypeOperators #-}
21{-# LANGUAGE ScopedTypeVariables #-}
22{-# LANGUAGE CPP #-}
23module Network.BitTorrent.DHT
24 ( -- * Distributed Hash Table
25 DHT
26 , Options (..)
27 , fullLogging
28 , dht
30 -- * Bootstrapping
31 -- $bootstrapping-terms
32 , tNodes
33 , defaultBootstrapNodes
34 , resolveHostName
35 , bootstrap
36 , isBootstrapped
38 -- * Initialization
39 , snapshot
41 -- * Operations
42 -- , Network.BitTorrent.DHT.lookup
43 , Network.BitTorrent.DHT.insert
44 , Network.BitTorrent.DHT.delete
46 -- * Embedding
47 -- ** Session
48 , LogFun
49 , Node
50 , defaultHandlers
51 , newNode
52 , closeNode
54 -- ** Monad
55 -- , MonadDHT (..)
56 , runDHT
57 ) where
59import Control.Monad.Logger
60import Control.Monad.Reader
61import Control.Exception
62import qualified Data.ByteString as BS
63import Data.Conduit as C
64import qualified Data.Conduit.List as C
65import Data.Serialize
66import Network.Socket
67import Text.PrettyPrint.HughesPJClass as PP (pPrint,render)
69import Data.Torrent
70import Network.Address
71import Network.BitTorrent.DHT.Query
72import Network.BitTorrent.DHT.Session
73import Network.DHT.Routing as T hiding (null)
74import qualified Data.Text as Text
75import Data.Typeable
76import Data.Monoid
77import Network.DatagramServer.Mainline (KMessageOf)
78import qualified Network.DatagramServer as KRPC (listen, Protocol(..))
79import Network.DatagramServer.Types
80import Network.DHT.Types
81import Data.Bits
82import Data.Default
83import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
84import Network.KRPC.Method
87-- DHT types
90#if 0
91class MonadDHT m where
92 liftDHT :: DHT raw dht u IPv4 a -> m a
94instance MonadDHT (DHT raw dht u IPv4) where
95 liftDHT = id
98-- | Convenience method. Pass this to 'dht' to enable full logging.
99fullLogging :: LogSource -> LogLevel -> Bool
100fullLogging _ _ = True
102-- | Run DHT on specified port. <add note about resources>
103dht ::
104 ( Ord ip
105 , Address ip
106 , Functor dht
107 , Ord (NodeId dht)
108 , FiniteBits (NodeId dht)
109 , Serialize (NodeId dht)
110 , Show (NodeId dht)
111 , SerializableTo raw (Response dht (Ping dht))
112 , SerializableTo raw (Query dht (Ping dht))
113 , SerializableTo raw (Response dht (NodeFound dht ip))
114 , SerializableTo raw (Query dht (FindNode dht ip))
115 , Ord (TransactionID dht)
116 , Serialize (TransactionID dht)
117 , Eq (QueryMethod dht)
118 , Show (QueryMethod dht)
119 , Pretty (NodeInfo dht ip u)
120 , Kademlia dht
121 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
122 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
123 , DataHandlers raw dht
124 , WireFormat raw dht
125 , Show u
126 , Default u
127 , Typeable dht
128 )
129 => Options -- ^ normally you need to use 'Data.Default.def';
130 -> NodeAddr ip -- ^ address to bind this node;
131 -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default
132 -> DHT raw dht u ip a -- ^ actions to run: 'bootstrap', 'lookup', etc;
133 -> IO a -- ^ result.
134dht opts addr logfilter action = do
135 runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do
136 bracket (newNode opts addr logger Nothing) closeNode $
137 \ node -> runDHT node $ do
138 hs <- defaultHandlers logger
139 m <- asks manager
140 liftIO $ KRPC.listen m hs (KRPC.Protocol Proxy Proxy)
141 action
142{-# INLINE dht #-}
145-- Bootstrapping
147-- $bootstrapping-terms
149-- [@Bootstrapping@] DHT @bootstrapping@ is the process of filling
150-- routing 'Table' by /good/ nodes.
152-- [@Bootstrapping time@] Bootstrapping process can take up to 5
153-- minutes. Bootstrapping should only happen at first application
154-- startup, if possible you should use 'snapshot' & 'restore'
155-- mechanism which must work faster.
157-- [@Bootstrap nodes@] DHT @bootstrap node@ is either:
159-- * a specialized high performance node maintained by bittorrent
160-- software authors\/maintainers, like those listed in
161-- 'defaultBootstrapNodes'. /Specialized/ means that those nodes
162-- may not support 'insert' queries and is running for the sake of
163-- bootstrapping only.
165-- * an ordinary bittorrent client running DHT node. The list of
166-- such bootstrapping nodes usually obtained from
167-- 'Data.Torrent.tNodes' field or
168-- 'Network.BitTorrent.Exchange.Message.Port' messages.
170-- Do not include the following hosts in the default bootstrap nodes list:
172-- * "" and "" - since
173-- Azureus client have a different (and probably incompatible) DHT
174-- protocol implementation.
176-- * "" since it is just an alias to
177-- "".
178-- XXX: ignoring this advise as it resolves to a different
179-- ip address for me.
181-- | List of bootstrap nodes maintained by different bittorrent
182-- software authors.
183defaultBootstrapNodes :: [NodeAddr HostName]
184defaultBootstrapNodes =
185 [ NodeAddr "" 6881 -- by BitTorrent Inc.
187 -- doesn't work at the moment (use git blame) of commit
188 , NodeAddr "" 6881 -- by Transmission project
190 , NodeAddr "" 6881
191 ]
193-- TODO Multihomed hosts
195-- | Resolve either a numeric network address or a hostname to a
196-- numeric IP address of the node. Usually used to resolve
197-- 'defaultBootstrapNodes' or 'Data.Torrent.tNodes' lists.
198resolveHostName :: NodeAddr HostName -> IO (NodeAddr IPv4)
199resolveHostName NodeAddr {..} = do
200 let hints = defaultHints { addrFamily = AF_INET, addrSocketType = Datagram }
201 -- getAddrInfo throws exception on empty list, so the pattern matching never fail
202 info : _ <- getAddrInfo (Just hints) (Just nodeHost) (Just (show nodePort))
203 case fromSockAddr (addrAddress info) of
204 Nothing -> error "resolveNodeAddr: impossible"
205 Just addr -> return addr
207-- | One good node may be sufficient.
209-- This operation do block, use
210-- 'Control.Concurrent.Async.Lifted.async' if needed.
211bootstrap :: forall raw dht u ip.
212 ( Ord ip
213 , Address ip
214 , Functor dht
215 , Ord (NodeId dht)
216 , FiniteBits (NodeId dht)
217 , Serialize (NodeId dht)
218 , Show (NodeId dht)
219 , Pretty (NodeId dht)
220 , SerializableTo raw (Response dht (Ping dht))
221 , SerializableTo raw (Query dht (Ping dht))
222 , SerializableTo raw (Response dht (NodeFound dht ip))
223 , SerializableTo raw (Query dht (FindNode dht ip))
224 , Ord (TransactionID dht)
225 , Serialize (TransactionID dht)
226 , Eq (QueryMethod dht)
227 , Show (QueryMethod dht)
228 , Pretty (NodeInfo dht ip u)
229 , Kademlia dht
230 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
231 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
232 , DataHandlers raw dht
233 , WireFormat raw dht
234 , Show u
235 , Default u
236 , Serialize u
237 ) => Maybe BS.ByteString -> [PacketDestination dht] -> DHT raw dht u ip ()
238bootstrap mbs startNodes = do
239 restored <-
240 case decode <$> mbs of
241 Just (Right tbl) -> return (T.toList tbl)
242 Just (Left e) -> do $(logWarnS) "restore" (Text.pack e)
243 return []
244 Nothing -> return []
246 $(logInfoS) "bootstrap" "Start node bootstrapping"
247 let searchAll aliveNodes = do
248 nid <- myNodeIdAccordingTo (error "FIXME")
249 ns <- bgsearch ioFindNodes nid
250 return ( ns :: [NodeInfo dht ip u] )
251 input_nodes <- (restored ++) . T.toList <$> getTable
252 -- Step 1: Use iterative searches to flesh out the table..
253 do let knowns = map (map $ fst) input_nodes
254 -- Below, we reverse the nodes since the table serialization puts the
255 -- nearest nodes last and we want to choose a similar node id to bootstrap
256 -- faster.
257 (alive_knowns,_) <- unzip <$> queryParallel (pingQ <$> reverse (concat knowns))
258 b <- isBootstrapped
259 -- If our cached nodes are alive and our IP address did not change, it's possible
260 -- we are already bootsrapped, so no need to do any searches.
261 when (not b) $ do
262 ns <- searchAll $ take 2 alive_knowns
263 -- We only use the supplied bootstrap nodes when we don't know of any
264 -- others to try.
265 when (null ns) $ do
266 -- TODO filter duplicated in startNodes list
267 -- TODO retransmissions for startNodes
268 (aliveNodes,_) <- unzip <$> queryParallel (coldPingQ <$> startNodes)
269 _ <- searchAll $ take 2 aliveNodes
270 return ()
271 -- Step 2: Repeatedly refresh incomplete buckets until the table is full.
272 maxbuckets <- asks $ optBucketCount . options
273 flip fix 0 $ \loop icnt -> do
274 tbl <- getTable
275 let unfull = filter ((/=defaultBucketSize) . snd)
276 us = zip
277 -- is_last = True for the last bucket
278 (True:repeat False)
279 -- Only non-full buckets unless it is the last one and the
280 -- maximum number of buckets has not been reached.
281 $ case reverse $ zip [0..] $ T.shape tbl of
282 p@(n,_):ps | n+1==maxbuckets -> unfull (p:ps)
283 p:ps -> p:unfull ps
284 [] -> []
285 forM_ us $ \(is_last,(index,_)) -> do
286 nid <- myNodeIdAccordingTo (error "FIXME")
287 sample <- liftIO $ genBucketSample nid (bucketRange index is_last)
288 $(logDebugS) "bootstrapping"
289 $ "BOOTSTRAP sample"
290 <> Text.pack (show (is_last,index,T.shape tbl))
291 <> " " <> Text.pack (render $ pPrint sample)
292 refreshNodes sample
293 $(logDebugS) "bootstrapping"
294 $ "BOOTSTRAP finished iteration "
295 <> Text.pack (show (icnt,T.shape tbl,us,defaultBucketSize))
296 when (not (null us) && icnt < div (3*maxbuckets) 2)
297 $ loop (succ icnt)
298 $(logInfoS) "bootstrap" "Node bootstrapping finished"
300-- | Check if this node is already bootstrapped.
301-- @bootstrap [good_node] >> isBootstrapped@@ should always return 'True'.
303-- This operation do not block.
305isBootstrapped :: Eq ip => DHT raw dht u ip Bool
306isBootstrapped = T.full <$> getTable
309-- Initialization
312-- | Serialize current DHT session to byte string.
314-- This is blocking operation, use
315-- 'Control.Concurrent.Async.Lifted.async' if needed.
316snapshot :: ( Address ip
317 , Ord (NodeId dht)
318 , Serialize u
319 , Serialize (NodeId dht)
320 ) => DHT raw dht u ip BS.ByteString
321snapshot = do
322 tbl <- getTable
323 return $ encode tbl
326-- Operations
329#if 0
331-- | Get list of peers which downloading this torrent.
333-- This operation is incremental and do block.
335lookup :: Address ip => InfoHash -> DHT raw dht u ip `C.Source` [PeerAddr ip]
336lookup topic = do -- TODO retry getClosest if bucket is empty
337 closest <- lift $ getClosest topic
338 C.sourceList [closest] $= search topic (getPeersQ topic)
342-- TODO do not republish if the topic is already in announceSet
344-- | Announce that /this/ peer may have some pieces of the specified
345-- torrent. DHT will reannounce this data periodically using
346-- 'optReannounce' interval.
348-- This operation is synchronous and do block, use
349-- 'Control.Concurrent.Async.Lifted.async' if needed.
351insert :: Address ip => InfoHash -> PortNumber -> DHT raw dht u ip ()
352insert ih p = do
353 publish ih p
354 insertTopic ih p
356-- | Stop announcing /this/ peer for the specified torrent.
358-- This operation is atomic and may block for a while.
360delete :: InfoHash -> PortNumber -> DHT raw dht u ip ()
361delete = deleteTopic
362{-# INLINE delete #-}
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
deleted file mode 100644
index 003bb5b9..00000000
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ /dev/null
@@ -1,750 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2014
3-- License : BSD3
4-- Maintainer :
5-- Stability : experimental
6-- Portability : portable
8-- This module provides functions to interact with other nodes.
9-- Normally, you don't need to import this module, use
10-- "Network.BitTorrent.DHT" instead.
12{-# LANGUAGE CPP #-}
13{-# LANGUAGE FlexibleContexts #-}
14{-# LANGUAGE ScopedTypeVariables #-}
15{-# LANGUAGE TemplateHaskell #-}
16{-# LANGUAGE TupleSections #-}
17{-# LANGUAGE PartialTypeSignatures #-}
18{-# LANGUAGE GADTs #-}
19{-# LANGUAGE RankNTypes #-}
20{-# LANGUAGE MultiParamTypeClasses #-}
21module Network.BitTorrent.DHT.Query
22 ( -- * Handler
23 -- | To bind specific set of handlers you need to pass
24 -- handler list to the 'startNode' function.
25 pingH
26 , findNodeH
27 , getPeersH
28 , announceH
29 , defaultHandlers
31 -- * Query
32 -- ** Basic
33 -- | A basic query perform a single request expecting a
34 -- single response.
35 , Iteration
36 , pingQ
37 , coldPingQ
38 , findNodeQ
39 , getPeersQ
40 , announceQ
42 -- ** Iterative
43 -- | An iterative query perform multiple basic queries,
44 -- concatenate its responses, optionally yielding result and
45 -- continue to the next iteration.
46 , Search
47 -- , search
48 , publish
49 , ioFindNode
50 , ioFindNodes
51 , ioGetPeers
52 , isearch
53 , bgsearch
55 -- ** Routing table
56 , insertNode
57 , refreshNodes
59 -- ** Messaging
60 , queryNode
61 , queryNode'
62 , (<@>)
63 ) where
65import Data.Bits
66import Data.Default
68import Control.Concurrent.Lifted.Instrument hiding (yield)
70import GHC.Conc (labelThread)
71import Control.Concurrent.Lifted hiding (yield)
73import Control.Exception.Lifted hiding (Handler)
74import Control.Monad.Reader
75import Control.Monad.Logger
76import Data.Maybe
77import Data.Conduit
78import Data.Conduit.List as C hiding (mapMaybe, mapM_)
79import Data.Either
80import Data.List as L
81import Data.Monoid
82import Data.Text as T
83import qualified Data.Set as Set
84 ;import Data.Set (Set)
85import Network
86import Text.PrettyPrint as PP hiding ((<>), ($$))
87import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
88import Data.Time
89import Data.Time.Clock.POSIX
90import Data.Hashable (Hashable)
91import Data.Serialize
92import Data.Hashable
94import Network.DatagramServer as KRPC hiding (Options, def)
95import Network.KRPC.Method as KRPC
96import Network.DatagramServer.Mainline (ReflectedIP(..), QueryExtra(..), ResponseExtra(..))
97import Network.DatagramServer (QueryFailure(..))
98import Data.Torrent
99import qualified Network.DHT as DHT
100import Network.DHT.Mainline
101import Network.DHT.Routing as R
102import Network.BitTorrent.DHT.Session
103import Control.Concurrent.STM
104import qualified Network.BitTorrent.DHT.Search as Search
105#ifdef VERSION_bencoding
106import Data.BEncode (BValue)
107import Network.DatagramServer.Mainline (KMessageOf)
109import Data.ByteString (ByteString)
110import Network.DatagramServer.Tox
112import Network.Address hiding (NodeId)
113import Network.DatagramServer.Types as RPC hiding (Query,Response)
114import Network.DHT.Types
115import Control.Monad.Trans.Control
116import Data.Typeable
117import Data.Serialize
118import System.IO.Unsafe (unsafeInterleaveIO)
119import Data.String
123-- Handlers
127nodeHandler :: ( Address ip
128 , KRPC dht (Query KMessageOf a) (Response KMessageOf b)
129 )
130 => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler
132nodeHandler :: forall raw dht addr u t q r.
133 (Address addr, WireFormat raw dht, Pretty (NodeInfo dht addr u),
134 Default u,
135 IsString t, Functor dht,
136 KRPC dht (Query dht q) (Response dht r),
137 SerializableTo raw (Response dht r),
138 SerializableTo raw (Query dht q),
139 Show (QueryMethod dht)) =>
140 (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ())
141 -> (NodeAddr addr -> IO (NodeId dht))
142 -> (Char -> t -> Text -> IO ())
143 -> DHTData dht addr
144 -> QueryMethod dht
145 -> (NodeAddr addr -> q -> IO r)
146 -> Handler IO dht raw
147nodeHandler insertNode myNodeIdAccordingTo logm dta method action = handler (\sockaddr -> myNodeIdAccordingTo (error "todo")) method $ \ sockAddr msg -> do
148 let remoteId = messageSender (msg :: dht (Query dht q)) resptype
149 qextra = queryExtra qry
150 resptype = Proxy :: Proxy (Response dht r)
151 q = queryParams qry
152 qry = envelopePayload msg :: Query dht q
153 case fromSockAddr sockAddr of
154 Nothing -> throwIO BadAddress
155 Just naddr -> do
156 logm 'D' "nodeHandler" $ "Received query: " <> T.pack (show $ method)
157 me <- myNodeIdAccordingTo naddr
158 rextra <- liftIO $ makeResponseExtra dta me qry resptype
159 let ni = NodeInfo remoteId naddr def
160 -- Do not route read-only nodes. (bep 43)
161 if fromRoutableNode qextra
162 then insertNode ni Nothing >> return () -- TODO need to block. why?
163 else logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni)
164 Response
165 <$> pure rextra
166 <*> action naddr q
168-- | Default 'Ping' handler.
169pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht)
170pingH dht _ _ = return (DHT.pongMessage dht)
171-- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True }
173-- | Default 'FindNode' handler.
174findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip)
175findNodeH getclosest _ msg = foundNodesMessage . (fmap (const ())) <$> getclosest (findWho msg)
177-- | Default 'GetPeers' handler.
178getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip)
179getPeersH getPeerList toks naddr (GetPeers ih) = do
180 ps <- getPeerList ih
181 tok <- grantToken toks naddr
182 return $ GotPeers ps tok
184-- | Default 'Announce' handler.
185announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced
186announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do
187 valid <- checkToken toks naddr sessionToken
188 unless valid $ do
189 throwIO $ InvalidParameter "token"
191 let annPort = if impliedPort then nodePort else port
192 peerAddr = PeerAddr Nothing nodeHost annPort
193 insertPeer peers topic announcedName peerAddr
194 return Announced
196-- | Includes all Kademlia-related handlers.
197kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip
198 , Ord (TransactionID dht)
199 , Ord (NodeId dht)
200 , Show u
201 , SerializableTo raw (Response dht (Ping dht))
202 , SerializableTo raw (Query dht (Ping dht))
203 , Show (QueryMethod dht)
204 , Show (NodeId dht)
205 , FiniteBits (NodeId dht)
206 , Default u
207 , Serialize (TransactionID dht)
208 , WireFormat raw dht
209 , Kademlia dht
210 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
211 , Functor dht
212 , Pretty (NodeInfo dht ip u)
213 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
214 , SerializableTo raw (Response dht (NodeFound dht ip))
215 , SerializableTo raw (Query dht (FindNode dht ip))
216 ) => LogFun -> DHT raw dht u ip [Handler IO dht raw]
217-- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler]
218kademliaHandlers logger = do
219 groknode <- insertNode1
220 mynid <- myNodeIdAccordingTo1
221 dta <- asks dhtData
222 let handler :: ( KRPC dht (Query dht a) (Response dht b)
223 , SerializableTo raw (Response dht b)
224 , SerializableTo raw (Query dht a)
225 ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw
226 handler = nodeHandler groknode mynid (logt logger) dta
227 dht = Proxy :: Proxy dht
228 getclosest <- getClosest1
229 return [ handler (namePing dht) $ pingH dht
230 , handler (nameFindNodes dht) $ findNodeH getclosest
231 ]
233instance DataHandlers BValue KMessageOf where
234 dataHandlers = bthandlers
236bthandlers ::
237 ( Ord ip , Hashable ip, Typeable ip, Serialize ip) =>
238 (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()])
239 -> DHTData KMessageOf ip
240 -> [MethodHandler BValue KMessageOf ip]
241bthandlers getclosest dta =
242 [ MethodHandler "get_peers" $ getPeersH (getpeers dta) (sessionTokens dta)
243 , MethodHandler "announce_peer" $ announceH (contactInfo dta) (sessionTokens dta)
244 ]
245 where
246 getpeers dta ih = do
247 ps <- lookupPeers (contactInfo dta) ih
248 if L.null ps
249 then Left <$> getclosest (toNodeId ih)
250 else return (Right ps)
253-- | Includes all default query handlers.
254defaultHandlers :: forall raw dht u ip.
255 ( Ord (TransactionID dht)
256 , Ord (NodeId dht)
257 , Show u
258 , SerializableTo raw (Response dht (Ping dht))
259 , SerializableTo raw (Query dht (Ping dht))
260 , Show (QueryMethod dht)
261 , Show (NodeId dht)
262 , FiniteBits (NodeId dht)
263 , Default u
264 , Serialize (TransactionID dht)
265 , WireFormat raw dht
266 , Kademlia dht
267 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
268 , Functor dht
269 , Pretty (NodeInfo dht ip u)
270 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
271 , SerializableTo raw (Response dht (NodeFound dht ip))
272 , SerializableTo raw (Query dht (FindNode dht ip))
273 , Eq ip, Ord ip, Address ip, DataHandlers raw dht
274 ) => LogFun -> DHT raw dht u ip [Handler IO dht raw]
275defaultHandlers logger = do
276 groknode <- insertNode1
277 mynid <- myNodeIdAccordingTo1
278 dta <- asks dhtData
279 let handler :: MethodHandler raw dht ip -> Handler IO dht raw
280 handler (MethodHandler name action) = nodeHandler groknode mynid (logt logger) dta name action
281 getclosest <- getClosest1
282 hs <- kademliaHandlers logger
283 return $ hs ++ handler (dataHandlers (fmap (fmap (fmap (const ()))) . getclosest) dta)
286-- Basic queries
289type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip])
291-- | The most basic query. May be used to check if the given node is
292-- alive or get its 'NodeId'.
293pingQ :: forall raw dht u ip.
294 ( DHT.Kademlia dht
295 , Address ip
296 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
297 , Default u
298 , Show u
299 , Ord (TransactionID dht)
300 , Serialize (TransactionID dht)
301 , WireFormat raw dht
302 , SerializableTo raw (Response dht (Ping dht))
303 , SerializableTo raw (Query dht (Ping dht))
304 , Ord (NodeId dht)
305 , FiniteBits (NodeId dht)
306 , Show (NodeId dht)
307 , Show (QueryMethod dht)
308 ) => NodeInfo dht ip u -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP)
309pingQ ni = do
310 let ping = DHT.pingMessage (Proxy :: Proxy dht)
311 (nid, pong, mip) <- queryNode' ni ping
312 let _ = pong `asTypeOf` ping
313 -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid}
314 return (NodeInfo nid (nodeAddr ni) def, mip)
316-- | The most basic query. May be used to check if the given node is
317-- alive or get its 'NodeId'.
318coldPingQ :: forall raw dht u ip.
319 ( DHT.Kademlia dht
320 , Address ip
321 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
322 , Default u
323 , Show u
324 , Ord (TransactionID dht)
325 , Serialize (TransactionID dht)
326 , WireFormat raw dht
327 , SerializableTo raw (Response dht (Ping dht))
328 , SerializableTo raw (Query dht (Ping dht))
329 , Ord (NodeId dht)
330 , FiniteBits (NodeId dht)
331 , Show (NodeId dht)
332 , Show (QueryMethod dht)
333 ) => PacketDestination dht -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP)
334coldPingQ dest = do
335 let ping = DHT.pingMessage (Proxy :: Proxy dht)
336 naddr <- maybe (throwIO $ QueryFailed ProtocolError "unable to construct NodeAddr from PacketDestination")
337 return
338 $ fromAddr dest
339 (nid, pong, mip) <- coldQueryNode' naddr dest ping
340 let _ = pong `asTypeOf` ping
341 -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid}
342 return (NodeInfo nid naddr def, mip)
344-- TODO [robustness] match range of returned node ids with the
345-- expected range and either filter bad nodes or discard response at
346-- all throwing an exception
347-- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo
348findNodeQ proxy key ni = do
349 closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> ni
350 $(logInfoS) "findNodeQ" $ "NodeFound\n"
351 <> T.pack (L.unlines $ ((' ' :) . show . pPrint) closest)
352 return $ Right closest
354#ifdef VERSION_bencoding
355getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr
356getPeersQ topic ni = do
357 GotPeers {..} <- GetPeers topic <@> ni
358 let dist = distance (toNodeId topic) (nodeId ni)
359 $(logInfoS) "getPeersQ" $ T.pack
360 $ "distance: " <> render (pPrint dist) <> " , result: "
361 <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" }
362 return peers
364announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr
365announceQ ih p ni = do
366 GotPeers {..} <- GetPeers ih <@> ni
367 case peers of
368 Left ns
369 | False -> undefined -- TODO check if we can announce
370 | otherwise -> return (Left ns)
371 Right _ -> do -- TODO *probably* add to peer cache
372 Announced <- Announce False ih Nothing p grantedToken <@> ni
373 return (Right [nodeAddr ni])
377-- Iterative queries
381ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip]))
382ioGetPeers ih = do
383 session <- ask
384 return $ \ni -> runDHT session $ do
385 r <- try $ getPeersQ ih ni
386 case r of
387 Right e -> return $ either (,[]) ([],) e
388 Left e -> let _ = e :: QueryFailure in return ([],[])
390ioFindNode :: ( DHT.Kademlia dht
391 , WireFormat raw dht
392 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
393 , Address ip
394 , Default u
395 , Show u
396 , Show (QueryMethod dht)
397 , TableKey dht infohash
398 , Eq (NodeId dht)
399 , Ord (NodeId dht)
400 , FiniteBits (NodeId dht)
401 , Show (NodeId dht)
402 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
403 , Ord (TransactionID dht)
404 , Serialize (TransactionID dht)
405 , SerializableTo raw (Response dht (NodeFound dht ip))
406 , SerializableTo raw (Query dht (FindNode dht ip))
407 , SerializableTo raw (Response dht (Ping dht))
408 , SerializableTo raw (Query dht (Ping dht))
409 ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u]))
410ioFindNode ih = do
411 session <- ask
412 return $ \ni -> runDHT session $ do
413 ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni
414 let ns' = (fmap (const def)) ns
415 return $ L.partition (\n -> nodeId n /= toNodeId ih) ns'
418-- | Like ioFindNode, but considers all found nodes to be 'Right' results.
419ioFindNodes :: ( DHT.Kademlia dht
420 , WireFormat raw dht
421 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
422 , Address ip
423 , Default u
424 , Show u
425 , Show (QueryMethod dht)
426 , TableKey dht infohash
427 , Eq (NodeId dht)
428 , Ord (NodeId dht)
429 , FiniteBits (NodeId dht)
430 , Show (NodeId dht)
431 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
432 , Ord (TransactionID dht)
433 , Serialize (TransactionID dht)
434 , SerializableTo raw (Response dht (NodeFound dht ip))
435 , SerializableTo raw (Query dht (FindNode dht ip))
436 , SerializableTo raw (Response dht (Ping dht))
437 , SerializableTo raw (Query dht (Ping dht))
438 ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u]))
439ioFindNodes ih = do
440 session <- ask
441 return $ \ni -> runDHT session $ do
442 ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni
443 let ns' = (fmap (const def)) ns
444 return ([], ns')
446isearch :: ( Ord r
447 , Ord ip
448 , Ord (NodeId dht)
449 , FiniteBits (NodeId dht)
450 , TableKey dht ih
451 , Show ih) =>
452 (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])))
453 -> ih
454 -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r)
455isearch f ih = do
456 qry <- f ih
457 ns <- kclosest 8 ih <$> getTable
458 liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns
459 a <- fork $ do
460 tid <- myThreadId
461 labelThread tid ("search."++show ih)
462 s
463 -- atomically \$ readTVar (Search.searchResults s)
464 return (a, s)
466-- | Background search: fill a lazy list using a background thread.
467bgsearch f ih = do
468 (tid, s) <- isearch f ih
469 let again shown = do
470 (chk,fin) <- atomically $ do
471 r <- (Set.\\ shown) <$> readTVar (Search.searchResults s)
472 if not $ Set.null r
473 then (,) r <$> Search.searchIsFinished s
474 else Search.searchIsFinished s >>= check >> return (Set.empty,True)
475 let ps = Set.toList chk
476 if fin then return ps
477 else do
478 xs <- unsafeInterleaveIO $ again (shown `Set.union` chk)
479 return $ ps ++ xs
480 liftIO $ again Set.empty
482type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u]
484#if 0
486-- TODO: use reorder and filter (Traversal option) leftovers
487-- search :: k -> IterationI ip o -> Search ip o
488search _ action = do
489 awaitForever $ \ batch -> unless (L.null batch) $ do
490 $(logWarnS) "search" "start query"
491 responses <- lift $ queryParallel (action <$> batch)
492 let (nodes, results) = partitionEithers responses
493 $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results)))
494 leftover $ L.concat nodes
495 let r = mapM_ yield results
496 _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ())
497 r
501publish = error "todo"
502-- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip ()
503-- publish ih p = do
504 -- nodes <- getClosest ih
505 -- r <- asks (optReplication . options)
506 -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
507 -- return ()
510probeNode :: ( Default u
511 , Show u
512 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
513 , DHT.Kademlia dht
514 , Address ip
515 , Ord (TransactionID dht)
516 , Serialize (TransactionID dht)
517 , WireFormat raw dht
518 , SerializableTo raw (Response dht (Ping dht))
519 , SerializableTo raw (Query dht (Ping dht))
520 , Ord (NodeId dht)
521 , FiniteBits (NodeId dht)
522 , Show (NodeId dht)
523 , Show (QueryMethod dht)
524 ) => NodeInfo dht ip u -> DHT raw dht u ip (Bool , Maybe ReflectedIP)
525probeNode addr = do
526 $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint $ nodeAddr addr)))
527 result <- try $ pingQ addr
528 let _ = fmap (const ()) result :: Either QueryFailure ()
529 return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result
532refreshNodes :: forall raw dht u ip.
533 ( Address ip
534 , Ord (NodeId dht)
535 , Default u
536 , FiniteBits (NodeId dht)
537 , Pretty (NodeId dht)
538 , DHT.Kademlia dht
539 , Ord ip
540 , Ord (TransactionID dht)
541 , SerializableTo raw (Response dht (NodeFound dht ip))
542 , SerializableTo raw (Query dht (FindNode dht ip))
543 , SerializableTo raw (Response dht (Ping dht))
544 , SerializableTo raw (Query dht (Ping dht))
545 , Pretty (NodeInfo dht ip u)
546 , Show (NodeId dht)
547 , Show u
548 , Show (QueryMethod dht)
549 , Serialize (TransactionID dht)
550 , WireFormat raw dht
551 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
552 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
553 ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()]
554-- FIXME do not use getClosest sinse we should /refresh/ them
555refreshNodes nid = do
556 $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid)))
557 nodes <- getClosest nid
558 do
559 -- forM (L.take 1 nodes) \$ \ addr -> do
560 -- NodeFound ns <- FindNode nid <@> addr
561 -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) ()
562 -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) ()
563 -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume
564 -- nss <- sourceList [nodes] \$= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume
565 ns <- bgsearch ioFindNodes nid
566 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length ns)) <> " nodes."
567 _ <- queryParallel $ flip ns $ \n -> do
568 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n))
569 pingQ n
570 -- pingQ takes care of inserting the node.
571 return ()
572 return () -- \$ L.concat nss
574logc :: Char -> String -> DHT raw dht u ip ()
575logc 'D' = $(logDebugS) "insertNode" . T.pack
576logc 'W' = $(logWarnS) "insertNode" . T.pack
577logc 'I' = $(logInfoS) "insertNode" . T.pack
578logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :)
580-- | This operation do not block but acquire exclusive access to
581-- routing table.
582insertNode :: forall raw dht u ip.
583 ( Address ip
584 , Ord (NodeId dht)
585 , FiniteBits (NodeId dht)
586 , Show (NodeId dht)
587 , Default u
588 , Show u
589 , DHT.Kademlia dht
590 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
591 , Ord (TransactionID dht)
592 , WireFormat raw dht
593 , Serialize (TransactionID dht)
594 , SerializableTo raw (Response dht (Ping dht))
595 , SerializableTo raw (Query dht (Ping dht))
596 , Ord (NodeId dht)
597 , Show (NodeId dht)
598 , Show (QueryMethod dht)
599 ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip ()
600insertNode info witnessed_ip0 = do
601 f <- insertNode1
602 liftIO $ f info witnessed_ip0
604insertNode1 :: forall raw dht u ip.
605 ( Address ip
606 , Default u
607 , Show u
608 , Ord (NodeId dht)
609 , FiniteBits (NodeId dht)
610 , Show (NodeId dht)
611 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
612 , DHT.Kademlia dht
613 , Ord (TransactionID dht)
614 , WireFormat raw dht
615 , Serialize (TransactionID dht)
616 , SerializableTo raw (Response dht (Ping dht))
617 , SerializableTo raw (Query dht (Ping dht))
618 , Ord (NodeId dht)
619 , Show (NodeId dht)
620 , Show (QueryMethod dht)
621 ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ())
622insertNode1 = do
623 bc <- optBucketCount <$> asks options
624 nid <- asks tentativeNodeId
625 logm0 <- embed_ (uncurry logc)
626 let logm c = logm0 . (c,)
627 dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state.
628 probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP)
629 let probe n = probe0 n >>= runDHT dht_node_state . restoreM
630 {-
631 changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive
632 ip <- fromSockAddr ip0 :: Maybe ip
633 listToMaybe
634 $ rank id (nodeId $ foreignNode arrival)
635 $ bep42s ip (DHT.fallbackID params) -- warning: recursive
636 -}
637 params = DHT.TableParameters
638 { maxBuckets = bc :: Int
639 , fallbackID = nid :: NodeId dht
640 , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht
641 , logMessage = logm :: Char -> String -> IO ()
642 , pingProbe = probe :: NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP)
643 }
644 tbl <- asks routingInfo
645 let state = DHT.TableKeeper
646 { routingInfo = tbl
647 , grokNode = DHT.insertNode params state
648 , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO ()
649 }
650 return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0
652-- | Throws exception if node is not responding.
653queryNode :: forall raw dht u a b ip.
654 ( Address ip
655 , KRPC dht (Query dht a) (Response dht b)
656 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
657 , Default u
658 , Show u
659 , DHT.Kademlia dht
660 , Ord (TransactionID dht)
661 , Serialize (TransactionID dht)
662 , WireFormat raw dht
663 , SerializableTo raw (Response dht b)
664 , SerializableTo raw (Query dht a)
665 , Ord (NodeId dht)
666 , FiniteBits (NodeId dht)
667 , Show (NodeId dht)
668 , Show (QueryMethod dht)
669 , SerializableTo raw (Response dht (Ping dht))
670 , SerializableTo raw (Query dht (Ping dht))
671 ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b)
672queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q
674queryNode' :: forall raw dht u a b ip.
675 ( Address ip
676 , Default u
677 , Show u
678 , DHT.Kademlia dht
679 , KRPC dht (Query dht a) (Response dht b)
680 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
681 , Ord (TransactionID dht)
682 , Serialize (TransactionID dht)
683 , WireFormat raw dht
684 , SerializableTo raw (Response dht b)
685 , SerializableTo raw (Query dht a)
686 , Ord (NodeId dht)
687 , FiniteBits (NodeId dht)
688 , Show (NodeId dht)
689 , Show (QueryMethod dht)
690 , SerializableTo raw (Response dht (Ping dht))
691 , SerializableTo raw (Query dht (Ping dht))
692 ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP)
693queryNode' ni q = do
694 let addr = nodeAddr ni
695 dest = makeAddress (Left $ nodeId ni) (toSockAddr addr)
696 coldQueryNode' addr dest q
698coldQueryNode' :: forall raw dht u a b ip.
699 ( Address ip
700 , Default u
701 , Show u
702 , DHT.Kademlia dht
703 , KRPC dht (Query dht a) (Response dht b)
704 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
705 , Ord (TransactionID dht)
706 , Serialize (TransactionID dht)
707 , WireFormat raw dht
708 , SerializableTo raw (Response dht b)
709 , SerializableTo raw (Query dht a)
710 , Ord (NodeId dht)
711 , FiniteBits (NodeId dht)
712 , Show (NodeId dht)
713 , Show (QueryMethod dht)
714 , SerializableTo raw (Response dht (Ping dht))
715 , SerializableTo raw (Query dht (Ping dht))
716 ) => NodeAddr ip -> PacketDestination dht -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP)
717coldQueryNode' addr dest q = do
718 nid <- myNodeIdAccordingTo $ fromMaybe (error "TODO: coldQueryNode' myNodeIdAccordingTo") $ fromAddr dest
719 dta <- asks dhtData
720 qextra <- liftIO $ makeQueryExtra dta nid (Proxy :: Proxy (Query dht q)) (Proxy :: Proxy (Response dht b))
721 let read_only = False -- TODO: check for NAT issues. (BEP 43)
722 -- let KRPC.Method name = KRPC.method :: KRPC.Method dht (Query dht a) (Response dht b)
723 mgr <- asks manager
724 (Response rextra r, remoteId, witnessed_ip) <- liftIO $ query' mgr dest (Query qextra q)
725 -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip)
726 -- <> " by " <> T.pack (show (toSockAddr addr))
727 _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip
728 return (remoteId, r, witnessed_ip)
730-- | Infix version of 'queryNode' function.
731(<@>) :: ( Address ip
732 , KRPC dht (Query dht a) (Response dht b)
733 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
734 , Default u
735 , Show u
736 , Show (QueryMethod dht)
737 , Ord (NodeId dht)
738 , FiniteBits (NodeId dht)
739 , Show (NodeId dht)
740 , Ord (TransactionID dht)
741 , Serialize (TransactionID dht)
742 , SerializableTo raw (Response dht b)
743 , SerializableTo raw (Query dht a)
744 , SerializableTo raw (Response dht (Ping dht))
745 , SerializableTo raw (Query dht (Ping dht))
746 , WireFormat raw dht
747 , Kademlia dht
748 ) => a -> NodeInfo dht ip u -> DHT raw dht u ip b
749q <@> addr = snd <$> queryNode addr q
750{-# INLINE (<@>) #-}
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
deleted file mode 100644
index b775e7d3..00000000
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ /dev/null
@@ -1,571 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013-2014
3-- License : BSD3
4-- Maintainer :
5-- Stability : experimental
6-- Portability : portable
8-- This module defines internal state of a node instance. You can
9-- have multiple nodes per application but usually you don't have
10-- to. Normally, you don't need to import this module, use
11-- "Network.BitTorrent.DHT" instead.
13{-# LANGUAGE CPP #-}
14{-# LANGUAGE RecordWildCards #-}
15{-# LANGUAGE FlexibleContexts #-}
16{-# LANGUAGE FlexibleInstances #-}
17{-# LANGUAGE GeneralizedNewtypeDeriving #-}
18{-# LANGUAGE MultiParamTypeClasses #-}
19{-# LANGUAGE ScopedTypeVariables #-}
20{-# LANGUAGE TypeFamilies #-}
21{-# LANGUAGE TemplateHaskell #-}
22module Network.BitTorrent.DHT.Session
23 ( -- * Options
24 -- | Use @optFooBar def@ to get default 'Alpha' or 'K'.
25 Alpha
26 , K
27 , Options (..)
29 -- * Session
30 , Node
31 , options
32 , tentativeNodeId
33 , myNodeIdAccordingTo
34 , myNodeIdAccordingTo1
35 , routingInfo
36 , routableAddress
37 , getTimestamp
38 -- , SessionTokens
39 -- , sessionTokens
40 -- , contactInfo
41 , dhtData
42 , PeerStore
43 , manager
45 -- ** Initialization
46 , LogFun
47 , logt
48 , NodeHandler
49 , newNode
50 , closeNode
52 -- * DHT
53 -- | Use @asks options@ to get options passed to 'startNode'
54 -- or @asks thisNodeId@ to get id of locally running node.
55 , DHT
56 , runDHT
58 -- ** Tokens
59 -- , grantToken
60 -- , checkToken
62 -- ** Routing table
63 , getTable
64 , getClosest
65 , getClosest1
67#ifdef VERSION_bencoding
68 -- ** Peer storage
69 , insertPeer
70 , getPeerList
71 , getPeerList1
72 , lookupPeers
73 , insertTopic
74 , deleteTopic
75 , getSwarms
76 , savePeerStore
77 , mergeSavedPeers
78 , allPeers
81 -- ** Messaging
82 , queryParallel
83 ) where
85import Prelude hiding (ioError)
87import Control.Concurrent.STM
89import Control.Concurrent.Async.Lifted.Instrument
91import Control.Concurrent.Async.Lifted
93import Control.Exception.Lifted hiding (Handler)
94import Control.Monad.Base
95import Control.Monad.Logger
96import Control.Monad.Reader
97import Control.Monad.Trans.Control
98import Control.Monad.Trans.Resource
99import Data.Typeable
100import Data.String
101import Data.Bits
102import Data.ByteString
103import Data.Conduit.Lazy
104import Data.Default
105import Data.Fixed
106import Data.Hashable
107import Data.List as L
108import Data.Maybe
109import Data.Monoid
110import Data.Set as S
111import Data.Time
112import Network (PortNumber)
113import System.Random (randomIO)
114import Data.Time.Clock.POSIX
115import Data.Text as Text
116import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
117import Data.Serialize as S
118import Network.DHT.Types
119import Network.DatagramServer.Types
122import Data.Torrent as Torrent
123import Network.DatagramServer as KRPC hiding (Options, def)
124import qualified Network.DatagramServer as KRPC (def)
125#ifdef VERSION_bencoding
126import Data.BEncode (BValue)
127import Network.DatagramServer.Mainline (KMessageOf)
129import Network.DatagramServer.Tox as Tox
131import Network.Address
132import Network.BitTorrent.DHT.ContactInfo (PeerStore)
133import qualified Network.BitTorrent.DHT.ContactInfo as P
134import Network.DHT.Mainline
135import Network.DHT.Routing as R
136import Network.BitTorrent.DHT.Token as T
137import GHC.Stack as GHC
140-- Options
143-- | Node lookups can proceed asynchronously.
144type Alpha = Int
146-- NOTE: libtorrent uses 5, azureus uses 10
147-- | The quantity of simultaneous lookups is typically three.
148defaultAlpha :: Alpha
149defaultAlpha = 3
151-- TODO add replication loop
153-- TODO do not insert infohash -> peeraddr if infohash is too far from
154-- this node id
156data Order
157 = NearFirst
158 | FarFirst
159 | Random
161data Traversal
162 = Greedy -- ^ aggressive short-circuit traversal
163 | Exhaustive -- ^
166-- | Original Kamelia DHT uses term /publish/ for data replication
167-- process. BitTorrent DHT uses term /announce/ since the purpose of
168-- the DHT is peer discovery. Later in documentation, we use terms
169-- /publish/ and /announce/ interchangible.
170data Options = Options
171 { -- | The degree of parallelism in 'find_node' queries. More
172 -- parallism lead to faster bootstrapping and lookup operations,
173 -- but also increase resource usage.
174 --
175 -- Normally this parameter should not exceed 'optK'.
176 optAlpha :: {-# UNPACK #-} !Alpha
178 -- | /K/ parameter - number of nodes to return in 'find_node'
179 -- responses.
180 , optK :: {-# UNPACK #-} !K
182 -- | Number of buckets to maintain. This parameter depends on
183 -- amount of nodes in the DHT network.
184 , optBucketCount :: {-# UNPACK #-} !BucketCount
186 -- | RPC timeout.
187 , optTimeout :: !NominalDiffTime
189 -- | /R/ parameter - how many target nodes the 'announce' query
190 -- should affect.
191 --
192 -- A large replica set compensates for inconsistent routing and
193 -- reduces the need to frequently republish data for
194 -- persistence. This comes at an increased cost for
195 -- 'Network.BitTorrent.DHT.insert' in terms of time, nodes
196 -- contacted, and storage.
197 , optReplication :: {-# UNPACK #-} !NodeCount
199 -- | How often this node should republish (or reannounce) its
200 -- data.
201 --
202 -- Large replica set ('optReplication') should require
203 -- smaller reannounce intervals ('optReannounce').
204 , optReannounce :: !NominalDiffTime
206 -- | The time it takes for data to expire in the
207 -- network. Publisher of the data should republish (or
208 -- reannounce) data to keep it in the network.
209 --
210 -- The /data expired timeout/ should be more than 'optReannounce'
211 -- interval.
212 , optDataExpired :: !NominalDiffTime
213 } deriving (Show, Eq)
215-- | Optimal options for bittorrent client. For short-lifetime
216-- utilities you most likely need to tune 'optAlpha' and
217-- 'optBucketCount'.
218instance Default Options where
219 def = Options
220 { optAlpha = defaultAlpha
221 , optK = defaultK
223 -- see Fig.2 from "BitTorrent Mainline DHT Measurement" paper.
224 , optBucketCount = defaultBucketCount
226 -- see Fig.4 from "Profiling a Million User DHT" paper.
227 , optTimeout = 5 -- seconds
228 , optReplication = 20 -- nodes
229 , optReannounce = 15 * 60
230 , optDataExpired = 60 * 60
231 }
233seconds :: NominalDiffTime -> Int
234seconds dt = fromEnum (realToFrac dt :: Uni)
236-- Session
239-- | A set of torrents this peer intends to share.
240type AnnounceSet = Set (InfoHash, PortNumber)
242-- | Logger function.
243type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
245-- | DHT session keep track state of /this/ node.
246data Node raw dht u ip = Node
247 { -- | Session configuration;
248 options :: !Options
250 -- | Pseudo-unique self-assigned session identifier. This value is
251 -- constant during DHT session and (optionally) between sessions.
252 , tentativeNodeId :: !(NodeId dht)
254 , resources :: !InternalState
255 , manager :: !(Manager raw dht) -- ^ RPC manager;
256 , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table;
257 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node;
258 , dhtData :: DHTData dht ip
259 , loggerFun :: !LogFun
260 }
262-- | DHT keep track current session and proper resource allocation for
263-- safe multithreading.
264newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a }
265 deriving ( Functor, Applicative, Monad, MonadIO
266 , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow
267 )
269#if MIN_VERSION_monad_control(1,0,0)
270newtype DHTStM raw dht u ip a = StM {
271 unSt :: StM (ReaderT (Node raw dht u ip) IO) a
272 }
275instance MonadBaseControl IO (DHT raw dht u ip) where
276#if MIN_VERSION_monad_control(1,0,0)
277 type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a
279 newtype StM (DHT raw dht u ip) a = StM {
280 unSt :: StM (ReaderT (Node raw dht u ip) IO) a
281 }
283 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' ->
284 cc $ \ (DHT m) -> StM <$> cc' m
285 {-# INLINE liftBaseWith #-}
287 restoreM = DHT . restoreM . unSt
288 {-# INLINE restoreM #-}
290-- | Check is it is possible to run 'queryNode' or handle pending
291-- query from remote node.
292instance MonadActive (DHT raw dht u ip) where
293 monadActive = getManager >>= liftIO . isActive
294 {-# INLINE monadActive #-}
296-- | All allocated resources will be closed at 'closeNode'.
297instance MonadResource (DHT raw dht u ip) where
298 liftResourceT m = do
299 s <- asks resources
300 liftIO $ runInternalState m s
302-- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where
304getManager :: DHT raw dht u ip (Manager raw dht)
305getManager = asks manager
307instance MonadLogger (DHT raw dht u ip) where
308 monadLoggerLog loc src lvl msg = do
309 logger <- asks loggerFun
310 liftIO $ logger loc src lvl (toLogStr msg)
312#ifdef VERSION_bencoding
313type NodeHandler = Handler IO KMessageOf BValue
315type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString
318logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO ()
319logt lf c m txt = lf (locFromCS callStack) (fromString m) (lvl c) (fromString $ Text.unpack txt)
320 where
321 lvl 'D' = LevelDebug
322 lvl 'I' = LevelInfo
323 lvl 'W' = LevelWarn
324 lvl 'E' = LevelError
325 lvl ch = LevelOther $ Text.cons ch Text.empty
327mkLoggerLoc :: GHC.SrcLoc -> Loc
328mkLoggerLoc loc =
329 Loc { loc_filename = GHC.srcLocFile loc
330 , loc_package = GHC.srcLocPackage loc
331 , loc_module = GHC.srcLocModule loc
332 , loc_start = ( GHC.srcLocStartLine loc
333 , GHC.srcLocStartCol loc)
334 , loc_end = ( GHC.srcLocEndLine loc
335 , GHC.srcLocEndCol loc)
336 }
338locFromCS :: GHC.CallStack -> Loc
339locFromCS cs = case getCallStack cs of
340 ((_, loc):_) -> mkLoggerLoc loc
341 _ -> Loc "<unknown>" "<unknown>" "<unknown>" (0,0) (0,0)
344-- | Run DHT session. You /must/ properly close session using
345-- 'closeNode' function, otherwise socket or other scarce resources may
346-- leak.
347newNode :: forall raw dht ip u.
348 ( Address ip
349 , FiniteBits (NodeId dht)
350 , Serialize (NodeId dht)
351 , Kademlia dht
352 , WireFormat raw dht
353 )
354 => -- [NodeHandler] -- ^ handlers to run on accepted queries;
355 Options -- ^ various dht options;
356 -> NodeAddr ip -- ^ node address to bind;
357 -> LogFun -- ^ invoked on log messages;
358 -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated.
359 -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address.
360newNode opts naddr logger mbid = do
361 s <- createInternalState
362 runInternalState initNode s
363 `onException` closeInternalState s
364 where
365 rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) }
366 nodeAddr = toSockAddr naddr
367 initNode = do
368 s <- getInternalState
369 (myId, infovar, getst) <- liftIO $ do
370 (i, ctx) <- initializeServerState (Proxy :: Proxy (dht raw)) mbid
371 var <- atomically (newTVar Nothing)
372 let getst dest = do
373 info <- atomically . readTVar $ var
374 return ( maybe i myNodeId info, ctx)
375 return (i, var, getst)
376 (_, m) <- allocate (newManager rpcOpts (logt logger) nodeAddr getst []) closeManager
377 liftIO $ do
378 dta <- initializeDHTData
379 node <- Node opts myId s m infovar
380 <$> newTVarIO S.empty
381 <*> pure dta
382 <*> pure logger
383 return node
385-- | Some resources like listener thread may live for
386-- some short period of time right after this DHT session closed.
387closeNode :: Node raw dht u ip -> IO ()
388closeNode Node {..} = closeInternalState resources
390-- | Run DHT operation on the given session.
391runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a
392runDHT node action = runReaderT (unDHT action) node
393{-# INLINE runDHT #-}
396-- Routing
399-- /pick a random ID/ in the range of the bucket and perform a
400-- find_nodes search on it.
404-- Routing table
407-- | This nodes externally routable address reported by remote peers.
408routableAddress :: DHT raw dht u ip (Maybe SockAddr)
409routableAddress = do
410 info <- asks routingInfo >>= liftIO . atomically . readTVar
411 return $ myAddress <$> info
413-- | The current NodeId that the given remote node should know us by.
414myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht)
415myNodeIdAccordingTo _ = do
416 info <- asks routingInfo >>= liftIO . atomically . readTVar
417 maybe (asks tentativeNodeId)
418 (return . myNodeId)
419 info
421myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) )
422myNodeIdAccordingTo1 = do
423 var <- asks routingInfo
424 tid <- asks tentativeNodeId
425 return $ \ _ -> do
426 info <- atomically $ readTVar var
427 return $ maybe tid myNodeId info
429-- | Get current routing table. Normally you don't need to use this
430-- function, but it can be usefull for debugging and profiling purposes.
431getTable :: Eq ip => DHT raw dht u ip (Table dht ip u)
432getTable = do
433 Node { tentativeNodeId = myId
434 , routingInfo = var
435 , options = opts } <- ask
436 let nil = nullTable myId (optBucketCount opts)
437 liftIO (maybe nil R.myBuckets <$> atomically (readTVar var))
439getSwarms :: Ord ip => DHT raw KMessageOf u ip [ (InfoHash, Int, Maybe ByteString) ]
440getSwarms = do
441 store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar
442 return $ P.knownSwarms store
444savePeerStore :: (Ord ip, Address ip) => DHT raw KMessageOf u ip ByteString
445savePeerStore = do
446 var <- asks (contactInfo . dhtData)
447 peers <- liftIO $ atomically $ readTVar var
448 return $ S.encode peers
450mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw KMessageOf u ip ()
451mergeSavedPeers bs = do
452 var <- asks (contactInfo . dhtData)
453 case S.decode bs of
454 Right newbies -> liftIO $ atomically $ modifyTVar' var (<> newbies)
455 Left _ -> return ()
458allPeers :: Ord ip => InfoHash -> DHT raw KMessageOf u ip [ PeerAddr ip ]
459allPeers ih = do
460 store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar
461 return $ P.lookup ih store
463-- | Find a set of closest nodes from routing table of this node. (in
464-- no particular order)
466-- This operation used for 'find_nodes' query.
468getClosest :: ( Eq ip
469 , Ord (NodeId dht)
470 , FiniteBits (NodeId dht)
471 , TableKey dht k ) =>
472 k -> DHT raw dht u ip [NodeInfo dht ip u]
473getClosest node = do
474 k <- asks (optK . options)
475 kclosest k node <$> getTable
477getClosest1 :: ( Eq ip
478 , Ord (NodeId dht)
479 , FiniteBits (NodeId dht)
480 , TableKey dht k
481 ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u])
482getClosest1 = do
483 k <- asks (optK . options)
484 nobkts <- asks (optBucketCount . options)
485 myid <- asks tentativeNodeId
486 var <- asks routingInfo
487 return $ \node -> do nfo <- atomically $ readTVar var
488 let tbl = maybe (nullTable myid nobkts) R.myBuckets nfo
489 return $ kclosest k node tbl
492-- Peer storage
495refreshContacts :: Ord ip => TVar (PeerStore ip) -> IO ()
496refreshContacts var =
497 -- TODO limit dht peer store in size (probably by removing oldest peers)
498 return ()
501-- | Insert peer to peer store. Used to handle announce requests.
502insertPeer :: Ord ip => TVar (PeerStore ip) -> InfoHash -> Maybe ByteString -> PeerAddr ip -> IO ()
503insertPeer var ih name addr = do
504 refreshContacts var
505 atomically $ modifyTVar' var (P.insertPeer ih name addr)
507-- | Get peer set for specific swarm.
508lookupPeers :: Ord ip => TVar (PeerStore ip) -> InfoHash -> IO [PeerAddr ip]
509lookupPeers var ih = do
510 refreshContacts var
511 tm <- getTimestamp
512 atomically $ do
513 (ps,store') <- P.freshPeers ih tm <$> readTVar var
514 writeTVar var store'
515 return ps
517getTimestamp :: IO Timestamp
518getTimestamp = do
519 utcTime <- getCurrentTime
520 -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime)))
521 return $ utcTimeToPOSIXSeconds utcTime
523#ifdef VERSION_bencoding
524-- | Prepare result for 'get_peers' query.
526-- This operation use 'getClosest' as failback so it may block.
528getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip)
529getPeerList ih = do
530 var <- asks (contactInfo . dhtData)
531 ps <- liftIO $ lookupPeers var ih
532 if L.null ps
533 then Left <$> getClosest ih
534 else return (Right ps)
536getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip))
537getPeerList1 = do
538 var <- asks (contactInfo . dhtData)
539 getclosest <- getClosest1
540 return $ \ih -> do
541 ps <- lookupPeers var ih
542 if L.null ps
543 then Left <$> getclosest ih
544 else return (Right ps)
547insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip ()
548insertTopic ih p = do
549 var <- asks announceInfo
550 liftIO $ atomically $ modifyTVar' var (S.insert (ih, p))
552deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip ()
553deleteTopic ih p = do
554 var <- asks announceInfo
555 liftIO $ atomically $ modifyTVar' var (S.delete (ih, p))
560-- Messaging
563-- | Failed queries are ignored.
564queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a]
565queryParallel queries = do
566 -- TODO: use alpha
567 -- alpha <- asks (optAlpha . options)
568 cleanup <$> mapConcurrently try queries
569 where
570 cleanup :: [Either QueryFailure a] -> [a]
571 cleanup = mapMaybe (either (const Nothing) Just)
diff --git a/src/Network/DHT.hs b/src/Network/DHT.hs
deleted file mode 100644
index 285cf9ff..00000000
--- a/src/Network/DHT.hs
+++ /dev/null
@@ -1,125 +0,0 @@
1{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
2module Network.DHT
3 ( -- makeTableKeeper
4 -- , TableKeeper(..)
5 module Network.DHT -- for now
6 , module Network.DHT.Types
7 ) where
9import Data.Bits
10import Data.Maybe
11import Data.Monoid
12import Network.Address
13import Network.DHT.Types
14import Network.DatagramServer.Types
15import Network.DHT.Routing
16import Control.Concurrent.STM
18import Control.Concurrent.Lifted.Instrument
20import GHC.Conc (labelThread)
21import Control.Concurrent.Lifted
23import Text.PrettyPrint as PP hiding ((<>), ($$))
24import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
26import Control.Monad
27import Data.Time.Clock (getCurrentTime)
28import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
30data TableKeeper msg ip u = TableKeeper
31 { routingInfo :: TVar (Maybe (Info msg ip u))
32 , grokNode :: NodeInfo msg ip u -> Maybe (ReflectedIP) -> IO ()
33 , grokAddress :: Maybe SockAddr -> ReflectedIP -> IO ()
34 }
36makeTableKeeper :: forall msg ip u.
37 ( Address ip
38 , Show u
39 , Show (NodeId msg)
40 , Ord (NodeId msg)
41 , FiniteBits (NodeId msg)
42 ) => TableParameters msg ip u -> IO (TableKeeper msg ip u)
43makeTableKeeper param@TableParameters{..} = do
44 error "TODO makeTableKeeper" -- kick off table-updating thread
45 ri <- atomically (newTVar Nothing)
46 let tk = TableKeeper{ routingInfo = ri
47 , grokNode = insertNode param tk
48 , grokAddress = error "todo"
49 }
50 return tk
52atomicInsert :: ( Eq ip, Address ip, Ord (NodeId msg), FiniteBits (NodeId msg)
53 ) => TableParameters msg ip u -> TableKeeper msg ip u -> Timestamp -> Event msg ip u -> Maybe ReflectedIP -> STM (IO [CheckPing msg ip u])
54atomicInsert param@TableParameters{..} state tm arrival witnessed_ip = do
55 minfo <- readTVar (routingInfo state)
56 case minfo of
57 Just inf -> do
58 (ps,t') <- insert tm arrival $ myBuckets inf
59 writeTVar (routingInfo state) $ Just $ inf { myBuckets = t' }
60 return $ do
61 case witnessed_ip of
62 Just (ReflectedIP ip)
63 | ip /= myAddress inf
64 -> logMessage 'I' $ unwords
65 $ [ "Possible NAT?"
66 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
67 , "reports my address:"
68 , show ip ]
69 -- TODO: Let routing table vote on my IP/NodeId.
70 _ -> return ()
71 return ps
72 Nothing ->
73 let dropped = return $ do
74 -- Ignore non-witnessing nodes until somebody tells
75 -- us our ip address.
76 logMessage 'W' ("Dropped " ++ show (toSockAddr $ nodeAddr $ foreignNode arrival))
77 return []
78 in fromMaybe dropped $ do
79 ReflectedIP ip <- witnessed_ip
80 let nil = nullTable (adjustID ip arrival) maxBuckets
81 return $ do
82 (ps,t') <- insert tm arrival nil
83 let new_info = Info t' (adjustID ip arrival) ip
84 writeTVar (routingInfo state) $ Just new_info
85 return $ do
86 logMessage 'I' $ unwords
87 [ "External IP address:"
88 , show ip
89 , "(reported by"
90 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
91 ++ ")"
92 ]
93 return ps
95-- | This operation do not block but acquire exclusive access to
96-- routing table.
97insertNode :: forall msg ip u.
98 ( Address ip
99 , Show u
100 , Show (NodeId msg)
101 , Ord (NodeId msg)
102 , FiniteBits (NodeId msg)
103 ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO ()
104insertNode param@TableParameters{..} state info witnessed_ip0 = do
105 tm <- utcTimeToPOSIXSeconds <$> getCurrentTime -- Network.DHT.Routing.TimeStamp = POSIXTime
106 let showTable = do
107 t <- atomically $ fmap myBuckets <$> readTVar (routingInfo state)
108 let logMsg = "Routing table: " <> pPrint t
109 logMessage 'D' (render logMsg)
110 let arrival = TryInsert info
111 logMessage 'D' $ show ( TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ )
112 ps <- join $ atomically $ atomicInsert param state tm arrival witnessed_ip0
113 showTable
114 _ <- fork $ do
115 myThreadId >>= flip labelThread "DHT.insertNode.pingResults"
116 forM_ ps $ \(CheckPing ns)-> do
117 forM_ ns $ \n -> do
118 (b,mip) <- pingProbe n
119 let alive = PingResult n b
120 logMessage 'D' $ "PingResult "++show (nodeId n,b)
121 _ <- join $ atomically $ atomicInsert param state tm alive mip
122 showTable
123 return ()
124 return ()
diff --git a/src/Network/DHT/Mainline.hs b/src/Network/DHT/Mainline.hs
deleted file mode 100644
index bdf9e8b2..00000000
--- a/src/Network/DHT/Mainline.hs
+++ /dev/null
@@ -1,579 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer :
5-- Stability : experimental
6-- Portability : portable
8-- This module provides message datatypes which is used for /Node to
9-- Node/ communication. Bittorrent DHT is based on Kademlia
10-- specification, but have a slightly different set of messages
11-- which have been adopted for /peer/ discovery mechanism. Messages
12-- are sent over "Network.KRPC" protocol, but normally you should
13-- use "Network.BitTorrent.DHT.Session" to send and receive
14-- messages.
16-- DHT queries are not /recursive/, they are /iterative/. This means
17-- that /querying/ node . While original specification (namely BEP5)
18-- do not impose any restrictions for /quered/ node behaviour, a
19-- good DHT implementation should follow some rules to guarantee
20-- that unlimit recursion will never happen. The following set of
21-- restrictions:
23-- * 'Ping' query must not trigger any message.
25-- * 'FindNode' query /may/ trigger 'Ping' query to check if a
26-- list of nodes to return is /good/. See
27-- 'Network.DHT.Routing.Routing' for further explanation.
29-- * 'GetPeers' query may trigger 'Ping' query for the same reason.
31-- * 'Announce' query must trigger 'Ping' query for the same reason.
33-- It is easy to see that the most long RPC chain is:
35-- @
36-- | | |
37-- Node_A | |
38-- | FindNode or GetPeers or Announce | |
39-- | ------------------------------------> Node_B |
40-- | | Ping |
41-- | | -----------> |
42-- | | Node_C
43-- | | Pong |
44-- | NodeFound or GotPeers or Announced | <----------- |
45-- | <------------------------------------- Node_B |
46-- Node_A | |
47-- | | |
48-- @
50-- where in some cases 'Node_C' is 'Node_A'.
52-- For more info see:
53-- <>
55-- For Kamelia messages see original Kademlia paper:
56-- <>
58{-# LANGUAGE CPP #-}
59{-# LANGUAGE StandaloneDeriving #-}
60{-# LANGUAGE GeneralizedNewtypeDeriving #-}
61{-# LANGUAGE DeriveDataTypeable #-}
62{-# LANGUAGE FlexibleInstances #-}
63{-# LANGUAGE MultiParamTypeClasses #-}
64{-# LANGUAGE UndecidableInstances #-}
65{-# LANGUAGE ScopedTypeVariables #-}
66{-# LANGUAGE TypeFamilies #-}
67module Network.DHT.Mainline
68 ( -- * Envelopes
69 Query (..)
70 , Response (..)
72 -- * Queries
73 -- ** ping
74 , Ping (..)
76 -- ** find_node
77 , FindNode (..)
78 , NodeFound (..)
79 , bep42s
80 -- , bep42
83#ifdef VERSION_bencoding
84 -- ** get_peers
85 , PeerList
86 , GetPeers (..)
87 , GotPeers (..)
89 -- ** announce_peer
90 , Announce (..)
91 , Announced (..)
93 , DHTData(..)
94 , SessionTokens(..)
95 , grantToken
96 , checkToken
97 ) where
99import Data.String
100import Control.Applicative
101import Data.Bool
102#ifdef VERSION_bencoding
103import Data.BEncode as BE
104import Data.BEncode.BDict as BDict hiding (map)
106import qualified Network.DatagramServer.Tox as Tox
107import Network.DatagramServer.Tox (NodeId)
108import Data.Word
109import Control.Monad
111import Network.KRPC.Method
112import Network.Address hiding (NodeId)
113import Data.Bits
114import Data.ByteString (ByteString)
115import qualified Data.ByteString as BS
116import Data.Digest.CRC32C
117import Data.List as L
118import Data.Monoid
119import Data.Serialize as S
120import Data.Typeable
121import Data.Word
122import Network
123import Network.DatagramServer
124import Network.DatagramServer.Mainline
125import Data.Maybe
127import Data.Torrent (InfoHash)
128import Network.BitTorrent.DHT.Token as T
129import Network.BitTorrent.DHT.ContactInfo
130#ifdef VERSION_bencoding
131import Network.DatagramServer ()
133import Network.DatagramServer.Types hiding (Query,Response)
134import Network.DHT.Types
135import Network.DHT.Routing
136import Data.Time
137import Control.Concurrent.STM
138import System.Random
139import Data.Hashable
143-- envelopes
146#ifndef VERSION_bencoding
147type BKey = ByteString
150node_id_key :: BKey
151node_id_key = "id"
153read_only_key :: BKey
154read_only_key = "ro"
157#ifdef VERSION_bencoding
158instance BEncode a => BEncode (Query KMessageOf a) where
159 toBEncode Query {..} = toDict $
160 BDict.union ( node_id_key .=! queringNodeId queryExtra
161 .: read_only_key .=? bool Nothing (Just (1 :: Integer)) (queryIsReadOnly queryExtra)
162 .: endDict)
163 (dict (toBEncode queryParams))
164 where
165 dict (BDict d) = d
166 dict _ = error "impossible: instance BEncode (Query a)"
168 fromBEncode v =
169 Query <$> (MainlineQuery <$> fromDict (field (req node_id_key)) v
170 <*> fromDict (fromMaybe False <$>? read_only_key) v)
171 <*> fromBEncode v
173data Query a = Query a
176#ifdef VERSION_bencoding
177instance BEncode a => BEncode (Response KMessageOf a) where
178 toBEncode = toBEncode . toQuery
179 where
180 toQuery (Response (MainlineResponse nid) a) = Query (MainlineQuery nid False) a
182 fromBEncode b = fromQuery <$> fromBEncode b
183 where
184 fromQuery (Query (MainlineQuery nid _) a) = Response (MainlineResponse nid) a
186data Response KMessageOf a = Response KMessageOf a
190-- ping method
193-- / The most basic query is a ping. Ping query is used to check if a
194-- quered node is still alive.
195-- data Ping = Ping Tox.Nonce8 deriving (Show, Eq, Typeable)
197#ifdef VERSION_bencoding
198instance BEncode (Ping KMessageOf) where
199 toBEncode Ping = toDict endDict
200 fromBEncode _ = pure Ping
202instance Serialize (Query (Ping KMessageOf)) where
203 get = do
204 b <- get
205 when ( (b::Word8) /= 0) $ fail "Bad ping request"
206 nonce <- get
207 return $ Query (Ping nonce)
208 put (Query (Ping nonce)) = do
209 put (0 :: Word8)
210 put nonce
211instance Serialize (Response Ping) where
212 get = do
213 b <- get
214 when ( (b::Word8) /= 1) $ fail "Bad ping response"
215 nonce <- get
216 return $ Response (Ping nonce)
217 put (Response (Ping nonce)) = do
218 put (1 :: Word8)
219 put nonce
222-- | \"q\" = \"ping\"
223instance KRPC KMessageOf (Query KMessageOf (Ping KMessageOf)) (Response KMessageOf (Ping KMessageOf)) where
224 method = "ping"
225 makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43)
226 makeResponseExtra _ nid _ _ = return $ MainlineResponse nid
228 -- TODO KError Sender/Responder
229 messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q
230 messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r
233-- find_node method
236-- / Find node is used to find the contact information for a node
237-- given its ID.
238-- data FindNode KMessageOf ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes
239 -- deriving (Show, Eq, Typeable)
241target_key :: BKey
242target_key = "target"
244#ifdef VERSION_bencoding
245instance Typeable ip => BEncode (FindNode KMessageOf ip) where
246 toBEncode (FindNode nid) = toDict $ target_key .=! nid .: endDict
247 fromBEncode = fromDict $ FindNode <$>! target_key
249instance Serialize (Query KMessageOf (FindNode KMessageOf ip)) where
250 get = do
251 nid <- get
252 nonce <- get
253 return $ Query (FindNode nid nonce)
254 put (Query (FindNode nid nonce)) = do
255 put nid
256 put nonce
259-- | When a node receives a 'FindNode' query, it should respond with a
260-- the compact node info for the target node or the K (8) closest good
261-- nodes in its own routing table.
263#ifdef VERSION_bencoding
264-- newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()] deriving (Show, Eq, Typeable)
266data NodeFound KMessageOf ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 deriving (Show, Eq, Typeable)
268-- Tox: send_nodes
270nodes_key :: BKey
271nodes_key = "nodes"
273-- Convert IPv4 address. Useful for using variadic IP type.
274from4 :: forall dht u s. Address s => NodeInfo dht IPv4 u -> Either String (NodeInfo dht s u)
275from4 n = maybe (Left "Error converting IPv4") Right
276 $ traverseAddress (fromAddr :: IPv4 -> Maybe s) n
278#ifdef VERSION_bencoding
279binary :: Serialize a => BKey -> BE.Get [a]
280binary k = field (req k) >>= either (fail . format) return .
281 runGet (many get)
282 where
283 format str = "fail to deserialize " ++ show k ++ " field: " ++ str
285instance Address ip => BEncode (NodeFound KMessageOf ip) where
286 toBEncode (NodeFound ns) = toDict $
287 nodes_key .=! runPut (mapM_ put ns)
288 .: endDict
290 -- TODO: handle IPv6 by reading the "nodes6" key (see bep 32)
291 fromBEncode bval = NodeFound <$> (traverse from4 =<< fromDict (binary nodes_key) bval)
293instance Serialize (Response KMessageOf (NodeFound KMessageOf ip)) where
294 get = do
295 count <- get :: Get Word8
296 nodes <- sequence $ replicate (fromIntegral count) get
