diff options
Diffstat (limited to 'src/Network/BitTorrent/DHT/Session.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs new file mode 100644 index 00000000..71400609 --- /dev/null +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -0,0 +1,251 @@ | |||
1 | {-# LANGUAGE RecordWildCards #-} | ||
2 | {-# LANGUAGE FlexibleContexts #-} | ||
3 | {-# LANGUAGE FlexibleInstances #-} | ||
4 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
5 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
6 | {-# LANGUAGE ScopedTypeVariables #-} | ||
7 | {-# LANGUAGE TypeFamilies #-} | ||
8 | |||
9 | {-# LANGUAGE RankNTypes #-} -- TODO remove | ||
10 | module Network.BitTorrent.DHT.Session | ||
11 | ( -- * Session | ||
12 | DHT | ||
13 | , runDHT | ||
14 | |||
15 | -- * Tokens | ||
16 | , grantToken | ||
17 | , checkToken | ||
18 | |||
19 | -- * Routing table | ||
20 | , getNodeId | ||
21 | , getClosest | ||
22 | , getClosestHash | ||
23 | , insertNode | ||
24 | |||
25 | -- * Peer storage | ||
26 | , insertPeer | ||
27 | , getPeerList | ||
28 | |||
29 | -- * Messaging | ||
30 | , (<@>) | ||
31 | , NodeHandler | ||
32 | , nodeHandler | ||
33 | ) where | ||
34 | |||
35 | import Control.Applicative | ||
36 | import Control.Concurrent.STM | ||
37 | import Control.Exception hiding (Handler) | ||
38 | import Control.Monad.Reader | ||
39 | import Control.Monad.Base | ||
40 | import Control.Monad.Trans.Control | ||
41 | import Control.Monad.Trans.Resource | ||
42 | import Data.Default | ||
43 | import Data.Hashable | ||
44 | import Data.List as L | ||
45 | import Data.Time | ||
46 | import Data.Time.Clock.POSIX | ||
47 | import System.Random (randomIO) | ||
48 | |||
49 | import Data.Torrent.InfoHash | ||
50 | import Network.KRPC | ||
51 | import Network.BitTorrent.Core | ||
52 | import Network.BitTorrent.Core.PeerAddr as P | ||
53 | import Network.BitTorrent.DHT.Message | ||
54 | import Network.BitTorrent.DHT.Routing as R | ||
55 | import Network.BitTorrent.DHT.Token as T | ||
56 | |||
57 | |||
58 | {----------------------------------------------------------------------- | ||
59 | -- Tokens policy | ||
60 | -----------------------------------------------------------------------} | ||
61 | |||
62 | data SessionTokens = SessionTokens | ||
63 | { tokenMap :: !TokenMap | ||
64 | , lastUpdate :: !UTCTime | ||
65 | , maxInterval :: !NominalDiffTime | ||
66 | } | ||
67 | |||
68 | nullSessionTokens :: IO SessionTokens | ||
69 | nullSessionTokens = SessionTokens | ||
70 | <$> (tokens <$> liftIO randomIO) | ||
71 | <*> liftIO getCurrentTime | ||
72 | <*> pure defaultUpdateInterval | ||
73 | |||
74 | invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens | ||
75 | invalidateTokens curTime ts @ SessionTokens {..} | ||
76 | | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens | ||
77 | { tokenMap = update tokenMap | ||
78 | , lastUpdate = curTime | ||
79 | , maxInterval = maxInterval | ||
80 | } | ||
81 | | otherwise = ts | ||
82 | |||
83 | {----------------------------------------------------------------------- | ||
84 | -- Session | ||
85 | -----------------------------------------------------------------------} | ||
86 | |||
87 | data Node ip = Node | ||
88 | { manager :: !(Manager (DHT ip)) | ||
89 | , routingTable :: !(TVar (Table ip)) | ||
90 | , contactInfo :: !(TVar (PeerStore ip)) | ||
91 | , sessionTokens :: !(TVar SessionTokens) | ||
92 | } | ||
93 | |||
94 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } | ||
95 | deriving ( Functor, Applicative, Monad | ||
96 | , MonadIO, MonadBase IO | ||
97 | , MonadReader (Node ip) | ||
98 | ) | ||
99 | instance MonadBaseControl IO (DHT ip) where | ||
100 | newtype StM (DHT ip) a = StM { | ||
101 | unSt :: StM (ReaderT (Node ip) (ResourceT IO)) a | ||
102 | } | ||
103 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> | ||
104 | cc $ \ (DHT m) -> StM <$> cc' m | ||
105 | {-# INLINE liftBaseWith #-} | ||
106 | |||
107 | restoreM = DHT . restoreM . unSt | ||
108 | {-# INLINE restoreM #-} | ||
109 | |||
110 | instance MonadKRPC (DHT ip) (DHT ip) where | ||
111 | getManager = asks manager | ||
112 | |||
113 | runDHT :: forall ip a. Address ip | ||
114 | => NodeAddr ip -- ^ node address to bind; | ||
115 | -> [Handler (DHT ip)] -- ^ handlers to run on accepted queries; | ||
116 | -> DHT ip a -- ^ DHT action to run; | ||
117 | -> IO a -- ^ result. | ||
118 | runDHT naddr handlers action = runResourceT $ do | ||
119 | (_, m) <- allocate (newManager (toSockAddr naddr) handlers) closeManager | ||
120 | myId <- liftIO genNodeId | ||
121 | node <- liftIO $ Node m | ||
122 | <$> newTVarIO (nullTable myId) | ||
123 | <*> newTVarIO def | ||
124 | <*> (newTVarIO =<< nullSessionTokens) | ||
125 | runReaderT (unDHT (listen >> action)) node | ||
126 | |||
127 | {----------------------------------------------------------------------- | ||
128 | -- Routing | ||
129 | -----------------------------------------------------------------------} | ||
130 | |||
131 | -- TODO fork? | ||
132 | routing :: Address ip => Routing ip a -> DHT ip a | ||
133 | routing = runRouting ping refreshNodes getTimestamp | ||
134 | |||
135 | -- TODO add timeout | ||
136 | ping :: Address ip => NodeAddr ip -> DHT ip Bool | ||
137 | ping addr = do | ||
138 | Ping <- Ping <@> addr | ||
139 | return True | ||
140 | |||
141 | -- FIXME do not use getClosest sinse we should /refresh/ them | ||
142 | refreshNodes :: Address ip => NodeId -> DHT ip [NodeInfo ip] | ||
143 | refreshNodes nid = do | ||
144 | nodes <- getClosest nid | ||
145 | nss <- forM (nodeAddr <$> nodes) $ \ addr -> do | ||
146 | NodeFound ns <- FindNode nid <@> addr | ||
147 | return ns | ||
148 | return $ L.concat nss | ||
149 | |||
150 | getTimestamp :: DHT ip Timestamp | ||
151 | getTimestamp = liftIO $ utcTimeToPOSIXSeconds <$> getCurrentTime | ||
152 | |||
153 | {----------------------------------------------------------------------- | ||
154 | -- Tokens | ||
155 | -----------------------------------------------------------------------} | ||
156 | |||
157 | tryUpdateSecret :: DHT ip () | ||
158 | tryUpdateSecret = do | ||
159 | curTime <- liftIO getCurrentTime | ||
160 | toks <- asks sessionTokens | ||
161 | liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime) | ||
162 | |||
163 | grantToken :: Hashable a => NodeAddr a -> DHT ip Token | ||
164 | grantToken addr = do | ||
165 | tryUpdateSecret | ||
166 | toks <- asks sessionTokens >>= liftIO . readTVarIO | ||
167 | return $ T.lookup addr $ tokenMap toks | ||
168 | |||
169 | -- | Throws 'ProtocolError' if token is invalid or already expired. | ||
170 | checkToken :: Hashable a => NodeAddr a -> Token -> DHT ip () | ||
171 | checkToken addr questionableToken = do | ||
172 | tryUpdateSecret | ||
173 | toks <- asks sessionTokens >>= liftIO . readTVarIO | ||
174 | unless (member addr questionableToken (tokenMap toks)) $ | ||
175 | liftIO $ throwIO $ KError ProtocolError "bad token" "" | ||
176 | -- todo reset transaction id in krpc | ||
177 | |||
178 | {----------------------------------------------------------------------- | ||
179 | -- Routing table | ||
180 | -----------------------------------------------------------------------} | ||
181 | |||
182 | getTable :: DHT ip (Table ip) | ||
183 | getTable = do | ||
184 | var <- asks routingTable | ||
185 | liftIO (readTVarIO var) | ||
186 | |||
187 | putTable :: Table ip -> DHT ip () | ||
188 | putTable table = do | ||
189 | var <- asks routingTable | ||
190 | liftIO (atomically (writeTVar var table)) | ||
191 | |||
192 | getNodeId :: DHT ip NodeId | ||
193 | getNodeId = thisId <$> getTable | ||
194 | |||
195 | getClosest :: Eq ip => NodeId -> DHT ip [NodeInfo ip] | ||
196 | getClosest nid = kclosest 8 nid <$> getTable | ||
197 | |||
198 | getClosestHash :: Eq ip => InfoHash -> DHT ip [NodeInfo ip] | ||
199 | getClosestHash ih = kclosestHash 8 ih <$> getTable | ||
200 | |||
201 | insertNode :: Address ip => NodeInfo ip -> DHT ip () | ||
202 | insertNode info = do | ||
203 | t <- getTable | ||
204 | t' <- routing (R.insert info t) | ||
205 | putTable t' | ||
206 | |||
207 | {----------------------------------------------------------------------- | ||
208 | -- Peer storage | ||
209 | -----------------------------------------------------------------------} | ||
210 | |||
211 | insertPeer :: Eq ip => InfoHash -> PeerAddr ip -> DHT ip () | ||
212 | insertPeer ih addr = do | ||
213 | var <- asks contactInfo | ||
214 | liftIO $ atomically $ modifyTVar' var (P.insert ih addr) | ||
215 | |||
216 | lookupPeers :: InfoHash -> DHT ip [PeerAddr ip] | ||
217 | lookupPeers ih = do | ||
218 | var <- asks contactInfo | ||
219 | liftIO $ P.lookup ih <$> readTVarIO var | ||
220 | |||
221 | type PeerList ip = Either [NodeInfo ip] [PeerAddr ip] | ||
222 | |||
223 | getPeerList :: Eq ip => InfoHash -> DHT ip (PeerList ip) | ||
224 | getPeerList ih = do | ||
225 | ps <- lookupPeers ih | ||
226 | if L.null ps | ||
227 | then Left <$> getClosestHash ih | ||
228 | else return (Right ps) | ||
229 | |||
230 | {----------------------------------------------------------------------- | ||
231 | -- Messaging | ||
232 | -----------------------------------------------------------------------} | ||
233 | |||
234 | (<@>) :: Address ip => KRPC (Query a) (Response b) | ||
235 | => a -> NodeAddr ip -> DHT ip b | ||
236 | q <@> addr = do | ||
237 | nid <- getNodeId | ||
238 | Response remoteId r <- query (toSockAddr addr) (Query nid q) | ||
239 | insertNode (NodeInfo remoteId addr) | ||
240 | return r | ||
241 | |||
242 | type NodeHandler ip = Handler (DHT ip) | ||
243 | |||
244 | nodeHandler :: Address ip => KRPC (Query a) (Response b) | ||
245 | => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip | ||
246 | nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do | ||
247 | case fromSockAddr sockAddr of | ||
248 | Nothing -> liftIO $ throwIO $ KError GenericError "bad address" "" | ||
249 | Just naddr -> do | ||
250 | insertNode (NodeInfo remoteId naddr) | ||
251 | Response <$> getNodeId <*> action naddr q | ||