1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
|
-- |
-- Copyright : (c) Sam Truzjan 2013
-- License : BSD3
-- Maintainer : pxqr.sta@gmail.com
-- Stability : experimental
-- Portability : portable
--
-- BitTorrent uses a \"distributed sloppy hash table\" (DHT) for
-- storing peer contact information for \"trackerless\" torrents. In
-- effect, each peer becomes a tracker.
--
-- Normally you don't need to import other DHT modules.
--
-- For more info see:
-- <http://www.bittorrent.org/beps/bep_0005.html>
--
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.BitTorrent.DHT
( -- * Distributed Hash Table
DHT
, Options (..)
, fullLogging
, dht
-- * Bootstrapping
-- $bootstrapping-terms
, tNodes
, defaultBootstrapNodes
, resolveHostName
, bootstrap
, isBootstrapped
-- * Initialization
, snapshot
-- * Operations
, Network.BitTorrent.DHT.lookup
, Network.BitTorrent.DHT.insert
, Network.BitTorrent.DHT.delete
-- * Embedding
-- ** Session
, LogFun
, Node
, defaultHandlers
, newNode
, closeNode
-- ** Monad
, MonadDHT (..)
, runDHT
) where
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Exception
import qualified Data.ByteString as BS
import Data.Conduit as C
import qualified Data.Conduit.List as C
import Data.Serialize
import Network.Socket
import Text.PrettyPrint.HughesPJClass as PP (pPrint,render)
import Data.Torrent
import Network.BitTorrent.Address
import Network.BitTorrent.DHT.Query
import Network.BitTorrent.DHT.Session
import Network.BitTorrent.DHT.Routing as T hiding (null)
import qualified Data.Text as Text
import Data.Monoid
import Network.KRPC.Message (KMessageOf)
{-----------------------------------------------------------------------
-- DHT types
-----------------------------------------------------------------------}
class MonadDHT m where
liftDHT :: DHT IPv4 a -> m a
instance MonadDHT (DHT IPv4) where
liftDHT = id
-- | Convenience method. Pass this to 'dht' to enable full logging.
fullLogging :: LogSource -> LogLevel -> Bool
fullLogging _ _ = True
-- | Run DHT on specified port. <add note about resources>
dht :: (Ord ip, Address ip)
=> Options -- ^ normally you need to use 'Data.Default.def';
-> NodeAddr ip -- ^ address to bind this node;
-> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default
-> DHT ip a -- ^ actions to run: 'bootstrap', 'lookup', etc;
-> IO a -- ^ result.
dht opts addr logfilter action = do
runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do
bracket (newNode defaultHandlers opts addr logger Nothing) closeNode $
\ node -> runDHT node action
{-# INLINE dht #-}
{-----------------------------------------------------------------------
-- Bootstrapping
-----------------------------------------------------------------------}
-- $bootstrapping-terms
--
-- [@Bootstrapping@] DHT @bootstrapping@ is the process of filling
-- routing 'Table' by /good/ nodes.
--
-- [@Bootstrapping time@] Bootstrapping process can take up to 5
-- minutes. Bootstrapping should only happen at first application
-- startup, if possible you should use 'snapshot' & 'restore'
-- mechanism which must work faster.
--
-- [@Bootstrap nodes@] DHT @bootstrap node@ is either:
--
-- * a specialized high performance node maintained by bittorrent
-- software authors\/maintainers, like those listed in
-- 'defaultBootstrapNodes'. /Specialized/ means that those nodes
-- may not support 'insert' queries and is running for the sake of
-- bootstrapping only.
--
-- * an ordinary bittorrent client running DHT node. The list of
-- such bootstrapping nodes usually obtained from
-- 'Data.Torrent.tNodes' field or
-- 'Network.BitTorrent.Exchange.Message.Port' messages.
-- Do not include the following hosts in the default bootstrap nodes list:
--
-- * "dht.aelitis.com" and "dht6.azureusplatform.com" - since
-- Azureus client have a different (and probably incompatible) DHT
-- protocol implementation.
--
-- * "router.utorrent.com" since it is just an alias to
-- "router.bittorrent.com".
-- XXX: ignoring this advise as it resolves to a different
-- ip address for me.
-- | List of bootstrap nodes maintained by different bittorrent
-- software authors.
defaultBootstrapNodes :: [NodeAddr HostName]
defaultBootstrapNodes =
[ NodeAddr "router.bittorrent.com" 6881 -- by BitTorrent Inc.
-- doesn't work at the moment (use git blame) of commit
, NodeAddr "dht.transmissionbt.com" 6881 -- by Transmission project
, NodeAddr "router.utorrent.com" 6881
]
-- TODO Multihomed hosts
-- | Resolve either a numeric network address or a hostname to a
-- numeric IP address of the node. Usually used to resolve
-- 'defaultBootstrapNodes' or 'Data.Torrent.tNodes' lists.
resolveHostName :: NodeAddr HostName -> IO (NodeAddr IPv4)
resolveHostName NodeAddr {..} = do
let hints = defaultHints { addrFamily = AF_INET, addrSocketType = Datagram }
-- getAddrInfo throws exception on empty list, so the pattern matching never fail
info : _ <- getAddrInfo (Just hints) (Just nodeHost) (Just (show nodePort))
case fromSockAddr (addrAddress info) of
Nothing -> error "resolveNodeAddr: impossible"
Just addr -> return addr
-- | One good node may be sufficient.
--
-- This operation do block, use
-- 'Control.Concurrent.Async.Lifted.async' if needed.
bootstrap :: forall ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT ip ()
bootstrap mbs startNodes = do
restored <-
case decode <$> mbs of
Just (Right tbl) -> return (T.toList tbl)
Just (Left e) -> do $(logWarnS) "restore" (Text.pack e)
return []
Nothing -> return []
$(logInfoS) "bootstrap" "Start node bootstrapping"
let searchAll aliveNodes = do
nid <- myNodeIdAccordingTo (error "FIXME")
nss <- C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume
return ( nss :: [[NodeInfo KMessageOf ip ()]] )
input_nodes <- (restored ++) . T.toList <$> getTable
-- Step 1: Use iterative searches to flesh out the table..
do let knowns = map (map $ nodeAddr . fst) input_nodes
-- Below, we reverse the nodes since the table serialization puts the
-- nearest nodes last and we want to choose a similar node id to bootstrap
-- faster.
(alive_knowns,_) <- unzip <$> queryParallel (pingQ <$> reverse (concat knowns))
b <- isBootstrapped
-- If our cached nodes are alive and our IP address did not change, it's possible
-- we are already bootsrapped, so no need to do any searches.
when (not b) $ do
nss <- searchAll $ take 2 alive_knowns
-- We only use the supplied bootstrap nodes when we don't know of any
-- others to try.
when (null nss) $ do
-- TODO filter duplicated in startNodes list
-- TODO retransmissions for startNodes
(aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes)
_ <- searchAll $ take 2 aliveNodes
return ()
-- Step 2: Repeatedly refresh incomplete buckets until the table is full.
maxbuckets <- asks $ optBucketCount . options
flip fix 0 $ \loop icnt -> do
tbl <- getTable
let unfull = filter ((/=defaultBucketSize) . snd)
us = zip
-- is_last = True for the last bucket
(True:repeat False)
-- Only non-full buckets unless it is the last one and the
-- maximum number of buckets has not been reached.
$ case reverse $ zip [0..] $ T.shape tbl of
p@(n,_):ps | n+1==maxbuckets -> unfull (p:ps)
p:ps -> p:unfull ps
[] -> []
forM_ us $ \(is_last,(index,_)) -> do
nid <- myNodeIdAccordingTo (error "FIXME")
sample <- liftIO $ genBucketSample nid (bucketRange index is_last)
$(logDebugS) "bootstrapping"
$ "BOOTSTRAP sample"
<> Text.pack (show (is_last,index,T.shape tbl))
<> " " <> Text.pack (render $ pPrint sample)
refreshNodes sample
$(logDebugS) "bootstrapping"
$ "BOOTSTRAP finished iteration "
<> Text.pack (show (icnt,T.shape tbl,us,defaultBucketSize))
when (not (null us) && icnt < div (3*maxbuckets) 2)
$ loop (succ icnt)
$(logInfoS) "bootstrap" "Node bootstrapping finished"
-- | Check if this node is already bootstrapped.
-- @bootstrap [good_node] >> isBootstrapped@@ should always return 'True'.
--
-- This operation do not block.
--
isBootstrapped :: Eq ip => DHT ip Bool
isBootstrapped = T.full <$> getTable
{-----------------------------------------------------------------------
-- Initialization
-----------------------------------------------------------------------}
-- | Serialize current DHT session to byte string.
--
-- This is blocking operation, use
-- 'Control.Concurrent.Async.Lifted.async' if needed.
snapshot :: Address ip => DHT ip BS.ByteString
snapshot = do
tbl <- getTable
return $ encode tbl
{-----------------------------------------------------------------------
-- Operations
-----------------------------------------------------------------------}
-- | Get list of peers which downloading this torrent.
--
-- This operation is incremental and do block.
--
lookup :: Address ip => InfoHash -> DHT ip `C.Source` [PeerAddr ip]
lookup topic = do -- TODO retry getClosest if bucket is empty
closest <- lift $ getClosest topic
C.sourceList [closest] $= search topic (getPeersQ topic)
-- TODO do not republish if the topic is already in announceSet
-- | Announce that /this/ peer may have some pieces of the specified
-- torrent. DHT will reannounce this data periodically using
-- 'optReannounce' interval.
--
-- This operation is synchronous and do block, use
-- 'Control.Concurrent.Async.Lifted.async' if needed.
--
insert :: Address ip => InfoHash -> PortNumber -> DHT ip ()
insert ih p = do
publish ih p
insertTopic ih p
-- | Stop announcing /this/ peer for the specified torrent.
--
-- This operation is atomic and may block for a while.
--
delete :: InfoHash -> PortNumber -> DHT ip ()
delete = deleteTopic
{-# INLINE delete #-}
|