diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/Core.hs | 50 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 86 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Protocol.hs | 329 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Routing.hs | 22 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 251 |
5 files changed, 396 insertions, 342 deletions
diff --git a/src/Network/BitTorrent/Core.hs b/src/Network/BitTorrent/Core.hs index 9cfb3dd7..6024f5a5 100644 --- a/src/Network/BitTorrent/Core.hs +++ b/src/Network/BitTorrent/Core.hs | |||
@@ -9,6 +9,7 @@ | |||
9 | -- | 9 | -- |
10 | module Network.BitTorrent.Core | 10 | module Network.BitTorrent.Core |
11 | ( module Core | 11 | ( module Core |
12 | , Address (..) | ||
12 | 13 | ||
13 | -- * Re-exports from Data.IP | 14 | -- * Re-exports from Data.IP |
14 | , IPv4 | 15 | , IPv4 |
@@ -16,9 +17,58 @@ module Network.BitTorrent.Core | |||
16 | , IP (..) | 17 | , IP (..) |
17 | ) where | 18 | ) where |
18 | 19 | ||
20 | import Control.Applicative | ||
19 | import Data.IP | 21 | import Data.IP |
22 | import Data.Serialize | ||
23 | import Data.Typeable | ||
24 | import Network.Socket (SockAddr (..), PortNumber) | ||
20 | 25 | ||
21 | import Network.BitTorrent.Core.Fingerprint as Core | 26 | import Network.BitTorrent.Core.Fingerprint as Core |
22 | import Network.BitTorrent.Core.Node as Core | 27 | import Network.BitTorrent.Core.Node as Core |
23 | import Network.BitTorrent.Core.PeerId as Core | 28 | import Network.BitTorrent.Core.PeerId as Core |
24 | import Network.BitTorrent.Core.PeerAddr as Core | 29 | import Network.BitTorrent.Core.PeerAddr as Core |
30 | |||
31 | |||
32 | class (Eq a, Serialize a, Typeable a) => Address a where | ||
33 | toSockAddr :: a -> SockAddr | ||
34 | fromSockAddr :: SockAddr -> Maybe a | ||
35 | |||
36 | -- | Note that port is zeroed. | ||
37 | instance Address IPv4 where | ||
38 | toSockAddr = SockAddrInet 0 . toHostAddress | ||
39 | fromSockAddr (SockAddrInet _ h) = Just (fromHostAddress h) | ||
40 | fromSockAddr _ = Nothing | ||
41 | |||
42 | -- | Note that port is zeroed. | ||
43 | instance Address IPv6 where | ||
44 | toSockAddr h = SockAddrInet6 0 0 (toHostAddress6 h) 0 | ||
45 | fromSockAddr (SockAddrInet6 _ _ h _) = Just (fromHostAddress6 h) | ||
46 | fromSockAddr _ = Nothing | ||
47 | |||
48 | -- | Note that port is zeroed. | ||
49 | instance Address IP where | ||
50 | toSockAddr (IPv4 h) = toSockAddr h | ||
51 | toSockAddr (IPv6 h) = toSockAddr h | ||
52 | fromSockAddr sa = | ||
53 | IPv4 <$> fromSockAddr sa | ||
54 | <|> IPv6 <$> fromSockAddr sa | ||
55 | |||
56 | setPort :: PortNumber -> SockAddr -> SockAddr | ||
57 | setPort port (SockAddrInet _ h ) = SockAddrInet port h | ||
58 | setPort port (SockAddrInet6 _ f h s) = SockAddrInet6 port f h s | ||
59 | setPort _ (SockAddrUnix s ) = SockAddrUnix s | ||
60 | {-# INLINE setPort #-} | ||
61 | |||
62 | getPort :: SockAddr -> Maybe PortNumber | ||
63 | getPort (SockAddrInet p _ ) = Just p | ||
64 | getPort (SockAddrInet6 p _ _ _) = Just p | ||
65 | getPort (SockAddrUnix _ ) = Nothing | ||
66 | {-# INLINE getPort #-} | ||
67 | |||
68 | instance Address a => Address (NodeAddr a) where | ||
69 | toSockAddr NodeAddr {..} = setPort nodePort $ toSockAddr nodeHost | ||
70 | fromSockAddr sa = NodeAddr <$> fromSockAddr sa <*> getPort sa | ||
71 | |||
72 | instance Address a => Address (PeerAddr a) where | ||
73 | toSockAddr PeerAddr {..} = setPort peerPort $ toSockAddr peerHost | ||
74 | fromSockAddr sa = PeerAddr Nothing <$> fromSockAddr sa <*> getPort sa | ||
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index b0aac002..bdb76c76 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs | |||
@@ -1,6 +1,86 @@ | |||
1 | module Network.BitTorrent.DHT | 1 | module Network.BitTorrent.DHT |
2 | ( newNodeSession | 2 | ( dht |
3 | , dhtServer | 3 | , ping |
4 | , Network.BitTorrent.DHT.bootstrap | ||
5 | , Network.BitTorrent.DHT.lookup | ||
6 | , Network.BitTorrent.DHT.insert | ||
4 | ) where | 7 | ) where |
5 | 8 | ||
6 | import Network.BitTorrent.DHT.Protocol \ No newline at end of file | 9 | import Control.Applicative |
10 | import Control.Monad | ||
11 | import Control.Monad.Reader | ||
12 | import Data.List as L | ||
13 | import Network.Socket (PortNumber) | ||
14 | |||
15 | import Data.Torrent.InfoHash | ||
16 | import Network.BitTorrent.Core | ||
17 | import Network.BitTorrent.DHT.Message | ||
18 | import Network.BitTorrent.DHT.Session | ||
19 | |||
20 | |||
21 | {----------------------------------------------------------------------- | ||
22 | -- Handlers | ||
23 | -----------------------------------------------------------------------} | ||
24 | |||
25 | pingH :: Address ip => NodeHandler ip | ||
26 | pingH = nodeHandler $ \ _ Ping -> return Ping | ||
27 | |||
28 | {- | ||
29 | findNodeH :: (Eq ip, Serialize ip, Typeable ip) => Handler (DHT ip) | ||
30 | findNodeH = dhtHandler $ \ _ (FindNode nid) -> | ||
31 | NodeFound <$> getClosest nid | ||
32 | |||
33 | getPeersH :: (Eq ip, Serialize ip, Typeable ip) => Handler (DHT ip) | ||
34 | getPeersH = dhtHandler $ \ addr (GetPeers ih) -> | ||
35 | GotPeers <$> getPeerList ih <*> grantToken addr | ||
36 | |||
37 | announceH :: Handler (DHT ip) | ||
38 | announceH = dhtHandler $ \ addr (Announce {..}) -> do | ||
39 | checkToken addr sessionToken | ||
40 | insertPeer topic undefined -- PeerAddr (add, port) | ||
41 | return Announced | ||
42 | -} | ||
43 | |||
44 | handlers :: Address ip => [NodeHandler ip] | ||
45 | handlers = [pingH] | ||
46 | |||
47 | {----------------------------------------------------------------------- | ||
48 | -- Query | ||
49 | -----------------------------------------------------------------------} | ||
50 | |||
51 | -- | Run DHT on specified port. <add note about resources> | ||
52 | dht :: Address ip => NodeAddr ip -> DHT ip a -> IO a | ||
53 | dht addr = runDHT addr handlers | ||
54 | |||
55 | ping :: Address ip => NodeAddr ip -> DHT ip () | ||
56 | ping addr = do | ||
57 | Ping <- Ping <@> addr | ||
58 | return () | ||
59 | |||
60 | -- | One good node may be sufficient. <note about 'Data.Torrent.tNodes'> | ||
61 | bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () | ||
62 | bootstrap = mapM_ insertClosest | ||
63 | where | ||
64 | insertClosest addr = do | ||
65 | nid <- getNodeId | ||
66 | NodeFound closest <- FindNode nid <@> addr | ||
67 | forM_ closest insertNode | ||
68 | |||
69 | -- | Get list of peers which downloading | ||
70 | lookup :: Address ip => InfoHash -> DHT ip [PeerAddr ip] | ||
71 | lookup ih = getClosestHash ih >>= collect | ||
72 | where | ||
73 | collect nodes = L.concat <$> forM (nodeAddr <$> nodes) retrieve | ||
74 | retrieve addr = do | ||
75 | GotPeers {..} <- GetPeers ih <@> addr | ||
76 | either collect pure peers | ||
77 | |||
78 | -- | Announce that /this/ peer may have some pieces of the specified | ||
79 | -- torrent. | ||
80 | insert :: Address ip => InfoHash -> PortNumber -> DHT ip () | ||
81 | insert ih port = do | ||
82 | nodes <- getClosestHash ih | ||
83 | forM_ (nodeAddr <$> nodes) $ \ addr -> do | ||
84 | -- GotPeers {..} <- GetPeers ih <@> addr | ||
85 | -- Announced <- Announce False ih undefined grantedToken <@> addr | ||
86 | return () | ||
diff --git a/src/Network/BitTorrent/DHT/Protocol.hs b/src/Network/BitTorrent/DHT/Protocol.hs deleted file mode 100644 index 8528f0e0..00000000 --- a/src/Network/BitTorrent/DHT/Protocol.hs +++ /dev/null | |||
@@ -1,329 +0,0 @@ | |||
1 | module Network.BitTorrent.DHT.Protocol | ||
2 | ( | ||
3 | newNodeSession | ||
4 | |||
5 | -- * Tracker | ||
6 | , ping | ||
7 | , findNode | ||
8 | , getPeers | ||
9 | , announcePeer | ||
10 | |||
11 | -- * Server | ||
12 | , dhtServer | ||
13 | ) where | ||
14 | |||
15 | import Control.Applicative | ||
16 | import Control.Concurrent | ||
17 | import Control.Concurrent.STM | ||
18 | import Control.Monad | ||
19 | import Control.Exception | ||
20 | import Data.ByteString | ||
21 | import Data.Serialize as S | ||
22 | import Data.Function | ||
23 | import Data.Ord | ||
24 | import Data.Maybe | ||
25 | import Data.List as L | ||
26 | import Data.Map as M | ||
27 | import Data.HashMap.Strict as HM | ||
28 | import Network | ||
29 | import Network.Socket | ||
30 | import System.Entropy | ||
31 | |||
32 | import Data.BEncode | ||
33 | import Network.KRPC | ||
34 | import Network.KRPC.Protocol | ||
35 | import Network.BitTorrent.Peer | ||
36 | import Network.BitTorrent.Exchange.Protocol () | ||
37 | |||
38 | {----------------------------------------------------------------------- | ||
39 | Node | ||
40 | -----------------------------------------------------------------------} | ||
41 | |||
42 | type NodeId = ByteString | ||
43 | |||
44 | -- TODO WARN is the 'system' random suitable for this? | ||
45 | -- | Generate random NodeID used for the entire session. | ||
46 | -- Distribution of ID's should be as uniform as possible. | ||
47 | -- | ||
48 | genNodeId :: IO NodeId | ||
49 | genNodeId = getEntropy 20 | ||
50 | |||
51 | data NodeAddr = NodeAddr { | ||
52 | nodeIP :: {-# UNPACK #-} !HostAddress | ||
53 | , nodePort :: {-# UNPACK #-} !PortNumber | ||
54 | } deriving (Show, Eq) | ||
55 | |||
56 | instance Serialize NodeAddr where | ||
57 | get = NodeAddr <$> getWord32be <*> get | ||
58 | put NodeAddr {..} = putWord32be nodeIP >> put nodePort | ||
59 | |||
60 | data NodeInfo = NodeInfo { | ||
61 | nodeID :: !NodeId | ||
62 | , nodeAddr :: !NodeAddr | ||
63 | } deriving (Show, Eq) | ||
64 | |||
65 | instance Serialize NodeInfo where | ||
66 | get = NodeInfo <$> getByteString 20 <*> get | ||
67 | put NodeInfo {..} = put nodeID >> put nodeAddr | ||
68 | |||
69 | type CompactInfo = ByteString | ||
70 | |||
71 | decodeCompact :: CompactInfo -> [NodeInfo] | ||
72 | decodeCompact = either (const []) id . S.runGet (many get) | ||
73 | |||
74 | encodeCompact :: [NodeId] -> CompactInfo | ||
75 | encodeCompact = S.runPut . mapM_ put | ||
76 | |||
77 | decodePeerList :: [BEncode] -> [PeerAddr] | ||
78 | decodePeerList = undefined | ||
79 | |||
80 | encodePeerList :: [PeerAddr] -> [BEncode] | ||
81 | encodePeerList = undefined | ||
82 | |||
83 | type Distance = NodeId | ||
84 | |||
85 | {----------------------------------------------------------------------- | ||
86 | Tokens | ||
87 | -----------------------------------------------------------------------} | ||
88 | |||
89 | type Secret = Int | ||
90 | |||
91 | genSecret :: IO Secret | ||
92 | genSecret = error "secret" | ||
93 | |||
94 | -- | Instead of periodically loop over the all nodes in the routing | ||
95 | -- table with some given interval (or some other tricky method | ||
96 | -- e.g. using timeouts) we can just update tokens on demand - if no | ||
97 | -- one asks for a token then the token _should_ not change at all. | ||
98 | -- | ||
99 | type Token = ByteString | ||
100 | |||
101 | defaultToken :: Token | ||
102 | defaultToken = "0xdeadbeef" | ||
103 | |||
104 | genToken :: NodeAddr -> Secret -> Token | ||
105 | genToken _ _ = defaultToken | ||
106 | |||
107 | {----------------------------------------------------------------------- | ||
108 | Routing table | ||
109 | -----------------------------------------------------------------------} | ||
110 | |||
111 | type ContactInfo = HashMap InfoHash [PeerAddr] | ||
112 | |||
113 | insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo | ||
114 | insertPeer ih addr = HM.insertWith (++) ih [addr] | ||
115 | |||
116 | lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr] | ||
117 | lookupPeers ih = fromMaybe [] . HM.lookup ih | ||
118 | |||
119 | -- TODO use more compact routing table | ||
120 | type RoutingTable = HashMap NodeId NodeAddr | ||
121 | |||
122 | insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable | ||
123 | insertNode = HM.insert | ||
124 | |||
125 | type Alpha = Int | ||
126 | |||
127 | defaultAlpha :: Alpha | ||
128 | defaultAlpha = 8 | ||
129 | |||
130 | -- TODO | ||
131 | kclosest :: Int -> NodeId -> RoutingTable -> [NodeId] | ||
132 | kclosest = undefined | ||
133 | |||
134 | {----------------------------------------------------------------------- | ||
135 | Node session | ||
136 | -----------------------------------------------------------------------} | ||
137 | |||
138 | data NodeSession = NodeSession { | ||
139 | nodeId :: !NodeId | ||
140 | , routingTable :: !(TVar RoutingTable) | ||
141 | , contactInfo :: !(TVar ContactInfo) | ||
142 | -- , currentSecret :: !(TVar Secret) | ||
143 | -- , secretTimestamp :: !(TVar Timestamp) | ||
144 | , alpha :: !Alpha | ||
145 | , listenerPort :: !PortNumber | ||
146 | } | ||
147 | |||
148 | instance Eq NodeSession where | ||
149 | (==) = (==) `on` nodeId | ||
150 | |||
151 | instance Ord NodeSession where | ||
152 | compare = comparing nodeId | ||
153 | |||
154 | newNodeSession :: PortNumber -> IO NodeSession | ||
155 | newNodeSession lport | ||
156 | = NodeSession | ||
157 | <$> genNodeId | ||
158 | <*> newTVarIO HM.empty | ||
159 | <*> newTVarIO HM.empty | ||
160 | <*> pure defaultAlpha | ||
161 | <*> pure lport | ||
162 | |||
163 | assignToken :: NodeSession -> NodeId -> IO Token | ||
164 | assignToken _ _ = return "" | ||
165 | |||
166 | -- TODO | ||
167 | checkToken :: NodeId -> Token -> NodeSession -> IO Bool | ||
168 | checkToken _ _ _ = return True | ||
169 | |||
170 | updateTimestamp :: NodeSession -> NodeId -> IO () | ||
171 | updateTimestamp = error "updateTimestamp" | ||
172 | |||
173 | updateToken :: NodeSession -> NodeId -> Token -> IO () | ||
174 | updateToken _ _ _ = error "updateToken" | ||
175 | |||
176 | {----------------------------------------------------------------------- | ||
177 | DHT Queries | ||
178 | -----------------------------------------------------------------------} | ||
179 | |||
180 | pingM :: Method NodeId NodeId | ||
181 | pingM = method "ping" ["id"] ["id"] | ||
182 | |||
183 | findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo) | ||
184 | findNodeM = method "find_node" ["id", "target"] ["id", "nodes"] | ||
185 | |||
186 | -- | Lookup peers by a torrent infohash. This method might return | ||
187 | -- different kind of responses depending on the routing table of | ||
188 | -- queried node: | ||
189 | -- | ||
190 | -- * If quieried node contains a peer list for the given infohash | ||
191 | -- then the node should return the list in a "value" key. Note that | ||
192 | -- list is encoded as compact peer address, not a compact node info. | ||
193 | -- The result of 'get_peers' method have the following scheme: | ||
194 | -- | ||
195 | -- > { "id" : "dht_server_node_id" | ||
196 | -- > , "token" : "assigned_token" | ||
197 | -- > , "values" : ["_IP_PO", "_ip_po"] | ||
198 | -- > } | ||
199 | -- | ||
200 | -- * If quieried node does not contain a list of peers associated | ||
201 | -- with the given infohash, then node should return | ||
202 | -- | ||
203 | -- > { "id" : "dht_server_node_id" | ||
204 | -- > , "token" : "assigned_token" | ||
205 | -- > , "nodes" : "compact_nodes_info" | ||
206 | -- > } | ||
207 | -- | ||
208 | -- The resulting dictionaries might differ only in a values\/nodes | ||
209 | -- keys. | ||
210 | -- | ||
211 | getPeersM :: Method (NodeId, InfoHash) BEncode | ||
212 | getPeersM = method "get_peers" ["id", "info_hash"] [] | ||
213 | |||
214 | -- | Used to announce that the peer, controlling the quering node is | ||
215 | -- downloading a torrent on a port. | ||
216 | announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId | ||
217 | announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"] | ||
218 | |||
219 | {----------------------------------------------------------------------- | ||
220 | DHT Tracker | ||
221 | -----------------------------------------------------------------------} | ||
222 | -- TODO: update node timestamp on each successful call | ||
223 | |||
224 | -- | Note that tracker side query functions could throw RPCException. | ||
225 | type DHT a b = NodeSession -> NodeAddr -> a -> IO b | ||
226 | |||
227 | ping :: DHT () () | ||
228 | ping NodeSession {..} addr @ NodeAddr {..} () = do | ||
229 | nid <- call (nodeIP, nodePort) pingM nodeId | ||
230 | atomically $ modifyTVar' routingTable $ HM.insert nid addr | ||
231 | |||
232 | findNode :: DHT NodeId [NodeInfo] | ||
233 | findNode ses @ NodeSession {..} NodeAddr {..} qnid = do | ||
234 | (nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid) | ||
235 | updateTimestamp ses nid | ||
236 | return (decodeCompact info) | ||
237 | |||
238 | getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr]) | ||
239 | getPeers ses @ NodeSession {..} NodeAddr {..} ih = do | ||
240 | resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih) | ||
241 | (nid, tok, res) <- extrResp resp | ||
242 | updateTimestamp ses nid | ||
243 | updateToken ses nid tok | ||
244 | return res | ||
245 | where | ||
246 | extrResp (BDict d) | ||
247 | | Just (BString nid ) <- M.lookup "id" d | ||
248 | , Just (BString tok ) <- M.lookup "token" d | ||
249 | , Just (BList values) <- M.lookup "values" d | ||
250 | = return $ (nid, tok, Right $ decodePeerList values) | ||
251 | |||
252 | | Just (BString nid ) <- M.lookup "id" d | ||
253 | , Just (BString tok ) <- M.lookup "token" d | ||
254 | , Just (BString nodes) <- M.lookup "nodes" d | ||
255 | = return (nid, tok, Left $ decodeCompact nodes) | ||
256 | |||
257 | extrResp _ = throw $ RPCException msg | ||
258 | where msg = ProtocolError "unable to extract getPeers resp" | ||
259 | |||
260 | -- remove token from signature, handle the all token stuff by NodeSession | ||
261 | |||
262 | -- | Note that before ever calling this method you should call the | ||
263 | -- getPeerList. | ||
264 | announcePeer :: DHT (InfoHash, Token) NodeId | ||
265 | announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do | ||
266 | nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok) | ||
267 | updateTimestamp ses nid | ||
268 | return nid | ||
269 | |||
270 | {----------------------------------------------------------------------- | ||
271 | DHT Server | ||
272 | -----------------------------------------------------------------------} | ||
273 | -- TODO: update node timestamp on each successful call | ||
274 | -- NOTE: ensure all server operations run in O(1) | ||
275 | |||
276 | type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b | ||
277 | |||
278 | pingS :: ServerHandler NodeId NodeId | ||
279 | pingS NodeSession {..} addr nid = do | ||
280 | atomically $ modifyTVar' routingTable $ insertNode nid addr | ||
281 | return nodeId | ||
282 | |||
283 | findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo) | ||
284 | findNodeS ses @ NodeSession {..} _ (nid, qnid) = do | ||
285 | updateTimestamp ses nid | ||
286 | rt <- atomically $ readTVar routingTable | ||
287 | return (nodeId, encodeCompact $ kclosest alpha qnid rt) | ||
288 | |||
289 | getPeersS :: ServerHandler (NodeId, InfoHash) BEncode | ||
290 | getPeersS ses @ NodeSession {..} _ (nid, ih) = do | ||
291 | updateTimestamp ses nid | ||
292 | mkResp <$> assignToken ses nid <*> findPeers | ||
293 | where | ||
294 | findPeers = do | ||
295 | list <- lookupPeers ih <$> readTVarIO contactInfo | ||
296 | if not (L.null list) | ||
297 | then return $ Right list | ||
298 | else do | ||
299 | rt <- readTVarIO routingTable | ||
300 | let nodes = kclosest alpha (getInfoHash ih) rt | ||
301 | return $ Left nodes | ||
302 | |||
303 | mkDict tok res = [("id",BString nodeId), ("token", BString tok), res] | ||
304 | mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes) | ||
305 | mkResult (Right values) = ("values", BList $ encodePeerList values) | ||
306 | mkResp tok = BDict . M.fromList . mkDict tok . mkResult | ||
307 | |||
308 | announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId | ||
309 | announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do | ||
310 | updateTimestamp ses nid | ||
311 | registered <- checkToken nid token ses | ||
312 | when registered $ do | ||
313 | atomically $ do | ||
314 | let peerAddr = PeerAddr Nothing nodeIP port | ||
315 | modifyTVar contactInfo $ insertPeer ih peerAddr | ||
316 | return nodeId | ||
317 | |||
318 | dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO () | ||
319 | dhtTracker = undefined | ||
320 | |||
321 | dhtServer :: NodeSession -> PortNumber -> IO () | ||
322 | dhtServer s p = server p methods | ||
323 | where | ||
324 | methods = | ||
325 | [ pingM ==> pingS s undefined | ||
326 | , findNodeM ==> findNodeS s undefined | ||
327 | , getPeersM ==> getPeersS s undefined | ||
328 | , announcePeerM ==> announcePeerS s undefined | ||
329 | ] \ No newline at end of file | ||
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 5f00a924..fd2197f0 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs | |||
@@ -8,12 +8,14 @@ | |||
8 | {-# LANGUAGE RecordWildCards #-} | 8 | {-# LANGUAGE RecordWildCards #-} |
9 | {-# LANGUAGE TypeOperators #-} | 9 | {-# LANGUAGE TypeOperators #-} |
10 | {-# LANGUAGE DeriveGeneric #-} | 10 | {-# LANGUAGE DeriveGeneric #-} |
11 | {-# OPTIONS_GHC -fno-warn-orphans #-} | ||
11 | module Network.BitTorrent.DHT.Routing | 12 | module Network.BitTorrent.DHT.Routing |
12 | ( -- * Routing table | 13 | ( -- * Routing table |
13 | Table | 14 | Table |
14 | , BucketCount | 15 | , BucketCount |
15 | 16 | ||
16 | -- * Routing | 17 | -- * Routing |
18 | , Timestamp | ||
17 | , Routing | 19 | , Routing |
18 | , runRouting | 20 | , runRouting |
19 | 21 | ||
@@ -89,12 +91,11 @@ insert ping (k, v) = go 0 | |||
89 | -----------------------------------------------------------------------} | 91 | -----------------------------------------------------------------------} |
90 | 92 | ||
91 | type Timestamp = POSIXTime | 93 | type Timestamp = POSIXTime |
92 | type PingInterval = POSIXTime | ||
93 | 94 | ||
94 | data Routing ip result | 95 | data Routing ip result |
95 | = Full result | 96 | = Full result |
96 | | Done (Timestamp -> result) | 97 | | Done (Timestamp -> result) |
97 | | Refresh (NodeAddr ip) (([NodeInfo ip], Timestamp) -> Routing ip result) | 98 | | Refresh NodeId (([NodeInfo ip], Timestamp) -> Routing ip result) |
98 | | NeedPing (NodeAddr ip) (Maybe Timestamp -> Routing ip result) | 99 | | NeedPing (NodeAddr ip) (Maybe Timestamp -> Routing ip result) |
99 | 100 | ||
100 | instance Functor (Routing ip) where | 101 | instance Functor (Routing ip) where |
@@ -107,23 +108,24 @@ runRouting :: (Monad m, Eq ip) | |||
107 | => (NodeAddr ip -> m Bool) -- ^ ping_node | 108 | => (NodeAddr ip -> m Bool) -- ^ ping_node |
108 | -> (NodeId -> m [NodeInfo ip]) -- ^ find_nodes | 109 | -> (NodeId -> m [NodeInfo ip]) -- ^ find_nodes |
109 | -> m Timestamp -- ^ timestamper | 110 | -> m Timestamp -- ^ timestamper |
110 | -> Routing ip f | 111 | -> Routing ip f -- ^ action |
111 | -> m f -- ^ result | 112 | -> m f -- ^ result |
112 | runRouting ping_node find_nodes timestamp = go | 113 | runRouting ping_node find_nodes timestamper = go |
113 | where | 114 | where |
114 | go (Full r) = return r | 115 | go (Full r) = return r |
115 | go (Done f) = liftM f timestamp | 116 | go (Done f) = liftM f timestamper |
116 | go (NeedPing addr f) = do | 117 | go (NeedPing addr f) = do |
117 | pong <- ping_node addr | 118 | pong <- ping_node addr |
118 | if pong | 119 | if pong |
119 | then do | 120 | then do |
120 | time <- timestamp | 121 | time <- timestamper |
121 | go (f (Just time)) | 122 | go (f (Just time)) |
122 | else go (f Nothing) | 123 | else go (f Nothing) |
123 | 124 | ||
124 | go (Refresh nodes f) = do | 125 | go (Refresh nid f) = do |
125 | let nid = undefined | 126 | infos <- find_nodes nid |
126 | go (f undefined) | 127 | time <- timestamper |
128 | go (f (infos, time)) | ||
127 | 129 | ||
128 | {----------------------------------------------------------------------- | 130 | {----------------------------------------------------------------------- |
129 | Bucket | 131 | Bucket |
@@ -186,7 +188,7 @@ insertNode info bucket | |||
186 | -- update the all bucket if it is too outdated | 188 | -- update the all bucket if it is too outdated |
187 | | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket | 189 | | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket |
188 | , lastSeen > delta | 190 | , lastSeen > delta |
189 | = Refresh nodeAddr $ \ (infos, t) -> | 191 | = Refresh nodeId $ \ (infos, t) -> |
190 | insertNode info $ | 192 | insertNode info $ |
191 | L.foldr (\ x -> PSQ.insertWith max x t) bucket infos | 193 | L.foldr (\ x -> PSQ.insertWith max x t) bucket infos |
192 | 194 | ||
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs new file mode 100644 index 00000000..71400609 --- /dev/null +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -0,0 +1,251 @@ | |||
1 | {-# LANGUAGE RecordWildCards #-} | ||
2 | {-# LANGUAGE FlexibleContexts #-} | ||
3 | {-# LANGUAGE FlexibleInstances #-} | ||
4 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
5 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
6 | {-# LANGUAGE ScopedTypeVariables #-} | ||
7 | {-# LANGUAGE TypeFamilies #-} | ||
8 | |||
9 | {-# LANGUAGE RankNTypes #-} -- TODO remove | ||
10 | module Network.BitTorrent.DHT.Session | ||
11 | ( -- * Session | ||
12 | DHT | ||
13 | , runDHT | ||
14 | |||
15 | -- * Tokens | ||
16 | , grantToken | ||
17 | , checkToken | ||
18 | |||
19 | -- * Routing table | ||
20 | , getNodeId | ||
21 | , getClosest | ||
22 | , getClosestHash | ||
23 | , insertNode | ||
24 | |||
25 | -- * Peer storage | ||
26 | , insertPeer | ||
27 | , getPeerList | ||
28 | |||
29 | -- * Messaging | ||
30 | , (<@>) | ||
31 | , NodeHandler | ||
32 | , nodeHandler | ||
33 | ) where | ||
34 | |||
35 | import Control.Applicative | ||
36 | import Control.Concurrent.STM | ||
37 | import Control.Exception hiding (Handler) | ||
38 | import Control.Monad.Reader | ||
39 | import Control.Monad.Base | ||
40 | import Control.Monad.Trans.Control | ||
41 | import Control.Monad.Trans.Resource | ||
42 | import Data.Default | ||
43 | import Data.Hashable | ||
44 | import Data.List as L | ||
45 | import Data.Time | ||
46 | import Data.Time.Clock.POSIX | ||
47 | import System.Random (randomIO) | ||
48 | |||
49 | import Data.Torrent.InfoHash | ||
50 | import Network.KRPC | ||
51 | import Network.BitTorrent.Core | ||
52 | import Network.BitTorrent.Core.PeerAddr as P | ||
53 | import Network.BitTorrent.DHT.Message | ||
54 | import Network.BitTorrent.DHT.Routing as R | ||
55 | import Network.BitTorrent.DHT.Token as T | ||
56 | |||
57 | |||
58 | {----------------------------------------------------------------------- | ||
59 | -- Tokens policy | ||
60 | -----------------------------------------------------------------------} | ||
61 | |||
62 | data SessionTokens = SessionTokens | ||
63 | { tokenMap :: !TokenMap | ||
64 | , lastUpdate :: !UTCTime | ||
65 | , maxInterval :: !NominalDiffTime | ||
66 | } | ||
67 | |||
68 | nullSessionTokens :: IO SessionTokens | ||
69 | nullSessionTokens = SessionTokens | ||
70 | <$> (tokens <$> liftIO randomIO) | ||
71 | <*> liftIO getCurrentTime | ||
72 | <*> pure defaultUpdateInterval | ||
73 | |||
74 | invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens | ||
75 | invalidateTokens curTime ts @ SessionTokens {..} | ||
76 | | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens | ||
77 | { tokenMap = update tokenMap | ||
78 | , lastUpdate = curTime | ||
79 | , maxInterval = maxInterval | ||
80 | } | ||
81 | | otherwise = ts | ||
82 | |||
83 | {----------------------------------------------------------------------- | ||
84 | -- Session | ||
85 | -----------------------------------------------------------------------} | ||
86 | |||
87 | data Node ip = Node | ||
88 | { manager :: !(Manager (DHT ip)) | ||
89 | , routingTable :: !(TVar (Table ip)) | ||
90 | , contactInfo :: !(TVar (PeerStore ip)) | ||
91 | , sessionTokens :: !(TVar SessionTokens) | ||
92 | } | ||
93 | |||
94 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } | ||
95 | deriving ( Functor, Applicative, Monad | ||
96 | , MonadIO, MonadBase IO | ||
97 | , MonadReader (Node ip) | ||
98 | ) | ||
99 | instance MonadBaseControl IO (DHT ip) where | ||
100 | newtype StM (DHT ip) a = StM { | ||
101 | unSt :: StM (ReaderT (Node ip) (ResourceT IO)) a | ||
102 | } | ||
103 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> | ||
104 | cc $ \ (DHT m) -> StM <$> cc' m | ||
105 | {-# INLINE liftBaseWith #-} | ||
106 | |||
107 | restoreM = DHT . restoreM . unSt | ||
108 | {-# INLINE restoreM #-} | ||
109 | |||
110 | instance MonadKRPC (DHT ip) (DHT ip) where | ||
111 | getManager = asks manager | ||
112 | |||
113 | runDHT :: forall ip a. Address ip | ||
114 | => NodeAddr ip -- ^ node address to bind; | ||
115 | -> [Handler (DHT ip)] -- ^ handlers to run on accepted queries; | ||
116 | -> DHT ip a -- ^ DHT action to run; | ||
117 | -> IO a -- ^ result. | ||
118 | runDHT naddr handlers action = runResourceT $ do | ||
119 | (_, m) <- allocate (newManager (toSockAddr naddr) handlers) closeManager | ||
120 | myId <- liftIO genNodeId | ||
121 | node <- liftIO $ Node m | ||
122 | <$> newTVarIO (nullTable myId) | ||
123 | <*> newTVarIO def | ||
124 | <*> (newTVarIO =<< nullSessionTokens) | ||
125 | runReaderT (unDHT (listen >> action)) node | ||
126 | |||
127 | {----------------------------------------------------------------------- | ||
128 | -- Routing | ||
129 | -----------------------------------------------------------------------} | ||
130 | |||
131 | -- TODO fork? | ||
132 | routing :: Address ip => Routing ip a -> DHT ip a | ||
133 | routing = runRouting ping refreshNodes getTimestamp | ||
134 | |||
135 | -- TODO add timeout | ||
136 | ping :: Address ip => NodeAddr ip -> DHT ip Bool | ||
137 | ping addr = do | ||
138 | Ping <- Ping <@> addr | ||
139 | return True | ||
140 | |||
141 | -- FIXME do not use getClosest sinse we should /refresh/ them | ||
142 | refreshNodes :: Address ip => NodeId -> DHT ip [NodeInfo ip] | ||
143 | refreshNodes nid = do | ||
144 | nodes <- getClosest nid | ||
145 | nss <- forM (nodeAddr <$> nodes) $ \ addr -> do | ||
146 | NodeFound ns <- FindNode nid <@> addr | ||
147 | return ns | ||
148 | return $ L.concat nss | ||
149 | |||
150 | getTimestamp :: DHT ip Timestamp | ||
151 | getTimestamp = liftIO $ utcTimeToPOSIXSeconds <$> getCurrentTime | ||
152 | |||
153 | {----------------------------------------------------------------------- | ||
154 | -- Tokens | ||
155 | -----------------------------------------------------------------------} | ||
156 | |||
157 | tryUpdateSecret :: DHT ip () | ||
158 | tryUpdateSecret = do | ||
159 | curTime <- liftIO getCurrentTime | ||
160 | toks <- asks sessionTokens | ||
161 | liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime) | ||
162 | |||
163 | grantToken :: Hashable a => NodeAddr a -> DHT ip Token | ||
164 | grantToken addr = do | ||
165 | tryUpdateSecret | ||
166 | toks <- asks sessionTokens >>= liftIO . readTVarIO | ||
167 | return $ T.lookup addr $ tokenMap toks | ||
168 | |||
169 | -- | Throws 'ProtocolError' if token is invalid or already expired. | ||
170 | checkToken :: Hashable a => NodeAddr a -> Token -> DHT ip () | ||
171 | checkToken addr questionableToken = do | ||
172 | tryUpdateSecret | ||
173 | toks <- asks sessionTokens >>= liftIO . readTVarIO | ||
174 | unless (member addr questionableToken (tokenMap toks)) $ | ||
175 | liftIO $ throwIO $ KError ProtocolError "bad token" "" | ||
176 | -- todo reset transaction id in krpc | ||
177 | |||
178 | {----------------------------------------------------------------------- | ||
179 | -- Routing table | ||
180 | -----------------------------------------------------------------------} | ||
181 | |||
182 | getTable :: DHT ip (Table ip) | ||
183 | getTable = do | ||
184 | var <- asks routingTable | ||
185 | liftIO (readTVarIO var) | ||
186 | |||
187 | putTable :: Table ip -> DHT ip () | ||
188 | putTable table = do | ||
189 | var <- asks routingTable | ||
190 | liftIO (atomically (writeTVar var table)) | ||
191 | |||
192 | getNodeId :: DHT ip NodeId | ||
193 | getNodeId = thisId <$> getTable | ||
194 | |||
195 | getClosest :: Eq ip => NodeId -> DHT ip [NodeInfo ip] | ||
196 | getClosest nid = kclosest 8 nid <$> getTable | ||
197 | |||
198 | getClosestHash :: Eq ip => InfoHash -> DHT ip [NodeInfo ip] | ||
199 | getClosestHash ih = kclosestHash 8 ih <$> getTable | ||
200 | |||
201 | insertNode :: Address ip => NodeInfo ip -> DHT ip () | ||
202 | insertNode info = do | ||
203 | t <- getTable | ||
204 | t' <- routing (R.insert info t) | ||
205 | putTable t' | ||
206 | |||
207 | {----------------------------------------------------------------------- | ||
208 | -- Peer storage | ||
209 | -----------------------------------------------------------------------} | ||
210 | |||
211 | insertPeer :: Eq ip => InfoHash -> PeerAddr ip -> DHT ip () | ||
212 | insertPeer ih addr = do | ||
213 | var <- asks contactInfo | ||
214 | liftIO $ atomically $ modifyTVar' var (P.insert ih addr) | ||
215 | |||
216 | lookupPeers :: InfoHash -> DHT ip [PeerAddr ip] | ||
217 | lookupPeers ih = do | ||
218 | var <- asks contactInfo | ||
219 | liftIO $ P.lookup ih <$> readTVarIO var | ||
220 | |||
221 | type PeerList ip = Either [NodeInfo ip] [PeerAddr ip] | ||
222 | |||
223 | getPeerList :: Eq ip => InfoHash -> DHT ip (PeerList ip) | ||
224 | getPeerList ih = do | ||
225 | ps <- lookupPeers ih | ||
226 | if L.null ps | ||
227 | then Left <$> getClosestHash ih | ||
228 | else return (Right ps) | ||
229 | |||
230 | {----------------------------------------------------------------------- | ||
231 | -- Messaging | ||
232 | -----------------------------------------------------------------------} | ||
233 | |||
234 | (<@>) :: Address ip => KRPC (Query a) (Response b) | ||
235 | => a -> NodeAddr ip -> DHT ip b | ||
236 | q <@> addr = do | ||
237 | nid <- getNodeId | ||
238 | Response remoteId r <- query (toSockAddr addr) (Query nid q) | ||
239 | insertNode (NodeInfo remoteId addr) | ||
240 | return r | ||
241 | |||
242 | type NodeHandler ip = Handler (DHT ip) | ||
243 | |||
244 | nodeHandler :: Address ip => KRPC (Query a) (Response b) | ||
245 | => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip | ||
246 | nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do | ||
247 | case fromSockAddr sockAddr of | ||
248 | Nothing -> liftIO $ throwIO $ KError GenericError "bad address" "" | ||
249 | Just naddr -> do | ||
250 | insertNode (NodeInfo remoteId naddr) | ||
251 | Response <$> getNodeId <*> action naddr q | ||