diff options
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 66 |
1 files changed, 52 insertions, 14 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index dbedf801..a5b4612f 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs | |||
@@ -57,17 +57,21 @@ import Control.Applicative | |||
57 | import Control.Monad.Logger | 57 | import Control.Monad.Logger |
58 | import Control.Monad.Reader | 58 | import Control.Monad.Reader |
59 | import Control.Exception | 59 | import Control.Exception |
60 | import Data.ByteString as BS | 60 | import qualified Data.ByteString as BS |
61 | import Data.Conduit as C | 61 | import Data.Conduit as C |
62 | import Data.Conduit.List as C | 62 | import qualified Data.Conduit.List as C |
63 | import Data.Serialize | 63 | import Data.Serialize |
64 | import Network.Socket | 64 | import Network.Socket |
65 | import Text.PrettyPrint.HughesPJClass as PP (pPrint,render) | ||
65 | 66 | ||
66 | import Data.Torrent | 67 | import Data.Torrent |
67 | import Network.BitTorrent.Address | 68 | import Network.BitTorrent.Address |
68 | import Network.BitTorrent.DHT.Query | 69 | import Network.BitTorrent.DHT.Query |
69 | import Network.BitTorrent.DHT.Session | 70 | import Network.BitTorrent.DHT.Session |
70 | import Network.BitTorrent.DHT.Routing as T | 71 | import Network.BitTorrent.DHT.Routing as T hiding (null) |
72 | import qualified Data.Text as Text | ||
73 | import Data.Monoid | ||
74 | |||
71 | 75 | ||
72 | {----------------------------------------------------------------------- | 76 | {----------------------------------------------------------------------- |
73 | -- DHT types | 77 | -- DHT types |
@@ -156,15 +160,50 @@ resolveHostName NodeAddr {..} = do | |||
156 | -- | 160 | -- |
157 | -- This operation do block, use | 161 | -- This operation do block, use |
158 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | 162 | -- 'Control.Concurrent.Async.Lifted.async' if needed. |
159 | -- | ||
160 | bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () | 163 | bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () |
161 | bootstrap startNodes = do | 164 | bootstrap startNodes = do |
162 | $(logInfoS) "bootstrap" "Start node bootstrapping" | 165 | $(logInfoS) "bootstrap" "Start node bootstrapping" |
163 | nid <- asks thisNodeId | 166 | nid <- asks thisNodeId |
164 | -- TODO filter duplicated in startNodes list | 167 | let searchAll aliveNodes |
165 | -- TODO retransmissions for startNodes | 168 | = C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume |
166 | aliveNodes <- queryParallel (pingQ <$> startNodes) | 169 | -- Step 1: Use iterative searches to flesh out the table.. |
167 | _ <- sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume | 170 | do knowns <- map (map $ nodeAddr . fst) . T.toList <$> getTable |
171 | alive_knowns <- queryParallel (pingQ <$> concat knowns) | ||
172 | nss <- searchAll alive_knowns | ||
173 | -- We only use the supplied bootstrap nodes when we don't know of any | ||
174 | -- others to try. | ||
175 | when (null nss) $ do | ||
176 | -- TODO filter duplicated in startNodes list | ||
177 | -- TODO retransmissions for startNodes | ||
178 | aliveNodes <- queryParallel (pingQ <$> startNodes) | ||
179 | _ <- searchAll aliveNodes | ||
180 | return () | ||
181 | -- Step 2: Repeatedly refresh incomplete buckets until the table is full. | ||
182 | maxbuckets <- asks $ optBucketCount . options | ||
183 | flip fix 0 $ \loop icnt -> do | ||
184 | tbl <- getTable | ||
185 | let unfull = filter ((/=defaultBucketSize) . snd) | ||
186 | us = zip | ||
187 | -- is_last = True for the last bucket | ||
188 | (True:repeat False) | ||
189 | -- Only non-full buckets unless it is the last one and the | ||
190 | -- maximum number of buckets has not been reached. | ||
191 | $ case reverse $ zip [0..] $ T.shape tbl of | ||
192 | p@(n,_):ps | n+1==maxbuckets -> unfull (p:ps) | ||
193 | p:ps -> p:unfull ps | ||
194 | [] -> [] | ||
195 | forM_ us $ \(is_last,(index,_)) -> do | ||
196 | sample <- liftIO $ genBucketSample nid (bucketRange index is_last) | ||
197 | $(logDebugS) "bootstrapping" | ||
198 | $ "BOOTSTRAP sample" | ||
199 | <> Text.pack (show (is_last,index,T.shape tbl)) | ||
200 | <> " " <> Text.pack (render $ pPrint sample) | ||
201 | refreshNodes sample | ||
202 | $(logDebugS) "bootstrapping" | ||
203 | $ "BOOTSTRAP finished iteration " | ||
204 | <> Text.pack (show (icnt,T.shape tbl,us,defaultBucketSize)) | ||
205 | when (not (null us) && icnt < div (3*maxbuckets) 2) | ||
206 | $ loop (succ icnt) | ||
168 | $(logInfoS) "bootstrap" "Node bootstrapping finished" | 207 | $(logInfoS) "bootstrap" "Node bootstrapping finished" |
169 | 208 | ||
170 | -- | Check if this node is already bootstrapped. | 209 | -- | Check if this node is already bootstrapped. |
@@ -183,18 +222,17 @@ isBootstrapped = T.full <$> getTable | |||
183 | -- | 222 | -- |
184 | -- This is blocking operation, use | 223 | -- This is blocking operation, use |
185 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | 224 | -- 'Control.Concurrent.Async.Lifted.async' if needed. |
186 | restore :: Address ip => ByteString -> DHT ip () | 225 | restore :: Address ip => BS.ByteString -> DHT ip () |
187 | restore bs = do | 226 | restore bs = do |
188 | tblvar <- asks routingTable | ||
189 | case decode bs of | 227 | case decode bs of |
190 | Right tbl -> restoreTable tbl | 228 | Right tbl -> restoreTable tbl |
191 | Left _ -> return () | 229 | Left e -> $(logWarnS) "restore" (Text.pack e) |
192 | 230 | ||
193 | -- | Serialize current DHT session to byte string. | 231 | -- | Serialize current DHT session to byte string. |
194 | -- | 232 | -- |
195 | -- This is blocking operation, use | 233 | -- This is blocking operation, use |
196 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | 234 | -- 'Control.Concurrent.Async.Lifted.async' if needed. |
197 | snapshot :: Address ip => DHT ip ByteString | 235 | snapshot :: Address ip => DHT ip BS.ByteString |
198 | snapshot = do | 236 | snapshot = do |
199 | tbl <- getTable | 237 | tbl <- getTable |
200 | return $ encode tbl | 238 | return $ encode tbl |
@@ -207,10 +245,10 @@ snapshot = do | |||
207 | -- | 245 | -- |
208 | -- This operation is incremental and do block. | 246 | -- This operation is incremental and do block. |
209 | -- | 247 | -- |
210 | lookup :: Address ip => InfoHash -> DHT ip `Source` [PeerAddr ip] | 248 | lookup :: Address ip => InfoHash -> DHT ip `C.Source` [PeerAddr ip] |
211 | lookup topic = do -- TODO retry getClosest if bucket is empty | 249 | lookup topic = do -- TODO retry getClosest if bucket is empty |
212 | closest <- lift $ getClosest topic | 250 | closest <- lift $ getClosest topic |
213 | sourceList [closest] $= search topic (getPeersQ topic) | 251 | C.sourceList [closest] $= search topic (getPeersQ topic) |
214 | 252 | ||
215 | -- TODO do not republish if the topic is already in announceSet | 253 | -- TODO do not republish if the topic is already in announceSet |
216 | 254 | ||