summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r--src/Network/BitTorrent/DHT.hs65
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs68
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs91
3 files changed, 142 insertions, 82 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs
index 8bc423a3..6d31eab2 100644
--- a/src/Network/BitTorrent/DHT.hs
+++ b/src/Network/BitTorrent/DHT.hs
@@ -76,8 +76,13 @@ import Data.Typeable
76import Data.Monoid 76import Data.Monoid
77import Network.DatagramServer.Mainline (KMessageOf) 77import Network.DatagramServer.Mainline (KMessageOf)
78import qualified Network.DatagramServer as KRPC (listen, Protocol(..)) 78import qualified Network.DatagramServer as KRPC (listen, Protocol(..))
79 79import Network.DatagramServer.Types
80 80import Network.DHT.Types
81import Data.Bits
82import Data.Default
83import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
84import Network.KRPC.Method
85import Network.BitTorrent.DHT.Query (DataHandlers)
81 86
82{----------------------------------------------------------------------- 87{-----------------------------------------------------------------------
83-- DHT types 88-- DHT types
@@ -96,7 +101,31 @@ fullLogging :: LogSource -> LogLevel -> Bool
96fullLogging _ _ = True 101fullLogging _ _ = True
97 102
98-- | Run DHT on specified port. <add note about resources> 103-- | Run DHT on specified port. <add note about resources>
99dht :: (Ord ip, Address ip) 104dht ::
105 ( Ord ip
106 , Address ip
107 , Functor dht
108 , Ord (NodeId dht)
109 , FiniteBits (NodeId dht)
110 , Serialize (NodeId dht)
111 , Show (NodeId dht)
112 , SerializableTo raw (Response dht (Ping dht))
113 , SerializableTo raw (Query dht (Ping dht))
114 , SerializableTo raw (Response dht (NodeFound dht ip))
115 , SerializableTo raw (Query dht (FindNode dht ip))
116 , Ord (TransactionID dht)
117 , Serialize (TransactionID dht)
118 , Eq (QueryMethod dht)
119 , Show (QueryMethod dht)
120 , Pretty (NodeInfo dht ip u)
121 , Kademlia dht
122 , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
123 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
124 , DataHandlers raw dht
125 , WireFormat raw dht
126 , Show u
127 , Default u
128 )
100 => Options -- ^ normally you need to use 'Data.Default.def'; 129 => Options -- ^ normally you need to use 'Data.Default.def';
101 -> NodeAddr ip -- ^ address to bind this node; 130 -> NodeAddr ip -- ^ address to bind this node;
102 -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default 131 -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default
@@ -179,7 +208,33 @@ resolveHostName NodeAddr {..} = do
179-- 208--
180-- This operation do block, use 209-- This operation do block, use
181-- 'Control.Concurrent.Async.Lifted.async' if needed. 210-- 'Control.Concurrent.Async.Lifted.async' if needed.
182bootstrap :: forall raw dht u ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT raw dht u ip () 211bootstrap :: forall raw dht u ip.
212 ( Ord ip
213 , Address ip
214 , Functor dht
215 , Ord (NodeId dht)
216 , FiniteBits (NodeId dht)
217 , Serialize (NodeId dht)
218 , Show (NodeId dht)
219 , Pretty (NodeId dht)
220 , SerializableTo raw (Response dht (Ping dht))
221 , SerializableTo raw (Query dht (Ping dht))
222 , SerializableTo raw (Response dht (NodeFound dht ip))
223 , SerializableTo raw (Query dht (FindNode dht ip))
224 , Ord (TransactionID dht)
225 , Serialize (TransactionID dht)
226 , Eq (QueryMethod dht)
227 , Show (QueryMethod dht)
228 , Pretty (NodeInfo dht ip u)
229 , Kademlia dht
230 , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
231 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
232 , DataHandlers raw dht
233 , WireFormat raw dht
234 , Show u
235 , Default u
236 , Serialize u
237 ) => Maybe BS.ByteString -> [NodeAddr ip] -> DHT raw dht u ip ()
183bootstrap mbs startNodes = do 238bootstrap mbs startNodes = do
184 restored <- 239 restored <-
185 case decode <$> mbs of 240 case decode <$> mbs of
@@ -192,7 +247,7 @@ bootstrap mbs startNodes = do
192 let searchAll aliveNodes = do 247 let searchAll aliveNodes = do
193 nid <- myNodeIdAccordingTo (error "FIXME") 248 nid <- myNodeIdAccordingTo (error "FIXME")
194 ns <- bgsearch ioFindNodes nid 249 ns <- bgsearch ioFindNodes nid
195 return ( ns :: [NodeInfo KMessageOf ip ()] ) 250 return ( ns :: [NodeInfo dht ip u] )
196 input_nodes <- (restored ++) . T.toList <$> getTable 251 input_nodes <- (restored ++) . T.toList <$> getTable
197 -- Step 1: Use iterative searches to flesh out the table.. 252 -- Step 1: Use iterative searches to flesh out the table..
198 do let knowns = map (map $ nodeAddr . fst) input_nodes 253 do let knowns = map (map $ nodeAddr . fst) input_nodes
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index e5d9bd5f..67dc4541 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -16,6 +16,8 @@
16{-# LANGUAGE TupleSections #-} 16{-# LANGUAGE TupleSections #-}
17{-# LANGUAGE PartialTypeSignatures #-} 17{-# LANGUAGE PartialTypeSignatures #-}
18{-# LANGUAGE GADTs #-} 18{-# LANGUAGE GADTs #-}
19{-# LANGUAGE RankNTypes #-}
20{-# LANGUAGE MultiParamTypeClasses #-}
19module Network.BitTorrent.DHT.Query 21module Network.BitTorrent.DHT.Query
20 ( -- * Handler 22 ( -- * Handler
21 -- | To bind specific set of handlers you need to pass 23 -- | To bind specific set of handlers you need to pass
@@ -25,6 +27,7 @@ module Network.BitTorrent.DHT.Query
25 , getPeersH 27 , getPeersH
26 , announceH 28 , announceH
27 , defaultHandlers 29 , defaultHandlers
30 , DataHandlers
28 31
29 -- * Query 32 -- * Query
30 -- ** Basic 33 -- ** Basic
@@ -113,6 +116,7 @@ import Data.Serialize
113import System.IO.Unsafe (unsafeInterleaveIO) 116import System.IO.Unsafe (unsafeInterleaveIO)
114import Data.String 117import Data.String
115 118
119
116{----------------------------------------------------------------------- 120{-----------------------------------------------------------------------
117-- Handlers 121-- Handlers
118-----------------------------------------------------------------------} 122-----------------------------------------------------------------------}
@@ -215,20 +219,68 @@ kademliaHandlers logger = do
215 , handler (nameFindNodes dht) $ findNodeH getclosest 219 , handler (nameFindNodes dht) $ findNodeH getclosest
216 ] 220 ]
217 221
222class DataHandlers raw dht where
223 dataHandlers ::
224 ( Ord ip , Hashable ip, Typeable ip, Serialize ip) =>
225 (NodeId dht -> IO [NodeInfo dht ip ()])
226 -> DHTData dht ip
227 -> [MethodHandler raw dht ip]
228
229instance DataHandlers BValue KMessageOf where
230 dataHandlers = bthandlers
231
232bthandlers ::
233 ( Ord ip , Hashable ip, Typeable ip, Serialize ip) =>
234 (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()])
235 -> DHTData KMessageOf ip
236 -> [MethodHandler BValue KMessageOf ip]
237bthandlers getclosest dta =
238 [ MethodHandler "get_peers" $ getPeersH (getpeers dta) (sessionTokens dta)
239 , MethodHandler "announce_peer" $ announceH (contactInfo dta) (sessionTokens dta)
240 ]
241 where
242 getpeers dta ih = do
243 ps <- lookupPeers (contactInfo dta) ih
244 if L.null ps
245 then Left <$> getclosest (toNodeId ih)
246 else return (Right ps)
247
248data MethodHandler raw dht ip =
249 forall a b. ( SerializableTo raw (Response dht b)
250 , SerializableTo raw (Query dht a)
251 ) => MethodHandler (QueryMethod dht) (NodeAddr ip -> a -> IO b)
218 252
219-- | Includes all default query handlers. 253-- | Includes all default query handlers.
220defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] 254defaultHandlers :: forall raw dht u ip.
255 ( Ord (TransactionID dht)
256 , Ord (NodeId dht)
257 , Show u
258 , SerializableTo raw (Response dht (Ping dht))
259 , SerializableTo raw (Query dht (Ping dht))
260 , Show (QueryMethod dht)
261 , Show (NodeId dht)
262 , FiniteBits (NodeId dht)
263 , Default u
264 , Serialize (TransactionID dht)
265 , WireFormat raw dht
266 , Kademlia dht
267 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
268 , Functor dht
269 , Pretty (NodeInfo dht ip u)
270 , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
271 , SerializableTo raw (Response dht (NodeFound dht ip))
272 , SerializableTo raw (Query dht (FindNode dht ip))
273 , Eq ip, Ord ip, Address ip, DataHandlers raw dht
274 ) => LogFun -> DHT raw dht u ip [Handler IO dht raw]
221defaultHandlers logger = do 275defaultHandlers logger = do
222 groknode <- insertNode1 276 groknode <- insertNode1
223 mynid <- myNodeIdAccordingTo1 277 mynid <- myNodeIdAccordingTo1
224 let handler :: KRPC (Query KMessageOf a) (Response KMessageOf b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler 278 let handler :: MethodHandler raw dht ip -> Handler IO dht raw
225 handler = nodeHandler groknode mynid (logt logger) 279 handler (MethodHandler name action) = nodeHandler groknode mynid (logt logger) name action
226 toks <- asks sessionTokens 280 dta <- asks dhtData
227 peers <- asks contactInfo 281 getclosest <- getClosest1
228 getpeers <- getPeerList1
229 hs <- kademliaHandlers logger 282 hs <- kademliaHandlers logger
230 return $ hs ++ [ handler "get_peers" $ getPeersH getpeers toks 283 return $ hs ++ L.map handler (dataHandlers (fmap (fmap (fmap (const ()))) . getclosest) dta)
231 , handler "announce_peer" $ announceH peers toks ]
232 284
233{----------------------------------------------------------------------- 285{-----------------------------------------------------------------------
234-- Basic queries 286-- Basic queries
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index f96ba707..d94f028f 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -35,9 +35,10 @@ module Network.BitTorrent.DHT.Session
35 , routingInfo 35 , routingInfo
36 , routableAddress 36 , routableAddress
37 , getTimestamp 37 , getTimestamp
38 , SessionTokens 38 -- , SessionTokens
39 , sessionTokens 39 -- , sessionTokens
40 , contactInfo 40 -- , contactInfo
41 , dhtData
41 , PeerStore 42 , PeerStore
42 , manager 43 , manager
43 44
@@ -55,8 +56,8 @@ module Network.BitTorrent.DHT.Session
55 , runDHT 56 , runDHT
56 57
57 -- ** Tokens 58 -- ** Tokens
58 , grantToken 59 -- , grantToken
59 , checkToken 60 -- , checkToken
60 61
61 -- ** Routing table 62 -- ** Routing table
62 , getTable 63 , getTable
@@ -68,6 +69,7 @@ module Network.BitTorrent.DHT.Session
68 , insertPeer 69 , insertPeer
69 , getPeerList 70 , getPeerList
70 , getPeerList1 71 , getPeerList1
72 , lookupPeers
71 , insertTopic 73 , insertTopic
72 , deleteTopic 74 , deleteTopic
73 , getSwarms 75 , getSwarms
@@ -113,6 +115,7 @@ import Data.Time.Clock.POSIX
113import Data.Text as Text 115import Data.Text as Text
114import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) 116import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
115import Data.Serialize as S 117import Data.Serialize as S
118import Network.DHT.Types
116 119
117 120
118import Data.Torrent as Torrent 121import Data.Torrent as Torrent
@@ -228,33 +231,6 @@ instance Default Options where
228 231
229seconds :: NominalDiffTime -> Int 232seconds :: NominalDiffTime -> Int
230seconds dt = fromEnum (realToFrac dt :: Uni) 233seconds dt = fromEnum (realToFrac dt :: Uni)
231
232{-----------------------------------------------------------------------
233-- Tokens policy
234-----------------------------------------------------------------------}
235
236data SessionTokens = SessionTokens
237 { tokenMap :: !TokenMap
238 , lastUpdate :: !UTCTime
239 , maxInterval :: !NominalDiffTime
240 }
241
242nullSessionTokens :: IO SessionTokens
243nullSessionTokens = SessionTokens
244 <$> (tokens <$> liftIO randomIO)
245 <*> liftIO getCurrentTime
246 <*> pure defaultUpdateInterval
247
248-- TODO invalidate *twice* if needed
249invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens
250invalidateTokens curTime ts @ SessionTokens {..}
251 | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens
252 { tokenMap = update tokenMap
253 , lastUpdate = curTime
254 , maxInterval = maxInterval
255 }
256 | otherwise = ts
257
258{----------------------------------------------------------------------- 234{-----------------------------------------------------------------------
259-- Session 235-- Session
260-----------------------------------------------------------------------} 236-----------------------------------------------------------------------}
@@ -277,9 +253,8 @@ data Node raw dht u ip = Node
277 , resources :: !InternalState 253 , resources :: !InternalState
278 , manager :: !(Manager raw dht) -- ^ RPC manager; 254 , manager :: !(Manager raw dht) -- ^ RPC manager;
279 , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; 255 , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table;
280 , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes;
281 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; 256 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node;
282 , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. 257 , dhtData :: DHTData dht ip
283 , loggerFun :: !LogFun 258 , loggerFun :: !LogFun
284 } 259 }
285 260
@@ -371,6 +346,7 @@ locFromCS cs = case getCallStack cs of
371newNode :: ( Address ip 346newNode :: ( Address ip
372 , FiniteBits (NodeId dht) 347 , FiniteBits (NodeId dht)
373 , Serialize (NodeId dht) 348 , Serialize (NodeId dht)
349 , Kademlia dht
374 ) 350 )
375 => -- [NodeHandler] -- ^ handlers to run on accepted queries; 351 => -- [NodeHandler] -- ^ handlers to run on accepted queries;
376 Options -- ^ various dht options; 352 Options -- ^ various dht options;
@@ -389,12 +365,12 @@ newNode opts naddr logger mbid = do
389 s <- getInternalState 365 s <- getInternalState
390 (_, m) <- allocate (newManager rpcOpts (logt logger) nodeAddr []) closeManager 366 (_, m) <- allocate (newManager rpcOpts (logt logger) nodeAddr []) closeManager
391 liftIO $ do 367 liftIO $ do
368 dta <- initializeDHTData
392 myId <- maybe genNodeId return mbid 369 myId <- maybe genNodeId return mbid
393 node <- Node opts myId s m 370 node <- Node opts myId s m
394 <$> atomically (newTVar Nothing) 371 <$> atomically (newTVar Nothing)
395 <*> newTVarIO def
396 <*> newTVarIO S.empty 372 <*> newTVarIO S.empty
397 <*> (newTVarIO =<< nullSessionTokens) 373 <*> pure dta
398 <*> pure logger 374 <*> pure logger
399 return node 375 return node
400 376
@@ -415,29 +391,6 @@ runDHT node action = runReaderT (unDHT action) node
415-- /pick a random ID/ in the range of the bucket and perform a 391-- /pick a random ID/ in the range of the bucket and perform a
416-- find_nodes search on it. 392-- find_nodes search on it.
417 393
418{-----------------------------------------------------------------------
419-- Tokens
420-----------------------------------------------------------------------}
421
422tryUpdateSecret :: TVar SessionTokens -> IO ()
423tryUpdateSecret toks = do
424 curTime <- liftIO getCurrentTime
425 liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime)
426
427grantToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> IO Token
428grantToken sessionTokens addr = do
429 tryUpdateSecret sessionTokens
430 toks <- readTVarIO sessionTokens
431 return $ T.lookup addr $ tokenMap toks
432
433-- | Throws 'HandlerError' if the token is invalid or already
434-- expired. See 'TokenMap' for details.
435checkToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> Token -> IO Bool
436checkToken sessionTokens addr questionableToken = do
437 tryUpdateSecret sessionTokens
438 toks <- readTVarIO sessionTokens
439 return $ T.member addr questionableToken (tokenMap toks)
440
441 394
442{----------------------------------------------------------------------- 395{-----------------------------------------------------------------------
443-- Routing table 396-- Routing table
@@ -475,28 +428,28 @@ getTable = do
475 let nil = nullTable myId (optBucketCount opts) 428 let nil = nullTable myId (optBucketCount opts)
476 liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) 429 liftIO (maybe nil R.myBuckets <$> atomically (readTVar var))
477 430
478getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ] 431getSwarms :: Ord ip => DHT raw KMessageOf u ip [ (InfoHash, Int, Maybe ByteString) ]
479getSwarms = do 432getSwarms = do
480 store <- asks contactInfo >>= liftIO . atomically . readTVar 433 store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar
481 return $ P.knownSwarms store 434 return $ P.knownSwarms store
482 435
483savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString 436savePeerStore :: (Ord ip, Address ip) => DHT raw KMessageOf u ip ByteString
484savePeerStore = do 437savePeerStore = do
485 var <- asks contactInfo 438 var <- asks (contactInfo . dhtData)
486 peers <- liftIO $ atomically $ readTVar var 439 peers <- liftIO $ atomically $ readTVar var
487 return $ S.encode peers 440 return $ S.encode peers
488 441
489mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip () 442mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw KMessageOf u ip ()
490mergeSavedPeers bs = do 443mergeSavedPeers bs = do
491 var <- asks contactInfo 444 var <- asks (contactInfo . dhtData)
492 case S.decode bs of 445 case S.decode bs of
493 Right newbies -> liftIO $ atomically $ modifyTVar' var (<> newbies) 446 Right newbies -> liftIO $ atomically $ modifyTVar' var (<> newbies)
494 Left _ -> return () 447 Left _ -> return ()
495 448
496 449
497allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ] 450allPeers :: Ord ip => InfoHash -> DHT raw KMessageOf u ip [ PeerAddr ip ]
498allPeers ih = do 451allPeers ih = do
499 store <- asks contactInfo >>= liftIO . atomically . readTVar 452 store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar
500 return $ P.lookup ih store 453 return $ P.lookup ih store
501 454
502-- | Find a set of closest nodes from routing table of this node. (in 455-- | Find a set of closest nodes from routing table of this node. (in
@@ -566,7 +519,7 @@ getTimestamp = do
566-- 519--
567getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) 520getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip)
568getPeerList ih = do 521getPeerList ih = do
569 var <- asks contactInfo 522 var <- asks (contactInfo . dhtData)
570 ps <- liftIO $ lookupPeers var ih 523 ps <- liftIO $ lookupPeers var ih
571 if L.null ps 524 if L.null ps
572 then Left <$> getClosest ih 525 then Left <$> getClosest ih
@@ -574,7 +527,7 @@ getPeerList ih = do
574 527
575getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) 528getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip))
576getPeerList1 = do 529getPeerList1 = do
577 var <- asks contactInfo 530 var <- asks (contactInfo . dhtData)
578 getclosest <- getClosest1 531 getclosest <- getClosest1
579 return $ \ih -> do 532 return $ \ih -> do
580 ps <- lookupPeers var ih 533 ps <- lookupPeers var ih