summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/DHT.hs1
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs141
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs126
3 files changed, 145 insertions, 123 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs
index 6b57e451..b5dc868d 100644
--- a/src/Network/BitTorrent/DHT.hs
+++ b/src/Network/BitTorrent/DHT.hs
@@ -62,6 +62,7 @@ import Network.Socket
62import Data.Torrent (tNodes) 62import Data.Torrent (tNodes)
63import Data.Torrent.InfoHash 63import Data.Torrent.InfoHash
64import Network.BitTorrent.Core 64import Network.BitTorrent.Core
65import Network.BitTorrent.DHT.Query
65import Network.BitTorrent.DHT.Session 66import Network.BitTorrent.DHT.Session
66import Network.BitTorrent.DHT.Routing as T 67import Network.BitTorrent.DHT.Routing as T
67 68
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
new file mode 100644
index 00000000..abe1ef5f
--- /dev/null
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -0,0 +1,141 @@
1{-# LANGUAGE FlexibleContexts #-}
2{-# LANGUAGE TemplateHaskell #-}
3module Network.BitTorrent.DHT.Query
4 ( -- * Handler
5 pingH
6 , findNodeH
7 , getPeersH
8 , announceH
9 , handlers
10
11 -- * Search
12 -- ** Step
13 , Iteration
14 , findNodeQ
15 , getPeersQ
16 , announceQ
17
18 -- ** Traversal
19 , Search
20 , search
21 , publish
22 ) where
23
24import Control.Applicative
25import Control.Concurrent.Lifted hiding (yield)
26import Control.Exception.Lifted hiding (Handler)
27import Control.Monad.Reader
28import Control.Monad.Logger
29import Data.Conduit
30import Data.Conduit.List as C hiding (mapMaybe, mapM_)
31import Data.Either
32import Data.List as L
33import Data.Monoid
34import Data.Text as T
35import Network
36import Text.PrettyPrint as PP hiding ((<>), ($$))
37import Text.PrettyPrint.Class
38
39import Network.KRPC hiding (Options, def)
40import Data.Torrent.InfoHash
41import Network.BitTorrent.Core
42import Network.BitTorrent.DHT.Message
43import Network.BitTorrent.DHT.Routing
44import Network.BitTorrent.DHT.Session
45
46{-----------------------------------------------------------------------
47-- Handlers
48-----------------------------------------------------------------------}
49
50nodeHandler :: Address ip => KRPC (Query a) (Response b)
51 => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip
52nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do
53 case fromSockAddr sockAddr of
54 Nothing -> throwIO BadAddress
55 Just naddr -> do
56 insertNode (NodeInfo remoteId naddr)
57 Response <$> getNodeId <*> action naddr q
58
59pingH :: Address ip => NodeHandler ip
60pingH = nodeHandler $ \ _ Ping -> do
61 return Ping
62
63findNodeH :: Address ip => NodeHandler ip
64findNodeH = nodeHandler $ \ _ (FindNode nid) -> do
65 NodeFound <$> getClosest nid
66
67getPeersH :: Address ip => NodeHandler ip
68getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do
69 GotPeers <$> getPeerList ih <*> grantToken naddr
70
71announceH :: Address ip => NodeHandler ip
72announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do
73 checkToken naddr sessionToken
74 let annPort = if impliedPort then nodePort else port
75 let peerAddr = PeerAddr Nothing nodeHost annPort
76 insertPeer topic peerAddr
77 return Announced
78
79handlers :: Address ip => [NodeHandler ip]
80handlers = [pingH, findNodeH, getPeersH, announceH]
81
82{-----------------------------------------------------------------------
83-- Search
84-----------------------------------------------------------------------}
85
86type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip])
87
88-- TODO match with expected node id
89findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo
90findNodeQ nid NodeInfo {..} = do
91 NodeFound closest <- FindNode nid <@> nodeAddr
92 return $ Right closest
93
94isLeft :: Either a b -> Bool
95isLeft (Right _) = False
96isLeft (Left _) = True
97
98getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr
99getPeersQ topic NodeInfo {..} = do
100 GotPeers {..} <- GetPeers topic <@> nodeAddr
101 let dist = distance (toNodeId topic) nodeId
102 $(logInfoS) "getPeersQ" $ T.pack
103 $ "distance: " <> render (pretty dist) <> " , result: "
104 <> if isLeft peers then "NODES" else "PEERS"
105 return peers
106
107announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr
108announceQ ih p NodeInfo {..} = do
109 GotPeers {..} <- GetPeers ih <@> nodeAddr
110 case peers of
111 Left ns
112 | False -> undefined -- TODO check if we can announce
113 | otherwise -> return (Left ns)
114 Right ps -> do -- TODO *probably* add to peer cache
115 Announced <- Announce False ih p grantedToken <@> nodeAddr
116 return (Right [nodeAddr])
117
118type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
119
120-- TODO: use reorder and filter (Traversal option) leftovers
121search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o
122search k action = do
123 awaitForever $ \ batch -> unless (L.null batch) $ do
124 $(logWarnS) "search" "start query"
125 responses <- lift $ queryParallel (action <$> batch)
126 let (nodes, results) = partitionEithers responses
127 $(logWarnS) "search" "done query"
128 leftover $ L.concat nodes
129 mapM_ yield results
130
131publish :: Address ip => InfoHash -> PortNumber -> DHT ip ()
132publish ih p = do
133 nodes <- getClosest ih
134 r <- asks (optReplication . options)
135 _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
136 return ()
137
138republish :: DHT ip ThreadId
139republish = fork $ do
140 i <- askOption optReannounce
141 error "DHT.republish: not implemented"
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index 50ca6db3..01950c10 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -22,8 +22,10 @@ module Network.BitTorrent.DHT.Session
22 , Options (..) 22 , Options (..)
23 23
24 -- * Node 24 -- * Node
25 , LogFun
26 , Node 25 , Node
26 , options
27 , LogFun
28 , NodeHandler
27 , startNode 29 , startNode
28 30
29 -- * Session 31 -- * Session
@@ -48,32 +50,10 @@ module Network.BitTorrent.DHT.Session
48 , deleteTopic 50 , deleteTopic
49 51
50 -- * Messaging 52 -- * Messaging
51 -- ** Initiate
52 , queryNode 53 , queryNode
53 , queryParallel 54 , queryParallel
54 , (<@>) 55 , (<@>)
55 , ping 56 , ping
56
57 -- ** Accept
58 , NodeHandler
59 , nodeHandler
60 , pingH
61 , findNodeH
62 , getPeersH
63 , announceH
64 , handlers
65
66 -- * Search
67 -- ** Step
68 , Iteration
69 , findNodeQ
70 , getPeersQ
71 , announceQ
72
73 -- ** Traversal
74 , Search
75 , search
76 , publish
77 ) where 57 ) where
78 58
79import Prelude hiding (ioError) 59import Prelude hiding (ioError)
@@ -88,10 +68,7 @@ import Control.Monad.Logger
88import Control.Monad.Reader 68import Control.Monad.Reader
89import Control.Monad.Trans.Control 69import Control.Monad.Trans.Control
90import Control.Monad.Trans.Resource 70import Control.Monad.Trans.Resource
91import Data.Conduit
92import Data.Conduit.List as C hiding (mapMaybe, mapM_)
93import Data.Default 71import Data.Default
94import Data.Either
95import Data.Fixed 72import Data.Fixed
96import Data.Hashable 73import Data.Hashable
97import Data.List as L 74import Data.List as L
@@ -493,100 +470,3 @@ ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip)
493ping addr = do 470ping addr = do
494 (nid, Ping) <- queryNode addr Ping 471 (nid, Ping) <- queryNode addr Ping
495 return (NodeInfo nid addr) 472 return (NodeInfo nid addr)
496
497nodeHandler :: Address ip => KRPC (Query a) (Response b)
498 => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip
499nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do
500 case fromSockAddr sockAddr of
501 Nothing -> throwIO BadAddress
502 Just naddr -> do
503 insertNode (NodeInfo remoteId naddr)
504 Response <$> getNodeId <*> action naddr q
505
506{-----------------------------------------------------------------------
507-- Search
508-----------------------------------------------------------------------}
509
510type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip])
511
512-- TODO match with expected node id
513findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo
514findNodeQ nid NodeInfo {..} = do
515 NodeFound closest <- FindNode nid <@> nodeAddr
516 return $ Right closest
517
518isLeft :: Either a b -> Bool
519isLeft (Right _) = False
520isLeft (Left _) = True
521
522getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr
523getPeersQ topic NodeInfo {..} = do
524 GotPeers {..} <- GetPeers topic <@> nodeAddr
525 let dist = distance (toNodeId topic) nodeId
526 $(logInfoS) "getPeersQ" $ T.pack
527 $ "distance: " <> render (pretty dist) <> " , result: "
528 <> if isLeft peers then "NODES" else "PEERS"
529 return peers
530
531announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr
532announceQ ih p NodeInfo {..} = do
533 GotPeers {..} <- GetPeers ih <@> nodeAddr
534 case peers of
535 Left ns
536 | False -> undefined -- TODO check if we can announce
537 | otherwise -> return (Left ns)
538 Right ps -> do -- TODO *probably* add to peer cache
539 Announced <- Announce False ih p grantedToken <@> nodeAddr
540 return (Right [nodeAddr])
541
542type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
543
544-- TODO: use reorder and filter (Traversal option) leftovers
545search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o
546search k action = do
547 awaitForever $ \ batch -> unless (L.null batch) $ do
548 $(logWarnS) "search" "start query"
549 responses <- lift $ queryParallel (action <$> batch)
550 let (nodes, results) = partitionEithers responses
551 $(logWarnS) "search" "done query"
552 leftover $ L.concat nodes
553 mapM_ yield results
554
555publish :: Address ip => InfoHash -> PortNumber -> DHT ip ()
556publish ih p = do
557 nodes <- getClosest ih
558 r <- asks (optReplication . options)
559 _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
560 return ()
561
562republish :: DHT ip ThreadId
563republish = fork $ do
564 i <- askOption optReannounce
565 error "DHT.republish: not implemented"
566
567{-----------------------------------------------------------------------
568-- Handlers
569-----------------------------------------------------------------------}
570
571pingH :: Address ip => NodeHandler ip
572pingH = nodeHandler $ \ _ Ping -> do
573 return Ping
574
575findNodeH :: Address ip => NodeHandler ip
576findNodeH = nodeHandler $ \ _ (FindNode nid) -> do
577 NodeFound <$> getClosest nid
578
579getPeersH :: Address ip => NodeHandler ip
580getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do
581 GotPeers <$> getPeerList ih <*> grantToken naddr
582
583announceH :: Address ip => NodeHandler ip
584announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do
585 checkToken naddr sessionToken
586 let annPort = if impliedPort then nodePort else port
587 let peerAddr = PeerAddr Nothing nodeHost annPort
588 insertPeer topic peerAddr
589 return Announced
590
591handlers :: Address ip => [NodeHandler ip]
592handlers = [pingH, findNodeH, getPeersH, announceH]