summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2013-12-28 08:47:02 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2013-12-28 08:47:02 +0400
commitf0df039183e7027a49eafe51de53340fc43723e3 (patch)
tree4dfeaa5b2dbe8a7b8474a79839bc7441b042a3bf
parentfe6cb6e8a5de55406ad3663cf5c0a0d73189a519 (diff)
Add node sessions
-rw-r--r--bittorrent.cabal6
-rw-r--r--src/Network/BitTorrent/Core.hs50
-rw-r--r--src/Network/BitTorrent/DHT.hs86
-rw-r--r--src/Network/BitTorrent/DHT/Protocol.hs329
-rw-r--r--src/Network/BitTorrent/DHT/Routing.hs22
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs251
6 files changed, 400 insertions, 344 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal
index 48fe51dc..9bc91647 100644
--- a/bittorrent.cabal
+++ b/bittorrent.cabal
@@ -57,10 +57,10 @@ library
57 Network.BitTorrent.Core.Node 57 Network.BitTorrent.Core.Node
58 Network.BitTorrent.Core.PeerId 58 Network.BitTorrent.Core.PeerId
59 Network.BitTorrent.Core.PeerAddr 59 Network.BitTorrent.Core.PeerAddr
60-- Network.BitTorrent.DHT 60 Network.BitTorrent.DHT
61 Network.BitTorrent.DHT.Message 61 Network.BitTorrent.DHT.Message
62-- Network.BitTorrent.DHT.Protocol
63 Network.BitTorrent.DHT.Routing 62 Network.BitTorrent.DHT.Routing
63 Network.BitTorrent.DHT.Session
64 Network.BitTorrent.DHT.Token 64 Network.BitTorrent.DHT.Token
65-- Network.BitTorrent.Exchange 65-- Network.BitTorrent.Exchange
66 Network.BitTorrent.Exchange.Assembler 66 Network.BitTorrent.Exchange.Assembler
@@ -95,6 +95,8 @@ library
95 , lens >= 3.0 95 , lens >= 3.0
96 , resourcet >= 0.4 96 , resourcet >= 0.4
97 , mtl 97 , mtl
98 , monad-control
99 , transformers-base
98 100
99 -- Concurrency 101 -- Concurrency
100-- , SafeSemaphore 102-- , SafeSemaphore
diff --git a/src/Network/BitTorrent/Core.hs b/src/Network/BitTorrent/Core.hs
index 9cfb3dd7..6024f5a5 100644
--- a/src/Network/BitTorrent/Core.hs
+++ b/src/Network/BitTorrent/Core.hs
@@ -9,6 +9,7 @@
9-- 9--
10module Network.BitTorrent.Core 10module Network.BitTorrent.Core
11 ( module Core 11 ( module Core
12 , Address (..)
12 13
13 -- * Re-exports from Data.IP 14 -- * Re-exports from Data.IP
14 , IPv4 15 , IPv4
@@ -16,9 +17,58 @@ module Network.BitTorrent.Core
16 , IP (..) 17 , IP (..)
17 ) where 18 ) where
18 19
20import Control.Applicative
19import Data.IP 21import Data.IP
22import Data.Serialize
23import Data.Typeable
24import Network.Socket (SockAddr (..), PortNumber)
20 25
21import Network.BitTorrent.Core.Fingerprint as Core 26import Network.BitTorrent.Core.Fingerprint as Core
22import Network.BitTorrent.Core.Node as Core 27import Network.BitTorrent.Core.Node as Core
23import Network.BitTorrent.Core.PeerId as Core 28import Network.BitTorrent.Core.PeerId as Core
24import Network.BitTorrent.Core.PeerAddr as Core 29import Network.BitTorrent.Core.PeerAddr as Core
30
31
32class (Eq a, Serialize a, Typeable a) => Address a where
33 toSockAddr :: a -> SockAddr
34 fromSockAddr :: SockAddr -> Maybe a
35
36-- | Note that port is zeroed.
37instance Address IPv4 where
38 toSockAddr = SockAddrInet 0 . toHostAddress
39 fromSockAddr (SockAddrInet _ h) = Just (fromHostAddress h)
40 fromSockAddr _ = Nothing
41
42-- | Note that port is zeroed.
43instance Address IPv6 where
44 toSockAddr h = SockAddrInet6 0 0 (toHostAddress6 h) 0
45 fromSockAddr (SockAddrInet6 _ _ h _) = Just (fromHostAddress6 h)
46 fromSockAddr _ = Nothing
47
48-- | Note that port is zeroed.
49instance Address IP where
50 toSockAddr (IPv4 h) = toSockAddr h
51 toSockAddr (IPv6 h) = toSockAddr h
52 fromSockAddr sa =
53 IPv4 <$> fromSockAddr sa
54 <|> IPv6 <$> fromSockAddr sa
55
56setPort :: PortNumber -> SockAddr -> SockAddr
57setPort port (SockAddrInet _ h ) = SockAddrInet port h
58setPort port (SockAddrInet6 _ f h s) = SockAddrInet6 port f h s
59setPort _ (SockAddrUnix s ) = SockAddrUnix s
60{-# INLINE setPort #-}
61
62getPort :: SockAddr -> Maybe PortNumber
63getPort (SockAddrInet p _ ) = Just p
64getPort (SockAddrInet6 p _ _ _) = Just p
65getPort (SockAddrUnix _ ) = Nothing
66{-# INLINE getPort #-}
67
68instance Address a => Address (NodeAddr a) where
69 toSockAddr NodeAddr {..} = setPort nodePort $ toSockAddr nodeHost
70 fromSockAddr sa = NodeAddr <$> fromSockAddr sa <*> getPort sa
71
72instance Address a => Address (PeerAddr a) where
73 toSockAddr PeerAddr {..} = setPort peerPort $ toSockAddr peerHost
74 fromSockAddr sa = PeerAddr Nothing <$> fromSockAddr sa <*> getPort sa
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs
index b0aac002..bdb76c76 100644
--- a/src/Network/BitTorrent/DHT.hs
+++ b/src/Network/BitTorrent/DHT.hs
@@ -1,6 +1,86 @@
1module Network.BitTorrent.DHT 1module Network.BitTorrent.DHT
2 ( newNodeSession 2 ( dht
3 , dhtServer 3 , ping
4 , Network.BitTorrent.DHT.bootstrap
5 , Network.BitTorrent.DHT.lookup
6 , Network.BitTorrent.DHT.insert
4 ) where 7 ) where
5 8
6import Network.BitTorrent.DHT.Protocol \ No newline at end of file 9import Control.Applicative
10import Control.Monad
11import Control.Monad.Reader
12import Data.List as L
13import Network.Socket (PortNumber)
14
15import Data.Torrent.InfoHash
16import Network.BitTorrent.Core
17import Network.BitTorrent.DHT.Message
18import Network.BitTorrent.DHT.Session
19
20
21{-----------------------------------------------------------------------
22-- Handlers
23-----------------------------------------------------------------------}
24
25pingH :: Address ip => NodeHandler ip
26pingH = nodeHandler $ \ _ Ping -> return Ping
27
28{-
29findNodeH :: (Eq ip, Serialize ip, Typeable ip) => Handler (DHT ip)
30findNodeH = dhtHandler $ \ _ (FindNode nid) ->
31 NodeFound <$> getClosest nid
32
33getPeersH :: (Eq ip, Serialize ip, Typeable ip) => Handler (DHT ip)
34getPeersH = dhtHandler $ \ addr (GetPeers ih) ->
35 GotPeers <$> getPeerList ih <*> grantToken addr
36
37announceH :: Handler (DHT ip)
38announceH = dhtHandler $ \ addr (Announce {..}) -> do
39 checkToken addr sessionToken
40 insertPeer topic undefined -- PeerAddr (add, port)
41 return Announced
42-}
43
44handlers :: Address ip => [NodeHandler ip]
45handlers = [pingH]
46
47{-----------------------------------------------------------------------
48-- Query
49-----------------------------------------------------------------------}
50
51-- | Run DHT on specified port. <add note about resources>
52dht :: Address ip => NodeAddr ip -> DHT ip a -> IO a
53dht addr = runDHT addr handlers
54
55ping :: Address ip => NodeAddr ip -> DHT ip ()
56ping addr = do
57 Ping <- Ping <@> addr
58 return ()
59
60-- | One good node may be sufficient. <note about 'Data.Torrent.tNodes'>
61bootstrap :: Address ip => [NodeAddr ip] -> DHT ip ()
62bootstrap = mapM_ insertClosest
63 where
64 insertClosest addr = do
65 nid <- getNodeId
66 NodeFound closest <- FindNode nid <@> addr
67 forM_ closest insertNode
68
69-- | Get list of peers which downloading
70lookup :: Address ip => InfoHash -> DHT ip [PeerAddr ip]
71lookup ih = getClosestHash ih >>= collect
72 where
73 collect nodes = L.concat <$> forM (nodeAddr <$> nodes) retrieve
74 retrieve addr = do
75 GotPeers {..} <- GetPeers ih <@> addr
76 either collect pure peers
77
78-- | Announce that /this/ peer may have some pieces of the specified
79-- torrent.
80insert :: Address ip => InfoHash -> PortNumber -> DHT ip ()
81insert ih port = do
82 nodes <- getClosestHash ih
83 forM_ (nodeAddr <$> nodes) $ \ addr -> do
84-- GotPeers {..} <- GetPeers ih <@> addr
85-- Announced <- Announce False ih undefined grantedToken <@> addr
86 return ()
diff --git a/src/Network/BitTorrent/DHT/Protocol.hs b/src/Network/BitTorrent/DHT/Protocol.hs
deleted file mode 100644
index 8528f0e0..00000000
--- a/src/Network/BitTorrent/DHT/Protocol.hs
+++ /dev/null
@@ -1,329 +0,0 @@
1module Network.BitTorrent.DHT.Protocol
2 (
3 newNodeSession
4
5 -- * Tracker
6 , ping
7 , findNode
8 , getPeers
9 , announcePeer
10
11 -- * Server
12 , dhtServer
13 ) where
14
15import Control.Applicative
16import Control.Concurrent
17import Control.Concurrent.STM
18import Control.Monad
19import Control.Exception
20import Data.ByteString
21import Data.Serialize as S
22import Data.Function
23import Data.Ord
24import Data.Maybe
25import Data.List as L
26import Data.Map as M
27import Data.HashMap.Strict as HM
28import Network
29import Network.Socket
30import System.Entropy
31
32import Data.BEncode
33import Network.KRPC
34import Network.KRPC.Protocol
35import Network.BitTorrent.Peer
36import Network.BitTorrent.Exchange.Protocol ()
37
38{-----------------------------------------------------------------------
39 Node
40-----------------------------------------------------------------------}
41
42type NodeId = ByteString
43
44-- TODO WARN is the 'system' random suitable for this?
45-- | Generate random NodeID used for the entire session.
46-- Distribution of ID's should be as uniform as possible.
47--
48genNodeId :: IO NodeId
49genNodeId = getEntropy 20
50
51data NodeAddr = NodeAddr {
52 nodeIP :: {-# UNPACK #-} !HostAddress
53 , nodePort :: {-# UNPACK #-} !PortNumber
54 } deriving (Show, Eq)
55
56instance Serialize NodeAddr where
57 get = NodeAddr <$> getWord32be <*> get
58 put NodeAddr {..} = putWord32be nodeIP >> put nodePort
59
60data NodeInfo = NodeInfo {
61 nodeID :: !NodeId
62 , nodeAddr :: !NodeAddr
63 } deriving (Show, Eq)
64
65instance Serialize NodeInfo where
66 get = NodeInfo <$> getByteString 20 <*> get
67 put NodeInfo {..} = put nodeID >> put nodeAddr
68
69type CompactInfo = ByteString
70
71decodeCompact :: CompactInfo -> [NodeInfo]
72decodeCompact = either (const []) id . S.runGet (many get)
73
74encodeCompact :: [NodeId] -> CompactInfo
75encodeCompact = S.runPut . mapM_ put
76
77decodePeerList :: [BEncode] -> [PeerAddr]
78decodePeerList = undefined
79
80encodePeerList :: [PeerAddr] -> [BEncode]
81encodePeerList = undefined
82
83type Distance = NodeId
84
85{-----------------------------------------------------------------------
86 Tokens
87-----------------------------------------------------------------------}
88
89type Secret = Int
90
91genSecret :: IO Secret
92genSecret = error "secret"
93
94-- | Instead of periodically loop over the all nodes in the routing
95-- table with some given interval (or some other tricky method
96-- e.g. using timeouts) we can just update tokens on demand - if no
97-- one asks for a token then the token _should_ not change at all.
98--
99type Token = ByteString
100
101defaultToken :: Token
102defaultToken = "0xdeadbeef"
103
104genToken :: NodeAddr -> Secret -> Token
105genToken _ _ = defaultToken
106
107{-----------------------------------------------------------------------
108 Routing table
109-----------------------------------------------------------------------}
110
111type ContactInfo = HashMap InfoHash [PeerAddr]
112
113insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo
114insertPeer ih addr = HM.insertWith (++) ih [addr]
115
116lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr]
117lookupPeers ih = fromMaybe [] . HM.lookup ih
118
119-- TODO use more compact routing table
120type RoutingTable = HashMap NodeId NodeAddr
121
122insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable
123insertNode = HM.insert
124
125type Alpha = Int
126
127defaultAlpha :: Alpha
128defaultAlpha = 8
129
130-- TODO
131kclosest :: Int -> NodeId -> RoutingTable -> [NodeId]
132kclosest = undefined
133
134{-----------------------------------------------------------------------
135 Node session
136-----------------------------------------------------------------------}
137
138data NodeSession = NodeSession {
139 nodeId :: !NodeId
140 , routingTable :: !(TVar RoutingTable)
141 , contactInfo :: !(TVar ContactInfo)
142-- , currentSecret :: !(TVar Secret)
143-- , secretTimestamp :: !(TVar Timestamp)
144 , alpha :: !Alpha
145 , listenerPort :: !PortNumber
146 }
147
148instance Eq NodeSession where
149 (==) = (==) `on` nodeId
150
151instance Ord NodeSession where
152 compare = comparing nodeId
153
154newNodeSession :: PortNumber -> IO NodeSession
155newNodeSession lport
156 = NodeSession
157 <$> genNodeId
158 <*> newTVarIO HM.empty
159 <*> newTVarIO HM.empty
160 <*> pure defaultAlpha
161 <*> pure lport
162
163assignToken :: NodeSession -> NodeId -> IO Token
164assignToken _ _ = return ""
165
166-- TODO
167checkToken :: NodeId -> Token -> NodeSession -> IO Bool
168checkToken _ _ _ = return True
169
170updateTimestamp :: NodeSession -> NodeId -> IO ()
171updateTimestamp = error "updateTimestamp"
172
173updateToken :: NodeSession -> NodeId -> Token -> IO ()
174updateToken _ _ _ = error "updateToken"
175
176{-----------------------------------------------------------------------
177 DHT Queries
178-----------------------------------------------------------------------}
179
180pingM :: Method NodeId NodeId
181pingM = method "ping" ["id"] ["id"]
182
183findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo)
184findNodeM = method "find_node" ["id", "target"] ["id", "nodes"]
185
186-- | Lookup peers by a torrent infohash. This method might return
187-- different kind of responses depending on the routing table of
188-- queried node:
189--
190-- * If quieried node contains a peer list for the given infohash
191-- then the node should return the list in a "value" key. Note that
192-- list is encoded as compact peer address, not a compact node info.
193-- The result of 'get_peers' method have the following scheme:
194--
195-- > { "id" : "dht_server_node_id"
196-- > , "token" : "assigned_token"
197-- > , "values" : ["_IP_PO", "_ip_po"]
198-- > }
199--
200-- * If quieried node does not contain a list of peers associated
201-- with the given infohash, then node should return
202--
203-- > { "id" : "dht_server_node_id"
204-- > , "token" : "assigned_token"
205-- > , "nodes" : "compact_nodes_info"
206-- > }
207--
208-- The resulting dictionaries might differ only in a values\/nodes
209-- keys.
210--
211getPeersM :: Method (NodeId, InfoHash) BEncode
212getPeersM = method "get_peers" ["id", "info_hash"] []
213
214-- | Used to announce that the peer, controlling the quering node is
215-- downloading a torrent on a port.
216announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId
217announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"]
218
219{-----------------------------------------------------------------------
220 DHT Tracker
221-----------------------------------------------------------------------}
222-- TODO: update node timestamp on each successful call
223
224-- | Note that tracker side query functions could throw RPCException.
225type DHT a b = NodeSession -> NodeAddr -> a -> IO b
226
227ping :: DHT () ()
228ping NodeSession {..} addr @ NodeAddr {..} () = do
229 nid <- call (nodeIP, nodePort) pingM nodeId
230 atomically $ modifyTVar' routingTable $ HM.insert nid addr
231
232findNode :: DHT NodeId [NodeInfo]
233findNode ses @ NodeSession {..} NodeAddr {..} qnid = do
234 (nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid)
235 updateTimestamp ses nid
236 return (decodeCompact info)
237
238getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr])
239getPeers ses @ NodeSession {..} NodeAddr {..} ih = do
240 resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih)
241 (nid, tok, res) <- extrResp resp
242 updateTimestamp ses nid
243 updateToken ses nid tok
244 return res
245 where
246 extrResp (BDict d)
247 | Just (BString nid ) <- M.lookup "id" d
248 , Just (BString tok ) <- M.lookup "token" d
249 , Just (BList values) <- M.lookup "values" d
250 = return $ (nid, tok, Right $ decodePeerList values)
251
252 | Just (BString nid ) <- M.lookup "id" d
253 , Just (BString tok ) <- M.lookup "token" d
254 , Just (BString nodes) <- M.lookup "nodes" d
255 = return (nid, tok, Left $ decodeCompact nodes)
256
257 extrResp _ = throw $ RPCException msg
258 where msg = ProtocolError "unable to extract getPeers resp"
259
260-- remove token from signature, handle the all token stuff by NodeSession
261
262-- | Note that before ever calling this method you should call the
263-- getPeerList.
264announcePeer :: DHT (InfoHash, Token) NodeId
265announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do
266 nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok)
267 updateTimestamp ses nid
268 return nid
269
270{-----------------------------------------------------------------------
271 DHT Server
272-----------------------------------------------------------------------}
273-- TODO: update node timestamp on each successful call
274-- NOTE: ensure all server operations run in O(1)
275
276type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b
277
278pingS :: ServerHandler NodeId NodeId
279pingS NodeSession {..} addr nid = do
280 atomically $ modifyTVar' routingTable $ insertNode nid addr
281 return nodeId
282
283findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo)
284findNodeS ses @ NodeSession {..} _ (nid, qnid) = do
285 updateTimestamp ses nid
286 rt <- atomically $ readTVar routingTable
287 return (nodeId, encodeCompact $ kclosest alpha qnid rt)
288
289getPeersS :: ServerHandler (NodeId, InfoHash) BEncode
290getPeersS ses @ NodeSession {..} _ (nid, ih) = do
291 updateTimestamp ses nid
292 mkResp <$> assignToken ses nid <*> findPeers
293 where
294 findPeers = do
295 list <- lookupPeers ih <$> readTVarIO contactInfo
296 if not (L.null list)
297 then return $ Right list
298 else do
299 rt <- readTVarIO routingTable
300 let nodes = kclosest alpha (getInfoHash ih) rt
301 return $ Left nodes
302
303 mkDict tok res = [("id",BString nodeId), ("token", BString tok), res]
304 mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes)
305 mkResult (Right values) = ("values", BList $ encodePeerList values)
306 mkResp tok = BDict . M.fromList . mkDict tok . mkResult
307
308announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId
309announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do
310 updateTimestamp ses nid
311 registered <- checkToken nid token ses
312 when registered $ do
313 atomically $ do
314 let peerAddr = PeerAddr Nothing nodeIP port
315 modifyTVar contactInfo $ insertPeer ih peerAddr
316 return nodeId
317
318dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO ()
319dhtTracker = undefined
320
321dhtServer :: NodeSession -> PortNumber -> IO ()
322dhtServer s p = server p methods
323 where
324 methods =
325 [ pingM ==> pingS s undefined
326 , findNodeM ==> findNodeS s undefined
327 , getPeersM ==> getPeersS s undefined
328 , announcePeerM ==> announcePeerS s undefined
329 ] \ No newline at end of file
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs
index 5f00a924..fd2197f0 100644
--- a/src/Network/BitTorrent/DHT/Routing.hs
+++ b/src/Network/BitTorrent/DHT/Routing.hs
@@ -8,12 +8,14 @@
8{-# LANGUAGE RecordWildCards #-} 8{-# LANGUAGE RecordWildCards #-}
9{-# LANGUAGE TypeOperators #-} 9{-# LANGUAGE TypeOperators #-}
10{-# LANGUAGE DeriveGeneric #-} 10{-# LANGUAGE DeriveGeneric #-}
11{-# OPTIONS_GHC -fno-warn-orphans #-}
11module Network.BitTorrent.DHT.Routing 12module Network.BitTorrent.DHT.Routing
12 ( -- * Routing table 13 ( -- * Routing table
13 Table 14 Table
14 , BucketCount 15 , BucketCount
15 16
16 -- * Routing 17 -- * Routing
18 , Timestamp
17 , Routing 19 , Routing
18 , runRouting 20 , runRouting
19 21
@@ -89,12 +91,11 @@ insert ping (k, v) = go 0
89-----------------------------------------------------------------------} 91-----------------------------------------------------------------------}
90 92
91type Timestamp = POSIXTime 93type Timestamp = POSIXTime
92type PingInterval = POSIXTime
93 94
94data Routing ip result 95data Routing ip result
95 = Full result 96 = Full result
96 | Done (Timestamp -> result) 97 | Done (Timestamp -> result)
97 | Refresh (NodeAddr ip) (([NodeInfo ip], Timestamp) -> Routing ip result) 98 | Refresh NodeId (([NodeInfo ip], Timestamp) -> Routing ip result)
98 | NeedPing (NodeAddr ip) (Maybe Timestamp -> Routing ip result) 99 | NeedPing (NodeAddr ip) (Maybe Timestamp -> Routing ip result)
99 100
100instance Functor (Routing ip) where 101instance Functor (Routing ip) where
@@ -107,23 +108,24 @@ runRouting :: (Monad m, Eq ip)
107 => (NodeAddr ip -> m Bool) -- ^ ping_node 108 => (NodeAddr ip -> m Bool) -- ^ ping_node
108 -> (NodeId -> m [NodeInfo ip]) -- ^ find_nodes 109 -> (NodeId -> m [NodeInfo ip]) -- ^ find_nodes
109 -> m Timestamp -- ^ timestamper 110 -> m Timestamp -- ^ timestamper
110 -> Routing ip f 111 -> Routing ip f -- ^ action
111 -> m f -- ^ result 112 -> m f -- ^ result
112runRouting ping_node find_nodes timestamp = go 113runRouting ping_node find_nodes timestamper = go
113 where 114 where
114 go (Full r) = return r 115 go (Full r) = return r
115 go (Done f) = liftM f timestamp 116 go (Done f) = liftM f timestamper
116 go (NeedPing addr f) = do 117 go (NeedPing addr f) = do
117 pong <- ping_node addr 118 pong <- ping_node addr
118 if pong 119 if pong
119 then do 120 then do
120 time <- timestamp 121 time <- timestamper
121 go (f (Just time)) 122 go (f (Just time))
122 else go (f Nothing) 123 else go (f Nothing)
123 124
124 go (Refresh nodes f) = do 125 go (Refresh nid f) = do
125 let nid = undefined 126 infos <- find_nodes nid
126 go (f undefined) 127 time <- timestamper
128 go (f (infos, time))
127 129
128{----------------------------------------------------------------------- 130{-----------------------------------------------------------------------
129 Bucket 131 Bucket
@@ -186,7 +188,7 @@ insertNode info bucket
186 -- update the all bucket if it is too outdated 188 -- update the all bucket if it is too outdated
187 | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket 189 | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket
188 , lastSeen > delta 190 , lastSeen > delta
189 = Refresh nodeAddr $ \ (infos, t) -> 191 = Refresh nodeId $ \ (infos, t) ->
190 insertNode info $ 192 insertNode info $
191 L.foldr (\ x -> PSQ.insertWith max x t) bucket infos 193 L.foldr (\ x -> PSQ.insertWith max x t) bucket infos
192 194
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
new file mode 100644
index 00000000..71400609
--- /dev/null
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -0,0 +1,251 @@
1{-# LANGUAGE RecordWildCards #-}
2{-# LANGUAGE FlexibleContexts #-}
3{-# LANGUAGE FlexibleInstances #-}
4{-# LANGUAGE GeneralizedNewtypeDeriving #-}
5{-# LANGUAGE MultiParamTypeClasses #-}
6{-# LANGUAGE ScopedTypeVariables #-}
7{-# LANGUAGE TypeFamilies #-}
8
9{-# LANGUAGE RankNTypes #-} -- TODO remove
10module Network.BitTorrent.DHT.Session
11 ( -- * Session
12 DHT
13 , runDHT
14
15 -- * Tokens
16 , grantToken
17 , checkToken
18
19 -- * Routing table
20 , getNodeId
21 , getClosest
22 , getClosestHash
23 , insertNode
24
25 -- * Peer storage
26 , insertPeer
27 , getPeerList
28
29 -- * Messaging
30 , (<@>)
31 , NodeHandler
32 , nodeHandler
33 ) where
34
35import Control.Applicative
36import Control.Concurrent.STM
37import Control.Exception hiding (Handler)
38import Control.Monad.Reader
39import Control.Monad.Base
40import Control.Monad.Trans.Control
41import Control.Monad.Trans.Resource
42import Data.Default
43import Data.Hashable
44import Data.List as L
45import Data.Time
46import Data.Time.Clock.POSIX
47import System.Random (randomIO)
48
49import Data.Torrent.InfoHash
50import Network.KRPC
51import Network.BitTorrent.Core
52import Network.BitTorrent.Core.PeerAddr as P
53import Network.BitTorrent.DHT.Message
54import Network.BitTorrent.DHT.Routing as R
55import Network.BitTorrent.DHT.Token as T
56
57
58{-----------------------------------------------------------------------
59-- Tokens policy
60-----------------------------------------------------------------------}
61
62data SessionTokens = SessionTokens
63 { tokenMap :: !TokenMap
64 , lastUpdate :: !UTCTime
65 , maxInterval :: !NominalDiffTime
66 }
67
68nullSessionTokens :: IO SessionTokens
69nullSessionTokens = SessionTokens
70 <$> (tokens <$> liftIO randomIO)
71 <*> liftIO getCurrentTime
72 <*> pure defaultUpdateInterval
73
74invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens
75invalidateTokens curTime ts @ SessionTokens {..}
76 | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens
77 { tokenMap = update tokenMap
78 , lastUpdate = curTime
79 , maxInterval = maxInterval
80 }
81 | otherwise = ts
82
83{-----------------------------------------------------------------------
84-- Session
85-----------------------------------------------------------------------}
86
87data Node ip = Node
88 { manager :: !(Manager (DHT ip))
89 , routingTable :: !(TVar (Table ip))
90 , contactInfo :: !(TVar (PeerStore ip))
91 , sessionTokens :: !(TVar SessionTokens)
92 }
93
94newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a }
95 deriving ( Functor, Applicative, Monad
96 , MonadIO, MonadBase IO
97 , MonadReader (Node ip)
98 )
99instance MonadBaseControl IO (DHT ip) where
100 newtype StM (DHT ip) a = StM {
101 unSt :: StM (ReaderT (Node ip) (ResourceT IO)) a
102 }
103 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' ->
104 cc $ \ (DHT m) -> StM <$> cc' m
105 {-# INLINE liftBaseWith #-}
106
107 restoreM = DHT . restoreM . unSt
108 {-# INLINE restoreM #-}
109
110instance MonadKRPC (DHT ip) (DHT ip) where
111 getManager = asks manager
112
113runDHT :: forall ip a. Address ip
114 => NodeAddr ip -- ^ node address to bind;
115 -> [Handler (DHT ip)] -- ^ handlers to run on accepted queries;
116 -> DHT ip a -- ^ DHT action to run;
117 -> IO a -- ^ result.
118runDHT naddr handlers action = runResourceT $ do
119 (_, m) <- allocate (newManager (toSockAddr naddr) handlers) closeManager
120 myId <- liftIO genNodeId
121 node <- liftIO $ Node m
122 <$> newTVarIO (nullTable myId)
123 <*> newTVarIO def
124 <*> (newTVarIO =<< nullSessionTokens)
125 runReaderT (unDHT (listen >> action)) node
126
127{-----------------------------------------------------------------------
128-- Routing
129-----------------------------------------------------------------------}
130
131-- TODO fork?
132routing :: Address ip => Routing ip a -> DHT ip a
133routing = runRouting ping refreshNodes getTimestamp
134
135-- TODO add timeout
136ping :: Address ip => NodeAddr ip -> DHT ip Bool
137ping addr = do
138 Ping <- Ping <@> addr
139 return True
140
141-- FIXME do not use getClosest sinse we should /refresh/ them
142refreshNodes :: Address ip => NodeId -> DHT ip [NodeInfo ip]
143refreshNodes nid = do
144 nodes <- getClosest nid
145 nss <- forM (nodeAddr <$> nodes) $ \ addr -> do
146 NodeFound ns <- FindNode nid <@> addr
147 return ns
148 return $ L.concat nss
149
150getTimestamp :: DHT ip Timestamp
151getTimestamp = liftIO $ utcTimeToPOSIXSeconds <$> getCurrentTime
152
153{-----------------------------------------------------------------------
154-- Tokens
155-----------------------------------------------------------------------}
156
157tryUpdateSecret :: DHT ip ()
158tryUpdateSecret = do
159 curTime <- liftIO getCurrentTime
160 toks <- asks sessionTokens
161 liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime)
162
163grantToken :: Hashable a => NodeAddr a -> DHT ip Token
164grantToken addr = do
165 tryUpdateSecret
166 toks <- asks sessionTokens >>= liftIO . readTVarIO
167 return $ T.lookup addr $ tokenMap toks
168
169-- | Throws 'ProtocolError' if token is invalid or already expired.
170checkToken :: Hashable a => NodeAddr a -> Token -> DHT ip ()
171checkToken addr questionableToken = do
172 tryUpdateSecret
173 toks <- asks sessionTokens >>= liftIO . readTVarIO
174 unless (member addr questionableToken (tokenMap toks)) $
175 liftIO $ throwIO $ KError ProtocolError "bad token" ""
176 -- todo reset transaction id in krpc
177
178{-----------------------------------------------------------------------
179-- Routing table
180-----------------------------------------------------------------------}
181
182getTable :: DHT ip (Table ip)
183getTable = do
184 var <- asks routingTable
185 liftIO (readTVarIO var)
186
187putTable :: Table ip -> DHT ip ()
188putTable table = do
189 var <- asks routingTable
190 liftIO (atomically (writeTVar var table))
191
192getNodeId :: DHT ip NodeId
193getNodeId = thisId <$> getTable
194
195getClosest :: Eq ip => NodeId -> DHT ip [NodeInfo ip]
196getClosest nid = kclosest 8 nid <$> getTable
197
198getClosestHash :: Eq ip => InfoHash -> DHT ip [NodeInfo ip]
199getClosestHash ih = kclosestHash 8 ih <$> getTable
200
201insertNode :: Address ip => NodeInfo ip -> DHT ip ()
202insertNode info = do
203 t <- getTable
204 t' <- routing (R.insert info t)
205 putTable t'
206
207{-----------------------------------------------------------------------
208-- Peer storage
209-----------------------------------------------------------------------}
210
211insertPeer :: Eq ip => InfoHash -> PeerAddr ip -> DHT ip ()
212insertPeer ih addr = do
213 var <- asks contactInfo
214 liftIO $ atomically $ modifyTVar' var (P.insert ih addr)
215
216lookupPeers :: InfoHash -> DHT ip [PeerAddr ip]
217lookupPeers ih = do
218 var <- asks contactInfo
219 liftIO $ P.lookup ih <$> readTVarIO var
220
221type PeerList ip = Either [NodeInfo ip] [PeerAddr ip]
222
223getPeerList :: Eq ip => InfoHash -> DHT ip (PeerList ip)
224getPeerList ih = do
225 ps <- lookupPeers ih
226 if L.null ps
227 then Left <$> getClosestHash ih
228 else return (Right ps)
229
230{-----------------------------------------------------------------------
231-- Messaging
232-----------------------------------------------------------------------}
233
234(<@>) :: Address ip => KRPC (Query a) (Response b)
235 => a -> NodeAddr ip -> DHT ip b
236q <@> addr = do
237 nid <- getNodeId
238 Response remoteId r <- query (toSockAddr addr) (Query nid q)
239 insertNode (NodeInfo remoteId addr)
240 return r
241
242type NodeHandler ip = Handler (DHT ip)
243
244nodeHandler :: Address ip => KRPC (Query a) (Response b)
245 => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip
246nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do
247 case fromSockAddr sockAddr of
248 Nothing -> liftIO $ throwIO $ KError GenericError "bad address" ""
249 Just naddr -> do
250 insertNode (NodeInfo remoteId naddr)
251 Response <$> getNodeId <*> action naddr q