summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/DHT')
-rw-r--r--src/Network/BitTorrent/DHT/Protocol.hs329
-rw-r--r--src/Network/BitTorrent/DHT/Routing.hs22
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs251
3 files changed, 263 insertions, 339 deletions
diff --git a/src/Network/BitTorrent/DHT/Protocol.hs b/src/Network/BitTorrent/DHT/Protocol.hs
deleted file mode 100644
index 8528f0e0..00000000
--- a/src/Network/BitTorrent/DHT/Protocol.hs
+++ /dev/null
@@ -1,329 +0,0 @@
1module Network.BitTorrent.DHT.Protocol
2 (
3 newNodeSession
4
5 -- * Tracker
6 , ping
7 , findNode
8 , getPeers
9 , announcePeer
10
11 -- * Server
12 , dhtServer
13 ) where
14
15import Control.Applicative
16import Control.Concurrent
17import Control.Concurrent.STM
18import Control.Monad
19import Control.Exception
20import Data.ByteString
21import Data.Serialize as S
22import Data.Function
23import Data.Ord
24import Data.Maybe
25import Data.List as L
26import Data.Map as M
27import Data.HashMap.Strict as HM
28import Network
29import Network.Socket
30import System.Entropy
31
32import Data.BEncode
33import Network.KRPC
34import Network.KRPC.Protocol
35import Network.BitTorrent.Peer
36import Network.BitTorrent.Exchange.Protocol ()
37
38{-----------------------------------------------------------------------
39 Node
40-----------------------------------------------------------------------}
41
42type NodeId = ByteString
43
44-- TODO WARN is the 'system' random suitable for this?
45-- | Generate random NodeID used for the entire session.
46-- Distribution of ID's should be as uniform as possible.
47--
48genNodeId :: IO NodeId
49genNodeId = getEntropy 20
50
51data NodeAddr = NodeAddr {
52 nodeIP :: {-# UNPACK #-} !HostAddress
53 , nodePort :: {-# UNPACK #-} !PortNumber
54 } deriving (Show, Eq)
55
56instance Serialize NodeAddr where
57 get = NodeAddr <$> getWord32be <*> get
58 put NodeAddr {..} = putWord32be nodeIP >> put nodePort
59
60data NodeInfo = NodeInfo {
61 nodeID :: !NodeId
62 , nodeAddr :: !NodeAddr
63 } deriving (Show, Eq)
64
65instance Serialize NodeInfo where
66 get = NodeInfo <$> getByteString 20 <*> get
67 put NodeInfo {..} = put nodeID >> put nodeAddr
68
69type CompactInfo = ByteString
70
71decodeCompact :: CompactInfo -> [NodeInfo]
72decodeCompact = either (const []) id . S.runGet (many get)
73
74encodeCompact :: [NodeId] -> CompactInfo
75encodeCompact = S.runPut . mapM_ put
76
77decodePeerList :: [BEncode] -> [PeerAddr]
78decodePeerList = undefined
79
80encodePeerList :: [PeerAddr] -> [BEncode]
81encodePeerList = undefined
82
83type Distance = NodeId
84
85{-----------------------------------------------------------------------
86 Tokens
87-----------------------------------------------------------------------}
88
89type Secret = Int
90
91genSecret :: IO Secret
92genSecret = error "secret"
93
94-- | Instead of periodically loop over the all nodes in the routing
95-- table with some given interval (or some other tricky method
96-- e.g. using timeouts) we can just update tokens on demand - if no
97-- one asks for a token then the token _should_ not change at all.
98--
99type Token = ByteString
100
101defaultToken :: Token
102defaultToken = "0xdeadbeef"
103
104genToken :: NodeAddr -> Secret -> Token
105genToken _ _ = defaultToken
106
107{-----------------------------------------------------------------------
108 Routing table
109-----------------------------------------------------------------------}
110
111type ContactInfo = HashMap InfoHash [PeerAddr]
112
113insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo
114insertPeer ih addr = HM.insertWith (++) ih [addr]
115
116lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr]
117lookupPeers ih = fromMaybe [] . HM.lookup ih
118
119-- TODO use more compact routing table
120type RoutingTable = HashMap NodeId NodeAddr
121
122insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable
123insertNode = HM.insert
124
125type Alpha = Int
126
127defaultAlpha :: Alpha
128defaultAlpha = 8
129
130-- TODO
131kclosest :: Int -> NodeId -> RoutingTable -> [NodeId]
132kclosest = undefined
133
134{-----------------------------------------------------------------------
135 Node session
136-----------------------------------------------------------------------}
137
138data NodeSession = NodeSession {
139 nodeId :: !NodeId
140 , routingTable :: !(TVar RoutingTable)
141 , contactInfo :: !(TVar ContactInfo)
142-- , currentSecret :: !(TVar Secret)
143-- , secretTimestamp :: !(TVar Timestamp)
144 , alpha :: !Alpha
145 , listenerPort :: !PortNumber
146 }
147
148instance Eq NodeSession where
149 (==) = (==) `on` nodeId
150
151instance Ord NodeSession where
152 compare = comparing nodeId
153
154newNodeSession :: PortNumber -> IO NodeSession
155newNodeSession lport
156 = NodeSession
157 <$> genNodeId
158 <*> newTVarIO HM.empty
159 <*> newTVarIO HM.empty
160 <*> pure defaultAlpha
161 <*> pure lport
162
163assignToken :: NodeSession -> NodeId -> IO Token
164assignToken _ _ = return ""
165
166-- TODO
167checkToken :: NodeId -> Token -> NodeSession -> IO Bool
168checkToken _ _ _ = return True
169
170updateTimestamp :: NodeSession -> NodeId -> IO ()
171updateTimestamp = error "updateTimestamp"
172
173updateToken :: NodeSession -> NodeId -> Token -> IO ()
174updateToken _ _ _ = error "updateToken"
175
176{-----------------------------------------------------------------------
177 DHT Queries
178-----------------------------------------------------------------------}
179
180pingM :: Method NodeId NodeId
181pingM = method "ping" ["id"] ["id"]
182
183findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo)
184findNodeM = method "find_node" ["id", "target"] ["id", "nodes"]
185
186-- | Lookup peers by a torrent infohash. This method might return
187-- different kind of responses depending on the routing table of
188-- queried node:
189--
190-- * If quieried node contains a peer list for the given infohash
191-- then the node should return the list in a "value" key. Note that
192-- list is encoded as compact peer address, not a compact node info.
193-- The result of 'get_peers' method have the following scheme:
194--
195-- > { "id" : "dht_server_node_id"
196-- > , "token" : "assigned_token"
197-- > , "values" : ["_IP_PO", "_ip_po"]
198-- > }
199--
200-- * If quieried node does not contain a list of peers associated
201-- with the given infohash, then node should return
202--
203-- > { "id" : "dht_server_node_id"
204-- > , "token" : "assigned_token"
205-- > , "nodes" : "compact_nodes_info"
206-- > }
207--
208-- The resulting dictionaries might differ only in a values\/nodes
209-- keys.
210--
211getPeersM :: Method (NodeId, InfoHash) BEncode
212getPeersM = method "get_peers" ["id", "info_hash"] []
213
214-- | Used to announce that the peer, controlling the quering node is
215-- downloading a torrent on a port.
216announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId
217announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"]
218
219{-----------------------------------------------------------------------
220 DHT Tracker
221-----------------------------------------------------------------------}
222-- TODO: update node timestamp on each successful call
223
224-- | Note that tracker side query functions could throw RPCException.
225type DHT a b = NodeSession -> NodeAddr -> a -> IO b
226
227ping :: DHT () ()
228ping NodeSession {..} addr @ NodeAddr {..} () = do
229 nid <- call (nodeIP, nodePort) pingM nodeId
230 atomically $ modifyTVar' routingTable $ HM.insert nid addr
231
232findNode :: DHT NodeId [NodeInfo]
233findNode ses @ NodeSession {..} NodeAddr {..} qnid = do
234 (nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid)
235 updateTimestamp ses nid
236 return (decodeCompact info)
237
238getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr])
239getPeers ses @ NodeSession {..} NodeAddr {..} ih = do
240 resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih)
241 (nid, tok, res) <- extrResp resp
242 updateTimestamp ses nid
243 updateToken ses nid tok
244 return res
245 where
246 extrResp (BDict d)
247 | Just (BString nid ) <- M.lookup "id" d
248 , Just (BString tok ) <- M.lookup "token" d
249 , Just (BList values) <- M.lookup "values" d
250 = return $ (nid, tok, Right $ decodePeerList values)
251
252 | Just (BString nid ) <- M.lookup "id" d
253 , Just (BString tok ) <- M.lookup "token" d
254 , Just (BString nodes) <- M.lookup "nodes" d
255 = return (nid, tok, Left $ decodeCompact nodes)
256
257 extrResp _ = throw $ RPCException msg
258 where msg = ProtocolError "unable to extract getPeers resp"
259
260-- remove token from signature, handle the all token stuff by NodeSession
261
262-- | Note that before ever calling this method you should call the
263-- getPeerList.
264announcePeer :: DHT (InfoHash, Token) NodeId
265announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do
266 nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok)
267 updateTimestamp ses nid
268 return nid
269
270{-----------------------------------------------------------------------
271 DHT Server
272-----------------------------------------------------------------------}
273-- TODO: update node timestamp on each successful call
274-- NOTE: ensure all server operations run in O(1)
275
276type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b
277
278pingS :: ServerHandler NodeId NodeId
279pingS NodeSession {..} addr nid = do
280 atomically $ modifyTVar' routingTable $ insertNode nid addr
281 return nodeId
282
283findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo)
284findNodeS ses @ NodeSession {..} _ (nid, qnid) = do
285 updateTimestamp ses nid
286 rt <- atomically $ readTVar routingTable
287 return (nodeId, encodeCompact $ kclosest alpha qnid rt)
288
289getPeersS :: ServerHandler (NodeId, InfoHash) BEncode
290getPeersS ses @ NodeSession {..} _ (nid, ih) = do
291 updateTimestamp ses nid
292 mkResp <$> assignToken ses nid <*> findPeers
293 where
294 findPeers = do
295 list <- lookupPeers ih <$> readTVarIO contactInfo
296 if not (L.null list)
297 then return $ Right list
298 else do
299 rt <- readTVarIO routingTable
300 let nodes = kclosest alpha (getInfoHash ih) rt
301 return $ Left nodes
302
303 mkDict tok res = [("id",BString nodeId), ("token", BString tok), res]
304 mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes)
305 mkResult (Right values) = ("values", BList $ encodePeerList values)
306 mkResp tok = BDict . M.fromList . mkDict tok . mkResult
307
308announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId
309announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do
310 updateTimestamp ses nid
311 registered <- checkToken nid token ses
312 when registered $ do
313 atomically $ do
314 let peerAddr = PeerAddr Nothing nodeIP port
315 modifyTVar contactInfo $ insertPeer ih peerAddr
316 return nodeId
317
318dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO ()
319dhtTracker = undefined
320
321dhtServer :: NodeSession -> PortNumber -> IO ()
322dhtServer s p = server p methods
323 where
324 methods =
325 [ pingM ==> pingS s undefined
326 , findNodeM ==> findNodeS s undefined
327 , getPeersM ==> getPeersS s undefined
328 , announcePeerM ==> announcePeerS s undefined
329 ] \ No newline at end of file
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs
index 5f00a924..fd2197f0 100644
--- a/src/Network/BitTorrent/DHT/Routing.hs
+++ b/src/Network/BitTorrent/DHT/Routing.hs
@@ -8,12 +8,14 @@
8{-# LANGUAGE RecordWildCards #-} 8{-# LANGUAGE RecordWildCards #-}
9{-# LANGUAGE TypeOperators #-} 9{-# LANGUAGE TypeOperators #-}
10{-# LANGUAGE DeriveGeneric #-} 10{-# LANGUAGE DeriveGeneric #-}
11{-# OPTIONS_GHC -fno-warn-orphans #-}
11module Network.BitTorrent.DHT.Routing 12module Network.BitTorrent.DHT.Routing
12 ( -- * Routing table 13 ( -- * Routing table
13 Table 14 Table
14 , BucketCount 15 , BucketCount
15 16
16 -- * Routing 17 -- * Routing
18 , Timestamp
17 , Routing 19 , Routing
18 , runRouting 20 , runRouting
19 21
@@ -89,12 +91,11 @@ insert ping (k, v) = go 0
89-----------------------------------------------------------------------} 91-----------------------------------------------------------------------}
90 92
91type Timestamp = POSIXTime 93type Timestamp = POSIXTime
92type PingInterval = POSIXTime
93 94
94data Routing ip result 95data Routing ip result
95 = Full result 96 = Full result
96 | Done (Timestamp -> result) 97 | Done (Timestamp -> result)
97 | Refresh (NodeAddr ip) (([NodeInfo ip], Timestamp) -> Routing ip result) 98 | Refresh NodeId (([NodeInfo ip], Timestamp) -> Routing ip result)
98 | NeedPing (NodeAddr ip) (Maybe Timestamp -> Routing ip result) 99 | NeedPing (NodeAddr ip) (Maybe Timestamp -> Routing ip result)
99 100
100instance Functor (Routing ip) where 101instance Functor (Routing ip) where
@@ -107,23 +108,24 @@ runRouting :: (Monad m, Eq ip)
107 => (NodeAddr ip -> m Bool) -- ^ ping_node 108 => (NodeAddr ip -> m Bool) -- ^ ping_node
108 -> (NodeId -> m [NodeInfo ip]) -- ^ find_nodes 109 -> (NodeId -> m [NodeInfo ip]) -- ^ find_nodes
109 -> m Timestamp -- ^ timestamper 110 -> m Timestamp -- ^ timestamper
110 -> Routing ip f 111 -> Routing ip f -- ^ action
111 -> m f -- ^ result 112 -> m f -- ^ result
112runRouting ping_node find_nodes timestamp = go 113runRouting ping_node find_nodes timestamper = go
113 where 114 where
114 go (Full r) = return r 115 go (Full r) = return r
115 go (Done f) = liftM f timestamp 116 go (Done f) = liftM f timestamper
116 go (NeedPing addr f) = do 117 go (NeedPing addr f) = do
117 pong <- ping_node addr 118 pong <- ping_node addr
118 if pong 119 if pong
119 then do 120 then do
120 time <- timestamp 121 time <- timestamper
121 go (f (Just time)) 122 go (f (Just time))
122 else go (f Nothing) 123 else go (f Nothing)
123 124
124 go (Refresh nodes f) = do 125 go (Refresh nid f) = do
125 let nid = undefined 126 infos <- find_nodes nid
126 go (f undefined) 127 time <- timestamper
128 go (f (infos, time))
127 129
128{----------------------------------------------------------------------- 130{-----------------------------------------------------------------------
129 Bucket 131 Bucket
@@ -186,7 +188,7 @@ insertNode info bucket
186 -- update the all bucket if it is too outdated 188 -- update the all bucket if it is too outdated
187 | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket 189 | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket
188 , lastSeen > delta 190 , lastSeen > delta
189 = Refresh nodeAddr $ \ (infos, t) -> 191 = Refresh nodeId $ \ (infos, t) ->
190 insertNode info $ 192 insertNode info $
191 L.foldr (\ x -> PSQ.insertWith max x t) bucket infos 193 L.foldr (\ x -> PSQ.insertWith max x t) bucket infos
192 194
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
new file mode 100644
index 00000000..71400609
--- /dev/null
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -0,0 +1,251 @@
1{-# LANGUAGE RecordWildCards #-}
2{-# LANGUAGE FlexibleContexts #-}
3{-# LANGUAGE FlexibleInstances #-}
4{-# LANGUAGE GeneralizedNewtypeDeriving #-}
5{-# LANGUAGE MultiParamTypeClasses #-}
6{-# LANGUAGE ScopedTypeVariables #-}
7{-# LANGUAGE TypeFamilies #-}
8
9{-# LANGUAGE RankNTypes #-} -- TODO remove
10module Network.BitTorrent.DHT.Session
11 ( -- * Session
12 DHT
13 , runDHT
14
15 -- * Tokens
16 , grantToken
17 , checkToken
18
19 -- * Routing table
20 , getNodeId
21 , getClosest
22 , getClosestHash
23 , insertNode
24
25 -- * Peer storage
26 , insertPeer
27 , getPeerList
28
29 -- * Messaging
30 , (<@>)
31 , NodeHandler
32 , nodeHandler
33 ) where
34
35import Control.Applicative
36import Control.Concurrent.STM
37import Control.Exception hiding (Handler)
38import Control.Monad.Reader
39import Control.Monad.Base
40import Control.Monad.Trans.Control
41import Control.Monad.Trans.Resource
42import Data.Default
43import Data.Hashable
44import Data.List as L
45import Data.Time
46import Data.Time.Clock.POSIX
47import System.Random (randomIO)
48
49import Data.Torrent.InfoHash
50import Network.KRPC
51import Network.BitTorrent.Core
52import Network.BitTorrent.Core.PeerAddr as P
53import Network.BitTorrent.DHT.Message
54import Network.BitTorrent.DHT.Routing as R
55import Network.BitTorrent.DHT.Token as T
56
57
58{-----------------------------------------------------------------------
59-- Tokens policy
60-----------------------------------------------------------------------}
61
62data SessionTokens = SessionTokens
63 { tokenMap :: !TokenMap
64 , lastUpdate :: !UTCTime
65 , maxInterval :: !NominalDiffTime
66 }
67
68nullSessionTokens :: IO SessionTokens
69nullSessionTokens = SessionTokens
70 <$> (tokens <$> liftIO randomIO)
71 <*> liftIO getCurrentTime
72 <*> pure defaultUpdateInterval
73
74invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens
75invalidateTokens curTime ts @ SessionTokens {..}
76 | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens
77 { tokenMap = update tokenMap
78 , lastUpdate = curTime
79 , maxInterval = maxInterval
80 }
81 | otherwise = ts
82
83{-----------------------------------------------------------------------
84-- Session
85-----------------------------------------------------------------------}
86
87data Node ip = Node
88 { manager :: !(Manager (DHT ip))
89 , routingTable :: !(TVar (Table ip))
90 , contactInfo :: !(TVar (PeerStore ip))
91 , sessionTokens :: !(TVar SessionTokens)
92 }
93
94newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a }
95 deriving ( Functor, Applicative, Monad
96 , MonadIO, MonadBase IO
97 , MonadReader (Node ip)
98 )
99instance MonadBaseControl IO (DHT ip) where
100 newtype StM (DHT ip) a = StM {
101 unSt :: StM (ReaderT (Node ip) (ResourceT IO)) a
102 }
103 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' ->
104 cc $ \ (DHT m) -> StM <$> cc' m
105 {-# INLINE liftBaseWith #-}
106
107 restoreM = DHT . restoreM . unSt
108 {-# INLINE restoreM #-}
109
110instance MonadKRPC (DHT ip) (DHT ip) where
111 getManager = asks manager
112
113runDHT :: forall ip a. Address ip
114 => NodeAddr ip -- ^ node address to bind;
115 -> [Handler (DHT ip)] -- ^ handlers to run on accepted queries;
116 -> DHT ip a -- ^ DHT action to run;
117 -> IO a -- ^ result.
118runDHT naddr handlers action = runResourceT $ do
119 (_, m) <- allocate (newManager (toSockAddr naddr) handlers) closeManager
120 myId <- liftIO genNodeId
121 node <- liftIO $ Node m
122 <$> newTVarIO (nullTable myId)
123 <*> newTVarIO def
124 <*> (newTVarIO =<< nullSessionTokens)
125 runReaderT (unDHT (listen >> action)) node
126
127{-----------------------------------------------------------------------
128-- Routing
129-----------------------------------------------------------------------}
130
131-- TODO fork?
132routing :: Address ip => Routing ip a -> DHT ip a
133routing = runRouting ping refreshNodes getTimestamp
134
135-- TODO add timeout
136ping :: Address ip => NodeAddr ip -> DHT ip Bool
137ping addr = do
138 Ping <- Ping <@> addr
139 return True
140
141-- FIXME do not use getClosest sinse we should /refresh/ them
142refreshNodes :: Address ip => NodeId -> DHT ip [NodeInfo ip]
143refreshNodes nid = do
144 nodes <- getClosest nid
145 nss <- forM (nodeAddr <$> nodes) $ \ addr -> do
146 NodeFound ns <- FindNode nid <@> addr
147 return ns
148 return $ L.concat nss
149
150getTimestamp :: DHT ip Timestamp
151getTimestamp = liftIO $ utcTimeToPOSIXSeconds <$> getCurrentTime
152
153{-----------------------------------------------------------------------
154-- Tokens
155-----------------------------------------------------------------------}
156
157tryUpdateSecret :: DHT ip ()
158tryUpdateSecret = do
159 curTime <- liftIO getCurrentTime
160 toks <- asks sessionTokens
161 liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime)
162
163grantToken :: Hashable a => NodeAddr a -> DHT ip Token
164grantToken addr = do
165 tryUpdateSecret
166 toks <- asks sessionTokens >>= liftIO . readTVarIO
167 return $ T.lookup addr $ tokenMap toks
168
169-- | Throws 'ProtocolError' if token is invalid or already expired.
170checkToken :: Hashable a => NodeAddr a -> Token -> DHT ip ()
171checkToken addr questionableToken = do
172 tryUpdateSecret
173 toks <- asks sessionTokens >>= liftIO . readTVarIO
174 unless (member addr questionableToken (tokenMap toks)) $
175 liftIO $ throwIO $ KError ProtocolError "bad token" ""
176 -- todo reset transaction id in krpc
177
178{-----------------------------------------------------------------------
179-- Routing table
180-----------------------------------------------------------------------}
181
182getTable :: DHT ip (Table ip)
183getTable = do
184 var <- asks routingTable
185 liftIO (readTVarIO var)
186
187putTable :: Table ip -> DHT ip ()
188putTable table = do
189 var <- asks routingTable
190 liftIO (atomically (writeTVar var table))
191
192getNodeId :: DHT ip NodeId
193getNodeId = thisId <$> getTable
194
195getClosest :: Eq ip => NodeId -> DHT ip [NodeInfo ip]
196getClosest nid = kclosest 8 nid <$> getTable
197
198getClosestHash :: Eq ip => InfoHash -> DHT ip [NodeInfo ip]
199getClosestHash ih = kclosestHash 8 ih <$> getTable
200
201insertNode :: Address ip => NodeInfo ip -> DHT ip ()
202insertNode info = do
203 t <- getTable
204 t' <- routing (R.insert info t)
205 putTable t'
206
207{-----------------------------------------------------------------------
208-- Peer storage
209-----------------------------------------------------------------------}
210
211insertPeer :: Eq ip => InfoHash -> PeerAddr ip -> DHT ip ()
212insertPeer ih addr = do
213 var <- asks contactInfo
214 liftIO $ atomically $ modifyTVar' var (P.insert ih addr)
215
216lookupPeers :: InfoHash -> DHT ip [PeerAddr ip]
217lookupPeers ih = do
218 var <- asks contactInfo
219 liftIO $ P.lookup ih <$> readTVarIO var
220
221type PeerList ip = Either [NodeInfo ip] [PeerAddr ip]
222
223getPeerList :: Eq ip => InfoHash -> DHT ip (PeerList ip)
224getPeerList ih = do
225 ps <- lookupPeers ih
226 if L.null ps
227 then Left <$> getClosestHash ih
228 else return (Right ps)
229
230{-----------------------------------------------------------------------
231-- Messaging
232-----------------------------------------------------------------------}
233
234(<@>) :: Address ip => KRPC (Query a) (Response b)
235 => a -> NodeAddr ip -> DHT ip b
236q <@> addr = do
237 nid <- getNodeId
238 Response remoteId r <- query (toSockAddr addr) (Query nid q)
239 insertNode (NodeInfo remoteId addr)
240 return r
241
242type NodeHandler ip = Handler (DHT ip)
243
244nodeHandler :: Address ip => KRPC (Query a) (Response b)
245 => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip
246nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do
247 case fromSockAddr sockAddr of
248 Nothing -> liftIO $ throwIO $ KError GenericError "bad address" ""
249 Just naddr -> do
250 insertNode (NodeInfo remoteId naddr)
251 Response <$> getNodeId <*> action naddr q