diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 29 |
1 files changed, 23 insertions, 6 deletions
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index e26cbad1..3adbb840 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -84,6 +84,7 @@ import Data.Hashable | |||
84 | import Data.List as L | 84 | import Data.List as L |
85 | import Data.Maybe | 85 | import Data.Maybe |
86 | import Data.Monoid | 86 | import Data.Monoid |
87 | import Data.Set as S | ||
87 | import Data.Text as T | 88 | import Data.Text as T |
88 | import Data.Time | 89 | import Data.Time |
89 | import Data.Time.Clock.POSIX | 90 | import Data.Time.Clock.POSIX |
@@ -227,14 +228,19 @@ invalidateTokens curTime ts @ SessionTokens {..} | |||
227 | -- Session | 228 | -- Session |
228 | -----------------------------------------------------------------------} | 229 | -----------------------------------------------------------------------} |
229 | 230 | ||
231 | type AnnounceSet = Set (InfoHash, PortNumber) | ||
232 | |||
233 | type Logger = Loc -> LogSource -> LogLevel -> LogStr -> IO () | ||
234 | |||
230 | data Node ip = Node | 235 | data Node ip = Node |
231 | { options :: !Options | 236 | { options :: !Options |
232 | , thisNodeId :: !NodeId | 237 | , thisNodeId :: !NodeId |
233 | , manager :: !(Manager (DHT ip)) | 238 | , manager :: !(Manager (DHT ip)) |
234 | , routingTable :: !(MVar (Table ip)) | 239 | , routingTable :: !(MVar (Table ip)) |
235 | , contactInfo :: !(TVar (PeerStore ip)) | 240 | , contactInfo :: !(TVar (PeerStore ip)) |
241 | , announceInfo :: !(TVar AnnounceSet ) | ||
236 | , sessionTokens :: !(TVar SessionTokens) | 242 | , sessionTokens :: !(TVar SessionTokens) |
237 | , loggerFun :: !(Loc -> LogSource -> LogLevel -> LogStr -> IO ()) | 243 | , loggerFun :: !Logger |
238 | } | 244 | } |
239 | 245 | ||
240 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } | 246 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } |
@@ -277,6 +283,7 @@ runDHT handlers opts naddr action = runResourceT $ do | |||
277 | node <- liftIO $ Node opts myId m | 283 | node <- liftIO $ Node opts myId m |
278 | <$> newMVar (nullTable myId (optBucketCount opts)) | 284 | <$> newMVar (nullTable myId (optBucketCount opts)) |
279 | <*> newTVarIO def | 285 | <*> newTVarIO def |
286 | <*> newTVarIO S.empty | ||
280 | <*> (newTVarIO =<< nullSessionTokens) | 287 | <*> (newTVarIO =<< nullSessionTokens) |
281 | <*> pure logger | 288 | <*> pure logger |
282 | runReaderT (unDHT (listen >> action)) node | 289 | runReaderT (unDHT (listen >> action)) node |
@@ -336,7 +343,7 @@ checkToken :: Hashable a => NodeAddr a -> Token -> DHT ip () | |||
336 | checkToken addr questionableToken = do | 343 | checkToken addr questionableToken = do |
337 | tryUpdateSecret | 344 | tryUpdateSecret |
338 | toks <- asks sessionTokens >>= liftIO . readTVarIO | 345 | toks <- asks sessionTokens >>= liftIO . readTVarIO |
339 | unless (member addr questionableToken (tokenMap toks)) $ | 346 | unless (T.member addr questionableToken (tokenMap toks)) $ |
340 | throwIO $ InvalidParameter "token" | 347 | throwIO $ InvalidParameter "token" |
341 | 348 | ||
342 | {----------------------------------------------------------------------- | 349 | {----------------------------------------------------------------------- |
@@ -412,10 +419,19 @@ getPeerList ih = do | |||
412 | else return (Right ps) | 419 | else return (Right ps) |
413 | 420 | ||
414 | insertTopic :: InfoHash -> PortNumber -> DHT ip () | 421 | insertTopic :: InfoHash -> PortNumber -> DHT ip () |
415 | insertTopic = undefined | 422 | insertTopic ih p = do |
423 | var <- asks announceInfo | ||
424 | liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) | ||
416 | 425 | ||
417 | deleteTopic :: InfoHash -> PortNumber -> DHT ip () | 426 | deleteTopic :: InfoHash -> PortNumber -> DHT ip () |
418 | deleteTopic = undefined | 427 | deleteTopic ih p = do |
428 | var <- asks announceInfo | ||
429 | liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) | ||
430 | |||
431 | republish :: DHT ip ThreadId | ||
432 | republish = fork $ do | ||
433 | i <- asks (optReannounce . options) | ||
434 | error "DHT.republish: not implemented" | ||
419 | 435 | ||
420 | {----------------------------------------------------------------------- | 436 | {----------------------------------------------------------------------- |
421 | -- Messaging | 437 | -- Messaging |
@@ -512,9 +528,10 @@ search k action = do | |||
512 | mapM_ yield results | 528 | mapM_ yield results |
513 | 529 | ||
514 | publish :: Address ip => InfoHash -> PortNumber -> DHT ip () | 530 | publish :: Address ip => InfoHash -> PortNumber -> DHT ip () |
515 | publish ih port = do | 531 | publish ih p = do |
516 | nodes <- getClosest ih | 532 | nodes <- getClosest ih |
517 | _ <- sourceList [nodes] $= search ih (announceQ ih port) $$ C.take 20 | 533 | r <- asks (optReplication . options) |
534 | _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
518 | return () | 535 | return () |
519 | 536 | ||
520 | {----------------------------------------------------------------------- | 537 | {----------------------------------------------------------------------- |