summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/DHT')
-rw-r--r--src/Network/BitTorrent/DHT/ContactInfo.hs1
-rw-r--r--src/Network/BitTorrent/DHT/Message.hs4
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs34
-rw-r--r--src/Network/BitTorrent/DHT/Routing.hs53
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs15
-rw-r--r--src/Network/BitTorrent/DHT/Token.hs3
6 files changed, 37 insertions, 73 deletions
diff --git a/src/Network/BitTorrent/DHT/ContactInfo.hs b/src/Network/BitTorrent/DHT/ContactInfo.hs
index baa240b4..4293506d 100644
--- a/src/Network/BitTorrent/DHT/ContactInfo.hs
+++ b/src/Network/BitTorrent/DHT/ContactInfo.hs
@@ -7,7 +7,6 @@ module Network.BitTorrent.DHT.ContactInfo
7import Data.Default 7import Data.Default
8import Data.List as L 8import Data.List as L
9import Data.Maybe 9import Data.Maybe
10import Data.Monoid
11import Data.HashMap.Strict as HM 10import Data.HashMap.Strict as HM
12import Data.Serialize 11import Data.Serialize
13 12
diff --git a/src/Network/BitTorrent/DHT/Message.hs b/src/Network/BitTorrent/DHT/Message.hs
index b8f272c3..9d66741f 100644
--- a/src/Network/BitTorrent/DHT/Message.hs
+++ b/src/Network/BitTorrent/DHT/Message.hs
@@ -94,7 +94,7 @@ import Network
94import Network.KRPC 94import Network.KRPC
95import Data.Maybe 95import Data.Maybe
96 96
97import Data.Torrent 97import Data.Torrent (InfoHash)
98import Network.BitTorrent.Address 98import Network.BitTorrent.Address
99import Network.BitTorrent.DHT.Token 99import Network.BitTorrent.DHT.Token
100import Network.KRPC () 100import Network.KRPC ()
@@ -204,7 +204,7 @@ binary k = field (req k) >>= either (fail . format) return .
204 where 204 where
205 format str = "fail to deserialize " ++ show k ++ " field: " ++ str 205 format str = "fail to deserialize " ++ show k ++ " field: " ++ str
206 206
207instance (Typeable ip, Address ip) => BEncode (NodeFound ip) where 207instance Address ip => BEncode (NodeFound ip) where
208 toBEncode (NodeFound ns) = toDict $ 208 toBEncode (NodeFound ns) = toDict $
209 nodes_key .=! runPut (mapM_ put ns) 209 nodes_key .=! runPut (mapM_ put ns)
210 .: endDict 210 .: endDict
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index d1fa36e5..e067ab52 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -49,7 +49,6 @@ module Network.BitTorrent.DHT.Query
49 , (<@>) 49 , (<@>)
50 ) where 50 ) where
51 51
52import Control.Applicative
53import Control.Concurrent.Lifted hiding (yield) 52import Control.Concurrent.Lifted hiding (yield)
54import Control.Exception.Lifted hiding (Handler) 53import Control.Exception.Lifted hiding (Handler)
55import Control.Monad.Reader 54import Control.Monad.Reader
@@ -168,7 +167,7 @@ announceQ ih p NodeInfo {..} = do
168 Left ns 167 Left ns
169 | False -> undefined -- TODO check if we can announce 168 | False -> undefined -- TODO check if we can announce
170 | otherwise -> return (Left ns) 169 | otherwise -> return (Left ns)
171 Right ps -> do -- TODO *probably* add to peer cache 170 Right _ -> do -- TODO *probably* add to peer cache
172 Announced <- Announce False ih p grantedToken <@> nodeAddr 171 Announced <- Announce False ih p grantedToken <@> nodeAddr
173 return (Right [nodeAddr]) 172 return (Right [nodeAddr])
174 173
@@ -179,7 +178,7 @@ announceQ ih p NodeInfo {..} = do
179type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] 178type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
180 179
181-- TODO: use reorder and filter (Traversal option) leftovers 180-- TODO: use reorder and filter (Traversal option) leftovers
182search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o 181search :: k -> Iteration ip o -> Search ip o
183search _ action = do 182search _ action = do
184 awaitForever $ \ batch -> unless (L.null batch) $ do 183 awaitForever $ \ batch -> unless (L.null batch) $ do
185 $(logWarnS) "search" "start query" 184 $(logWarnS) "search" "start query"
@@ -196,11 +195,6 @@ publish ih p = do
196 _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r 195 _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
197 return () 196 return ()
198 197
199republish :: DHT ip ThreadId
200republish = fork $ do
201 i <- asks (optReannounce . options)
202 error "DHT.republish: not implemented"
203
204getTimestamp :: DHT ip Timestamp 198getTimestamp :: DHT ip Timestamp
205getTimestamp = do 199getTimestamp = do
206 utcTime <- liftIO $ getCurrentTime 200 utcTime <- liftIO $ getCurrentTime
@@ -229,7 +223,7 @@ refreshNodes nid = do
229 -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume 223 -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume
230 nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume 224 nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume
231 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." 225 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes."
232 queryParallel $ flip L.map (L.concat nss) $ \n -> do 226 _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do
233 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) 227 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n))
234 pingQ (nodeAddr n) 228 pingQ (nodeAddr n)
235 -- pingQ takes care of inserting the node. 229 -- pingQ takes care of inserting the node.
@@ -239,15 +233,14 @@ refreshNodes nid = do
239-- | This operation do not block but acquire exclusive access to 233-- | This operation do not block but acquire exclusive access to
240-- routing table. 234-- routing table.
241insertNode :: Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId 235insertNode :: Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId
242insertNode info witnessed_ip = fork $ do 236insertNode info witnessed_ip0 = fork $ do
243 var <- asks routingInfo 237 var <- asks routingInfo
244 tm <- getTimestamp 238 tm <- getTimestamp
245 let showTable = do 239 let showTable = do
246 t <- getTable 240 t <- getTable
247 let logMsg = "Routing table: " <> pPrint t 241 let logMsg = "Routing table: " <> pPrint t
248 $(logDebugS) "insertNode" (T.pack (render logMsg)) 242 $(logDebugS) "insertNode" (T.pack (render logMsg))
249 t <- liftIO $ atomically $ readTVar var 243 let arrival0 = TryInsert info
250 let arrival = TryInsert info
251 arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) 244 arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4)
252 $(logDebugS) "insertNode" $ T.pack (show arrival4) 245 $(logDebugS) "insertNode" $ T.pack (show arrival4)
253 maxbuckets <- asks (optBucketCount . options) 246 maxbuckets <- asks (optBucketCount . options)
@@ -259,13 +252,13 @@ insertNode info witnessed_ip = fork $ do
259 $ rank id (nodeId $ foreignNode arrival) 252 $ rank id (nodeId $ foreignNode arrival)
260 $ bep42s ip fallbackid 253 $ bep42s ip fallbackid
261 case minfo of 254 case minfo of
262 Just info -> do 255 Just inf -> do
263 (ps,t') <- R.insert tm arrival $ myBuckets info 256 (ps,t') <- R.insert tm arrival $ myBuckets inf
264 writeTVar var $ Just $ info { myBuckets = t' } 257 writeTVar var $ Just $ inf { myBuckets = t' }
265 return $ do 258 return $ do
266 case witnessed_ip of 259 case witnessed_ip of
267 Just (ReflectedIP ip0) 260 Just (ReflectedIP ip0)
268 | fromSockAddr ip0 /= Just (myAddress info) 261 | fromSockAddr ip0 /= Just (myAddress inf)
269 -> $(logInfo) ( T.pack $ L.unwords 262 -> $(logInfo) ( T.pack $ L.unwords
270 $ [ "Possible NAT?" 263 $ [ "Possible NAT?"
271 , show (toSockAddr $ nodeAddr $ foreignNode arrival) 264 , show (toSockAddr $ nodeAddr $ foreignNode arrival)
@@ -298,15 +291,14 @@ insertNode info witnessed_ip = fork $ do
298 <> ")" 291 <> ")"
299 ] ) 292 ] )
300 return ps 293 return ps
301 ps <- join $ liftIO $ atomically $ atomicInsert arrival witnessed_ip 294 ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0
302 showTable 295 showTable
303 fork $ forM_ ps $ \(CheckPing ns)-> do 296 _ <- fork $ forM_ ps $ \(CheckPing ns)-> do
304 forM_ ns $ \n -> do 297 forM_ ns $ \n -> do
305 (b,mip) <- probeNode (nodeAddr n) 298 (b,mip) <- probeNode (nodeAddr n)
306 let alive = PingResult n b 299 let alive = PingResult n b
307 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) 300 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b))
308 tm <- getTimestamp 301 _ <- join $ liftIO $ atomically $ atomicInsert alive mip
309 join $ liftIO $ atomically $ atomicInsert alive mip
310 showTable 302 showTable
311 return () 303 return ()
312 304
@@ -323,7 +315,7 @@ queryNode' addr q = do
323 (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) 315 (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q)
324 -- $(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) 316 -- $(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip)
325 -- <> " by " <> T.pack (show (toSockAddr addr)) 317 -- <> " by " <> T.pack (show (toSockAddr addr))
326 insertNode (NodeInfo remoteId addr) witnessed_ip 318 _ <- insertNode (NodeInfo remoteId addr) witnessed_ip
327 return (remoteId, r, witnessed_ip) 319 return (remoteId, r, witnessed_ip)
328 320
329-- | Infix version of 'queryNode' function. 321-- | Infix version of 'queryNode' function.
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs
index 84e4d4ce..f9d64eea 100644
--- a/src/Network/BitTorrent/DHT/Routing.hs
+++ b/src/Network/BitTorrent/DHT/Routing.hs
@@ -144,7 +144,7 @@ instance Alternative (Routing ip) where
144 Refresh n f <|> m = Refresh n (f <|> m) 144 Refresh n f <|> m = Refresh n (f <|> m)
145 145
146-- | Run routing table operation. 146-- | Run routing table operation.
147runRouting :: (Monad m, Eq ip) 147runRouting :: Monad m
148 => (NodeAddr ip -> m Bool) -- ^ ping the specific node; 148 => (NodeAddr ip -> m Bool) -- ^ ping the specific node;
149 -> (NodeId -> m ()) -- ^ refresh nodes; 149 -> (NodeId -> m ()) -- ^ refresh nodes;
150 -> m Timestamp -- ^ get current time; 150 -> m Timestamp -- ^ get current time;
@@ -166,18 +166,6 @@ runRouting ping_node find_nodes timestamper = go
166 find_nodes nid 166 find_nodes nid
167 go f 167 go f
168 168
169getTime :: Routing ip Timestamp
170getTime = GetTime return
171{-# INLINE getTime #-}
172
173needPing :: NodeAddr ip -> Routing ip Bool
174needPing addr = NeedPing addr return
175{-# INLINE needPing #-}
176
177refresh :: NodeId -> Routing ip ()
178refresh nid = Refresh nid (Done ())
179{-# INLINE refresh #-}
180
181{----------------------------------------------------------------------- 169{-----------------------------------------------------------------------
182 Bucket 170 Bucket
183-----------------------------------------------------------------------} 171-----------------------------------------------------------------------}
@@ -213,6 +201,7 @@ data QueueMethods m elem fifo = QueueMethods
213 , emptyQueue :: m fifo 201 , emptyQueue :: m fifo
214 } 202 }
215 203
204{-
216fromQ :: Functor m => 205fromQ :: Functor m =>
217 ( a -> b ) 206 ( a -> b )
218 -> ( b -> a ) 207 -> ( b -> a )
@@ -223,6 +212,7 @@ fromQ embed project QueueMethods{..} =
223 , popFront = fmap (second embed) . popFront . project 212 , popFront = fmap (second embed) . popFront . project
224 , emptyQueue = fmap embed emptyQueue 213 , emptyQueue = fmap embed emptyQueue
225 } 214 }
215-}
226 216
227seqQ :: QueueMethods Identity (NodeInfo ip) (Seq.Seq (NodeInfo ip)) 217seqQ :: QueueMethods Identity (NodeInfo ip) (Seq.Seq (NodeInfo ip))
228seqQ = QueueMethods 218seqQ = QueueMethods
@@ -256,17 +246,6 @@ instance (Serialize k, Serialize v, Ord k, Ord v)
256 get = PSQ.fromList <$> get 246 get = PSQ.fromList <$> get
257 put = put . PSQ.toList 247 put = put . PSQ.toList
258 248
259-- | Get the most recently changed node entry, if any.
260lastChanged :: Eq ip => Bucket ip -> Maybe (NodeEntry ip)
261lastChanged bucket
262 | L.null timestamps = Nothing
263 | otherwise = Just (L.maximumBy (compare `on` prio) timestamps)
264 where
265 timestamps = PSQ.toList $ bktNodes bucket
266
267leastRecently :: Eq ip => Bucket ip -> Maybe (NodeEntry ip, Bucket ip)
268leastRecently b = fmap (\(e,ns) -> (e, b { bktNodes = ns })) $ minView $ bktNodes b
269
270-- | Update interval, in seconds. 249-- | Update interval, in seconds.
271delta :: NominalDiffTime 250delta :: NominalDiffTime
272delta = 15 * 60 251delta = 15 * 60
@@ -303,29 +282,30 @@ insertBucket curTime (TryInsert info) bucket
303 map_q f = bucket { bktQ = runIdentity $ f (bktQ bucket) } 282 map_q f = bucket { bktQ = runIdentity $ f (bktQ bucket) }
304 283
305insertBucket curTime (PingResult bad_node got_response) bucket 284insertBucket curTime (PingResult bad_node got_response) bucket
306 = pure ([], Bucket (update $ bktNodes bucket) popped) 285 = pure ([], Bucket (upd $ bktNodes bucket) popped)
307 where 286 where
308 (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) 287 (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket)
309 update | got_response = id 288 upd | got_response = id
310 | Just info <- top = PSQ.insert info curTime . PSQ.delete bad_node 289 | Just info <- top = PSQ.insert info curTime . PSQ.delete bad_node
311 | otherwise = id 290 | otherwise = id
312 291
313type BitIx = Word 292type BitIx = Word
314 293
315partitionQ imp pred q = do 294partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b)
316 pass <- emptyQueue imp 295partitionQ imp test q0 = do
317 fail <- emptyQueue imp 296 pass0 <- emptyQueue imp
297 fail0 <- emptyQueue imp
318 let flipfix a b f = fix f a b 298 let flipfix a b f = fix f a b
319 flipfix q (pass,fail) $ \loop q qs -> do 299 flipfix q0 (pass0,fail0) $ \rec q qs -> do
320 (mb,q') <- popFront imp q 300 (mb,q') <- popFront imp q
321 case mb of 301 case mb of
322 Nothing -> return qs 302 Nothing -> return qs
323 Just e -> do qs' <- select (pushBack imp e) qs 303 Just e -> do qs' <- select (pushBack imp e) qs
324 loop q' qs' 304 rec q' qs'
325 where 305 where
326 select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) 306 select :: Functor f => (b -> f b) -> (b, b) -> f (b, b)
327 select f = if pred e then \(a,b) -> flip (,) b <$> f a 307 select f = if test e then \(a,b) -> flip (,) b <$> f a
328 else \(a,b) -> (,) a <$> f b 308 else \(a,b) -> (,) a <$> f b
329 309
330split :: Eq ip => BitIx -> Bucket ip -> (Bucket ip, Bucket ip) 310split :: Eq ip => BitIx -> Bucket ip -> (Bucket ip, Bucket ip)
331split i b = (Bucket ns qs, Bucket ms rs) 311split i b = (Bucket ns qs, Bucket ms rs)
@@ -529,7 +509,7 @@ splitTip nid n i bucket
529-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia 509-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia
530-- paper. The rule requiring additional splits is in section 2.4. 510-- paper. The rule requiring additional splits is in section 2.4.
531modifyBucket 511modifyBucket
532 :: forall f ip xs. (Alternative f, Eq ip, Monoid xs) => 512 :: forall f ip xs. (Alternative f, Eq ip) =>
533 NodeId -> (Bucket ip -> f (xs, Bucket ip)) -> Table ip -> f (xs,Table ip) 513 NodeId -> (Bucket ip -> f (xs, Bucket ip)) -> Table ip -> f (xs,Table ip)
534modifyBucket nodeId f = go (0 :: BitIx) 514modifyBucket nodeId f = go (0 :: BitIx)
535 where 515 where
@@ -552,6 +532,7 @@ data Event ip = TryInsert { foreignNode :: NodeInfo ip }
552 } 532 }
553 deriving (Eq,Ord,Show) 533 deriving (Eq,Ord,Show)
554 534
535eventId :: Event ip -> NodeId
555eventId (TryInsert NodeInfo{..}) = nodeId 536eventId (TryInsert NodeInfo{..}) = nodeId
556eventId (PingResult NodeInfo{..} _) = nodeId 537eventId (PingResult NodeInfo{..} _) = nodeId
557 538
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index 44a5f0e9..d9a50a15 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -66,9 +66,7 @@ module Network.BitTorrent.DHT.Session
66 66
67import Prelude hiding (ioError) 67import Prelude hiding (ioError)
68 68
69import Control.Applicative
70import Control.Concurrent.STM 69import Control.Concurrent.STM
71import Control.Concurrent.Lifted hiding (yield)
72import Control.Concurrent.Async.Lifted 70import Control.Concurrent.Async.Lifted
73import Control.Exception.Lifted hiding (Handler) 71import Control.Exception.Lifted hiding (Handler)
74import Control.Monad.Base 72import Control.Monad.Base
@@ -82,16 +80,10 @@ import Data.Fixed
82import Data.Hashable 80import Data.Hashable
83import Data.List as L 81import Data.List as L
84import Data.Maybe 82import Data.Maybe
85import Data.Monoid
86import Data.Set as S 83import Data.Set as S
87import Data.Text as T
88import Data.Time 84import Data.Time
89import Data.Time.Clock.POSIX
90import Network (PortNumber) 85import Network (PortNumber)
91import System.Log.FastLogger
92import System.Random (randomIO) 86import System.Random (randomIO)
93import Text.PrettyPrint as PP hiding ((<>), ($$))
94import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
95 87
96import Data.Torrent as Torrent 88import Data.Torrent as Torrent
97import Network.KRPC as KRPC hiding (Options, def) 89import Network.KRPC as KRPC hiding (Options, def)
@@ -118,7 +110,7 @@ defaultAlpha = 3
118 110
119-- TODO do not insert infohash -> peeraddr if infohash is too far from 111-- TODO do not insert infohash -> peeraddr if infohash is too far from
120-- this node id 112-- this node id
121 113{-
122data Order 114data Order
123 = NearFirst 115 = NearFirst
124 | FarFirst 116 | FarFirst
@@ -127,6 +119,7 @@ data Order
127data Traversal 119data Traversal
128 = Greedy -- ^ aggressive short-circuit traversal 120 = Greedy -- ^ aggressive short-circuit traversal
129 | Exhaustive -- ^ 121 | Exhaustive -- ^
122-}
130 123
131-- | Original Kamelia DHT uses term /publish/ for data replication 124-- | Original Kamelia DHT uses term /publish/ for data replication
132-- process. BitTorrent DHT uses term /announce/ since the purpose of 125-- process. BitTorrent DHT uses term /announce/ since the purpose of
@@ -460,11 +453,11 @@ deleteTopic ih p = do
460-- Messaging 453-- Messaging
461-----------------------------------------------------------------------} 454-----------------------------------------------------------------------}
462 455
463-- TODO: use alpha
464-- | Failed queries are ignored. 456-- | Failed queries are ignored.
465queryParallel :: [DHT ip a] -> DHT ip [a] 457queryParallel :: [DHT ip a] -> DHT ip [a]
466queryParallel queries = do 458queryParallel queries = do
467 alpha <- asks (optAlpha . options) 459 -- TODO: use alpha
460 -- alpha <- asks (optAlpha . options)
468 cleanup <$> mapConcurrently try queries 461 cleanup <$> mapConcurrently try queries
469 where 462 where
470 cleanup :: [Either QueryFailure a] -> [a] 463 cleanup :: [Either QueryFailure a] -> [a]
diff --git a/src/Network/BitTorrent/DHT/Token.hs b/src/Network/BitTorrent/DHT/Token.hs
index a0ed428b..7aaaf2b7 100644
--- a/src/Network/BitTorrent/DHT/Token.hs
+++ b/src/Network/BitTorrent/DHT/Token.hs
@@ -37,7 +37,6 @@ module Network.BitTorrent.DHT.Token
37 , Network.BitTorrent.DHT.Token.update 37 , Network.BitTorrent.DHT.Token.update
38 ) where 38 ) where
39 39
40import Control.Applicative
41import Control.Monad.State 40import Control.Monad.State
42import Data.BEncode (BEncode) 41import Data.BEncode (BEncode)
43import Data.ByteString as BS 42import Data.ByteString as BS
@@ -119,4 +118,4 @@ update TokenMap {..} = TokenMap
119 , generator = newGen 118 , generator = newGen
120 } 119 }
121 where 120 where
122 (newSecret, newGen) = next generator \ No newline at end of file 121 (newSecret, newGen) = next generator