summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-01-05 12:56:08 -0500
committerjoe <joe@jerkface.net>2017-01-05 13:44:31 -0500
commitc02531ba4555e205d59672e6cd9b65a6c8931506 (patch)
treee11476cb58e5b268945a9b5690d779e79db47b67 /src/Network
parent5e2f43d967aa2d07368b7d5552f65a69b3979ab5 (diff)
More aggressive bootstrap algorithm.
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/DHT.hs66
1 files changed, 52 insertions, 14 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs
index dbedf801..a5b4612f 100644
--- a/src/Network/BitTorrent/DHT.hs
+++ b/src/Network/BitTorrent/DHT.hs
@@ -57,17 +57,21 @@ import Control.Applicative
57import Control.Monad.Logger 57import Control.Monad.Logger
58import Control.Monad.Reader 58import Control.Monad.Reader
59import Control.Exception 59import Control.Exception
60import Data.ByteString as BS 60import qualified Data.ByteString as BS
61import Data.Conduit as C 61import Data.Conduit as C
62import Data.Conduit.List as C 62import qualified Data.Conduit.List as C
63import Data.Serialize 63import Data.Serialize
64import Network.Socket 64import Network.Socket
65import Text.PrettyPrint.HughesPJClass as PP (pPrint,render)
65 66
66import Data.Torrent 67import Data.Torrent
67import Network.BitTorrent.Address 68import Network.BitTorrent.Address
68import Network.BitTorrent.DHT.Query 69import Network.BitTorrent.DHT.Query
69import Network.BitTorrent.DHT.Session 70import Network.BitTorrent.DHT.Session
70import Network.BitTorrent.DHT.Routing as T 71import Network.BitTorrent.DHT.Routing as T hiding (null)
72import qualified Data.Text as Text
73import Data.Monoid
74
71 75
72{----------------------------------------------------------------------- 76{-----------------------------------------------------------------------
73-- DHT types 77-- DHT types
@@ -156,15 +160,50 @@ resolveHostName NodeAddr {..} = do
156-- 160--
157-- This operation do block, use 161-- This operation do block, use
158-- 'Control.Concurrent.Async.Lifted.async' if needed. 162-- 'Control.Concurrent.Async.Lifted.async' if needed.
159--
160bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () 163bootstrap :: Address ip => [NodeAddr ip] -> DHT ip ()
161bootstrap startNodes = do 164bootstrap startNodes = do
162 $(logInfoS) "bootstrap" "Start node bootstrapping" 165 $(logInfoS) "bootstrap" "Start node bootstrapping"
163 nid <- asks thisNodeId 166 nid <- asks thisNodeId
164 -- TODO filter duplicated in startNodes list 167 let searchAll aliveNodes
165 -- TODO retransmissions for startNodes 168 = C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume
166 aliveNodes <- queryParallel (pingQ <$> startNodes) 169 -- Step 1: Use iterative searches to flesh out the table..
167 _ <- sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume 170 do knowns <- map (map $ nodeAddr . fst) . T.toList <$> getTable
171 alive_knowns <- queryParallel (pingQ <$> concat knowns)
172 nss <- searchAll alive_knowns
173 -- We only use the supplied bootstrap nodes when we don't know of any
174 -- others to try.
175 when (null nss) $ do
176 -- TODO filter duplicated in startNodes list
177 -- TODO retransmissions for startNodes
178 aliveNodes <- queryParallel (pingQ <$> startNodes)
179 _ <- searchAll aliveNodes
180 return ()
181 -- Step 2: Repeatedly refresh incomplete buckets until the table is full.
182 maxbuckets <- asks $ optBucketCount . options
183 flip fix 0 $ \loop icnt -> do
184 tbl <- getTable
185 let unfull = filter ((/=defaultBucketSize) . snd)
186 us = zip
187 -- is_last = True for the last bucket
188 (True:repeat False)
189 -- Only non-full buckets unless it is the last one and the
190 -- maximum number of buckets has not been reached.
191 $ case reverse $ zip [0..] $ T.shape tbl of
192 p@(n,_):ps | n+1==maxbuckets -> unfull (p:ps)
193 p:ps -> p:unfull ps
194 [] -> []
195 forM_ us $ \(is_last,(index,_)) -> do
196 sample <- liftIO $ genBucketSample nid (bucketRange index is_last)
197 $(logDebugS) "bootstrapping"
198 $ "BOOTSTRAP sample"
199 <> Text.pack (show (is_last,index,T.shape tbl))
200 <> " " <> Text.pack (render $ pPrint sample)
201 refreshNodes sample
202 $(logDebugS) "bootstrapping"
203 $ "BOOTSTRAP finished iteration "
204 <> Text.pack (show (icnt,T.shape tbl,us,defaultBucketSize))
205 when (not (null us) && icnt < div (3*maxbuckets) 2)
206 $ loop (succ icnt)
168 $(logInfoS) "bootstrap" "Node bootstrapping finished" 207 $(logInfoS) "bootstrap" "Node bootstrapping finished"
169 208
170-- | Check if this node is already bootstrapped. 209-- | Check if this node is already bootstrapped.
@@ -183,18 +222,17 @@ isBootstrapped = T.full <$> getTable
183-- 222--
184-- This is blocking operation, use 223-- This is blocking operation, use
185-- 'Control.Concurrent.Async.Lifted.async' if needed. 224-- 'Control.Concurrent.Async.Lifted.async' if needed.
186restore :: Address ip => ByteString -> DHT ip () 225restore :: Address ip => BS.ByteString -> DHT ip ()
187restore bs = do 226restore bs = do
188 tblvar <- asks routingTable
189 case decode bs of 227 case decode bs of
190 Right tbl -> restoreTable tbl 228 Right tbl -> restoreTable tbl
191 Left _ -> return () 229 Left e -> $(logWarnS) "restore" (Text.pack e)
192 230
193-- | Serialize current DHT session to byte string. 231-- | Serialize current DHT session to byte string.
194-- 232--
195-- This is blocking operation, use 233-- This is blocking operation, use
196-- 'Control.Concurrent.Async.Lifted.async' if needed. 234-- 'Control.Concurrent.Async.Lifted.async' if needed.
197snapshot :: Address ip => DHT ip ByteString 235snapshot :: Address ip => DHT ip BS.ByteString
198snapshot = do 236snapshot = do
199 tbl <- getTable 237 tbl <- getTable
200 return $ encode tbl 238 return $ encode tbl
@@ -207,10 +245,10 @@ snapshot = do
207-- 245--
208-- This operation is incremental and do block. 246-- This operation is incremental and do block.
209-- 247--
210lookup :: Address ip => InfoHash -> DHT ip `Source` [PeerAddr ip] 248lookup :: Address ip => InfoHash -> DHT ip `C.Source` [PeerAddr ip]
211lookup topic = do -- TODO retry getClosest if bucket is empty 249lookup topic = do -- TODO retry getClosest if bucket is empty
212 closest <- lift $ getClosest topic 250 closest <- lift $ getClosest topic
213 sourceList [closest] $= search topic (getPeersQ topic) 251 C.sourceList [closest] $= search topic (getPeersQ topic)
214 252
215-- TODO do not republish if the topic is already in announceSet 253-- TODO do not republish if the topic is already in announceSet
216 254