diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 1 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 141 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 126 |
3 files changed, 145 insertions, 123 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 6b57e451..b5dc868d 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs | |||
@@ -62,6 +62,7 @@ import Network.Socket | |||
62 | import Data.Torrent (tNodes) | 62 | import Data.Torrent (tNodes) |
63 | import Data.Torrent.InfoHash | 63 | import Data.Torrent.InfoHash |
64 | import Network.BitTorrent.Core | 64 | import Network.BitTorrent.Core |
65 | import Network.BitTorrent.DHT.Query | ||
65 | import Network.BitTorrent.DHT.Session | 66 | import Network.BitTorrent.DHT.Session |
66 | import Network.BitTorrent.DHT.Routing as T | 67 | import Network.BitTorrent.DHT.Routing as T |
67 | 68 | ||
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs new file mode 100644 index 00000000..abe1ef5f --- /dev/null +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -0,0 +1,141 @@ | |||
1 | {-# LANGUAGE FlexibleContexts #-} | ||
2 | {-# LANGUAGE TemplateHaskell #-} | ||
3 | module Network.BitTorrent.DHT.Query | ||
4 | ( -- * Handler | ||
5 | pingH | ||
6 | , findNodeH | ||
7 | , getPeersH | ||
8 | , announceH | ||
9 | , handlers | ||
10 | |||
11 | -- * Search | ||
12 | -- ** Step | ||
13 | , Iteration | ||
14 | , findNodeQ | ||
15 | , getPeersQ | ||
16 | , announceQ | ||
17 | |||
18 | -- ** Traversal | ||
19 | , Search | ||
20 | , search | ||
21 | , publish | ||
22 | ) where | ||
23 | |||
24 | import Control.Applicative | ||
25 | import Control.Concurrent.Lifted hiding (yield) | ||
26 | import Control.Exception.Lifted hiding (Handler) | ||
27 | import Control.Monad.Reader | ||
28 | import Control.Monad.Logger | ||
29 | import Data.Conduit | ||
30 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) | ||
31 | import Data.Either | ||
32 | import Data.List as L | ||
33 | import Data.Monoid | ||
34 | import Data.Text as T | ||
35 | import Network | ||
36 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | ||
37 | import Text.PrettyPrint.Class | ||
38 | |||
39 | import Network.KRPC hiding (Options, def) | ||
40 | import Data.Torrent.InfoHash | ||
41 | import Network.BitTorrent.Core | ||
42 | import Network.BitTorrent.DHT.Message | ||
43 | import Network.BitTorrent.DHT.Routing | ||
44 | import Network.BitTorrent.DHT.Session | ||
45 | |||
46 | {----------------------------------------------------------------------- | ||
47 | -- Handlers | ||
48 | -----------------------------------------------------------------------} | ||
49 | |||
50 | nodeHandler :: Address ip => KRPC (Query a) (Response b) | ||
51 | => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip | ||
52 | nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do | ||
53 | case fromSockAddr sockAddr of | ||
54 | Nothing -> throwIO BadAddress | ||
55 | Just naddr -> do | ||
56 | insertNode (NodeInfo remoteId naddr) | ||
57 | Response <$> getNodeId <*> action naddr q | ||
58 | |||
59 | pingH :: Address ip => NodeHandler ip | ||
60 | pingH = nodeHandler $ \ _ Ping -> do | ||
61 | return Ping | ||
62 | |||
63 | findNodeH :: Address ip => NodeHandler ip | ||
64 | findNodeH = nodeHandler $ \ _ (FindNode nid) -> do | ||
65 | NodeFound <$> getClosest nid | ||
66 | |||
67 | getPeersH :: Address ip => NodeHandler ip | ||
68 | getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do | ||
69 | GotPeers <$> getPeerList ih <*> grantToken naddr | ||
70 | |||
71 | announceH :: Address ip => NodeHandler ip | ||
72 | announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do | ||
73 | checkToken naddr sessionToken | ||
74 | let annPort = if impliedPort then nodePort else port | ||
75 | let peerAddr = PeerAddr Nothing nodeHost annPort | ||
76 | insertPeer topic peerAddr | ||
77 | return Announced | ||
78 | |||
79 | handlers :: Address ip => [NodeHandler ip] | ||
80 | handlers = [pingH, findNodeH, getPeersH, announceH] | ||
81 | |||
82 | {----------------------------------------------------------------------- | ||
83 | -- Search | ||
84 | -----------------------------------------------------------------------} | ||
85 | |||
86 | type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) | ||
87 | |||
88 | -- TODO match with expected node id | ||
89 | findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo | ||
90 | findNodeQ nid NodeInfo {..} = do | ||
91 | NodeFound closest <- FindNode nid <@> nodeAddr | ||
92 | return $ Right closest | ||
93 | |||
94 | isLeft :: Either a b -> Bool | ||
95 | isLeft (Right _) = False | ||
96 | isLeft (Left _) = True | ||
97 | |||
98 | getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr | ||
99 | getPeersQ topic NodeInfo {..} = do | ||
100 | GotPeers {..} <- GetPeers topic <@> nodeAddr | ||
101 | let dist = distance (toNodeId topic) nodeId | ||
102 | $(logInfoS) "getPeersQ" $ T.pack | ||
103 | $ "distance: " <> render (pretty dist) <> " , result: " | ||
104 | <> if isLeft peers then "NODES" else "PEERS" | ||
105 | return peers | ||
106 | |||
107 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr | ||
108 | announceQ ih p NodeInfo {..} = do | ||
109 | GotPeers {..} <- GetPeers ih <@> nodeAddr | ||
110 | case peers of | ||
111 | Left ns | ||
112 | | False -> undefined -- TODO check if we can announce | ||
113 | | otherwise -> return (Left ns) | ||
114 | Right ps -> do -- TODO *probably* add to peer cache | ||
115 | Announced <- Announce False ih p grantedToken <@> nodeAddr | ||
116 | return (Right [nodeAddr]) | ||
117 | |||
118 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] | ||
119 | |||
120 | -- TODO: use reorder and filter (Traversal option) leftovers | ||
121 | search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o | ||
122 | search k action = do | ||
123 | awaitForever $ \ batch -> unless (L.null batch) $ do | ||
124 | $(logWarnS) "search" "start query" | ||
125 | responses <- lift $ queryParallel (action <$> batch) | ||
126 | let (nodes, results) = partitionEithers responses | ||
127 | $(logWarnS) "search" "done query" | ||
128 | leftover $ L.concat nodes | ||
129 | mapM_ yield results | ||
130 | |||
131 | publish :: Address ip => InfoHash -> PortNumber -> DHT ip () | ||
132 | publish ih p = do | ||
133 | nodes <- getClosest ih | ||
134 | r <- asks (optReplication . options) | ||
135 | _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
136 | return () | ||
137 | |||
138 | republish :: DHT ip ThreadId | ||
139 | republish = fork $ do | ||
140 | i <- askOption optReannounce | ||
141 | error "DHT.republish: not implemented" | ||
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 50ca6db3..01950c10 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -22,8 +22,10 @@ module Network.BitTorrent.DHT.Session | |||
22 | , Options (..) | 22 | , Options (..) |
23 | 23 | ||
24 | -- * Node | 24 | -- * Node |
25 | , LogFun | ||
26 | , Node | 25 | , Node |
26 | , options | ||
27 | , LogFun | ||
28 | , NodeHandler | ||
27 | , startNode | 29 | , startNode |
28 | 30 | ||
29 | -- * Session | 31 | -- * Session |
@@ -48,32 +50,10 @@ module Network.BitTorrent.DHT.Session | |||
48 | , deleteTopic | 50 | , deleteTopic |
49 | 51 | ||
50 | -- * Messaging | 52 | -- * Messaging |
51 | -- ** Initiate | ||
52 | , queryNode | 53 | , queryNode |
53 | , queryParallel | 54 | , queryParallel |
54 | , (<@>) | 55 | , (<@>) |
55 | , ping | 56 | , ping |
56 | |||
57 | -- ** Accept | ||
58 | , NodeHandler | ||
59 | , nodeHandler | ||
60 | , pingH | ||
61 | , findNodeH | ||
62 | , getPeersH | ||
63 | , announceH | ||
64 | , handlers | ||
65 | |||
66 | -- * Search | ||
67 | -- ** Step | ||
68 | , Iteration | ||
69 | , findNodeQ | ||
70 | , getPeersQ | ||
71 | , announceQ | ||
72 | |||
73 | -- ** Traversal | ||
74 | , Search | ||
75 | , search | ||
76 | , publish | ||
77 | ) where | 57 | ) where |
78 | 58 | ||
79 | import Prelude hiding (ioError) | 59 | import Prelude hiding (ioError) |
@@ -88,10 +68,7 @@ import Control.Monad.Logger | |||
88 | import Control.Monad.Reader | 68 | import Control.Monad.Reader |
89 | import Control.Monad.Trans.Control | 69 | import Control.Monad.Trans.Control |
90 | import Control.Monad.Trans.Resource | 70 | import Control.Monad.Trans.Resource |
91 | import Data.Conduit | ||
92 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) | ||
93 | import Data.Default | 71 | import Data.Default |
94 | import Data.Either | ||
95 | import Data.Fixed | 72 | import Data.Fixed |
96 | import Data.Hashable | 73 | import Data.Hashable |
97 | import Data.List as L | 74 | import Data.List as L |
@@ -493,100 +470,3 @@ ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) | |||
493 | ping addr = do | 470 | ping addr = do |
494 | (nid, Ping) <- queryNode addr Ping | 471 | (nid, Ping) <- queryNode addr Ping |
495 | return (NodeInfo nid addr) | 472 | return (NodeInfo nid addr) |
496 | |||
497 | nodeHandler :: Address ip => KRPC (Query a) (Response b) | ||
498 | => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip | ||
499 | nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do | ||
500 | case fromSockAddr sockAddr of | ||
501 | Nothing -> throwIO BadAddress | ||
502 | Just naddr -> do | ||
503 | insertNode (NodeInfo remoteId naddr) | ||
504 | Response <$> getNodeId <*> action naddr q | ||
505 | |||
506 | {----------------------------------------------------------------------- | ||
507 | -- Search | ||
508 | -----------------------------------------------------------------------} | ||
509 | |||
510 | type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) | ||
511 | |||
512 | -- TODO match with expected node id | ||
513 | findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo | ||
514 | findNodeQ nid NodeInfo {..} = do | ||
515 | NodeFound closest <- FindNode nid <@> nodeAddr | ||
516 | return $ Right closest | ||
517 | |||
518 | isLeft :: Either a b -> Bool | ||
519 | isLeft (Right _) = False | ||
520 | isLeft (Left _) = True | ||
521 | |||
522 | getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr | ||
523 | getPeersQ topic NodeInfo {..} = do | ||
524 | GotPeers {..} <- GetPeers topic <@> nodeAddr | ||
525 | let dist = distance (toNodeId topic) nodeId | ||
526 | $(logInfoS) "getPeersQ" $ T.pack | ||
527 | $ "distance: " <> render (pretty dist) <> " , result: " | ||
528 | <> if isLeft peers then "NODES" else "PEERS" | ||
529 | return peers | ||
530 | |||
531 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr | ||
532 | announceQ ih p NodeInfo {..} = do | ||
533 | GotPeers {..} <- GetPeers ih <@> nodeAddr | ||
534 | case peers of | ||
535 | Left ns | ||
536 | | False -> undefined -- TODO check if we can announce | ||
537 | | otherwise -> return (Left ns) | ||
538 | Right ps -> do -- TODO *probably* add to peer cache | ||
539 | Announced <- Announce False ih p grantedToken <@> nodeAddr | ||
540 | return (Right [nodeAddr]) | ||
541 | |||
542 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] | ||
543 | |||
544 | -- TODO: use reorder and filter (Traversal option) leftovers | ||
545 | search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o | ||
546 | search k action = do | ||
547 | awaitForever $ \ batch -> unless (L.null batch) $ do | ||
548 | $(logWarnS) "search" "start query" | ||
549 | responses <- lift $ queryParallel (action <$> batch) | ||
550 | let (nodes, results) = partitionEithers responses | ||
551 | $(logWarnS) "search" "done query" | ||
552 | leftover $ L.concat nodes | ||
553 | mapM_ yield results | ||
554 | |||
555 | publish :: Address ip => InfoHash -> PortNumber -> DHT ip () | ||
556 | publish ih p = do | ||
557 | nodes <- getClosest ih | ||
558 | r <- asks (optReplication . options) | ||
559 | _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
560 | return () | ||
561 | |||
562 | republish :: DHT ip ThreadId | ||
563 | republish = fork $ do | ||
564 | i <- askOption optReannounce | ||
565 | error "DHT.republish: not implemented" | ||
566 | |||
567 | {----------------------------------------------------------------------- | ||
568 | -- Handlers | ||
569 | -----------------------------------------------------------------------} | ||
570 | |||
571 | pingH :: Address ip => NodeHandler ip | ||
572 | pingH = nodeHandler $ \ _ Ping -> do | ||
573 | return Ping | ||
574 | |||
575 | findNodeH :: Address ip => NodeHandler ip | ||
576 | findNodeH = nodeHandler $ \ _ (FindNode nid) -> do | ||
577 | NodeFound <$> getClosest nid | ||
578 | |||
579 | getPeersH :: Address ip => NodeHandler ip | ||
580 | getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do | ||
581 | GotPeers <$> getPeerList ih <*> grantToken naddr | ||
582 | |||
583 | announceH :: Address ip => NodeHandler ip | ||
584 | announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do | ||
585 | checkToken naddr sessionToken | ||
586 | let annPort = if impliedPort then nodePort else port | ||
587 | let peerAddr = PeerAddr Nothing nodeHost annPort | ||
588 | insertPeer topic peerAddr | ||
589 | return Announced | ||
590 | |||
591 | handlers :: Address ip => [NodeHandler ip] | ||
592 | handlers = [pingH, findNodeH, getPeersH, announceH] | ||