diff options
Diffstat (limited to 'dht/src/Network/BitTorrent')
-rw-r--r-- | dht/src/Network/BitTorrent/DHT/ContactInfo.hs | 254 | ||||
-rw-r--r-- | dht/src/Network/BitTorrent/DHT/Readme.md | 13 | ||||
-rw-r--r-- | dht/src/Network/BitTorrent/DHT/Token.hs | 201 | ||||
-rw-r--r-- | dht/src/Network/BitTorrent/MainlineDHT.hs | 1169 | ||||
-rw-r--r-- | dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs | 24 |
5 files changed, 1661 insertions, 0 deletions
diff --git a/dht/src/Network/BitTorrent/DHT/ContactInfo.hs b/dht/src/Network/BitTorrent/DHT/ContactInfo.hs new file mode 100644 index 00000000..ec7e6658 --- /dev/null +++ b/dht/src/Network/BitTorrent/DHT/ContactInfo.hs | |||
@@ -0,0 +1,254 @@ | |||
1 | {-# LANGUAGE BangPatterns #-} | ||
2 | module Network.BitTorrent.DHT.ContactInfo | ||
3 | ( PeerStore | ||
4 | , PeerAddr(..) | ||
5 | , Network.BitTorrent.DHT.ContactInfo.lookup | ||
6 | , Network.BitTorrent.DHT.ContactInfo.freshPeers | ||
7 | , Network.BitTorrent.DHT.ContactInfo.insertPeer | ||
8 | , deleteOlderThan | ||
9 | , knownSwarms | ||
10 | ) where | ||
11 | |||
12 | import Control.Applicative | ||
13 | import Data.Default | ||
14 | import Data.List as L | ||
15 | import Data.Maybe | ||
16 | import Data.HashMap.Strict as HM | ||
17 | import Data.Serialize | ||
18 | import Data.Semigroup | ||
19 | import Data.Wrapper.PSQ as PSQ | ||
20 | import Data.Time.Clock.POSIX | ||
21 | import Data.ByteString (ByteString) | ||
22 | import Data.Word | ||
23 | |||
24 | import Data.Torrent | ||
25 | import Network.Address | ||
26 | |||
27 | -- {- | ||
28 | -- import Data.HashMap.Strict as HM | ||
29 | -- | ||
30 | -- import Data.Torrent.InfoHash | ||
31 | -- import Network.Address | ||
32 | -- | ||
33 | -- -- increase prefix when table is too large | ||
34 | -- -- decrease prefix when table is too small | ||
35 | -- -- filter outdated peers | ||
36 | -- | ||
37 | -- {----------------------------------------------------------------------- | ||
38 | -- -- PeerSet | ||
39 | -- -----------------------------------------------------------------------} | ||
40 | -- | ||
41 | -- type PeerSet a = [(PeerAddr, NodeInfo a, Timestamp)] | ||
42 | -- | ||
43 | -- -- compare PSQueue vs Ordered list | ||
44 | -- | ||
45 | -- takeNewest :: PeerSet a -> [PeerAddr] | ||
46 | -- takeNewest = undefined | ||
47 | -- | ||
48 | -- dropOld :: Timestamp -> PeerSet a -> PeerSet a | ||
49 | -- dropOld = undefined | ||
50 | -- | ||
51 | -- insert :: PeerAddr -> Timestamp -> PeerSet a -> PeerSet a | ||
52 | -- insert = undefined | ||
53 | -- | ||
54 | -- type Mask = Int | ||
55 | -- type Size = Int | ||
56 | -- type Timestamp = Int | ||
57 | -- | ||
58 | -- {----------------------------------------------------------------------- | ||
59 | -- -- InfoHashMap | ||
60 | -- -----------------------------------------------------------------------} | ||
61 | -- | ||
62 | -- -- compare handwritten prefix tree versus IntMap | ||
63 | -- | ||
64 | -- data Tree a | ||
65 | -- = Nil | ||
66 | -- | Tip !InfoHash !(PeerSet a) | ||
67 | -- | Bin !InfoHash !Mask !Size !Timestamp (Tree a) (Tree a) | ||
68 | -- | ||
69 | -- insertTree :: InfoHash -> a -> Tree a -> Tree a | ||
70 | -- insertTree = undefined | ||
71 | -- | ||
72 | -- type Prio = Int | ||
73 | -- | ||
74 | -- --shrink :: ContactInfo ip -> Int | ||
75 | -- shrink Nil = Nil | ||
76 | -- shrink (Tip _ _) = undefined | ||
77 | -- shrink (Bin _ _) = undefined | ||
78 | -- | ||
79 | -- {----------------------------------------------------------------------- | ||
80 | -- -- InfoHashMap | ||
81 | -- -----------------------------------------------------------------------} | ||
82 | -- | ||
83 | -- -- compare new design versus HashMap | ||
84 | -- | ||
85 | -- data IntMap k p a | ||
86 | -- type ContactInfo = Map InfoHash Timestamp (Set (PeerAddr IP) Timestamp) | ||
87 | -- | ||
88 | -- data ContactInfo ip = PeerStore | ||
89 | -- { maxSize :: Int | ||
90 | -- , prefixSize :: Int | ||
91 | -- , thisNodeId :: NodeId | ||
92 | -- | ||
93 | -- , count :: Int -- ^ Cached size of the 'peerSet' | ||
94 | -- , peerSet :: HashMap InfoHash [PeerAddr ip] | ||
95 | -- } | ||
96 | -- | ||
97 | -- size :: ContactInfo ip -> Int | ||
98 | -- size = undefined | ||
99 | -- | ||
100 | -- prefixSize :: ContactInfo ip -> Int | ||
101 | -- prefixSize = undefined | ||
102 | -- | ||
103 | -- lookup :: InfoHash -> ContactInfo ip -> [PeerAddr ip] | ||
104 | -- lookup = undefined | ||
105 | -- | ||
106 | -- insert :: InfoHash -> PeerAddr ip -> ContactInfo ip -> ContactInfo ip | ||
107 | -- insert = undefined | ||
108 | -- | ||
109 | -- -- | Limit in size. | ||
110 | -- prune :: NodeId -> Int -> ContactInfo ip -> ContactInfo ip | ||
111 | -- prune pref targetSize Nil = Nil | ||
112 | -- prune pref targetSize (Tip _ _) = undefined | ||
113 | -- | ||
114 | -- -- | Remove expired entries. | ||
115 | -- splitGT :: Timestamp -> ContactInfo ip -> ContactInfo ip | ||
116 | -- splitGT = undefined | ||
117 | -- -} | ||
118 | |||
119 | -- | Storage used to keep track a set of known peers in client, | ||
120 | -- tracker or DHT sessions. | ||
121 | newtype PeerStore = PeerStore (HashMap InfoHash SwarmData) | ||
122 | |||
123 | type Timestamp = POSIXTime | ||
124 | |||
125 | data SwarmData = SwarmData | ||
126 | { peers :: !(PSQ PeerAddr Timestamp) | ||
127 | , name :: !(Maybe ByteString) | ||
128 | } | ||
129 | |||
130 | -- | This wrapper will serialize an ip address with a '4' or '6' prefix byte | ||
131 | -- to indicate whether it is IPv4 or IPv6. | ||
132 | -- | ||
133 | -- Note: it does not serialize port numbers. | ||
134 | newtype SerializeAddress a = SerializeAddress { unserializeAddress :: a } | ||
135 | |||
136 | instance Address a => Serialize (SerializeAddress a) where | ||
137 | get = SerializeAddress <$> do | ||
138 | c <- get | ||
139 | case (c::Word8) of | ||
140 | 0x34 -> do ip4 <- get | ||
141 | return $ fromJust $ fromAddr (ip4::IPv4) | ||
142 | 0x36 -> do ip6 <- get | ||
143 | return $ fromJust $ fromAddr (ip6::IPv6) | ||
144 | _ -> return $ error "cannot deserialize non-IP SerializeAddress" | ||
145 | put (SerializeAddress a) | ||
146 | | Just ip4 <- fromAddr a | ||
147 | = put (0x34::Word8) >> put (ip4::IPv4) | ||
148 | | Just ip6 <- fromAddr a | ||
149 | = put (0x36::Word8) >> put (ip6::IPv6) | ||
150 | | otherwise = return $ error "cannot serialize non-IP SerializeAddress" | ||
151 | |||
152 | |||
153 | instance Serialize SwarmData where | ||
154 | get = flip SwarmData <$> get | ||
155 | <*> ( PSQ.fromList . L.map parseAddr <$> get ) | ||
156 | where | ||
157 | parseAddr (pid,addr,port) = PeerAddr { peerId = pid | ||
158 | , peerHost = unserializeAddress addr | ||
159 | , peerPort = port | ||
160 | } | ||
161 | :-> 0 | ||
162 | |||
163 | put SwarmData{..} = do | ||
164 | put name | ||
165 | put $ L.map (\(addr :-> _) -> (peerId addr, SerializeAddress addr, peerPort addr)) | ||
166 | -- XXX: should we serialize the timestamp? | ||
167 | $ PSQ.toList peers | ||
168 | |||
169 | knownSwarms :: PeerStore -> [ (InfoHash, Int, Maybe ByteString) ] | ||
170 | knownSwarms (PeerStore m) = L.map (\(ih,SwarmData q n) -> (ih,PSQ.size q,n)) $ HM.toList m | ||
171 | |||
172 | swarmSingleton :: PeerAddr -> SwarmData | ||
173 | swarmSingleton a = SwarmData | ||
174 | { peers = PSQ.singleton a 0 | ||
175 | , name = Nothing } | ||
176 | |||
177 | swarmInsert :: SwarmData -> SwarmData -> SwarmData | ||
178 | swarmInsert new old = SwarmData | ||
179 | { peers = L.foldl' (\q (a :-> t) -> PSQ.insertWith newerTimeStamp a t q) (peers old) (PSQ.toList $ peers new) | ||
180 | , name = name new <|> name old -- TODO: decodeUtf8' check | ||
181 | } | ||
182 | where | ||
183 | newerTimeStamp newtime oldtime = if newtime > oldtime then newtime else oldtime | ||
184 | |||
185 | isSwarmOccupied :: SwarmData -> Bool | ||
186 | isSwarmOccupied SwarmData{..} = not $ PSQ.null peers | ||
187 | |||
188 | -- | Empty store. | ||
189 | instance Default (PeerStore) where | ||
190 | def = PeerStore HM.empty | ||
191 | {-# INLINE def #-} | ||
192 | |||
193 | instance Semigroup PeerStore where | ||
194 | PeerStore a <> PeerStore b = | ||
195 | PeerStore (HM.unionWith swarmInsert a b) | ||
196 | {-# INLINE (<>) #-} | ||
197 | |||
198 | -- | Monoid under union operation. | ||
199 | instance Monoid PeerStore where | ||
200 | mempty = def | ||
201 | {-# INLINE mempty #-} | ||
202 | |||
203 | mappend (PeerStore a) (PeerStore b) = | ||
204 | PeerStore (HM.unionWith swarmInsert a b) | ||
205 | {-# INLINE mappend #-} | ||
206 | |||
207 | -- | Can be used to store peers between invocations of the client | ||
208 | -- software. | ||
209 | instance Serialize PeerStore where | ||
210 | get = PeerStore . HM.fromList <$> get | ||
211 | put (PeerStore m) = put (L.filter (isSwarmOccupied . snd) $ HM.toList m) | ||
212 | |||
213 | -- | Returns all peers associated with a given info hash. | ||
214 | lookup :: InfoHash -> PeerStore -> [PeerAddr] | ||
215 | lookup ih (PeerStore m) = maybe [] (PSQ.keys . peers) $ HM.lookup ih m | ||
216 | |||
217 | batchSize :: Int | ||
218 | batchSize = 64 | ||
219 | |||
220 | -- | Used in 'get_peers' DHT queries. | ||
221 | freshPeers :: InfoHash -> Timestamp -> PeerStore -> ([PeerAddr], PeerStore) | ||
222 | freshPeers ih tm (PeerStore m) = fromMaybe ([],PeerStore m) $ do | ||
223 | swarm <- HM.lookup ih m | ||
224 | let ps0 = take batchSize $ unfoldr (incomp minView) (peers swarm) | ||
225 | peers' = case reverse ps0 of | ||
226 | (_,psq):_ -> psq | ||
227 | _ -> peers swarm | ||
228 | ps = L.map (key . fst) ps0 | ||
229 | m' = HM.insert ih swarm { peers = L.foldl' (\q p -> PSQ.insert p tm q) peers' ps } m | ||
230 | return $! m' `seq` (ps,PeerStore m') | ||
231 | |||
232 | incomp :: (x -> Maybe (r,x)) -> x -> Maybe ((r,x),x) | ||
233 | incomp !f !x = do | ||
234 | (result,x') <- f x | ||
235 | pure $! ( (result,x'), x' ) | ||
236 | |||
237 | -- | Used in 'announce_peer' DHT queries. | ||
238 | insertPeer :: InfoHash -> Maybe ByteString -> PeerAddr -> PeerStore -> PeerStore | ||
239 | insertPeer !ih !name !a !(PeerStore m) = seq a' $ PeerStore (HM.insertWith swarmInsert ih a' m) | ||
240 | where | ||
241 | a' = SwarmData { peers = PSQ.singleton a 0 | ||
242 | , name = name } | ||
243 | |||
244 | deleteOlderThan :: POSIXTime -> PeerStore -> PeerStore | ||
245 | deleteOlderThan cutoff (PeerStore m) = PeerStore $ HM.mapMaybe gc m | ||
246 | where | ||
247 | gc :: SwarmData -> Maybe SwarmData | ||
248 | gc swarms = fmap (\ps -> swarms { peers = ps }) $ gcPSQ (peers swarms) | ||
249 | |||
250 | gcPSQ :: PSQKey a => PSQ a Timestamp -> Maybe (PSQ a Timestamp) | ||
251 | gcPSQ ps = case minView ps of | ||
252 | Nothing -> Nothing | ||
253 | Just (_ :-> tm, ps') | tm < cutoff -> gcPSQ ps' | ||
254 | Just _ -> Just ps | ||
diff --git a/dht/src/Network/BitTorrent/DHT/Readme.md b/dht/src/Network/BitTorrent/DHT/Readme.md new file mode 100644 index 00000000..e2352f10 --- /dev/null +++ b/dht/src/Network/BitTorrent/DHT/Readme.md | |||
@@ -0,0 +1,13 @@ | |||
1 | References | ||
2 | ========== | ||
3 | |||
4 | Some good references excluding BEPs: | ||
5 | |||
6 | * [Kademlia wiki page][kademlia-wiki] | ||
7 | * [Kademlia: A Peer-to-peer Information System Based on the XOR Metric][kademlia-paper] | ||
8 | * [BitTorrent Mainline DHT Measurement][mldht] | ||
9 | * Profiling a Million User DHT. (paper) | ||
10 | |||
11 | [kademlia-wiki]: http://en.wikipedia.org/wiki/Kademlia | ||
12 | [kademlia-paper]: http://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf | ||
13 | [mldht]: http://www.cs.helsinki.fi/u/jakangas/MLDHT/ | ||
diff --git a/dht/src/Network/BitTorrent/DHT/Token.hs b/dht/src/Network/BitTorrent/DHT/Token.hs new file mode 100644 index 00000000..171cc8be --- /dev/null +++ b/dht/src/Network/BitTorrent/DHT/Token.hs | |||
@@ -0,0 +1,201 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- The return value for a query for peers includes an opaque value | ||
9 | -- known as the 'Token'. For a node to announce that its controlling | ||
10 | -- peer is downloading a torrent, it must present the token received | ||
11 | -- from the same queried node in a recent query for peers. When a node | ||
12 | -- attempts to \"announce\" a torrent, the queried node checks the | ||
13 | -- token against the querying node's 'IP' address. This is to prevent | ||
14 | -- malicious hosts from signing up other hosts for torrents. Since the | ||
15 | -- token is merely returned by the querying node to the same node it | ||
16 | -- received the token from, the implementation is not defined. Tokens | ||
17 | -- must be accepted for a reasonable amount of time after they have | ||
18 | -- been distributed. | ||
19 | -- | ||
20 | {-# LANGUAGE GeneralizedNewtypeDeriving, CPP #-} | ||
21 | module Network.BitTorrent.DHT.Token | ||
22 | ( -- * Token | ||
23 | Token | ||
24 | , maxInterval | ||
25 | , toPaddedByteString | ||
26 | , fromPaddedByteString | ||
27 | |||
28 | -- * Session tokens | ||
29 | , TokenMap | ||
30 | , SessionTokens | ||
31 | , nullSessionTokens | ||
32 | , checkToken | ||
33 | , grantToken | ||
34 | |||
35 | -- ** Construction | ||
36 | , Network.BitTorrent.DHT.Token.tokens | ||
37 | |||
38 | -- ** Query | ||
39 | , Network.BitTorrent.DHT.Token.lookup | ||
40 | , Network.BitTorrent.DHT.Token.member | ||
41 | |||
42 | -- ** Modification | ||
43 | , Network.BitTorrent.DHT.Token.defaultUpdateInterval | ||
44 | , Network.BitTorrent.DHT.Token.update | ||
45 | ) where | ||
46 | |||
47 | import Control.Arrow | ||
48 | import Control.Monad.State | ||
49 | #ifdef VERSION_bencoding | ||
50 | import Data.BEncode (BEncode) | ||
51 | #endif | ||
52 | import Data.ByteString as BS | ||
53 | import Data.ByteString.Char8 as B8 | ||
54 | import Data.ByteString.Lazy as BL | ||
55 | import Data.ByteString.Lazy.Builder as BS | ||
56 | import qualified Data.ByteString.Base16 as Base16 | ||
57 | import Data.Default | ||
58 | import Data.List as L | ||
59 | import Data.Hashable | ||
60 | import Data.String | ||
61 | import Data.Time | ||
62 | import System.Random | ||
63 | import Control.Concurrent.STM | ||
64 | |||
65 | -- TODO use ShortByteString | ||
66 | |||
67 | -- | An opaque value. | ||
68 | newtype Token = Token BS.ByteString | ||
69 | deriving ( Eq, IsString | ||
70 | #ifdef VERSION_bencoding | ||
71 | , BEncode | ||
72 | #endif | ||
73 | ) | ||
74 | |||
75 | instance Show Token where | ||
76 | show (Token bs) = B8.unpack $ Base16.encode bs | ||
77 | |||
78 | instance Read Token where | ||
79 | readsPrec i s = pure $ (Token *** B8.unpack) $ Base16.decode (B8.pack s) | ||
80 | |||
81 | -- | Meaningless token, for testing purposes only. | ||
82 | instance Default Token where | ||
83 | def = makeToken (0::Int) 0 | ||
84 | |||
85 | -- | Prepend token with 0x20 bytes to fill the available width. | ||
86 | -- | ||
87 | -- If n > 8, then this will also guarantee a nonzero token, which is useful for | ||
88 | -- Tox ping-id values for announce responses. | ||
89 | toPaddedByteString :: Int -> Token -> BS.ByteString | ||
90 | toPaddedByteString n (Token bs) = BS.append (BS.replicate (n - BS.length bs) 0x20) bs | ||
91 | |||
92 | fromPaddedByteString :: Int -> BS.ByteString -> Token | ||
93 | fromPaddedByteString n bs = Token $ BS.drop (n - len) bs | ||
94 | where | ||
95 | len = BS.length tok where Token tok = def | ||
96 | |||
97 | -- | The secret value used as salt. | ||
98 | type Secret = Int | ||
99 | |||
100 | -- The BitTorrent implementation uses the SHA1 hash of the IP address | ||
101 | -- concatenated onto a secret, we use hashable instead. | ||
102 | makeToken :: Hashable a => a -> Secret -> Token | ||
103 | makeToken n s = Token $ toBS $ hashWithSalt s n | ||
104 | where | ||
105 | toBS = toStrict . toLazyByteString . int64BE . fromIntegral | ||
106 | {-# INLINE makeToken #-} | ||
107 | |||
108 | -- | Constant space 'Node' to 'Token' map based on the secret value. | ||
109 | data TokenMap = TokenMap | ||
110 | { prevSecret :: {-# UNPACK #-} !Secret | ||
111 | , curSecret :: {-# UNPACK #-} !Secret | ||
112 | , generator :: {-# UNPACK #-} !StdGen | ||
113 | } deriving Show | ||
114 | |||
115 | -- | A new token map based on the specified seed value. Returned token | ||
116 | -- map should be periodicatically 'update'd. | ||
117 | -- | ||
118 | -- Normally, the seed value should vary between invocations of the | ||
119 | -- client software. | ||
120 | tokens :: Int -> TokenMap | ||
121 | tokens seed = (`evalState` mkStdGen seed) $ | ||
122 | TokenMap <$> state next | ||
123 | <*> state next | ||
124 | <*> get | ||
125 | |||
126 | -- | Get token for the given node. A token becomes invalid after 2 | ||
127 | -- 'update's. | ||
128 | -- | ||
129 | -- Typically used to handle find_peers query. | ||
130 | lookup :: Hashable a => a -> TokenMap -> Token | ||
131 | lookup addr TokenMap {..} = makeToken addr curSecret | ||
132 | |||
133 | -- | Check if token is valid. | ||
134 | -- | ||
135 | -- Typically used to handle 'Network.DHT.Mainline.Announce' | ||
136 | -- query. If token is invalid the 'Network.KRPC.ProtocolError' should | ||
137 | -- be sent back to the malicious node. | ||
138 | member :: Hashable a => a -> Token -> TokenMap -> Bool | ||
139 | member addr token TokenMap {..} = token `L.elem` valid | ||
140 | where valid = makeToken addr <$> [curSecret, prevSecret] | ||
141 | |||
142 | -- | Secret changes every five minutes and tokens up to ten minutes old | ||
143 | -- are accepted. | ||
144 | defaultUpdateInterval :: NominalDiffTime | ||
145 | defaultUpdateInterval = 5 * 60 | ||
146 | |||
147 | -- | Update current tokens. | ||
148 | update :: TokenMap -> TokenMap | ||
149 | update TokenMap {..} = TokenMap | ||
150 | { prevSecret = curSecret | ||
151 | , curSecret = newSecret | ||
152 | , generator = newGen | ||
153 | } | ||
154 | where | ||
155 | (newSecret, newGen) = next generator | ||
156 | |||
157 | data SessionTokens = SessionTokens | ||
158 | { tokenMap :: !TokenMap | ||
159 | , lastUpdate :: !UTCTime | ||
160 | , maxInterval :: !NominalDiffTime | ||
161 | } | ||
162 | |||
163 | nullSessionTokens :: IO SessionTokens | ||
164 | nullSessionTokens = SessionTokens | ||
165 | <$> (tokens <$> randomIO) | ||
166 | <*> getCurrentTime | ||
167 | <*> pure defaultUpdateInterval | ||
168 | |||
169 | -- TODO invalidate *twice* if needed | ||
170 | invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens | ||
171 | invalidateTokens curTime ts @ SessionTokens {..} | ||
172 | | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens | ||
173 | { tokenMap = update tokenMap | ||
174 | , lastUpdate = curTime | ||
175 | , maxInterval = maxInterval | ||
176 | } | ||
177 | | otherwise = ts | ||
178 | |||
179 | {----------------------------------------------------------------------- | ||
180 | -- Tokens | ||
181 | -----------------------------------------------------------------------} | ||
182 | |||
183 | tryUpdateSecret :: TVar SessionTokens -> IO () | ||
184 | tryUpdateSecret toks = do | ||
185 | curTime <- getCurrentTime | ||
186 | atomically $ modifyTVar' toks (invalidateTokens curTime) | ||
187 | |||
188 | grantToken :: Hashable addr => TVar SessionTokens -> addr -> IO Token | ||
189 | grantToken sessionTokens addr = do | ||
190 | tryUpdateSecret sessionTokens | ||
191 | toks <- readTVarIO sessionTokens | ||
192 | return $ Network.BitTorrent.DHT.Token.lookup addr $ tokenMap toks | ||
193 | |||
194 | -- | Throws 'HandlerError' if the token is invalid or already | ||
195 | -- expired. See 'TokenMap' for details. | ||
196 | checkToken :: Hashable addr => TVar SessionTokens -> addr -> Token -> IO Bool | ||
197 | checkToken sessionTokens addr questionableToken = do | ||
198 | tryUpdateSecret sessionTokens | ||
199 | toks <- readTVarIO sessionTokens | ||
200 | return $ member addr questionableToken (tokenMap toks) | ||
201 | |||
diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs new file mode 100644 index 00000000..89851e88 --- /dev/null +++ b/dht/src/Network/BitTorrent/MainlineDHT.hs | |||
@@ -0,0 +1,1169 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE DeriveDataTypeable #-} | ||
3 | {-# LANGUAGE DeriveFoldable #-} | ||
4 | {-# LANGUAGE DeriveFunctor #-} | ||
5 | {-# LANGUAGE DeriveTraversable #-} | ||
6 | {-# LANGUAGE FlexibleInstances #-} | ||
7 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
8 | {-# LANGUAGE LambdaCase #-} | ||
9 | {-# LANGUAGE NamedFieldPuns #-} | ||
10 | {-# LANGUAGE PatternSynonyms #-} | ||
11 | {-# LANGUAGE StandaloneDeriving #-} | ||
12 | {-# LANGUAGE TupleSections #-} | ||
13 | module Network.BitTorrent.MainlineDHT where | ||
14 | |||
15 | import Control.Applicative | ||
16 | import Control.Arrow | ||
17 | import Control.Concurrent.STM | ||
18 | import Control.Monad | ||
19 | import Crypto.Random | ||
20 | import Data.BEncode as BE | ||
21 | import qualified Data.BEncode.BDict as BE | ||
22 | ;import Data.BEncode.BDict (BKey) | ||
23 | import Data.BEncode.Pretty | ||
24 | import Data.BEncode.Types (BDict) | ||
25 | import Data.Bits | ||
26 | import Data.Bits.ByteString () | ||
27 | import Data.Bool | ||
28 | import Data.ByteArray (ByteArrayAccess) | ||
29 | import qualified Data.ByteString as B | ||
30 | ;import Data.ByteString (ByteString) | ||
31 | import qualified Data.ByteString.Base16 as Base16 | ||
32 | import qualified Data.ByteString.Char8 as C8 | ||
33 | import Data.ByteString.Lazy (toStrict) | ||
34 | import qualified Data.ByteString.Lazy.Char8 as L8 | ||
35 | import Data.Char | ||
36 | import Data.Coerce | ||
37 | import Data.Data | ||
38 | import Data.Default | ||
39 | import Data.Digest.CRC32C | ||
40 | import Data.Function (fix) | ||
41 | import Data.Hashable | ||
42 | #if MIN_VERSION_iproute(1,7,4) | ||
43 | import Data.IP hiding (fromSockAddr) | ||
44 | #else | ||
45 | import Data.IP | ||
46 | #endif | ||
47 | import Data.Maybe | ||
48 | import Data.Monoid | ||
49 | import Data.Ord | ||
50 | import qualified Data.Serialize as S | ||
51 | import Data.Set (Set) | ||
52 | import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime) | ||
53 | import Data.Torrent | ||
54 | import Data.Word | ||
55 | import qualified Data.Wrapper.PSQInt as Int | ||
56 | import Debug.Trace | ||
57 | import Network.BitTorrent.MainlineDHT.Symbols | ||
58 | import Network.Kademlia | ||
59 | import Network.Kademlia.Bootstrap | ||
60 | import Network.Address (fromSockAddr, | ||
61 | setPort, sockAddrPort, testIdBit, | ||
62 | toSockAddr, genBucketSample', WantIP(..), | ||
63 | un4map,either4or6,ipFamily) | ||
64 | import Network.BitTorrent.DHT.ContactInfo as Peers | ||
65 | import Network.Kademlia.Search (Search (..)) | ||
66 | import Network.BitTorrent.DHT.Token as Token | ||
67 | import qualified Network.Kademlia.Routing as R | ||
68 | ;import Network.Kademlia.Routing (getTimestamp) | ||
69 | import Network.QueryResponse | ||
70 | import Network.Socket | ||
71 | import System.IO.Error | ||
72 | import System.IO.Unsafe (unsafeInterleaveIO) | ||
73 | import qualified Text.ParserCombinators.ReadP as RP | ||
74 | #ifdef THREAD_DEBUG | ||
75 | import Control.Concurrent.Lifted.Instrument | ||
76 | #else | ||
77 | import Control.Concurrent.Lifted | ||
78 | import GHC.Conc (labelThread) | ||
79 | #endif | ||
80 | import qualified Data.Aeson as JSON | ||
81 | ;import Data.Aeson (FromJSON, ToJSON, (.=)) | ||
82 | import Text.Read | ||
83 | import System.Global6 | ||
84 | import Control.TriadCommittee | ||
85 | import Data.TableMethods | ||
86 | import DPut | ||
87 | import DebugTag | ||
88 | |||
89 | newtype NodeId = NodeId ByteString | ||
90 | deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable) | ||
91 | |||
92 | instance BEncode NodeId where | ||
93 | fromBEncode bval = do | ||
94 | bs <- fromBEncode bval | ||
95 | if B.length bs /= 20 | ||
96 | then Left "Invalid length node id." | ||
97 | else Right $ NodeId bs | ||
98 | |||
99 | toBEncode (NodeId bs) = toBEncode bs | ||
100 | |||
101 | instance Show NodeId where | ||
102 | show (NodeId bs) = C8.unpack $ Base16.encode bs | ||
103 | |||
104 | instance S.Serialize NodeId where | ||
105 | get = NodeId <$> S.getBytes 20 | ||
106 | put (NodeId bs) = S.putByteString bs | ||
107 | |||
108 | instance FiniteBits NodeId where | ||
109 | finiteBitSize _ = 160 | ||
110 | |||
111 | instance Read NodeId where | ||
112 | readsPrec _ str | ||
113 | | (bs, xs) <- Base16.decode $ C8.pack str | ||
114 | , B.length bs == 20 | ||
115 | = [ (NodeId bs, drop 40 str) ] | ||
116 | | otherwise = [] | ||
117 | |||
118 | zeroID :: NodeId | ||
119 | zeroID = NodeId $ B.replicate 20 0 | ||
120 | |||
121 | data NodeInfo = NodeInfo | ||
122 | { nodeId :: NodeId | ||
123 | , nodeIP :: IP | ||
124 | , nodePort :: PortNumber | ||
125 | } | ||
126 | deriving (Eq,Ord) | ||
127 | |||
128 | instance ToJSON NodeInfo where | ||
129 | toJSON (NodeInfo nid (IPv4 ip) port) | ||
130 | = JSON.object [ "node-id" .= show nid | ||
131 | , "ipv4" .= show ip | ||
132 | , "port" .= (fromIntegral port :: Int) | ||
133 | ] | ||
134 | toJSON (NodeInfo nid (IPv6 ip6) port) | ||
135 | | Just ip <- un4map ip6 | ||
136 | = JSON.object [ "node-id" .= show nid | ||
137 | , "ipv4" .= show ip | ||
138 | , "port" .= (fromIntegral port :: Int) | ||
139 | ] | ||
140 | | otherwise | ||
141 | = JSON.object [ "node-id" .= show nid | ||
142 | , "ipv6" .= show ip6 | ||
143 | , "port" .= (fromIntegral port :: Int) | ||
144 | ] | ||
145 | instance FromJSON NodeInfo where | ||
146 | parseJSON (JSON.Object v) = do | ||
147 | nidstr <- v JSON..: "node-id" | ||
148 | ip6str <- v JSON..:? "ipv6" | ||
149 | ip4str <- v JSON..:? "ipv4" | ||
150 | portnum <- v JSON..: "port" | ||
151 | ip <- maybe empty (return . IPv6) (ip6str >>= readMaybe) | ||
152 | <|> maybe empty (return . IPv4) (ip4str >>= readMaybe) | ||
153 | let (bs,_) = Base16.decode (C8.pack nidstr) | ||
154 | guard (B.length bs == 20) | ||
155 | return $ NodeInfo (NodeId bs) ip (fromIntegral (portnum :: Word16)) | ||
156 | |||
157 | hexdigit :: Char -> Bool | ||
158 | hexdigit c = ('0' <= c && c <= '9') || ( 'a' <= c && c <= 'f') || ( 'A' <= c && c <= 'F') | ||
159 | |||
160 | instance Read NodeInfo where | ||
161 | readsPrec i = RP.readP_to_S $ do | ||
162 | RP.skipSpaces | ||
163 | let n = 40 -- characters in node id. | ||
164 | parseAddr = RP.between (RP.char '(') (RP.char ')') (RP.munch (/=')')) | ||
165 | RP.+++ RP.munch (not . isSpace) | ||
166 | nodeidAt = do hexhash <- sequence $ replicate n (RP.satisfy hexdigit) | ||
167 | RP.char '@' RP.+++ RP.satisfy isSpace | ||
168 | addrstr <- parseAddr | ||
169 | nid <- case Base16.decode $ C8.pack hexhash of | ||
170 | (bs,_) | B.length bs==20 -> return (NodeId bs) | ||
171 | _ -> fail "Bad node id." | ||
172 | return (nid,addrstr) | ||
173 | (nid,addrstr) <- ( nodeidAt RP.+++ ( (zeroID,) <$> parseAddr) ) | ||
174 | let raddr = do | ||
175 | ip <- RP.between (RP.char '[') (RP.char ']') | ||
176 | (IPv6 <$> RP.readS_to_P (readsPrec i)) | ||
177 | RP.+++ (IPv4 <$> RP.readS_to_P (readsPrec i)) | ||
178 | _ <- RP.char ':' | ||
179 | port <- toEnum <$> RP.readS_to_P (readsPrec i) | ||
180 | return (ip, port) | ||
181 | |||
182 | (ip,port) <- case RP.readP_to_S raddr addrstr of | ||
183 | [] -> fail "Bad address." | ||
184 | ((ip,port),_):_ -> return (ip,port) | ||
185 | return $ NodeInfo nid ip port | ||
186 | |||
187 | |||
188 | |||
189 | -- The Hashable instance depends only on the IP address and port number. It is | ||
190 | -- used to compute the announce token. | ||
191 | instance Hashable NodeInfo where | ||
192 | hashWithSalt s ni = hashWithSalt s (nodeIP ni , nodePort ni) | ||
193 | {-# INLINE hashWithSalt #-} | ||
194 | |||
195 | |||
196 | instance Show NodeInfo where | ||
197 | showsPrec _ (NodeInfo nid ip port) = | ||
198 | shows nid . ('@' :) . showsip . (':' :) . shows port | ||
199 | where | ||
200 | showsip | ||
201 | | IPv4 ip4 <- ip = shows ip4 | ||
202 | | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = shows ip4 | ||
203 | | otherwise = ('[' :) . shows ip . (']' :) | ||
204 | |||
205 | {- | ||
206 | |||
207 | -- | KRPC 'compact list' compatible encoding: contact information for | ||
208 | -- nodes is encoded as a 26-byte string. Also known as "Compact node | ||
209 | -- info" the 20-byte Node ID in network byte order has the compact | ||
210 | -- IP-address/port info concatenated to the end. | ||
211 | get = NodeInfo <$> (NodeId <$> S.getBytes 20 ) <*> S.get <*> S.get | ||
212 | -} | ||
213 | |||
214 | getNodeInfo4 :: S.Get NodeInfo | ||
215 | getNodeInfo4 = NodeInfo <$> (NodeId <$> S.getBytes 20) | ||
216 | <*> (IPv4 <$> S.get) | ||
217 | <*> S.get | ||
218 | |||
219 | putNodeInfo4 :: NodeInfo -> S.Put | ||
220 | putNodeInfo4 (NodeInfo (NodeId nid) ip port) | ||
221 | | IPv4 ip4 <- ip = put4 ip4 | ||
222 | | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = put4 ip4 | ||
223 | | otherwise = return () | ||
224 | where | ||
225 | put4 ip4 = S.putByteString nid >> S.put ip4 >> S.put port | ||
226 | |||
227 | getNodeInfo6 :: S.Get NodeInfo | ||
228 | getNodeInfo6 = NodeInfo <$> (NodeId <$> S.getBytes 20) | ||
229 | <*> (IPv6 <$> S.get) | ||
230 | <*> S.get | ||
231 | |||
232 | putNodeInfo6 :: NodeInfo -> S.Put | ||
233 | putNodeInfo6 (NodeInfo (NodeId nid) (IPv6 ip) port) | ||
234 | = S.putByteString nid >> S.put ip >> S.put port | ||
235 | putNodeInfo6 _ = return () | ||
236 | |||
237 | |||
238 | -- | TODO: This should depend on the bind address to support IPv4-only. For | ||
239 | -- now, in order to support dual-stack listen, we're going to assume IPv6 is | ||
240 | -- wanted and map IPv4 addresses accordingly. | ||
241 | nodeAddr :: NodeInfo -> SockAddr | ||
242 | nodeAddr (NodeInfo _ ip port) = | ||
243 | case ip of | ||
244 | IPv4 ip4 -> setPort port $ toSockAddr (ipv4ToIPv6 ip4) | ||
245 | IPv6 ip6 -> setPort port $ toSockAddr ip6 | ||
246 | |||
247 | nodeInfo :: NodeId -> SockAddr -> Either String NodeInfo | ||
248 | nodeInfo nid saddr | ||
249 | | Just ip <- fromSockAddr saddr | ||
250 | , Just port <- sockAddrPort saddr = Right $ NodeInfo nid ip port | ||
251 | | otherwise = Left "Address family not supported." | ||
252 | |||
253 | -- | Types of RPC errors. | ||
254 | data ErrorCode | ||
255 | -- | Some error doesn't fit in any other category. | ||
256 | = GenericError | ||
257 | |||
258 | -- | Occurs when server fail to process procedure call. | ||
259 | | ServerError | ||
260 | |||
261 | -- | Malformed packet, invalid arguments or bad token. | ||
262 | | ProtocolError | ||
263 | |||
264 | -- | Occurs when client trying to call method server don't know. | ||
265 | | MethodUnknown | ||
266 | deriving (Show, Read, Eq, Ord, Bounded, Typeable, Data) | ||
267 | |||
268 | -- | According to the table: | ||
269 | -- <http://bittorrent.org/beps/bep_0005.html#errors> | ||
270 | instance Enum ErrorCode where | ||
271 | fromEnum GenericError = 201 | ||
272 | fromEnum ServerError = 202 | ||
273 | fromEnum ProtocolError = 203 | ||
274 | fromEnum MethodUnknown = 204 | ||
275 | {-# INLINE fromEnum #-} | ||
276 | toEnum 201 = GenericError | ||
277 | toEnum 202 = ServerError | ||
278 | toEnum 203 = ProtocolError | ||
279 | toEnum 204 = MethodUnknown | ||
280 | toEnum _ = GenericError | ||
281 | {-# INLINE toEnum #-} | ||
282 | |||
283 | instance BEncode ErrorCode where | ||
284 | toBEncode = toBEncode . fromEnum | ||
285 | {-# INLINE toBEncode #-} | ||
286 | fromBEncode b = toEnum <$> fromBEncode b | ||
287 | {-# INLINE fromBEncode #-} | ||
288 | |||
289 | data Error = Error | ||
290 | { errorCode :: !ErrorCode -- ^ The type of error. | ||
291 | , errorMessage :: !ByteString -- ^ Human-readable text message. | ||
292 | } deriving ( Show, Eq, Ord, Typeable, Data, Read ) | ||
293 | |||
294 | newtype TransactionId = TransactionId ByteString | ||
295 | deriving (Eq, Ord, Show, BEncode) | ||
296 | |||
297 | newtype Method = Method ByteString | ||
298 | deriving (Eq, Ord, Show, BEncode) | ||
299 | |||
300 | data Message a = Q { msgOrigin :: NodeId | ||
301 | , msgID :: TransactionId | ||
302 | , qryPayload :: a | ||
303 | , qryMethod :: Method | ||
304 | , qryReadOnly :: Bool } | ||
305 | |||
306 | | R { msgOrigin :: NodeId | ||
307 | , msgID :: TransactionId | ||
308 | , rspPayload :: Either Error a | ||
309 | , rspReflectedIP :: Maybe SockAddr } | ||
310 | |||
311 | showBE :: BValue -> String | ||
312 | showBE bval = L8.unpack (showBEncode bval) | ||
313 | |||
314 | instance BE.BEncode (Message BValue) where | ||
315 | toBEncode m = encodeMessage m | ||
316 | {- | ||
317 | in case m of | ||
318 | Q {} -> trace ("encoded(query): "++showBE r) r | ||
319 | R {} -> trace ("encoded(response): "++showBE r) r -} | ||
320 | fromBEncode bval = decodeMessage bval | ||
321 | {- | ||
322 | in case r of | ||
323 | Left e -> trace (show e) r | ||
324 | Right (Q {}) -> trace ("decoded(query): "++showBE bval) r | ||
325 | Right (R {}) -> trace ("decoded(response): "++showBE bval) r -} | ||
326 | |||
327 | decodeMessage :: BValue -> Either String (Message BValue) | ||
328 | decodeMessage = fromDict $ do | ||
329 | key <- lookAhead (field (req "y")) | ||
330 | let _ = key :: BKey | ||
331 | f <- case key of | ||
332 | "q" -> do a <- field (req "a") | ||
333 | g <- either fail return $ flip fromDict a $ do | ||
334 | who <- field (req "id") | ||
335 | ro <- fromMaybe False <$> optional (field (req "ro")) | ||
336 | return $ \meth tid -> Q who tid a meth ro | ||
337 | meth <- field (req "q") | ||
338 | return $ g meth | ||
339 | "r" -> do ip <- do | ||
340 | ipstr <- optional (field (req "ip")) | ||
341 | mapM (either fail return . decodeAddr) ipstr | ||
342 | vals <- field (req "r") | ||
343 | either fail return $ flip fromDict vals $ do | ||
344 | who <- field (req "id") | ||
345 | return $ \tid -> R who tid (Right vals) ip | ||
346 | "e" -> do (ecode,emsg) <- field (req "e") | ||
347 | ip <- do | ||
348 | ipstr <- optional (field (req "ip")) | ||
349 | mapM (either fail return . decodeAddr) ipstr | ||
350 | -- FIXME:Spec does not give us the NodeId of the sender. | ||
351 | -- Using 'zeroID' as place holder. | ||
352 | -- We should ignore the msgOrigin for errors in 'updateRouting'. | ||
353 | -- We should consider making msgOrigin a Maybe value. | ||
354 | return $ \tid -> R zeroID tid (Left (Error ecode emsg)) ip | ||
355 | _ -> fail $ "Mainline message is not a query, response, or an error: " | ||
356 | ++ show key | ||
357 | tid <- field (req "t") | ||
358 | return $ f (tid :: TransactionId) | ||
359 | |||
360 | |||
361 | encodeMessage :: Message BValue -> BValue | ||
362 | encodeMessage (Q origin tid a meth ro) | ||
363 | = case a of | ||
364 | BDict args -> encodeQuery tid meth (BDict $ genericArgs origin ro `BE.union` args) | ||
365 | _ -> encodeQuery tid meth a -- XXX: Not really a valid query. | ||
366 | encodeMessage (R origin tid v ip) | ||
367 | = case v of | ||
368 | Right (BDict vals) -> encodeResponse tid (BDict $ genericArgs origin False `BE.union` vals) ip | ||
369 | Left err -> encodeError tid err | ||
370 | |||
371 | |||
372 | encodeAddr :: SockAddr -> ByteString | ||
373 | encodeAddr = either encode4 encode6 . either4or6 | ||
374 | where | ||
375 | encode4 (SockAddrInet port addr) | ||
376 | = S.runPut (S.putWord32host addr >> S.putWord16be (fromIntegral port)) | ||
377 | |||
378 | encode6 (SockAddrInet6 port _ addr _) | ||
379 | = S.runPut (S.put addr >> S.putWord16be (fromIntegral port)) | ||
380 | encode6 _ = B.empty | ||
381 | |||
382 | decodeAddr :: ByteString -> Either String SockAddr | ||
383 | decodeAddr bs = S.runGet g bs | ||
384 | where | ||
385 | g | (B.length bs == 6) = flip SockAddrInet <$> S.getWord32host <*> (fromIntegral <$> S.getWord16be) | ||
386 | | otherwise = do host <- S.get -- TODO: Is this right? | ||
387 | port <- fromIntegral <$> S.getWord16be | ||
388 | return $ SockAddrInet6 port 0 host 0 | ||
389 | |||
390 | genericArgs :: BEncode a => a -> Bool -> BDict | ||
391 | genericArgs nodeid ro = | ||
392 | "id" .=! nodeid | ||
393 | .: "ro" .=? bool Nothing (Just (1 :: Int)) ro | ||
394 | .: endDict | ||
395 | |||
396 | encodeError :: BEncode a => a -> Error -> BValue | ||
397 | encodeError tid (Error ecode emsg) = encodeAny tid "e" (ecode,emsg) id | ||
398 | |||
399 | encodeResponse :: (BEncode tid, BEncode vals) => | ||
400 | tid -> vals -> Maybe SockAddr -> BValue | ||
401 | encodeResponse tid rvals rip = | ||
402 | encodeAny tid "r" rvals ("ip" .=? (BString . encodeAddr <$> rip) .:) | ||
403 | |||
404 | encodeQuery :: (BEncode args, BEncode tid, BEncode method) => | ||
405 | tid -> method -> args -> BValue | ||
406 | encodeQuery tid qmeth qargs = encodeAny tid "q" qmeth ("a" .=! qargs .:) | ||
407 | |||
408 | encodeAny :: | ||
409 | (BEncode tid, BEncode a) => | ||
410 | tid -> BKey -> a -> (BDict -> BDict) -> BValue | ||
411 | encodeAny tid key val aux = toDict $ | ||
412 | aux $ key .=! val | ||
413 | .: "t" .=! tid | ||
414 | .: "y" .=! key | ||
415 | .: endDict | ||
416 | |||
417 | |||
418 | showPacket :: ([L8.ByteString] -> [L8.ByteString]) -> SockAddr -> L8.ByteString -> ByteString -> String | ||
419 | showPacket f addr flow bs = L8.unpack $ L8.unlines es | ||
420 | where | ||
421 | es = map (L8.append prefix) (f $ L8.lines pp) | ||
422 | |||
423 | prefix = L8.pack (either show show $ either4or6 addr) <> flow | ||
424 | |||
425 | pp = either L8.pack showBEncode $ BE.decode bs | ||
426 | |||
427 | -- Add detailed printouts for every packet. | ||
428 | addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString | ||
429 | addVerbosity tr = | ||
430 | tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do | ||
431 | forM_ m $ mapM_ $ \(msg,addr) -> do | ||
432 | dput XBitTorrent (showPacket id addr " --> " msg) | ||
433 | kont m | ||
434 | , sendMessage = \addr msg -> do | ||
435 | dput XBitTorrent (showPacket id addr " <-- " msg) | ||
436 | sendMessage tr addr msg | ||
437 | } | ||
438 | |||
439 | |||
440 | showParseError :: ByteString -> SockAddr -> String -> String | ||
441 | showParseError bs addr err = showPacket (L8.pack err :) addr " --> " bs | ||
442 | |||
443 | parsePacket :: ByteString -> SockAddr -> Either String (Message BValue, NodeInfo) | ||
444 | parsePacket bs addr = left (showParseError bs addr) $ do | ||
445 | pkt <- BE.decode bs | ||
446 | -- TODO: Error packets do not include a valid msgOrigin. | ||
447 | -- The BE.decode method is using 'zeroID' as a placeholder. | ||
448 | ni <- nodeInfo (msgOrigin pkt) addr | ||
449 | return (pkt, ni) | ||
450 | |||
451 | encodePacket :: Message BValue -> NodeInfo -> (ByteString, SockAddr) | ||
452 | encodePacket msg ni = ( toStrict $ BE.encode msg | ||
453 | , nodeAddr ni ) | ||
454 | |||
455 | classify :: Message BValue -> MessageClass String Method TransactionId NodeInfo (Message BValue) | ||
456 | classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid | ||
457 | classify (R { msgID = tid }) = IsResponse tid | ||
458 | |||
459 | encodeResponsePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue | ||
460 | encodeResponsePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest) | ||
461 | |||
462 | encodeQueryPayload :: BEncode a => | ||
463 | Method -> Bool -> TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue | ||
464 | encodeQueryPayload meth isReadonly tid self dest b = Q (nodeId self) tid (BE.toBEncode b) meth isReadonly | ||
465 | |||
466 | errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a | ||
467 | errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest) | ||
468 | |||
469 | decodePayload :: BEncode a => Message BValue -> Either String a | ||
470 | decodePayload msg = BE.fromBEncode $ qryPayload msg | ||
471 | |||
472 | type Handler = MethodHandler String TransactionId NodeInfo (Message BValue) | ||
473 | |||
474 | handler :: ( BEncode a | ||
475 | , BEncode b | ||
476 | ) => | ||
477 | (NodeInfo -> a -> IO b) -> Maybe Handler | ||
478 | handler f = Just $ MethodHandler decodePayload encodeResponsePayload f | ||
479 | |||
480 | |||
481 | handlerE :: ( BEncode a | ||
482 | , BEncode b | ||
483 | ) => | ||
484 | (NodeInfo -> a -> IO (Either Error b)) -> Maybe Handler | ||
485 | handlerE f = Just $ MethodHandler decodePayload enc f | ||
486 | where | ||
487 | enc tid self dest (Left e) = errorPayload tid self dest e | ||
488 | enc tid self dest (Right b) = encodeResponsePayload tid self dest b | ||
489 | |||
490 | type AnnounceSet = Set (InfoHash, PortNumber) | ||
491 | |||
492 | data SwarmsDatabase = SwarmsDatabase | ||
493 | { contactInfo :: !( TVar PeerStore ) -- ^ Published by other nodes. | ||
494 | , sessionTokens :: !( TVar SessionTokens ) -- ^ Query session IDs. | ||
495 | , announceInfo :: !( TVar AnnounceSet ) -- ^ To publish by this node. | ||
496 | } | ||
497 | |||
498 | newSwarmsDatabase :: IO SwarmsDatabase | ||
499 | newSwarmsDatabase = do | ||
500 | toks <- nullSessionTokens | ||
501 | atomically | ||
502 | $ SwarmsDatabase <$> newTVar def | ||
503 | <*> newTVar toks | ||
504 | <*> newTVar def | ||
505 | |||
506 | data Routing = Routing | ||
507 | { tentativeId :: NodeInfo | ||
508 | , committee4 :: TriadCommittee NodeId SockAddr | ||
509 | , committee6 :: TriadCommittee NodeId SockAddr | ||
510 | , refresher4 :: BucketRefresher NodeId NodeInfo | ||
511 | , refresher6 :: BucketRefresher NodeId NodeInfo | ||
512 | } | ||
513 | |||
514 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) | ||
515 | sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue | ||
516 | |||
517 | sched6 :: Routing -> TVar (Int.PSQ POSIXTime) | ||
518 | sched6 Routing { refresher6 = BucketRefresher { refreshQueue } } = refreshQueue | ||
519 | |||
520 | routing4 :: Routing -> TVar (R.BucketList NodeInfo) | ||
521 | routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBuckets | ||
522 | |||
523 | routing6 :: Routing -> TVar (R.BucketList NodeInfo) | ||
524 | routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets | ||
525 | |||
526 | traced :: Show tid => TableMethods t tid -> TableMethods t tid | ||
527 | traced (TableMethods ins del lkup) | ||
528 | = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) | ||
529 | (\tid t -> trace ("del "++show tid) $ del tid t) | ||
530 | (\tid t -> trace ("lookup "++show tid) $ lkup tid t) | ||
531 | |||
532 | |||
533 | type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) | ||
534 | |||
535 | -- | Like 'nodeInfo' but falls back to 'iNADDR_ANY' for nodeIP' and 'nodePort'. | ||
536 | mkNodeInfo :: NodeId -> SockAddr -> NodeInfo | ||
537 | mkNodeInfo nid addr = NodeInfo | ||
538 | { nodeId = nid | ||
539 | , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr | ||
540 | , nodePort = fromMaybe 0 $ sockAddrPort addr | ||
541 | } | ||
542 | |||
543 | newClient :: SwarmsDatabase -> SockAddr | ||
544 | -> IO ( MainlineClient | ||
545 | , Routing | ||
546 | , [NodeInfo] -> [NodeInfo] -> IO () | ||
547 | , [NodeInfo] -> [NodeInfo] -> IO () | ||
548 | ) | ||
549 | newClient swarms addr = do | ||
550 | udp <- udpTransport addr | ||
551 | nid <- NodeId <$> getRandomBytes 20 | ||
552 | let tentative_info = mkNodeInfo nid addr | ||
553 | tentative_info6 <- | ||
554 | maybe tentative_info | ||
555 | (\ip6 -> tentative_info { nodeId = fromMaybe (nodeId tentative_info) | ||
556 | $ bep42 (toSockAddr ip6) (nodeId tentative_info) | ||
557 | , nodeIP = IPv6 ip6 | ||
558 | }) | ||
559 | <$> global6 | ||
560 | addr4 <- atomically $ newTChan | ||
561 | addr6 <- atomically $ newTChan | ||
562 | mkrouting <- atomically $ do | ||
563 | -- We defer initializing the refreshSearch and refreshPing until we | ||
564 | -- have a client to send queries with. | ||
565 | let nullPing = const $ return False | ||
566 | nullSearch = mainlineSearch $ Left $ \_ _ -> return Nothing | ||
567 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount | ||
568 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing | ||
569 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount | ||
570 | refresher6 <- newBucketRefresher tbl6 nullSearch nullPing | ||
571 | let updateIPVote tblvar addrvar a = do | ||
572 | bkts <- readTVar tblvar | ||
573 | case bep42 a (nodeId $ R.thisNode bkts) of | ||
574 | Just nid -> do | ||
575 | let tbl = R.nullTable (comparing nodeId) | ||
576 | (\s -> hashWithSalt s . nodeId) | ||
577 | (mkNodeInfo nid a) | ||
578 | (R.defaultBucketCount) | ||
579 | writeTVar tblvar tbl | ||
580 | writeTChan addrvar (a,map fst $ concat $ R.toList bkts) | ||
581 | Nothing -> return () | ||
582 | committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4 | ||
583 | committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 | ||
584 | return $ \client -> | ||
585 | -- Now we have a client, so tell the BucketRefresher how to search and ping. | ||
586 | let updIO r = updateRefresherIO (nodeSearch client) (ping client) r | ||
587 | in Routing tentative_info committee4 committee6 (updIO refresher4) (updIO refresher6) | ||
588 | map_var <- atomically $ newTVar (0, mempty) | ||
589 | |||
590 | let routing = mkrouting outgoingClient | ||
591 | |||
592 | net = onInbound (updateRouting outgoingClient routing) | ||
593 | $ layerTransport parsePacket encodePacket | ||
594 | $ udp | ||
595 | |||
596 | -- Paranoid: It's safe to define /net/ and /client/ to be mutually | ||
597 | -- recursive since 'updateRouting' does not invoke 'awaitMessage' which | ||
598 | -- which was modified by 'onInbound'. However, I'm going to avoid the | ||
599 | -- mutual reference just to be safe. | ||
600 | outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } } | ||
601 | |||
602 | dispatch = DispatchMethods | ||
603 | { classifyInbound = classify -- :: x -> MessageClass err meth tid addr x | ||
604 | , lookupHandler = handlers -- :: meth -> Maybe (MethodHandler err tid addr x) | ||
605 | , tableMethods = mapT -- :: TransactionMethods tbl tid x | ||
606 | } | ||
607 | |||
608 | handlers :: Method -> Maybe Handler | ||
609 | handlers ( Method "ping" ) = handler pingH | ||
610 | handlers ( Method "find_node" ) = handler $ findNodeH routing | ||
611 | handlers ( Method "get_peers" ) = handler $ getPeersH routing swarms | ||
612 | handlers ( Method "announce_peer" ) = handlerE $ announceH swarms | ||
613 | handlers ( Method meth ) = Just $ defaultHandler meth | ||
614 | |||
615 | mapT = transactionMethods mapMethods gen | ||
616 | |||
617 | gen :: Word16 -> (TransactionId, Word16) | ||
618 | gen cnt = (TransactionId $ S.encode cnt, cnt+1) | ||
619 | |||
620 | ignoreParseError :: String -> IO () | ||
621 | ignoreParseError _ = return () | ||
622 | |||
623 | client = Client | ||
624 | { clientNet = addHandler ignoreParseError (handleMessage client) net | ||
625 | , clientDispatcher = dispatch | ||
626 | , clientErrorReporter = ignoreErrors -- printErrors stderr | ||
627 | , clientPending = map_var | ||
628 | , clientAddress = \maddr -> atomically $ do | ||
629 | let var = case flip prefer4or6 Nothing <$> maddr of | ||
630 | Just Want_IP6 -> routing6 routing | ||
631 | _ -> routing4 routing | ||
632 | R.thisNode <$> readTVar var | ||
633 | , clientResponseId = return | ||
634 | } | ||
635 | |||
636 | -- TODO: Provide some means of shutting down these five auxillary threads: | ||
637 | |||
638 | fork $ fix $ \again -> do | ||
639 | myThreadId >>= flip labelThread "addr4" | ||
640 | (addr, ns) <- atomically $ readTChan addr4 | ||
641 | dput XBitTorrent $ "External IPv4: "++show (addr, length ns) | ||
642 | forM_ ns $ \n -> do | ||
643 | dput XBitTorrent $ "Change IP, ping: "++show n | ||
644 | ping outgoingClient n | ||
645 | -- TODO: trigger bootstrap ipv4 | ||
646 | again | ||
647 | fork $ fix $ \again -> do | ||
648 | myThreadId >>= flip labelThread "addr6" | ||
649 | (addr,ns) <- atomically $ readTChan addr6 | ||
650 | dput XBitTorrent $ "External IPv6: "++show (addr, length ns) | ||
651 | forM_ ns $ \n -> do | ||
652 | dput XBitTorrent $ "Change IP, ping: "++show n | ||
653 | ping outgoingClient n | ||
654 | -- TODO: trigger bootstrap ipv6 | ||
655 | again | ||
656 | |||
657 | |||
658 | refresh_thread4 <- forkPollForRefresh $ refresher4 routing | ||
659 | refresh_thread6 <- forkPollForRefresh $ refresher6 routing | ||
660 | |||
661 | forkAnnouncedInfohashesGC (contactInfo swarms) | ||
662 | |||
663 | return (client, routing, bootstrap (refresher4 routing), bootstrap (refresher6 routing)) | ||
664 | |||
665 | -- Note that you should call .put() every hour for content that you want to | ||
666 | -- keep alive, since nodes may discard data nodes older than 2 hours. (source: | ||
667 | -- https://www.npmjs.com/package/bittorrent-dht) | ||
668 | -- | ||
669 | -- This function will discard records between 3 and 6 hours old. | ||
670 | forkAnnouncedInfohashesGC :: TVar PeerStore -> IO ThreadId | ||
671 | forkAnnouncedInfohashesGC vpeers = fork $ do | ||
672 | myThreadId >>= flip labelThread "gc:bt-peers" | ||
673 | fix $ \loop -> do | ||
674 | cutoff <- getPOSIXTime | ||
675 | threadDelay 10800000000 -- 3 hours | ||
676 | atomically $ modifyTVar' vpeers $ deleteOlderThan cutoff | ||
677 | loop | ||
678 | |||
679 | -- | Modifies a purely random 'NodeId' to one that is related to a given | ||
680 | -- routable address in accordance with BEP 42. | ||
681 | -- | ||
682 | -- Test vectors from the spec: | ||
683 | -- | ||
684 | -- IP rand example node ID | ||
685 | -- ============ ===== ========================================== | ||
686 | -- 124.31.75.21 1 5fbfbf f10c5d6a4ec8a88e4c6ab4c28b95eee4 01 | ||
687 | -- 21.75.31.124 86 5a3ce9 c14e7a08645677bbd1cfe7d8f956d532 56 | ||
688 | -- 65.23.51.170 22 a5d432 20bc8f112a3d426c84764f8c2a1150e6 16 | ||
689 | -- 84.124.73.14 65 1b0321 dd1bb1fe518101ceef99462b947a01ff 41 | ||
690 | -- 43.213.53.83 90 e56f6c bf5b7c4be0237986d5243b87aa6d5130 5a | ||
691 | bep42 :: SockAddr -> NodeId -> Maybe NodeId | ||
692 | bep42 addr0 (NodeId r) | ||
693 | | let addr = either id id $ either4or6 addr0 -- unmap 4mapped SockAddrs | ||
694 | , Just ip <- fmap S.encode (fromSockAddr addr :: Maybe IPv4) | ||
695 | <|> fmap S.encode (fromSockAddr addr :: Maybe IPv6) | ||
696 | = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0) | ||
697 | | otherwise | ||
698 | = Nothing | ||
699 | where | ||
700 | ip4mask = "\x03\x0f\x3f\xff" :: ByteString | ||
701 | ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString | ||
702 | nbhood_select = B.last r .&. 7 | ||
703 | retr n = pure $ B.drop (B.length r - n) r | ||
704 | crc = S.encode . crc32c . B.pack | ||
705 | applyMask ip = case B.zipWith (.&.) msk ip of | ||
706 | (b:bs) -> (b .|. shiftL nbhood_select 5) : bs | ||
707 | bs -> bs | ||
708 | where msk | B.length ip == 4 = ip4mask | ||
709 | | otherwise = ip6mask | ||
710 | |||
711 | |||
712 | |||
713 | defaultHandler :: ByteString -> Handler | ||
714 | defaultHandler meth = MethodHandler decodePayload errorPayload returnError | ||
715 | where | ||
716 | returnError :: NodeInfo -> BValue -> IO Error | ||
717 | returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) | ||
718 | |||
719 | mainlineKademlia :: MainlineClient | ||
720 | -> TriadCommittee NodeId SockAddr | ||
721 | -> BucketRefresher NodeId NodeInfo | ||
722 | -> Kademlia NodeId NodeInfo | ||
723 | mainlineKademlia client committee refresher | ||
724 | = Kademlia quietInsertions | ||
725 | mainlineSpace | ||
726 | (vanillaIO (refreshBuckets refresher) $ ping client) | ||
727 | { tblTransition = \tr -> do | ||
728 | io1 <- transitionCommittee committee tr | ||
729 | io2 <- touchBucket refresher tr | ||
730 | return $ do | ||
731 | io1 >> io2 | ||
732 | {- noisy (timestamp updates are currently reported as transitions to Accepted) | ||
733 | dput XBitTorrent $ unwords | ||
734 | [ show (transitionedTo tr) | ||
735 | , show (transitioningNode tr) | ||
736 | ] -} | ||
737 | } | ||
738 | |||
739 | |||
740 | mainlineSpace :: R.KademliaSpace NodeId NodeInfo | ||
741 | mainlineSpace = R.KademliaSpace | ||
742 | { R.kademliaLocation = nodeId | ||
743 | , R.kademliaTestBit = testIdBit | ||
744 | , R.kademliaXor = xor | ||
745 | , R.kademliaSample = genBucketSample' | ||
746 | } | ||
747 | |||
748 | transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ()) | ||
749 | transitionCommittee committee (RoutingTransition ni Stranger) = do | ||
750 | delVote committee (nodeId ni) | ||
751 | return $ do | ||
752 | dput XBitTorrent $ "delVote "++show (nodeId ni) | ||
753 | transitionCommittee committee _ = return $ return () | ||
754 | |||
755 | updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () | ||
756 | updateRouting client routing naddr msg = do | ||
757 | case prefer4or6 naddr Nothing of | ||
758 | Want_IP4 -> go (committee4 routing) (refresher4 routing) | ||
759 | Want_IP6 -> go (committee6 routing) (refresher6 routing) | ||
760 | where | ||
761 | go committee refresher = do | ||
762 | self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) | ||
763 | when (nodeIP self /= nodeIP naddr) $ do | ||
764 | case msg of | ||
765 | R { rspReflectedIP = Just sockaddr } | ||
766 | -> do | ||
767 | -- dput XBitTorrent $ "External: "++show (nodeId naddr,sockaddr) | ||
768 | atomically $ addVote committee (nodeId naddr) sockaddr | ||
769 | _ -> return () | ||
770 | insertNode (mainlineKademlia client committee refresher) naddr | ||
771 | |||
772 | data Ping = Ping deriving Show | ||
773 | |||
774 | -- Pong is the same as Ping. | ||
775 | type Pong = Ping | ||
776 | pattern Pong = Ping | ||
777 | |||
778 | instance BEncode Ping where | ||
779 | toBEncode Ping = toDict endDict | ||
780 | fromBEncode _ = pure Ping | ||
781 | |||
782 | wantList :: WantIP -> [ByteString] | ||
783 | wantList Want_IP4 = ["ip4"] | ||
784 | wantList Want_IP6 = ["ip6"] | ||
785 | wantList Want_Both = ["ip4","ip6"] | ||
786 | |||
787 | instance BEncode WantIP where | ||
788 | toBEncode w = toBEncode $ wantList w | ||
789 | fromBEncode bval = do | ||
790 | wants <- fromBEncode bval | ||
791 | let _ = wants :: [ByteString] | ||
792 | case (elem "ip4" wants, elem "ip6" wants) of | ||
793 | (True,True) -> Right Want_Both | ||
794 | (True,False) -> Right Want_IP4 | ||
795 | (False,True) -> Right Want_IP6 | ||
796 | _ -> Left "Unrecognized IP type." | ||
797 | |||
798 | data FindNode = FindNode NodeId (Maybe WantIP) | ||
799 | |||
800 | instance BEncode FindNode where | ||
801 | toBEncode (FindNode nid iptyp) = toDict $ target_key .=! nid | ||
802 | .: want_key .=? iptyp | ||
803 | .: endDict | ||
804 | fromBEncode = fromDict $ FindNode <$>! target_key | ||
805 | <*>? want_key | ||
806 | |||
807 | data NodeFound = NodeFound | ||
808 | { nodes4 :: [NodeInfo] | ||
809 | , nodes6 :: [NodeInfo] | ||
810 | } | ||
811 | |||
812 | instance BEncode NodeFound where | ||
813 | toBEncode (NodeFound ns ns6) = toDict $ | ||
814 | nodes_key .=? | ||
815 | (if Prelude.null ns then Nothing | ||
816 | else Just (S.runPut (mapM_ putNodeInfo4 ns))) | ||
817 | .: nodes6_key .=? | ||
818 | (if Prelude.null ns6 then Nothing | ||
819 | else Just (S.runPut (mapM_ putNodeInfo6 ns6))) | ||
820 | .: endDict | ||
821 | |||
822 | fromBEncode bval = NodeFound <$> ns4 <*> ns6 | ||
823 | where | ||
824 | opt ns = fromMaybe [] <$> optional ns | ||
825 | ns4 = opt $ fromDict (binary getNodeInfo4 nodes_key) bval | ||
826 | ns6 = opt $ fromDict (binary getNodeInfo6 nodes6_key) bval | ||
827 | |||
828 | binary :: S.Get a -> BKey -> BE.Get [a] | ||
829 | binary get k = field (req k) >>= either (fail . format) return . | ||
830 | S.runGet (many get) | ||
831 | where | ||
832 | format str = "fail to deserialize " ++ show k ++ " field: " ++ str | ||
833 | |||
834 | pingH :: NodeInfo -> Ping -> IO Pong | ||
835 | pingH _ Ping = return Pong | ||
836 | |||
837 | prefer4or6 :: NodeInfo -> Maybe WantIP -> WantIP | ||
838 | prefer4or6 addr iptyp = fromMaybe (ipFamily $ nodeIP addr) iptyp | ||
839 | |||
840 | findNodeH :: Routing -> NodeInfo -> FindNode -> IO NodeFound | ||
841 | findNodeH routing addr (FindNode node iptyp) = do | ||
842 | let preferred = prefer4or6 addr iptyp | ||
843 | |||
844 | (append4,append6) <- atomically $ do | ||
845 | ni4 <- R.thisNode <$> readTVar (routing4 routing) | ||
846 | ni6 <- R.thisNode <$> readTVar (routing6 routing) | ||
847 | return $ case ipFamily (nodeIP addr) of | ||
848 | Want_IP4 -> (id, (++ [ni6])) | ||
849 | Want_IP6 -> ((++ [ni4]), id) | ||
850 | ks <- bool (return []) (go append4 $ routing4 routing) (preferred /= Want_IP6) | ||
851 | ks6 <- bool (return []) (go append6 $ routing6 routing) (preferred /= Want_IP4) | ||
852 | return $ NodeFound ks ks6 | ||
853 | where | ||
854 | go f var = f . R.kclosest mainlineSpace k node <$> atomically (readTVar var) | ||
855 | |||
856 | k = R.defaultK | ||
857 | |||
858 | |||
859 | data GetPeers = GetPeers InfoHash (Maybe WantIP) | ||
860 | |||
861 | instance BEncode GetPeers where | ||
862 | toBEncode (GetPeers ih iptyp) | ||
863 | = toDict $ info_hash_key .=! ih | ||
864 | .: want_key .=? iptyp | ||
865 | .: endDict | ||
866 | fromBEncode = fromDict $ GetPeers <$>! info_hash_key <*>? want_key | ||
867 | |||
868 | |||
869 | data GotPeers = GotPeers | ||
870 | { -- | If the queried node has no peers for the infohash, returned | ||
871 | -- the K nodes in the queried nodes routing table closest to the | ||
872 | -- infohash supplied in the query. | ||
873 | peers :: [PeerAddr] | ||
874 | |||
875 | , nodes :: NodeFound | ||
876 | |||
877 | -- | The token value is a required argument for a future | ||
878 | -- announce_peer query. | ||
879 | , grantedToken :: Token | ||
880 | } -- deriving (Show, Eq, Typeable) | ||
881 | |||
882 | nodeIsIPv6 :: NodeInfo -> Bool | ||
883 | nodeIsIPv6 (NodeInfo _ (IPv6 _) _) = True | ||
884 | nodeIsIPv6 _ = False | ||
885 | |||
886 | instance BEncode GotPeers where | ||
887 | toBEncode GotPeers { nodes = NodeFound ns4 ns6, ..} = toDict $ | ||
888 | nodes_key .=? (if null ns4 then Nothing | ||
889 | else Just $ S.runPut (mapM_ putNodeInfo4 ns4)) | ||
890 | .: nodes6_key .=? (if null ns6 then Nothing | ||
891 | else Just $ S.runPut (mapM_ putNodeInfo4 ns6)) | ||
892 | .: token_key .=! grantedToken | ||
893 | .: peers_key .=! map S.encode peers | ||
894 | .: endDict | ||
895 | |||
896 | fromBEncode = fromDict $ do | ||
897 | ns4 <- fromMaybe [] <$> optional (binary getNodeInfo4 nodes_key) -- "nodes" | ||
898 | ns6 <- fromMaybe [] <$> optional (binary getNodeInfo6 nodes6_key) -- "nodes6" | ||
899 | -- TODO: BEP 42... | ||
900 | -- | ||
901 | -- Once enforced, responses to get_peers requests whose node ID does not | ||
902 | -- match its external IP should be considered to not contain a token and | ||
903 | -- thus not be eligible as storage target. Implementations should take | ||
904 | -- care that they find the closest set of nodes which return a token and | ||
905 | -- whose IDs matches their IPs before sending a store request to those | ||
906 | -- nodes. | ||
907 | -- | ||
908 | -- Sounds like something to take care of at peer-search time, so I'll | ||
909 | -- ignore it for now. | ||
910 | tok <- field (req token_key) -- "token" | ||
911 | ps <- fromMaybe [] <$> optional (field (req peers_key) >>= decodePeers) -- "values" | ||
912 | pure $ GotPeers ps (NodeFound ns4 ns6) tok | ||
913 | where | ||
914 | decodePeers = either fail pure . mapM S.decode | ||
915 | |||
916 | getPeersH :: Routing -> SwarmsDatabase -> NodeInfo -> GetPeers -> IO GotPeers | ||
917 | getPeersH routing (SwarmsDatabase peers toks _) naddr (GetPeers ih iptyp) = do | ||
918 | ps <- do | ||
919 | tm <- getTimestamp | ||
920 | atomically $ do | ||
921 | (ps,store') <- Peers.freshPeers ih tm <$> readTVar peers | ||
922 | writeTVar peers store' | ||
923 | return ps | ||
924 | -- Filter peer results to only a single address family, IPv4 or IPv6, as | ||
925 | -- per BEP 32. | ||
926 | let notboth = iptyp >>= \case Want_Both -> Nothing | ||
927 | specific -> Just specific | ||
928 | selected = prefer4or6 naddr notboth | ||
929 | ps' = filter ( (== selected) . ipFamily . peerHost ) ps | ||
930 | tok <- grantToken toks naddr | ||
931 | ns <- findNodeH routing naddr (FindNode (coerce ih) iptyp) | ||
932 | return $ GotPeers ps' ns tok | ||
933 | |||
934 | -- | Announce that the peer, controlling the querying node, is | ||
935 | -- downloading a torrent on a port. | ||
936 | data Announce = Announce | ||
937 | { -- | If set, the 'port' field should be ignored and the source | ||
938 | -- port of the UDP packet should be used as the peer's port | ||
939 | -- instead. This is useful for peers behind a NAT that may not | ||
940 | -- know their external port, and supporting uTP, they accept | ||
941 | -- incoming connections on the same port as the DHT port. | ||
942 | impliedPort :: Bool | ||
943 | |||
944 | -- | infohash of the torrent; | ||
945 | , topic :: InfoHash | ||
946 | |||
947 | -- | some clients announce the friendly name of the torrent here. | ||
948 | , announcedName :: Maybe ByteString | ||
949 | |||
950 | -- | the port /this/ peer is listening; | ||
951 | , port :: PortNumber | ||
952 | |||
953 | -- TODO: optional boolean "seed" key | ||
954 | |||
955 | -- | received in response to a previous get_peers query. | ||
956 | , sessionToken :: Token | ||
957 | |||
958 | } deriving (Show, Eq, Typeable) | ||
959 | |||
960 | mkAnnounce :: PortNumber -> InfoHash -> Token -> Announce | ||
961 | mkAnnounce portnum info token = Announce | ||
962 | { topic = info | ||
963 | , port = portnum | ||
964 | , sessionToken = token | ||
965 | , announcedName = Nothing | ||
966 | , impliedPort = False | ||
967 | } | ||
968 | |||
969 | |||
970 | instance BEncode Announce where | ||
971 | toBEncode Announce {..} = toDict $ | ||
972 | implied_port_key .=? flagField impliedPort | ||
973 | .: info_hash_key .=! topic | ||
974 | .: name_key .=? announcedName | ||
975 | .: port_key .=! port | ||
976 | .: token_key .=! sessionToken | ||
977 | .: endDict | ||
978 | where | ||
979 | flagField flag = if flag then Just (1 :: Int) else Nothing | ||
980 | |||
981 | fromBEncode = fromDict $ do | ||
982 | Announce <$> (boolField <$> optional (field (req implied_port_key))) | ||
983 | <*>! info_hash_key | ||
984 | <*>? name_key | ||
985 | <*>! port_key | ||
986 | <*>! token_key | ||
987 | where | ||
988 | boolField = maybe False (/= (0 :: Int)) | ||
989 | |||
990 | |||
991 | |||
992 | -- | The queried node must verify that the token was previously sent | ||
993 | -- to the same IP address as the querying node. Then the queried node | ||
994 | -- should store the IP address of the querying node and the supplied | ||
995 | -- port number under the infohash in its store of peer contact | ||
996 | -- information. | ||
997 | data Announced = Announced | ||
998 | deriving (Show, Eq, Typeable) | ||
999 | |||
1000 | instance BEncode Announced where | ||
1001 | toBEncode _ = toBEncode Ping | ||
1002 | fromBEncode _ = pure Announced | ||
1003 | |||
1004 | announceH :: SwarmsDatabase -> NodeInfo -> Announce -> IO (Either Error Announced) | ||
1005 | announceH (SwarmsDatabase peers toks _) naddr announcement = do | ||
1006 | checkToken toks naddr (sessionToken announcement) | ||
1007 | >>= bool (Left <$> return (Error ProtocolError "invalid parameter: token")) | ||
1008 | (Right <$> go) | ||
1009 | where | ||
1010 | go = atomically $ do | ||
1011 | modifyTVar' peers | ||
1012 | $ insertPeer (topic announcement) (announcedName announcement) | ||
1013 | $ PeerAddr | ||
1014 | { peerId = Nothing | ||
1015 | -- Avoid storing IPv4-mapped addresses. | ||
1016 | , peerHost = case nodeIP naddr of | ||
1017 | IPv6 ip6 | Just ip4 <- un4map ip6 -> IPv4 ip4 | ||
1018 | a -> a | ||
1019 | , peerPort = if impliedPort announcement | ||
1020 | then nodePort naddr | ||
1021 | else port announcement | ||
1022 | } | ||
1023 | return Announced | ||
1024 | |||
1025 | isReadonlyClient :: MainlineClient -> Bool | ||
1026 | isReadonlyClient client = False -- TODO | ||
1027 | |||
1028 | mainlineSend :: ( BEncode a | ||
1029 | , BEncode a2 | ||
1030 | ) => Method | ||
1031 | -> (a2 -> b) | ||
1032 | -> (t -> a) | ||
1033 | -> MainlineClient | ||
1034 | -> t | ||
1035 | -> NodeInfo | ||
1036 | -> IO (Maybe b) | ||
1037 | mainlineSend meth unwrap msg client nid addr = do | ||
1038 | reply <- sendQuery client (mainlineSerializeer meth unwrap client) (msg nid) addr | ||
1039 | -- sendQuery will return (Just (Left _)) on a parse error. We're going to | ||
1040 | -- blow it away with the join-either sequence. | ||
1041 | -- TODO: Do something with parse errors. | ||
1042 | return $ join $ either (const Nothing) Just <$> reply | ||
1043 | |||
1044 | mainlineAsync :: (BEncode a1, BEncode a2) => | ||
1045 | Method | ||
1046 | -> (a2 -> a3) | ||
1047 | -> (t -> a1) | ||
1048 | -> Client String Method TransactionId NodeInfo (Message BValue) | ||
1049 | -> t | ||
1050 | -> NodeInfo | ||
1051 | -> (Maybe a3 -> IO ()) | ||
1052 | -> IO () | ||
1053 | mainlineAsync meth unwrap msg client nid addr onresult = do | ||
1054 | asyncQuery client (mainlineSerializeer meth unwrap client) (msg nid) addr | ||
1055 | $ \reply -> | ||
1056 | -- sendQuery will return (Just (Left _)) on a parse error. We're going to | ||
1057 | -- blow it away with the join-either sequence. | ||
1058 | -- TODO: Do something with parse errors. | ||
1059 | onresult $ join $ either (const Nothing) Just <$> reply | ||
1060 | |||
1061 | mainlineSerializeer :: (BEncode a2, BEncode a1) => | ||
1062 | Method | ||
1063 | -> (a2 -> b) | ||
1064 | -> MainlineClient | ||
1065 | -> MethodSerializer | ||
1066 | TransactionId NodeInfo (Message BValue) Method a1 (Either Error b) | ||
1067 | mainlineSerializeer meth unwrap client = MethodSerializer | ||
1068 | { methodTimeout = \_ ni -> return (ni, 5000000) | ||
1069 | , method = meth | ||
1070 | , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) | ||
1071 | , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack) | ||
1072 | (Right . unwrap) | ||
1073 | . BE.fromBEncode) | ||
1074 | . rspPayload | ||
1075 | } | ||
1076 | |||
1077 | ping :: MainlineClient -> NodeInfo -> IO Bool | ||
1078 | ping client addr = | ||
1079 | fromMaybe False | ||
1080 | <$> mainlineSend (Method "ping") (\Pong -> True) (const Ping) client () addr | ||
1081 | |||
1082 | -- searchQuery :: ni -> IO (Maybe [ni], [r], tok)) | ||
1083 | getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ())) | ||
1084 | getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) | ||
1085 | |||
1086 | asyncGetNodes :: Client String Method TransactionId NodeInfo (Message BValue) | ||
1087 | -> NodeId | ||
1088 | -> NodeInfo | ||
1089 | -> (Maybe ([NodeInfo], [NodeInfo], Maybe ()) -> IO ()) | ||
1090 | -> IO () | ||
1091 | asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) | ||
1092 | |||
1093 | unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) | ||
1094 | unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) | ||
1095 | |||
1096 | getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[PeerAddr],Maybe Token)) | ||
1097 | getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce | ||
1098 | |||
1099 | asyncGetPeers :: Client String Method TransactionId NodeInfo (Message BValue) | ||
1100 | -> NodeId | ||
1101 | -> NodeInfo | ||
1102 | -> (Maybe ([NodeInfo], [PeerAddr], Maybe Token) -> IO ()) | ||
1103 | -> IO () | ||
1104 | asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce | ||
1105 | |||
1106 | unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) | ||
1107 | unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) | ||
1108 | |||
1109 | mainlineSearch :: Either (NodeId -> NodeInfo -> IO (Maybe ([NodeInfo], [r], Maybe tok))) | ||
1110 | (NodeId -> NodeInfo -> (Maybe ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO ()) | ||
1111 | -> Search NodeId (IP, PortNumber) tok NodeInfo r | ||
1112 | mainlineSearch qry = Search | ||
1113 | { searchSpace = mainlineSpace | ||
1114 | , searchNodeAddress = nodeIP &&& nodePort | ||
1115 | , searchQuery = qry | ||
1116 | , searchAlpha = 8 | ||
1117 | , searchK = 16 | ||
1118 | } | ||
1119 | |||
1120 | nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo | ||
1121 | nodeSearch client = mainlineSearch (Right $ asyncGetNodes client) | ||
1122 | |||
1123 | peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr | ||
1124 | peerSearch client = mainlineSearch (Right $ asyncGetPeers client) | ||
1125 | |||
1126 | -- | List of bootstrap nodes maintained by different bittorrent | ||
1127 | -- software authors. | ||
1128 | bootstrapNodes :: WantIP -> IO [NodeInfo] | ||
1129 | bootstrapNodes want = unsafeInterleaveIO $ do | ||
1130 | let wellknowns = | ||
1131 | [ "router.bittorrent.com:6881" -- by BitTorrent Inc. | ||
1132 | |||
1133 | -- doesn't work at the moment (use git blame) of commit | ||
1134 | , "dht.transmissionbt.com:6881" -- by Transmission project | ||
1135 | |||
1136 | , "router.utorrent.com:6881" | ||
1137 | ] | ||
1138 | nss <- forM wellknowns $ \hostAndPort -> do | ||
1139 | e <- resolve want hostAndPort | ||
1140 | case e of | ||
1141 | Left _ -> return [] | ||
1142 | Right sockaddr -> either (const $ return []) | ||
1143 | (return . (: [])) | ||
1144 | $ nodeInfo zeroID sockaddr | ||
1145 | return $ concat nss | ||
1146 | |||
1147 | -- | Resolve either a numeric network address or a hostname to a | ||
1148 | -- numeric IP address of the node. | ||
1149 | resolve :: WantIP -> String -> IO (Either IOError SockAddr) | ||
1150 | resolve want hostAndPort = do | ||
1151 | let hints = defaultHints { addrSocketType = Datagram | ||
1152 | , addrFamily = case want of | ||
1153 | Want_IP4 -> AF_INET | ||
1154 | _ -> AF_INET6 | ||
1155 | } | ||
1156 | (rport,rhost) = span (/= ':') $ reverse hostAndPort | ||
1157 | (host,port) = case rhost of | ||
1158 | [] -> (hostAndPort, Nothing) | ||
1159 | (_:hs) -> (reverse hs, Just (reverse rport)) | ||
1160 | tryIOError $ do | ||
1161 | -- getAddrInfo throws exception on empty list, so this | ||
1162 | -- pattern matching never fails. | ||
1163 | info : _ <- getAddrInfo (Just hints) (Just host) port | ||
1164 | return $ addrAddress info | ||
1165 | |||
1166 | |||
1167 | announce :: MainlineClient -> Announce -> NodeInfo -> IO (Maybe Announced) | ||
1168 | announce client msg addr = do | ||
1169 | mainlineSend (Method "announce_peer") id (\() -> msg) client () addr | ||
diff --git a/dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs b/dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs new file mode 100644 index 00000000..05a64014 --- /dev/null +++ b/dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs | |||
@@ -0,0 +1,24 @@ | |||
1 | {-# OPTIONS_GHC -fno-warn-missing-signatures #-} | ||
2 | module Network.BitTorrent.MainlineDHT.Symbols where | ||
3 | |||
4 | import Data.BEncode.BDict | ||
5 | |||
6 | peer_ip_key = "ip" :: BKey | ||
7 | peer_id_key = "peer id" :: BKey | ||
8 | peer_port_key = "port" :: BKey | ||
9 | msg_type_key = "msg_type" :: BKey | ||
10 | piece_key = "piece" :: BKey | ||
11 | total_size_key = "total_size" :: BKey | ||
12 | node_id_key = "id" :: BKey | ||
13 | read_only_key = "ro" :: BKey | ||
14 | want_key = "want" :: BKey | ||
15 | target_key = "target" :: BKey | ||
16 | nodes_key = "nodes" :: BKey | ||
17 | nodes6_key = "nodes6" :: BKey | ||
18 | info_hash_key = "info_hash" :: BKey | ||
19 | peers_key = "values" :: BKey | ||
20 | token_key = "token" :: BKey | ||
21 | name_key = "name" :: BKey | ||
22 | port_key = "port" :: BKey | ||
23 | implied_port_key = "implied_port" :: BKey | ||
24 | |||