diff options
author | Sam T <pxqr.sta@gmail.com> | 2013-07-13 05:22:31 +0400 |
---|---|---|
committer | Sam T <pxqr.sta@gmail.com> | 2013-07-13 05:22:31 +0400 |
commit | eecd91150b33d6363419daa8e0461984061ed06c (patch) | |
tree | f69c7c9231f717f6755dc9e59618f4afa4ba3c1a /src/Network/BitTorrent/DHT/Protocol.hs | |
parent | 9c9924831ccd975b359ea20101663502b467c99f (diff) |
+ Add listener service.
Diffstat (limited to 'src/Network/BitTorrent/DHT/Protocol.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Protocol.hs | 339 |
1 files changed, 339 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/DHT/Protocol.hs b/src/Network/BitTorrent/DHT/Protocol.hs new file mode 100644 index 00000000..5267a916 --- /dev/null +++ b/src/Network/BitTorrent/DHT/Protocol.hs | |||
@@ -0,0 +1,339 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE RecordWildCards #-} | ||
3 | module Network.BitTorrent.DHT.Protocol | ||
4 | ( | ||
5 | newNodeSession | ||
6 | |||
7 | -- * Tracker | ||
8 | , ping | ||
9 | , findNode | ||
10 | , getPeers | ||
11 | , announcePeer | ||
12 | |||
13 | -- * Server | ||
14 | , dhtServer | ||
15 | ) where | ||
16 | |||
17 | import Control.Applicative | ||
18 | import Control.Concurrent | ||
19 | import Control.Concurrent.STM | ||
20 | import Control.Monad | ||
21 | import Control.Exception | ||
22 | import Data.ByteString | ||
23 | import Data.Serialize as S | ||
24 | import Data.Function | ||
25 | import Data.Ord | ||
26 | import Data.Maybe | ||
27 | import Data.List as L | ||
28 | import Data.Map as M | ||
29 | import Data.HashMap.Strict as HM | ||
30 | import Network | ||
31 | import Network.Socket | ||
32 | import System.Entropy | ||
33 | |||
34 | import Remote.KRPC | ||
35 | import Remote.KRPC.Protocol | ||
36 | import Data.BEncode | ||
37 | import Data.Torrent | ||
38 | import Network.BitTorrent.Peer | ||
39 | |||
40 | {----------------------------------------------------------------------- | ||
41 | Node | ||
42 | -----------------------------------------------------------------------} | ||
43 | |||
44 | type NodeId = ByteString | ||
45 | |||
46 | -- WARN is the 'system' random suitable for this? | ||
47 | -- | Generate random NodeID used for the entire session. | ||
48 | -- Distribution of ID's should be as uniform as possible. | ||
49 | -- | ||
50 | genNodeId :: IO NodeId | ||
51 | genNodeId = getEntropy 20 | ||
52 | |||
53 | instance Serialize PortNumber where | ||
54 | get = fromIntegral <$> getWord16be | ||
55 | put = putWord16be . fromIntegral | ||
56 | |||
57 | |||
58 | data NodeAddr = NodeAddr { | ||
59 | nodeIP :: {-# UNPACK #-} !HostAddress | ||
60 | , nodePort :: {-# UNPACK #-} !PortNumber | ||
61 | } deriving (Show, Eq) | ||
62 | |||
63 | instance Serialize NodeAddr where | ||
64 | get = NodeAddr <$> getWord32be <*> get | ||
65 | put NodeAddr {..} = do | ||
66 | putWord32be nodeIP | ||
67 | put nodePort | ||
68 | |||
69 | |||
70 | data NodeInfo = NodeInfo { | ||
71 | nodeID :: !NodeId | ||
72 | , nodeAddr :: !NodeAddr | ||
73 | } deriving (Show, Eq) | ||
74 | |||
75 | instance Serialize NodeInfo where | ||
76 | get = NodeInfo <$> getByteString 20 <*> get | ||
77 | put NodeInfo {..} = put nodeID >> put nodeAddr | ||
78 | |||
79 | type CompactInfo = ByteString | ||
80 | |||
81 | decodeCompact :: CompactInfo -> [NodeInfo] | ||
82 | decodeCompact = either (const []) id . S.runGet (many get) | ||
83 | |||
84 | encodeCompact :: [NodeId] -> CompactInfo | ||
85 | encodeCompact = S.runPut . mapM_ put | ||
86 | |||
87 | decodePeerList :: [BEncode] -> [PeerAddr] | ||
88 | decodePeerList = undefined | ||
89 | |||
90 | encodePeerList :: [PeerAddr] -> [BEncode] | ||
91 | encodePeerList = undefined | ||
92 | |||
93 | type Distance = NodeId | ||
94 | |||
95 | {----------------------------------------------------------------------- | ||
96 | Tokens | ||
97 | -----------------------------------------------------------------------} | ||
98 | |||
99 | type Secret = Int | ||
100 | |||
101 | genSecret :: IO Secret | ||
102 | genSecret = error "secret" | ||
103 | |||
104 | -- | Instead of periodically loop over the all nodes in the routing | ||
105 | -- table with some given interval (or some other tricky method | ||
106 | -- e.g. using timeouts) we can just update tokens on demand - if no | ||
107 | -- one asks for a token then the token _should_ not change at all. | ||
108 | -- | ||
109 | type Token = ByteString | ||
110 | |||
111 | defaultToken :: Token | ||
112 | defaultToken = "0xdeadbeef" | ||
113 | |||
114 | genToken :: NodeAddr -> Secret -> Token | ||
115 | genToken _ _ = defaultToken | ||
116 | |||
117 | {----------------------------------------------------------------------- | ||
118 | Routing table | ||
119 | -----------------------------------------------------------------------} | ||
120 | |||
121 | type ContactInfo = HashMap InfoHash [PeerAddr] | ||
122 | |||
123 | insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo | ||
124 | insertPeer ih addr = HM.insertWith (++) ih [addr] | ||
125 | |||
126 | lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr] | ||
127 | lookupPeers ih = fromMaybe [] . HM.lookup ih | ||
128 | |||
129 | -- TODO use more compact routing table | ||
130 | type RoutingTable = HashMap NodeId NodeAddr | ||
131 | |||
132 | insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable | ||
133 | insertNode = HM.insert | ||
134 | |||
135 | type Alpha = Int | ||
136 | |||
137 | defaultAlpha :: Alpha | ||
138 | defaultAlpha = 8 | ||
139 | |||
140 | -- TODO | ||
141 | kclosest :: Int -> NodeId -> RoutingTable -> [NodeId] | ||
142 | kclosest = undefined | ||
143 | |||
144 | {----------------------------------------------------------------------- | ||
145 | Node session | ||
146 | -----------------------------------------------------------------------} | ||
147 | |||
148 | data NodeSession = NodeSession { | ||
149 | nodeId :: !NodeId | ||
150 | , routingTable :: !(TVar RoutingTable) | ||
151 | , contactInfo :: !(TVar ContactInfo) | ||
152 | -- , currentSecret :: !(TVar Secret) | ||
153 | -- , secretTimestamp :: !(TVar Timestamp) | ||
154 | , alpha :: !Alpha | ||
155 | , listenerPort :: !PortNumber | ||
156 | } | ||
157 | |||
158 | instance Eq NodeSession where | ||
159 | (==) = (==) `on` nodeId | ||
160 | |||
161 | instance Ord NodeSession where | ||
162 | compare = comparing nodeId | ||
163 | |||
164 | newNodeSession :: PortNumber -> IO NodeSession | ||
165 | newNodeSession lport | ||
166 | = NodeSession | ||
167 | <$> genNodeId | ||
168 | <*> newTVarIO HM.empty | ||
169 | <*> newTVarIO HM.empty | ||
170 | <*> pure defaultAlpha | ||
171 | <*> pure lport | ||
172 | |||
173 | assignToken :: NodeSession -> NodeId -> IO Token | ||
174 | assignToken _ _ = return "" | ||
175 | |||
176 | -- TODO | ||
177 | checkToken :: NodeId -> Token -> NodeSession -> IO Bool | ||
178 | checkToken nid token _ = return True | ||
179 | |||
180 | updateTimestamp :: NodeSession -> NodeId -> IO () | ||
181 | updateTimestamp = error "updateTimestamp" | ||
182 | |||
183 | updateToken :: NodeSession -> NodeId -> Token -> IO () | ||
184 | updateToken _ _ _ = error "updateToken" | ||
185 | |||
186 | {----------------------------------------------------------------------- | ||
187 | DHT Queries | ||
188 | -----------------------------------------------------------------------} | ||
189 | |||
190 | pingM :: Method NodeId NodeId | ||
191 | pingM = method "ping" ["id"] ["id"] | ||
192 | |||
193 | findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo) | ||
194 | findNodeM = method "find_node" ["id", "target"] ["id", "nodes"] | ||
195 | |||
196 | -- | Lookup peers by a torrent infohash. This method might return | ||
197 | -- different kind of responses depending on the routing table of | ||
198 | -- queried node: | ||
199 | -- | ||
200 | -- * If quieried node contains a peer list for the given infohash | ||
201 | -- then the node should return the list in a "value" key. Note that | ||
202 | -- list is encoded as compact peer address, not a compact node info. | ||
203 | -- The result of 'get_peers' method have the following scheme: | ||
204 | -- | ||
205 | -- > { "id" : "dht_server_node_id" | ||
206 | -- > , "token" : "assigned_token" | ||
207 | -- > , "values" : ["_IP_PO", "_ip_po"] | ||
208 | -- > } | ||
209 | -- | ||
210 | -- * If quieried node does not contain a list of peers associated | ||
211 | -- with the given infohash, then node should return | ||
212 | -- | ||
213 | -- > { "id" : "dht_server_node_id" | ||
214 | -- > , "token" : "assigned_token" | ||
215 | -- > , "nodes" : "compact_nodes_info" | ||
216 | -- > } | ||
217 | -- | ||
218 | -- The resulting dictionaries might differ only in a values\/nodes | ||
219 | -- keys. | ||
220 | -- | ||
221 | getPeersM :: Method (NodeId, InfoHash) BEncode | ||
222 | getPeersM = method "get_peers" ["id", "info_hash"] [] | ||
223 | |||
224 | -- | Used to announce that the peer, controlling the quering node is | ||
225 | -- downloading a torrent on a port. | ||
226 | announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId | ||
227 | announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"] | ||
228 | |||
229 | {----------------------------------------------------------------------- | ||
230 | DHT Tracker | ||
231 | -----------------------------------------------------------------------} | ||
232 | -- TODO: update node timestamp on each successful call | ||
233 | |||
234 | -- | Note that tracker side query functions could throw RPCException. | ||
235 | type DHT a b = NodeSession -> NodeAddr -> a -> IO b | ||
236 | |||
237 | ping :: DHT () () | ||
238 | ping NodeSession {..} addr @ NodeAddr {..} () = do | ||
239 | nid <- call (nodeIP, nodePort) pingM nodeId | ||
240 | atomically $ modifyTVar' routingTable $ HM.insert nid addr | ||
241 | |||
242 | findNode :: DHT NodeId [NodeInfo] | ||
243 | findNode ses @ NodeSession {..} NodeAddr {..} qnid = do | ||
244 | (nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid) | ||
245 | updateTimestamp ses nid | ||
246 | return (decodeCompact info) | ||
247 | |||
248 | getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr]) | ||
249 | getPeers ses @ NodeSession {..} NodeAddr {..} ih = do | ||
250 | resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih) | ||
251 | (nid, tok, res) <- extrResp resp | ||
252 | updateTimestamp ses nid | ||
253 | updateToken ses nid tok | ||
254 | return res | ||
255 | where | ||
256 | extrResp (BDict d) | ||
257 | | Just (BString nid ) <- M.lookup "id" d | ||
258 | , Just (BString tok ) <- M.lookup "token" d | ||
259 | , Just (BList values) <- M.lookup "values" d | ||
260 | = return $ (nid, tok, Right $ decodePeerList values) | ||
261 | |||
262 | | Just (BString nid ) <- M.lookup "id" d | ||
263 | , Just (BString tok ) <- M.lookup "token" d | ||
264 | , Just (BString nodes) <- M.lookup "nodes" d | ||
265 | = return (nid, tok, Left $ decodeCompact nodes) | ||
266 | |||
267 | extrResp _ = throw $ RPCException msg | ||
268 | where msg = ProtocolError "unable to extract getPeers resp" | ||
269 | |||
270 | -- remove token from signature, handle the all token stuff by NodeSession | ||
271 | |||
272 | -- | Note that before ever calling this method you should call the | ||
273 | -- getPeerList. | ||
274 | announcePeer :: DHT (InfoHash, Token) NodeId | ||
275 | announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do | ||
276 | nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok) | ||
277 | updateTimestamp ses nid | ||
278 | return nid | ||
279 | |||
280 | {----------------------------------------------------------------------- | ||
281 | DHT Server | ||
282 | -----------------------------------------------------------------------} | ||
283 | -- TODO: update node timestamp on each successful call | ||
284 | -- NOTE: ensure all server operations run in O(1) | ||
285 | |||
286 | type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b | ||
287 | |||
288 | pingS :: ServerHandler NodeId NodeId | ||
289 | pingS NodeSession {..} addr nid = do | ||
290 | atomically $ modifyTVar' routingTable $ insertNode nid addr | ||
291 | return nodeId | ||
292 | |||
293 | findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo) | ||
294 | findNodeS ses @ NodeSession {..} _ (nid, qnid) = do | ||
295 | updateTimestamp ses nid | ||
296 | rt <- atomically $ readTVar routingTable | ||
297 | return (nodeId, encodeCompact $ kclosest alpha qnid rt) | ||
298 | |||
299 | getPeersS :: ServerHandler (NodeId, InfoHash) BEncode | ||
300 | getPeersS ses @ NodeSession {..} _ (nid, ih) = do | ||
301 | updateTimestamp ses nid | ||
302 | mkResp <$> assignToken ses nid <*> findPeers | ||
303 | where | ||
304 | findPeers = do | ||
305 | list <- lookupPeers ih <$> readTVarIO contactInfo | ||
306 | if not (L.null list) | ||
307 | then return $ Right list | ||
308 | else do | ||
309 | rt <- readTVarIO routingTable | ||
310 | let nodes = kclosest alpha (getInfoHash ih) rt | ||
311 | return $ Left nodes | ||
312 | |||
313 | mkDict tok res = [("id",BString nodeId), ("token", BString tok), res] | ||
314 | mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes) | ||
315 | mkResult (Right values) = ("values", BList $ encodePeerList values) | ||
316 | mkResp tok = BDict . M.fromList . mkDict tok . mkResult | ||
317 | |||
318 | announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId | ||
319 | announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do | ||
320 | updateTimestamp ses nid | ||
321 | registered <- checkToken nid token ses | ||
322 | when registered $ do | ||
323 | atomically $ do | ||
324 | let peerAddr = PeerAddr Nothing nodeIP port | ||
325 | modifyTVar contactInfo $ insertPeer ih peerAddr | ||
326 | return nodeId | ||
327 | |||
328 | dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO () | ||
329 | dhtTracker = undefined | ||
330 | |||
331 | dhtServer :: NodeSession -> PortNumber -> IO () | ||
332 | dhtServer s p = server p methods | ||
333 | where | ||
334 | methods = | ||
335 | [ pingM ==> pingS s undefined | ||
336 | , findNodeM ==> findNodeS s undefined | ||
337 | , getPeersM ==> getPeersS s undefined | ||
338 | , announcePeerM ==> announcePeerS s undefined | ||
339 | ] \ No newline at end of file | ||