summaryrefslogtreecommitdiff
path: root/dht/src/Network/BitTorrent
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /dht/src/Network/BitTorrent
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'dht/src/Network/BitTorrent')
-rw-r--r--dht/src/Network/BitTorrent/DHT/ContactInfo.hs254
-rw-r--r--dht/src/Network/BitTorrent/DHT/Readme.md13
-rw-r--r--dht/src/Network/BitTorrent/DHT/Token.hs201
-rw-r--r--dht/src/Network/BitTorrent/MainlineDHT.hs1169
-rw-r--r--dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs24
5 files changed, 1661 insertions, 0 deletions
diff --git a/dht/src/Network/BitTorrent/DHT/ContactInfo.hs b/dht/src/Network/BitTorrent/DHT/ContactInfo.hs
new file mode 100644
index 00000000..ec7e6658
--- /dev/null
+++ b/dht/src/Network/BitTorrent/DHT/ContactInfo.hs
@@ -0,0 +1,254 @@
1{-# LANGUAGE BangPatterns #-}
2module Network.BitTorrent.DHT.ContactInfo
3 ( PeerStore
4 , PeerAddr(..)
5 , Network.BitTorrent.DHT.ContactInfo.lookup
6 , Network.BitTorrent.DHT.ContactInfo.freshPeers
7 , Network.BitTorrent.DHT.ContactInfo.insertPeer
8 , deleteOlderThan
9 , knownSwarms
10 ) where
11
12import Control.Applicative
13import Data.Default
14import Data.List as L
15import Data.Maybe
16import Data.HashMap.Strict as HM
17import Data.Serialize
18import Data.Semigroup
19import Data.Wrapper.PSQ as PSQ
20import Data.Time.Clock.POSIX
21import Data.ByteString (ByteString)
22import Data.Word
23
24import Data.Torrent
25import Network.Address
26
27-- {-
28-- import Data.HashMap.Strict as HM
29--
30-- import Data.Torrent.InfoHash
31-- import Network.Address
32--
33-- -- increase prefix when table is too large
34-- -- decrease prefix when table is too small
35-- -- filter outdated peers
36--
37-- {-----------------------------------------------------------------------
38-- -- PeerSet
39-- -----------------------------------------------------------------------}
40--
41-- type PeerSet a = [(PeerAddr, NodeInfo a, Timestamp)]
42--
43-- -- compare PSQueue vs Ordered list
44--
45-- takeNewest :: PeerSet a -> [PeerAddr]
46-- takeNewest = undefined
47--
48-- dropOld :: Timestamp -> PeerSet a -> PeerSet a
49-- dropOld = undefined
50--
51-- insert :: PeerAddr -> Timestamp -> PeerSet a -> PeerSet a
52-- insert = undefined
53--
54-- type Mask = Int
55-- type Size = Int
56-- type Timestamp = Int
57--
58-- {-----------------------------------------------------------------------
59-- -- InfoHashMap
60-- -----------------------------------------------------------------------}
61--
62-- -- compare handwritten prefix tree versus IntMap
63--
64-- data Tree a
65-- = Nil
66-- | Tip !InfoHash !(PeerSet a)
67-- | Bin !InfoHash !Mask !Size !Timestamp (Tree a) (Tree a)
68--
69-- insertTree :: InfoHash -> a -> Tree a -> Tree a
70-- insertTree = undefined
71--
72-- type Prio = Int
73--
74-- --shrink :: ContactInfo ip -> Int
75-- shrink Nil = Nil
76-- shrink (Tip _ _) = undefined
77-- shrink (Bin _ _) = undefined
78--
79-- {-----------------------------------------------------------------------
80-- -- InfoHashMap
81-- -----------------------------------------------------------------------}
82--
83-- -- compare new design versus HashMap
84--
85-- data IntMap k p a
86-- type ContactInfo = Map InfoHash Timestamp (Set (PeerAddr IP) Timestamp)
87--
88-- data ContactInfo ip = PeerStore
89-- { maxSize :: Int
90-- , prefixSize :: Int
91-- , thisNodeId :: NodeId
92--
93-- , count :: Int -- ^ Cached size of the 'peerSet'
94-- , peerSet :: HashMap InfoHash [PeerAddr ip]
95-- }
96--
97-- size :: ContactInfo ip -> Int
98-- size = undefined
99--
100-- prefixSize :: ContactInfo ip -> Int
101-- prefixSize = undefined
102--
103-- lookup :: InfoHash -> ContactInfo ip -> [PeerAddr ip]
104-- lookup = undefined
105--
106-- insert :: InfoHash -> PeerAddr ip -> ContactInfo ip -> ContactInfo ip
107-- insert = undefined
108--
109-- -- | Limit in size.
110-- prune :: NodeId -> Int -> ContactInfo ip -> ContactInfo ip
111-- prune pref targetSize Nil = Nil
112-- prune pref targetSize (Tip _ _) = undefined
113--
114-- -- | Remove expired entries.
115-- splitGT :: Timestamp -> ContactInfo ip -> ContactInfo ip
116-- splitGT = undefined
117-- -}
118
119-- | Storage used to keep track a set of known peers in client,
120-- tracker or DHT sessions.
121newtype PeerStore = PeerStore (HashMap InfoHash SwarmData)
122
123type Timestamp = POSIXTime
124
125data SwarmData = SwarmData
126 { peers :: !(PSQ PeerAddr Timestamp)
127 , name :: !(Maybe ByteString)
128 }
129
130-- | This wrapper will serialize an ip address with a '4' or '6' prefix byte
131-- to indicate whether it is IPv4 or IPv6.
132--
133-- Note: it does not serialize port numbers.
134newtype SerializeAddress a = SerializeAddress { unserializeAddress :: a }
135
136instance Address a => Serialize (SerializeAddress a) where
137 get = SerializeAddress <$> do
138 c <- get
139 case (c::Word8) of
140 0x34 -> do ip4 <- get
141 return $ fromJust $ fromAddr (ip4::IPv4)
142 0x36 -> do ip6 <- get
143 return $ fromJust $ fromAddr (ip6::IPv6)
144 _ -> return $ error "cannot deserialize non-IP SerializeAddress"
145 put (SerializeAddress a)
146 | Just ip4 <- fromAddr a
147 = put (0x34::Word8) >> put (ip4::IPv4)
148 | Just ip6 <- fromAddr a
149 = put (0x36::Word8) >> put (ip6::IPv6)
150 | otherwise = return $ error "cannot serialize non-IP SerializeAddress"
151
152
153instance Serialize SwarmData where
154 get = flip SwarmData <$> get
155 <*> ( PSQ.fromList . L.map parseAddr <$> get )
156 where
157 parseAddr (pid,addr,port) = PeerAddr { peerId = pid
158 , peerHost = unserializeAddress addr
159 , peerPort = port
160 }
161 :-> 0
162
163 put SwarmData{..} = do
164 put name
165 put $ L.map (\(addr :-> _) -> (peerId addr, SerializeAddress addr, peerPort addr))
166 -- XXX: should we serialize the timestamp?
167 $ PSQ.toList peers
168
169knownSwarms :: PeerStore -> [ (InfoHash, Int, Maybe ByteString) ]
170knownSwarms (PeerStore m) = L.map (\(ih,SwarmData q n) -> (ih,PSQ.size q,n)) $ HM.toList m
171
172swarmSingleton :: PeerAddr -> SwarmData
173swarmSingleton a = SwarmData
174 { peers = PSQ.singleton a 0
175 , name = Nothing }
176
177swarmInsert :: SwarmData -> SwarmData -> SwarmData
178swarmInsert new old = SwarmData
179 { peers = L.foldl' (\q (a :-> t) -> PSQ.insertWith newerTimeStamp a t q) (peers old) (PSQ.toList $ peers new)
180 , name = name new <|> name old -- TODO: decodeUtf8' check
181 }
182 where
183 newerTimeStamp newtime oldtime = if newtime > oldtime then newtime else oldtime
184
185isSwarmOccupied :: SwarmData -> Bool
186isSwarmOccupied SwarmData{..} = not $ PSQ.null peers
187
188-- | Empty store.
189instance Default (PeerStore) where
190 def = PeerStore HM.empty
191 {-# INLINE def #-}
192
193instance Semigroup PeerStore where
194 PeerStore a <> PeerStore b =
195 PeerStore (HM.unionWith swarmInsert a b)
196 {-# INLINE (<>) #-}
197
198-- | Monoid under union operation.
199instance Monoid PeerStore where
200 mempty = def
201 {-# INLINE mempty #-}
202
203 mappend (PeerStore a) (PeerStore b) =
204 PeerStore (HM.unionWith swarmInsert a b)
205 {-# INLINE mappend #-}
206
207-- | Can be used to store peers between invocations of the client
208-- software.
209instance Serialize PeerStore where
210 get = PeerStore . HM.fromList <$> get
211 put (PeerStore m) = put (L.filter (isSwarmOccupied . snd) $ HM.toList m)
212
213-- | Returns all peers associated with a given info hash.
214lookup :: InfoHash -> PeerStore -> [PeerAddr]
215lookup ih (PeerStore m) = maybe [] (PSQ.keys . peers) $ HM.lookup ih m
216
217batchSize :: Int
218batchSize = 64
219
220-- | Used in 'get_peers' DHT queries.
221freshPeers :: InfoHash -> Timestamp -> PeerStore -> ([PeerAddr], PeerStore)
222freshPeers ih tm (PeerStore m) = fromMaybe ([],PeerStore m) $ do
223 swarm <- HM.lookup ih m
224 let ps0 = take batchSize $ unfoldr (incomp minView) (peers swarm)
225 peers' = case reverse ps0 of
226 (_,psq):_ -> psq
227 _ -> peers swarm
228 ps = L.map (key . fst) ps0
229 m' = HM.insert ih swarm { peers = L.foldl' (\q p -> PSQ.insert p tm q) peers' ps } m
230 return $! m' `seq` (ps,PeerStore m')
231
232incomp :: (x -> Maybe (r,x)) -> x -> Maybe ((r,x),x)
233incomp !f !x = do
234 (result,x') <- f x
235 pure $! ( (result,x'), x' )
236
237-- | Used in 'announce_peer' DHT queries.
238insertPeer :: InfoHash -> Maybe ByteString -> PeerAddr -> PeerStore -> PeerStore
239insertPeer !ih !name !a !(PeerStore m) = seq a' $ PeerStore (HM.insertWith swarmInsert ih a' m)
240 where
241 a' = SwarmData { peers = PSQ.singleton a 0
242 , name = name }
243
244deleteOlderThan :: POSIXTime -> PeerStore -> PeerStore
245deleteOlderThan cutoff (PeerStore m) = PeerStore $ HM.mapMaybe gc m
246 where
247 gc :: SwarmData -> Maybe SwarmData
248 gc swarms = fmap (\ps -> swarms { peers = ps }) $ gcPSQ (peers swarms)
249
250 gcPSQ :: PSQKey a => PSQ a Timestamp -> Maybe (PSQ a Timestamp)
251 gcPSQ ps = case minView ps of
252 Nothing -> Nothing
253 Just (_ :-> tm, ps') | tm < cutoff -> gcPSQ ps'
254 Just _ -> Just ps
diff --git a/dht/src/Network/BitTorrent/DHT/Readme.md b/dht/src/Network/BitTorrent/DHT/Readme.md
new file mode 100644
index 00000000..e2352f10
--- /dev/null
+++ b/dht/src/Network/BitTorrent/DHT/Readme.md
@@ -0,0 +1,13 @@
1References
2==========
3
4Some good references excluding BEPs:
5
6* [Kademlia wiki page][kademlia-wiki]
7* [Kademlia: A Peer-to-peer Information System Based on the XOR Metric][kademlia-paper]
8* [BitTorrent Mainline DHT Measurement][mldht]
9* Profiling a Million User DHT. (paper)
10
11[kademlia-wiki]: http://en.wikipedia.org/wiki/Kademlia
12[kademlia-paper]: http://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf
13[mldht]: http://www.cs.helsinki.fi/u/jakangas/MLDHT/
diff --git a/dht/src/Network/BitTorrent/DHT/Token.hs b/dht/src/Network/BitTorrent/DHT/Token.hs
new file mode 100644
index 00000000..171cc8be
--- /dev/null
+++ b/dht/src/Network/BitTorrent/DHT/Token.hs
@@ -0,0 +1,201 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- The return value for a query for peers includes an opaque value
9-- known as the 'Token'. For a node to announce that its controlling
10-- peer is downloading a torrent, it must present the token received
11-- from the same queried node in a recent query for peers. When a node
12-- attempts to \"announce\" a torrent, the queried node checks the
13-- token against the querying node's 'IP' address. This is to prevent
14-- malicious hosts from signing up other hosts for torrents. Since the
15-- token is merely returned by the querying node to the same node it
16-- received the token from, the implementation is not defined. Tokens
17-- must be accepted for a reasonable amount of time after they have
18-- been distributed.
19--
20{-# LANGUAGE GeneralizedNewtypeDeriving, CPP #-}
21module Network.BitTorrent.DHT.Token
22 ( -- * Token
23 Token
24 , maxInterval
25 , toPaddedByteString
26 , fromPaddedByteString
27
28 -- * Session tokens
29 , TokenMap
30 , SessionTokens
31 , nullSessionTokens
32 , checkToken
33 , grantToken
34
35 -- ** Construction
36 , Network.BitTorrent.DHT.Token.tokens
37
38 -- ** Query
39 , Network.BitTorrent.DHT.Token.lookup
40 , Network.BitTorrent.DHT.Token.member
41
42 -- ** Modification
43 , Network.BitTorrent.DHT.Token.defaultUpdateInterval
44 , Network.BitTorrent.DHT.Token.update
45 ) where
46
47import Control.Arrow
48import Control.Monad.State
49#ifdef VERSION_bencoding
50import Data.BEncode (BEncode)
51#endif
52import Data.ByteString as BS
53import Data.ByteString.Char8 as B8
54import Data.ByteString.Lazy as BL
55import Data.ByteString.Lazy.Builder as BS
56import qualified Data.ByteString.Base16 as Base16
57import Data.Default
58import Data.List as L
59import Data.Hashable
60import Data.String
61import Data.Time
62import System.Random
63import Control.Concurrent.STM
64
65-- TODO use ShortByteString
66
67-- | An opaque value.
68newtype Token = Token BS.ByteString
69 deriving ( Eq, IsString
70#ifdef VERSION_bencoding
71 , BEncode
72#endif
73 )
74
75instance Show Token where
76 show (Token bs) = B8.unpack $ Base16.encode bs
77
78instance Read Token where
79 readsPrec i s = pure $ (Token *** B8.unpack) $ Base16.decode (B8.pack s)
80
81-- | Meaningless token, for testing purposes only.
82instance Default Token where
83 def = makeToken (0::Int) 0
84
85-- | Prepend token with 0x20 bytes to fill the available width.
86--
87-- If n > 8, then this will also guarantee a nonzero token, which is useful for
88-- Tox ping-id values for announce responses.
89toPaddedByteString :: Int -> Token -> BS.ByteString
90toPaddedByteString n (Token bs) = BS.append (BS.replicate (n - BS.length bs) 0x20) bs
91
92fromPaddedByteString :: Int -> BS.ByteString -> Token
93fromPaddedByteString n bs = Token $ BS.drop (n - len) bs
94 where
95 len = BS.length tok where Token tok = def
96
97-- | The secret value used as salt.
98type Secret = Int
99
100-- The BitTorrent implementation uses the SHA1 hash of the IP address
101-- concatenated onto a secret, we use hashable instead.
102makeToken :: Hashable a => a -> Secret -> Token
103makeToken n s = Token $ toBS $ hashWithSalt s n
104 where
105 toBS = toStrict . toLazyByteString . int64BE . fromIntegral
106{-# INLINE makeToken #-}
107
108-- | Constant space 'Node' to 'Token' map based on the secret value.
109data TokenMap = TokenMap
110 { prevSecret :: {-# UNPACK #-} !Secret
111 , curSecret :: {-# UNPACK #-} !Secret
112 , generator :: {-# UNPACK #-} !StdGen
113 } deriving Show
114
115-- | A new token map based on the specified seed value. Returned token
116-- map should be periodicatically 'update'd.
117--
118-- Normally, the seed value should vary between invocations of the
119-- client software.
120tokens :: Int -> TokenMap
121tokens seed = (`evalState` mkStdGen seed) $
122 TokenMap <$> state next
123 <*> state next
124 <*> get
125
126-- | Get token for the given node. A token becomes invalid after 2
127-- 'update's.
128--
129-- Typically used to handle find_peers query.
130lookup :: Hashable a => a -> TokenMap -> Token
131lookup addr TokenMap {..} = makeToken addr curSecret
132
133-- | Check if token is valid.
134--
135-- Typically used to handle 'Network.DHT.Mainline.Announce'
136-- query. If token is invalid the 'Network.KRPC.ProtocolError' should
137-- be sent back to the malicious node.
138member :: Hashable a => a -> Token -> TokenMap -> Bool
139member addr token TokenMap {..} = token `L.elem` valid
140 where valid = makeToken addr <$> [curSecret, prevSecret]
141
142-- | Secret changes every five minutes and tokens up to ten minutes old
143-- are accepted.
144defaultUpdateInterval :: NominalDiffTime
145defaultUpdateInterval = 5 * 60
146
147-- | Update current tokens.
148update :: TokenMap -> TokenMap
149update TokenMap {..} = TokenMap
150 { prevSecret = curSecret
151 , curSecret = newSecret
152 , generator = newGen
153 }
154 where
155 (newSecret, newGen) = next generator
156
157data SessionTokens = SessionTokens
158 { tokenMap :: !TokenMap
159 , lastUpdate :: !UTCTime
160 , maxInterval :: !NominalDiffTime
161 }
162
163nullSessionTokens :: IO SessionTokens
164nullSessionTokens = SessionTokens
165 <$> (tokens <$> randomIO)
166 <*> getCurrentTime
167 <*> pure defaultUpdateInterval
168
169-- TODO invalidate *twice* if needed
170invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens
171invalidateTokens curTime ts @ SessionTokens {..}
172 | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens
173 { tokenMap = update tokenMap
174 , lastUpdate = curTime
175 , maxInterval = maxInterval
176 }
177 | otherwise = ts
178
179{-----------------------------------------------------------------------
180-- Tokens
181-----------------------------------------------------------------------}
182
183tryUpdateSecret :: TVar SessionTokens -> IO ()
184tryUpdateSecret toks = do
185 curTime <- getCurrentTime
186 atomically $ modifyTVar' toks (invalidateTokens curTime)
187
188grantToken :: Hashable addr => TVar SessionTokens -> addr -> IO Token
189grantToken sessionTokens addr = do
190 tryUpdateSecret sessionTokens
191 toks <- readTVarIO sessionTokens
192 return $ Network.BitTorrent.DHT.Token.lookup addr $ tokenMap toks
193
194-- | Throws 'HandlerError' if the token is invalid or already
195-- expired. See 'TokenMap' for details.
196checkToken :: Hashable addr => TVar SessionTokens -> addr -> Token -> IO Bool
197checkToken sessionTokens addr questionableToken = do
198 tryUpdateSecret sessionTokens
199 toks <- readTVarIO sessionTokens
200 return $ member addr questionableToken (tokenMap toks)
201
diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs
new file mode 100644
index 00000000..89851e88
--- /dev/null
+++ b/dht/src/Network/BitTorrent/MainlineDHT.hs
@@ -0,0 +1,1169 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE DeriveDataTypeable #-}
3{-# LANGUAGE DeriveFoldable #-}
4{-# LANGUAGE DeriveFunctor #-}
5{-# LANGUAGE DeriveTraversable #-}
6{-# LANGUAGE FlexibleInstances #-}
7{-# LANGUAGE GeneralizedNewtypeDeriving #-}
8{-# LANGUAGE LambdaCase #-}
9{-# LANGUAGE NamedFieldPuns #-}
10{-# LANGUAGE PatternSynonyms #-}
11{-# LANGUAGE StandaloneDeriving #-}
12{-# LANGUAGE TupleSections #-}
13module Network.BitTorrent.MainlineDHT where
14
15import Control.Applicative
16import Control.Arrow
17import Control.Concurrent.STM
18import Control.Monad
19import Crypto.Random
20import Data.BEncode as BE
21import qualified Data.BEncode.BDict as BE
22 ;import Data.BEncode.BDict (BKey)
23import Data.BEncode.Pretty
24import Data.BEncode.Types (BDict)
25import Data.Bits
26import Data.Bits.ByteString ()
27import Data.Bool
28import Data.ByteArray (ByteArrayAccess)
29import qualified Data.ByteString as B
30 ;import Data.ByteString (ByteString)
31import qualified Data.ByteString.Base16 as Base16
32import qualified Data.ByteString.Char8 as C8
33import Data.ByteString.Lazy (toStrict)
34import qualified Data.ByteString.Lazy.Char8 as L8
35import Data.Char
36import Data.Coerce
37import Data.Data
38import Data.Default
39import Data.Digest.CRC32C
40import Data.Function (fix)
41import Data.Hashable
42#if MIN_VERSION_iproute(1,7,4)
43import Data.IP hiding (fromSockAddr)
44#else
45import Data.IP
46#endif
47import Data.Maybe
48import Data.Monoid
49import Data.Ord
50import qualified Data.Serialize as S
51import Data.Set (Set)
52import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime)
53import Data.Torrent
54import Data.Word
55import qualified Data.Wrapper.PSQInt as Int
56import Debug.Trace
57import Network.BitTorrent.MainlineDHT.Symbols
58import Network.Kademlia
59import Network.Kademlia.Bootstrap
60import Network.Address (fromSockAddr,
61 setPort, sockAddrPort, testIdBit,
62 toSockAddr, genBucketSample', WantIP(..),
63 un4map,either4or6,ipFamily)
64import Network.BitTorrent.DHT.ContactInfo as Peers
65import Network.Kademlia.Search (Search (..))
66import Network.BitTorrent.DHT.Token as Token
67import qualified Network.Kademlia.Routing as R
68 ;import Network.Kademlia.Routing (getTimestamp)
69import Network.QueryResponse
70import Network.Socket
71import System.IO.Error
72import System.IO.Unsafe (unsafeInterleaveIO)
73import qualified Text.ParserCombinators.ReadP as RP
74#ifdef THREAD_DEBUG
75import Control.Concurrent.Lifted.Instrument
76#else
77import Control.Concurrent.Lifted
78import GHC.Conc (labelThread)
79#endif
80import qualified Data.Aeson as JSON
81 ;import Data.Aeson (FromJSON, ToJSON, (.=))
82import Text.Read
83import System.Global6
84import Control.TriadCommittee
85import Data.TableMethods
86import DPut
87import DebugTag
88
89newtype NodeId = NodeId ByteString
90 deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable)
91
92instance BEncode NodeId where
93 fromBEncode bval = do
94 bs <- fromBEncode bval
95 if B.length bs /= 20
96 then Left "Invalid length node id."
97 else Right $ NodeId bs
98
99 toBEncode (NodeId bs) = toBEncode bs
100
101instance Show NodeId where
102 show (NodeId bs) = C8.unpack $ Base16.encode bs
103
104instance S.Serialize NodeId where
105 get = NodeId <$> S.getBytes 20
106 put (NodeId bs) = S.putByteString bs
107
108instance FiniteBits NodeId where
109 finiteBitSize _ = 160
110
111instance Read NodeId where
112 readsPrec _ str
113 | (bs, xs) <- Base16.decode $ C8.pack str
114 , B.length bs == 20
115 = [ (NodeId bs, drop 40 str) ]
116 | otherwise = []
117
118zeroID :: NodeId
119zeroID = NodeId $ B.replicate 20 0
120
121data NodeInfo = NodeInfo
122 { nodeId :: NodeId
123 , nodeIP :: IP
124 , nodePort :: PortNumber
125 }
126 deriving (Eq,Ord)
127
128instance ToJSON NodeInfo where
129 toJSON (NodeInfo nid (IPv4 ip) port)
130 = JSON.object [ "node-id" .= show nid
131 , "ipv4" .= show ip
132 , "port" .= (fromIntegral port :: Int)
133 ]
134 toJSON (NodeInfo nid (IPv6 ip6) port)
135 | Just ip <- un4map ip6
136 = JSON.object [ "node-id" .= show nid
137 , "ipv4" .= show ip
138 , "port" .= (fromIntegral port :: Int)
139 ]
140 | otherwise
141 = JSON.object [ "node-id" .= show nid
142 , "ipv6" .= show ip6
143 , "port" .= (fromIntegral port :: Int)
144 ]
145instance FromJSON NodeInfo where
146 parseJSON (JSON.Object v) = do
147 nidstr <- v JSON..: "node-id"
148 ip6str <- v JSON..:? "ipv6"
149 ip4str <- v JSON..:? "ipv4"
150 portnum <- v JSON..: "port"
151 ip <- maybe empty (return . IPv6) (ip6str >>= readMaybe)
152 <|> maybe empty (return . IPv4) (ip4str >>= readMaybe)
153 let (bs,_) = Base16.decode (C8.pack nidstr)
154 guard (B.length bs == 20)
155 return $ NodeInfo (NodeId bs) ip (fromIntegral (portnum :: Word16))
156
157hexdigit :: Char -> Bool
158hexdigit c = ('0' <= c && c <= '9') || ( 'a' <= c && c <= 'f') || ( 'A' <= c && c <= 'F')
159
160instance Read NodeInfo where
161 readsPrec i = RP.readP_to_S $ do
162 RP.skipSpaces
163 let n = 40 -- characters in node id.
164 parseAddr = RP.between (RP.char '(') (RP.char ')') (RP.munch (/=')'))
165 RP.+++ RP.munch (not . isSpace)
166 nodeidAt = do hexhash <- sequence $ replicate n (RP.satisfy hexdigit)
167 RP.char '@' RP.+++ RP.satisfy isSpace
168 addrstr <- parseAddr
169 nid <- case Base16.decode $ C8.pack hexhash of
170 (bs,_) | B.length bs==20 -> return (NodeId bs)
171 _ -> fail "Bad node id."
172 return (nid,addrstr)
173 (nid,addrstr) <- ( nodeidAt RP.+++ ( (zeroID,) <$> parseAddr) )
174 let raddr = do
175 ip <- RP.between (RP.char '[') (RP.char ']')
176 (IPv6 <$> RP.readS_to_P (readsPrec i))
177 RP.+++ (IPv4 <$> RP.readS_to_P (readsPrec i))
178 _ <- RP.char ':'
179 port <- toEnum <$> RP.readS_to_P (readsPrec i)
180 return (ip, port)
181
182 (ip,port) <- case RP.readP_to_S raddr addrstr of
183 [] -> fail "Bad address."
184 ((ip,port),_):_ -> return (ip,port)
185 return $ NodeInfo nid ip port
186
187
188
189-- The Hashable instance depends only on the IP address and port number. It is
190-- used to compute the announce token.
191instance Hashable NodeInfo where
192 hashWithSalt s ni = hashWithSalt s (nodeIP ni , nodePort ni)
193 {-# INLINE hashWithSalt #-}
194
195
196instance Show NodeInfo where
197 showsPrec _ (NodeInfo nid ip port) =
198 shows nid . ('@' :) . showsip . (':' :) . shows port
199 where
200 showsip
201 | IPv4 ip4 <- ip = shows ip4
202 | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = shows ip4
203 | otherwise = ('[' :) . shows ip . (']' :)
204
205{-
206
207-- | KRPC 'compact list' compatible encoding: contact information for
208-- nodes is encoded as a 26-byte string. Also known as "Compact node
209-- info" the 20-byte Node ID in network byte order has the compact
210-- IP-address/port info concatenated to the end.
211 get = NodeInfo <$> (NodeId <$> S.getBytes 20 ) <*> S.get <*> S.get
212-}
213
214getNodeInfo4 :: S.Get NodeInfo
215getNodeInfo4 = NodeInfo <$> (NodeId <$> S.getBytes 20)
216 <*> (IPv4 <$> S.get)
217 <*> S.get
218
219putNodeInfo4 :: NodeInfo -> S.Put
220putNodeInfo4 (NodeInfo (NodeId nid) ip port)
221 | IPv4 ip4 <- ip = put4 ip4
222 | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = put4 ip4
223 | otherwise = return ()
224 where
225 put4 ip4 = S.putByteString nid >> S.put ip4 >> S.put port
226
227getNodeInfo6 :: S.Get NodeInfo
228getNodeInfo6 = NodeInfo <$> (NodeId <$> S.getBytes 20)
229 <*> (IPv6 <$> S.get)
230 <*> S.get
231
232putNodeInfo6 :: NodeInfo -> S.Put
233putNodeInfo6 (NodeInfo (NodeId nid) (IPv6 ip) port)
234 = S.putByteString nid >> S.put ip >> S.put port
235putNodeInfo6 _ = return ()
236
237
238-- | TODO: This should depend on the bind address to support IPv4-only. For
239-- now, in order to support dual-stack listen, we're going to assume IPv6 is
240-- wanted and map IPv4 addresses accordingly.
241nodeAddr :: NodeInfo -> SockAddr
242nodeAddr (NodeInfo _ ip port) =
243 case ip of
244 IPv4 ip4 -> setPort port $ toSockAddr (ipv4ToIPv6 ip4)
245 IPv6 ip6 -> setPort port $ toSockAddr ip6
246
247nodeInfo :: NodeId -> SockAddr -> Either String NodeInfo
248nodeInfo nid saddr
249 | Just ip <- fromSockAddr saddr
250 , Just port <- sockAddrPort saddr = Right $ NodeInfo nid ip port
251 | otherwise = Left "Address family not supported."
252
253-- | Types of RPC errors.
254data ErrorCode
255 -- | Some error doesn't fit in any other category.
256 = GenericError
257
258 -- | Occurs when server fail to process procedure call.
259 | ServerError
260
261 -- | Malformed packet, invalid arguments or bad token.
262 | ProtocolError
263
264 -- | Occurs when client trying to call method server don't know.
265 | MethodUnknown
266 deriving (Show, Read, Eq, Ord, Bounded, Typeable, Data)
267
268-- | According to the table:
269-- <http://bittorrent.org/beps/bep_0005.html#errors>
270instance Enum ErrorCode where
271 fromEnum GenericError = 201
272 fromEnum ServerError = 202
273 fromEnum ProtocolError = 203
274 fromEnum MethodUnknown = 204
275 {-# INLINE fromEnum #-}
276 toEnum 201 = GenericError
277 toEnum 202 = ServerError
278 toEnum 203 = ProtocolError
279 toEnum 204 = MethodUnknown
280 toEnum _ = GenericError
281 {-# INLINE toEnum #-}
282
283instance BEncode ErrorCode where
284 toBEncode = toBEncode . fromEnum
285 {-# INLINE toBEncode #-}
286 fromBEncode b = toEnum <$> fromBEncode b
287 {-# INLINE fromBEncode #-}
288
289data Error = Error
290 { errorCode :: !ErrorCode -- ^ The type of error.
291 , errorMessage :: !ByteString -- ^ Human-readable text message.
292 } deriving ( Show, Eq, Ord, Typeable, Data, Read )
293
294newtype TransactionId = TransactionId ByteString
295 deriving (Eq, Ord, Show, BEncode)
296
297newtype Method = Method ByteString
298 deriving (Eq, Ord, Show, BEncode)
299
300data Message a = Q { msgOrigin :: NodeId
301 , msgID :: TransactionId
302 , qryPayload :: a
303 , qryMethod :: Method
304 , qryReadOnly :: Bool }
305
306 | R { msgOrigin :: NodeId
307 , msgID :: TransactionId
308 , rspPayload :: Either Error a
309 , rspReflectedIP :: Maybe SockAddr }
310
311showBE :: BValue -> String
312showBE bval = L8.unpack (showBEncode bval)
313
314instance BE.BEncode (Message BValue) where
315 toBEncode m = encodeMessage m
316 {-
317 in case m of
318 Q {} -> trace ("encoded(query): "++showBE r) r
319 R {} -> trace ("encoded(response): "++showBE r) r -}
320 fromBEncode bval = decodeMessage bval
321 {-
322 in case r of
323 Left e -> trace (show e) r
324 Right (Q {}) -> trace ("decoded(query): "++showBE bval) r
325 Right (R {}) -> trace ("decoded(response): "++showBE bval) r -}
326
327decodeMessage :: BValue -> Either String (Message BValue)
328decodeMessage = fromDict $ do
329 key <- lookAhead (field (req "y"))
330 let _ = key :: BKey
331 f <- case key of
332 "q" -> do a <- field (req "a")
333 g <- either fail return $ flip fromDict a $ do
334 who <- field (req "id")
335 ro <- fromMaybe False <$> optional (field (req "ro"))
336 return $ \meth tid -> Q who tid a meth ro
337 meth <- field (req "q")
338 return $ g meth
339 "r" -> do ip <- do
340 ipstr <- optional (field (req "ip"))
341 mapM (either fail return . decodeAddr) ipstr
342 vals <- field (req "r")
343 either fail return $ flip fromDict vals $ do
344 who <- field (req "id")
345 return $ \tid -> R who tid (Right vals) ip
346 "e" -> do (ecode,emsg) <- field (req "e")
347 ip <- do
348 ipstr <- optional (field (req "ip"))
349 mapM (either fail return . decodeAddr) ipstr
350 -- FIXME:Spec does not give us the NodeId of the sender.
351 -- Using 'zeroID' as place holder.
352 -- We should ignore the msgOrigin for errors in 'updateRouting'.
353 -- We should consider making msgOrigin a Maybe value.
354 return $ \tid -> R zeroID tid (Left (Error ecode emsg)) ip
355 _ -> fail $ "Mainline message is not a query, response, or an error: "
356 ++ show key
357 tid <- field (req "t")
358 return $ f (tid :: TransactionId)
359
360
361encodeMessage :: Message BValue -> BValue
362encodeMessage (Q origin tid a meth ro)
363 = case a of
364 BDict args -> encodeQuery tid meth (BDict $ genericArgs origin ro `BE.union` args)
365 _ -> encodeQuery tid meth a -- XXX: Not really a valid query.
366encodeMessage (R origin tid v ip)
367 = case v of
368 Right (BDict vals) -> encodeResponse tid (BDict $ genericArgs origin False `BE.union` vals) ip
369 Left err -> encodeError tid err
370
371
372encodeAddr :: SockAddr -> ByteString
373encodeAddr = either encode4 encode6 . either4or6
374 where
375 encode4 (SockAddrInet port addr)
376 = S.runPut (S.putWord32host addr >> S.putWord16be (fromIntegral port))
377
378 encode6 (SockAddrInet6 port _ addr _)
379 = S.runPut (S.put addr >> S.putWord16be (fromIntegral port))
380 encode6 _ = B.empty
381
382decodeAddr :: ByteString -> Either String SockAddr
383decodeAddr bs = S.runGet g bs
384 where
385 g | (B.length bs == 6) = flip SockAddrInet <$> S.getWord32host <*> (fromIntegral <$> S.getWord16be)
386 | otherwise = do host <- S.get -- TODO: Is this right?
387 port <- fromIntegral <$> S.getWord16be
388 return $ SockAddrInet6 port 0 host 0
389
390genericArgs :: BEncode a => a -> Bool -> BDict
391genericArgs nodeid ro =
392 "id" .=! nodeid
393 .: "ro" .=? bool Nothing (Just (1 :: Int)) ro
394 .: endDict
395
396encodeError :: BEncode a => a -> Error -> BValue
397encodeError tid (Error ecode emsg) = encodeAny tid "e" (ecode,emsg) id
398
399encodeResponse :: (BEncode tid, BEncode vals) =>
400 tid -> vals -> Maybe SockAddr -> BValue
401encodeResponse tid rvals rip =
402 encodeAny tid "r" rvals ("ip" .=? (BString . encodeAddr <$> rip) .:)
403
404encodeQuery :: (BEncode args, BEncode tid, BEncode method) =>
405 tid -> method -> args -> BValue
406encodeQuery tid qmeth qargs = encodeAny tid "q" qmeth ("a" .=! qargs .:)
407
408encodeAny ::
409 (BEncode tid, BEncode a) =>
410 tid -> BKey -> a -> (BDict -> BDict) -> BValue
411encodeAny tid key val aux = toDict $
412 aux $ key .=! val
413 .: "t" .=! tid
414 .: "y" .=! key
415 .: endDict
416
417
418showPacket :: ([L8.ByteString] -> [L8.ByteString]) -> SockAddr -> L8.ByteString -> ByteString -> String
419showPacket f addr flow bs = L8.unpack $ L8.unlines es
420 where
421 es = map (L8.append prefix) (f $ L8.lines pp)
422
423 prefix = L8.pack (either show show $ either4or6 addr) <> flow
424
425 pp = either L8.pack showBEncode $ BE.decode bs
426
427-- Add detailed printouts for every packet.
428addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString
429addVerbosity tr =
430 tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do
431 forM_ m $ mapM_ $ \(msg,addr) -> do
432 dput XBitTorrent (showPacket id addr " --> " msg)
433 kont m
434 , sendMessage = \addr msg -> do
435 dput XBitTorrent (showPacket id addr " <-- " msg)
436 sendMessage tr addr msg
437 }
438
439
440showParseError :: ByteString -> SockAddr -> String -> String
441showParseError bs addr err = showPacket (L8.pack err :) addr " --> " bs
442
443parsePacket :: ByteString -> SockAddr -> Either String (Message BValue, NodeInfo)
444parsePacket bs addr = left (showParseError bs addr) $ do
445 pkt <- BE.decode bs
446 -- TODO: Error packets do not include a valid msgOrigin.
447 -- The BE.decode method is using 'zeroID' as a placeholder.
448 ni <- nodeInfo (msgOrigin pkt) addr
449 return (pkt, ni)
450
451encodePacket :: Message BValue -> NodeInfo -> (ByteString, SockAddr)
452encodePacket msg ni = ( toStrict $ BE.encode msg
453 , nodeAddr ni )
454
455classify :: Message BValue -> MessageClass String Method TransactionId NodeInfo (Message BValue)
456classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid
457classify (R { msgID = tid }) = IsResponse tid
458
459encodeResponsePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue
460encodeResponsePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest)
461
462encodeQueryPayload :: BEncode a =>
463 Method -> Bool -> TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue
464encodeQueryPayload meth isReadonly tid self dest b = Q (nodeId self) tid (BE.toBEncode b) meth isReadonly
465
466errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a
467errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest)
468
469decodePayload :: BEncode a => Message BValue -> Either String a
470decodePayload msg = BE.fromBEncode $ qryPayload msg
471
472type Handler = MethodHandler String TransactionId NodeInfo (Message BValue)
473
474handler :: ( BEncode a
475 , BEncode b
476 ) =>
477 (NodeInfo -> a -> IO b) -> Maybe Handler
478handler f = Just $ MethodHandler decodePayload encodeResponsePayload f
479
480
481handlerE :: ( BEncode a
482 , BEncode b
483 ) =>
484 (NodeInfo -> a -> IO (Either Error b)) -> Maybe Handler
485handlerE f = Just $ MethodHandler decodePayload enc f
486 where
487 enc tid self dest (Left e) = errorPayload tid self dest e
488 enc tid self dest (Right b) = encodeResponsePayload tid self dest b
489
490type AnnounceSet = Set (InfoHash, PortNumber)
491
492data SwarmsDatabase = SwarmsDatabase
493 { contactInfo :: !( TVar PeerStore ) -- ^ Published by other nodes.
494 , sessionTokens :: !( TVar SessionTokens ) -- ^ Query session IDs.
495 , announceInfo :: !( TVar AnnounceSet ) -- ^ To publish by this node.
496 }
497
498newSwarmsDatabase :: IO SwarmsDatabase
499newSwarmsDatabase = do
500 toks <- nullSessionTokens
501 atomically
502 $ SwarmsDatabase <$> newTVar def
503 <*> newTVar toks
504 <*> newTVar def
505
506data Routing = Routing
507 { tentativeId :: NodeInfo
508 , committee4 :: TriadCommittee NodeId SockAddr
509 , committee6 :: TriadCommittee NodeId SockAddr
510 , refresher4 :: BucketRefresher NodeId NodeInfo
511 , refresher6 :: BucketRefresher NodeId NodeInfo
512 }
513
514sched4 :: Routing -> TVar (Int.PSQ POSIXTime)
515sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue
516
517sched6 :: Routing -> TVar (Int.PSQ POSIXTime)
518sched6 Routing { refresher6 = BucketRefresher { refreshQueue } } = refreshQueue
519
520routing4 :: Routing -> TVar (R.BucketList NodeInfo)
521routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBuckets
522
523routing6 :: Routing -> TVar (R.BucketList NodeInfo)
524routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets
525
526traced :: Show tid => TableMethods t tid -> TableMethods t tid
527traced (TableMethods ins del lkup)
528 = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t)
529 (\tid t -> trace ("del "++show tid) $ del tid t)
530 (\tid t -> trace ("lookup "++show tid) $ lkup tid t)
531
532
533type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue)
534
535-- | Like 'nodeInfo' but falls back to 'iNADDR_ANY' for nodeIP' and 'nodePort'.
536mkNodeInfo :: NodeId -> SockAddr -> NodeInfo
537mkNodeInfo nid addr = NodeInfo
538 { nodeId = nid
539 , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr
540 , nodePort = fromMaybe 0 $ sockAddrPort addr
541 }
542
543newClient :: SwarmsDatabase -> SockAddr
544 -> IO ( MainlineClient
545 , Routing
546 , [NodeInfo] -> [NodeInfo] -> IO ()
547 , [NodeInfo] -> [NodeInfo] -> IO ()
548 )
549newClient swarms addr = do
550 udp <- udpTransport addr
551 nid <- NodeId <$> getRandomBytes 20
552 let tentative_info = mkNodeInfo nid addr
553 tentative_info6 <-
554 maybe tentative_info
555 (\ip6 -> tentative_info { nodeId = fromMaybe (nodeId tentative_info)
556 $ bep42 (toSockAddr ip6) (nodeId tentative_info)
557 , nodeIP = IPv6 ip6
558 })
559 <$> global6
560 addr4 <- atomically $ newTChan
561 addr6 <- atomically $ newTChan
562 mkrouting <- atomically $ do
563 -- We defer initializing the refreshSearch and refreshPing until we
564 -- have a client to send queries with.
565 let nullPing = const $ return False
566 nullSearch = mainlineSearch $ Left $ \_ _ -> return Nothing
567 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount
568 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing
569 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount
570 refresher6 <- newBucketRefresher tbl6 nullSearch nullPing
571 let updateIPVote tblvar addrvar a = do
572 bkts <- readTVar tblvar
573 case bep42 a (nodeId $ R.thisNode bkts) of
574 Just nid -> do
575 let tbl = R.nullTable (comparing nodeId)
576 (\s -> hashWithSalt s . nodeId)
577 (mkNodeInfo nid a)
578 (R.defaultBucketCount)
579 writeTVar tblvar tbl
580 writeTChan addrvar (a,map fst $ concat $ R.toList bkts)
581 Nothing -> return ()
582 committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4
583 committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6
584 return $ \client ->
585 -- Now we have a client, so tell the BucketRefresher how to search and ping.
586 let updIO r = updateRefresherIO (nodeSearch client) (ping client) r
587 in Routing tentative_info committee4 committee6 (updIO refresher4) (updIO refresher6)
588 map_var <- atomically $ newTVar (0, mempty)
589
590 let routing = mkrouting outgoingClient
591
592 net = onInbound (updateRouting outgoingClient routing)
593 $ layerTransport parsePacket encodePacket
594 $ udp
595
596 -- Paranoid: It's safe to define /net/ and /client/ to be mutually
597 -- recursive since 'updateRouting' does not invoke 'awaitMessage' which
598 -- which was modified by 'onInbound'. However, I'm going to avoid the
599 -- mutual reference just to be safe.
600 outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } }
601
602 dispatch = DispatchMethods
603 { classifyInbound = classify -- :: x -> MessageClass err meth tid addr x
604 , lookupHandler = handlers -- :: meth -> Maybe (MethodHandler err tid addr x)
605 , tableMethods = mapT -- :: TransactionMethods tbl tid x
606 }
607
608 handlers :: Method -> Maybe Handler
609 handlers ( Method "ping" ) = handler pingH
610 handlers ( Method "find_node" ) = handler $ findNodeH routing
611 handlers ( Method "get_peers" ) = handler $ getPeersH routing swarms
612 handlers ( Method "announce_peer" ) = handlerE $ announceH swarms
613 handlers ( Method meth ) = Just $ defaultHandler meth
614
615 mapT = transactionMethods mapMethods gen
616
617 gen :: Word16 -> (TransactionId, Word16)
618 gen cnt = (TransactionId $ S.encode cnt, cnt+1)
619
620 ignoreParseError :: String -> IO ()
621 ignoreParseError _ = return ()
622
623 client = Client
624 { clientNet = addHandler ignoreParseError (handleMessage client) net
625 , clientDispatcher = dispatch
626 , clientErrorReporter = ignoreErrors -- printErrors stderr
627 , clientPending = map_var
628 , clientAddress = \maddr -> atomically $ do
629 let var = case flip prefer4or6 Nothing <$> maddr of
630 Just Want_IP6 -> routing6 routing
631 _ -> routing4 routing
632 R.thisNode <$> readTVar var
633 , clientResponseId = return
634 }
635
636 -- TODO: Provide some means of shutting down these five auxillary threads:
637
638 fork $ fix $ \again -> do
639 myThreadId >>= flip labelThread "addr4"
640 (addr, ns) <- atomically $ readTChan addr4
641 dput XBitTorrent $ "External IPv4: "++show (addr, length ns)
642 forM_ ns $ \n -> do
643 dput XBitTorrent $ "Change IP, ping: "++show n
644 ping outgoingClient n
645 -- TODO: trigger bootstrap ipv4
646 again
647 fork $ fix $ \again -> do
648 myThreadId >>= flip labelThread "addr6"
649 (addr,ns) <- atomically $ readTChan addr6
650 dput XBitTorrent $ "External IPv6: "++show (addr, length ns)
651 forM_ ns $ \n -> do
652 dput XBitTorrent $ "Change IP, ping: "++show n
653 ping outgoingClient n
654 -- TODO: trigger bootstrap ipv6
655 again
656
657
658 refresh_thread4 <- forkPollForRefresh $ refresher4 routing
659 refresh_thread6 <- forkPollForRefresh $ refresher6 routing
660
661 forkAnnouncedInfohashesGC (contactInfo swarms)
662
663 return (client, routing, bootstrap (refresher4 routing), bootstrap (refresher6 routing))
664
665-- Note that you should call .put() every hour for content that you want to
666-- keep alive, since nodes may discard data nodes older than 2 hours. (source:
667-- https://www.npmjs.com/package/bittorrent-dht)
668--
669-- This function will discard records between 3 and 6 hours old.
670forkAnnouncedInfohashesGC :: TVar PeerStore -> IO ThreadId
671forkAnnouncedInfohashesGC vpeers = fork $ do
672 myThreadId >>= flip labelThread "gc:bt-peers"
673 fix $ \loop -> do
674 cutoff <- getPOSIXTime
675 threadDelay 10800000000 -- 3 hours
676 atomically $ modifyTVar' vpeers $ deleteOlderThan cutoff
677 loop
678
679-- | Modifies a purely random 'NodeId' to one that is related to a given
680-- routable address in accordance with BEP 42.
681--
682-- Test vectors from the spec:
683--
684-- IP rand example node ID
685-- ============ ===== ==========================================
686-- 124.31.75.21 1 5fbfbf f10c5d6a4ec8a88e4c6ab4c28b95eee4 01
687-- 21.75.31.124 86 5a3ce9 c14e7a08645677bbd1cfe7d8f956d532 56
688-- 65.23.51.170 22 a5d432 20bc8f112a3d426c84764f8c2a1150e6 16
689-- 84.124.73.14 65 1b0321 dd1bb1fe518101ceef99462b947a01ff 41
690-- 43.213.53.83 90 e56f6c bf5b7c4be0237986d5243b87aa6d5130 5a
691bep42 :: SockAddr -> NodeId -> Maybe NodeId
692bep42 addr0 (NodeId r)
693 | let addr = either id id $ either4or6 addr0 -- unmap 4mapped SockAddrs
694 , Just ip <- fmap S.encode (fromSockAddr addr :: Maybe IPv4)
695 <|> fmap S.encode (fromSockAddr addr :: Maybe IPv6)
696 = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0)
697 | otherwise
698 = Nothing
699 where
700 ip4mask = "\x03\x0f\x3f\xff" :: ByteString
701 ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString
702 nbhood_select = B.last r .&. 7
703 retr n = pure $ B.drop (B.length r - n) r
704 crc = S.encode . crc32c . B.pack
705 applyMask ip = case B.zipWith (.&.) msk ip of
706 (b:bs) -> (b .|. shiftL nbhood_select 5) : bs
707 bs -> bs
708 where msk | B.length ip == 4 = ip4mask
709 | otherwise = ip6mask
710
711
712
713defaultHandler :: ByteString -> Handler
714defaultHandler meth = MethodHandler decodePayload errorPayload returnError
715 where
716 returnError :: NodeInfo -> BValue -> IO Error
717 returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth)
718
719mainlineKademlia :: MainlineClient
720 -> TriadCommittee NodeId SockAddr
721 -> BucketRefresher NodeId NodeInfo
722 -> Kademlia NodeId NodeInfo
723mainlineKademlia client committee refresher
724 = Kademlia quietInsertions
725 mainlineSpace
726 (vanillaIO (refreshBuckets refresher) $ ping client)
727 { tblTransition = \tr -> do
728 io1 <- transitionCommittee committee tr
729 io2 <- touchBucket refresher tr
730 return $ do
731 io1 >> io2
732 {- noisy (timestamp updates are currently reported as transitions to Accepted)
733 dput XBitTorrent $ unwords
734 [ show (transitionedTo tr)
735 , show (transitioningNode tr)
736 ] -}
737 }
738
739
740mainlineSpace :: R.KademliaSpace NodeId NodeInfo
741mainlineSpace = R.KademliaSpace
742 { R.kademliaLocation = nodeId
743 , R.kademliaTestBit = testIdBit
744 , R.kademliaXor = xor
745 , R.kademliaSample = genBucketSample'
746 }
747
748transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ())
749transitionCommittee committee (RoutingTransition ni Stranger) = do
750 delVote committee (nodeId ni)
751 return $ do
752 dput XBitTorrent $ "delVote "++show (nodeId ni)
753transitionCommittee committee _ = return $ return ()
754
755updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO ()
756updateRouting client routing naddr msg = do
757 case prefer4or6 naddr Nothing of
758 Want_IP4 -> go (committee4 routing) (refresher4 routing)
759 Want_IP6 -> go (committee6 routing) (refresher6 routing)
760 where
761 go committee refresher = do
762 self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher)
763 when (nodeIP self /= nodeIP naddr) $ do
764 case msg of
765 R { rspReflectedIP = Just sockaddr }
766 -> do
767 -- dput XBitTorrent $ "External: "++show (nodeId naddr,sockaddr)
768 atomically $ addVote committee (nodeId naddr) sockaddr
769 _ -> return ()
770 insertNode (mainlineKademlia client committee refresher) naddr
771
772data Ping = Ping deriving Show
773
774-- Pong is the same as Ping.
775type Pong = Ping
776pattern Pong = Ping
777
778instance BEncode Ping where
779 toBEncode Ping = toDict endDict
780 fromBEncode _ = pure Ping
781
782wantList :: WantIP -> [ByteString]
783wantList Want_IP4 = ["ip4"]
784wantList Want_IP6 = ["ip6"]
785wantList Want_Both = ["ip4","ip6"]
786
787instance BEncode WantIP where
788 toBEncode w = toBEncode $ wantList w
789 fromBEncode bval = do
790 wants <- fromBEncode bval
791 let _ = wants :: [ByteString]
792 case (elem "ip4" wants, elem "ip6" wants) of
793 (True,True) -> Right Want_Both
794 (True,False) -> Right Want_IP4
795 (False,True) -> Right Want_IP6
796 _ -> Left "Unrecognized IP type."
797
798data FindNode = FindNode NodeId (Maybe WantIP)
799
800instance BEncode FindNode where
801 toBEncode (FindNode nid iptyp) = toDict $ target_key .=! nid
802 .: want_key .=? iptyp
803 .: endDict
804 fromBEncode = fromDict $ FindNode <$>! target_key
805 <*>? want_key
806
807data NodeFound = NodeFound
808 { nodes4 :: [NodeInfo]
809 , nodes6 :: [NodeInfo]
810 }
811
812instance BEncode NodeFound where
813 toBEncode (NodeFound ns ns6) = toDict $
814 nodes_key .=?
815 (if Prelude.null ns then Nothing
816 else Just (S.runPut (mapM_ putNodeInfo4 ns)))
817 .: nodes6_key .=?
818 (if Prelude.null ns6 then Nothing
819 else Just (S.runPut (mapM_ putNodeInfo6 ns6)))
820 .: endDict
821
822 fromBEncode bval = NodeFound <$> ns4 <*> ns6
823 where
824 opt ns = fromMaybe [] <$> optional ns
825 ns4 = opt $ fromDict (binary getNodeInfo4 nodes_key) bval
826 ns6 = opt $ fromDict (binary getNodeInfo6 nodes6_key) bval
827
828binary :: S.Get a -> BKey -> BE.Get [a]
829binary get k = field (req k) >>= either (fail . format) return .
830 S.runGet (many get)
831 where
832 format str = "fail to deserialize " ++ show k ++ " field: " ++ str
833
834pingH :: NodeInfo -> Ping -> IO Pong
835pingH _ Ping = return Pong
836
837prefer4or6 :: NodeInfo -> Maybe WantIP -> WantIP
838prefer4or6 addr iptyp = fromMaybe (ipFamily $ nodeIP addr) iptyp
839
840findNodeH :: Routing -> NodeInfo -> FindNode -> IO NodeFound
841findNodeH routing addr (FindNode node iptyp) = do
842 let preferred = prefer4or6 addr iptyp
843
844 (append4,append6) <- atomically $ do
845 ni4 <- R.thisNode <$> readTVar (routing4 routing)
846 ni6 <- R.thisNode <$> readTVar (routing6 routing)
847 return $ case ipFamily (nodeIP addr) of
848 Want_IP4 -> (id, (++ [ni6]))
849 Want_IP6 -> ((++ [ni4]), id)
850 ks <- bool (return []) (go append4 $ routing4 routing) (preferred /= Want_IP6)
851 ks6 <- bool (return []) (go append6 $ routing6 routing) (preferred /= Want_IP4)
852 return $ NodeFound ks ks6
853 where
854 go f var = f . R.kclosest mainlineSpace k node <$> atomically (readTVar var)
855
856 k = R.defaultK
857
858
859data GetPeers = GetPeers InfoHash (Maybe WantIP)
860
861instance BEncode GetPeers where
862 toBEncode (GetPeers ih iptyp)
863 = toDict $ info_hash_key .=! ih
864 .: want_key .=? iptyp
865 .: endDict
866 fromBEncode = fromDict $ GetPeers <$>! info_hash_key <*>? want_key
867
868
869data GotPeers = GotPeers
870 { -- | If the queried node has no peers for the infohash, returned
871 -- the K nodes in the queried nodes routing table closest to the
872 -- infohash supplied in the query.
873 peers :: [PeerAddr]
874
875 , nodes :: NodeFound
876
877 -- | The token value is a required argument for a future
878 -- announce_peer query.
879 , grantedToken :: Token
880 } -- deriving (Show, Eq, Typeable)
881
882nodeIsIPv6 :: NodeInfo -> Bool
883nodeIsIPv6 (NodeInfo _ (IPv6 _) _) = True
884nodeIsIPv6 _ = False
885
886instance BEncode GotPeers where
887 toBEncode GotPeers { nodes = NodeFound ns4 ns6, ..} = toDict $
888 nodes_key .=? (if null ns4 then Nothing
889 else Just $ S.runPut (mapM_ putNodeInfo4 ns4))
890 .: nodes6_key .=? (if null ns6 then Nothing
891 else Just $ S.runPut (mapM_ putNodeInfo4 ns6))
892 .: token_key .=! grantedToken
893 .: peers_key .=! map S.encode peers
894 .: endDict
895
896 fromBEncode = fromDict $ do
897 ns4 <- fromMaybe [] <$> optional (binary getNodeInfo4 nodes_key) -- "nodes"
898 ns6 <- fromMaybe [] <$> optional (binary getNodeInfo6 nodes6_key) -- "nodes6"
899 -- TODO: BEP 42...
900 --
901 -- Once enforced, responses to get_peers requests whose node ID does not
902 -- match its external IP should be considered to not contain a token and
903 -- thus not be eligible as storage target. Implementations should take
904 -- care that they find the closest set of nodes which return a token and
905 -- whose IDs matches their IPs before sending a store request to those
906 -- nodes.
907 --
908 -- Sounds like something to take care of at peer-search time, so I'll
909 -- ignore it for now.
910 tok <- field (req token_key) -- "token"
911 ps <- fromMaybe [] <$> optional (field (req peers_key) >>= decodePeers) -- "values"
912 pure $ GotPeers ps (NodeFound ns4 ns6) tok
913 where
914 decodePeers = either fail pure . mapM S.decode
915
916getPeersH :: Routing -> SwarmsDatabase -> NodeInfo -> GetPeers -> IO GotPeers
917getPeersH routing (SwarmsDatabase peers toks _) naddr (GetPeers ih iptyp) = do
918 ps <- do
919 tm <- getTimestamp
920 atomically $ do
921 (ps,store') <- Peers.freshPeers ih tm <$> readTVar peers
922 writeTVar peers store'
923 return ps
924 -- Filter peer results to only a single address family, IPv4 or IPv6, as
925 -- per BEP 32.
926 let notboth = iptyp >>= \case Want_Both -> Nothing
927 specific -> Just specific
928 selected = prefer4or6 naddr notboth
929 ps' = filter ( (== selected) . ipFamily . peerHost ) ps
930 tok <- grantToken toks naddr
931 ns <- findNodeH routing naddr (FindNode (coerce ih) iptyp)
932 return $ GotPeers ps' ns tok
933
934-- | Announce that the peer, controlling the querying node, is
935-- downloading a torrent on a port.
936data Announce = Announce
937 { -- | If set, the 'port' field should be ignored and the source
938 -- port of the UDP packet should be used as the peer's port
939 -- instead. This is useful for peers behind a NAT that may not
940 -- know their external port, and supporting uTP, they accept
941 -- incoming connections on the same port as the DHT port.
942 impliedPort :: Bool
943
944 -- | infohash of the torrent;
945 , topic :: InfoHash
946
947 -- | some clients announce the friendly name of the torrent here.
948 , announcedName :: Maybe ByteString
949
950 -- | the port /this/ peer is listening;
951 , port :: PortNumber
952
953 -- TODO: optional boolean "seed" key
954
955 -- | received in response to a previous get_peers query.
956 , sessionToken :: Token
957
958 } deriving (Show, Eq, Typeable)
959
960mkAnnounce :: PortNumber -> InfoHash -> Token -> Announce
961mkAnnounce portnum info token = Announce
962 { topic = info
963 , port = portnum
964 , sessionToken = token
965 , announcedName = Nothing
966 , impliedPort = False
967 }
968
969
970instance BEncode Announce where
971 toBEncode Announce {..} = toDict $
972 implied_port_key .=? flagField impliedPort
973 .: info_hash_key .=! topic
974 .: name_key .=? announcedName
975 .: port_key .=! port
976 .: token_key .=! sessionToken
977 .: endDict
978 where
979 flagField flag = if flag then Just (1 :: Int) else Nothing
980
981 fromBEncode = fromDict $ do
982 Announce <$> (boolField <$> optional (field (req implied_port_key)))
983 <*>! info_hash_key
984 <*>? name_key
985 <*>! port_key
986 <*>! token_key
987 where
988 boolField = maybe False (/= (0 :: Int))
989
990
991
992-- | The queried node must verify that the token was previously sent
993-- to the same IP address as the querying node. Then the queried node
994-- should store the IP address of the querying node and the supplied
995-- port number under the infohash in its store of peer contact
996-- information.
997data Announced = Announced
998 deriving (Show, Eq, Typeable)
999
1000instance BEncode Announced where
1001 toBEncode _ = toBEncode Ping
1002 fromBEncode _ = pure Announced
1003
1004announceH :: SwarmsDatabase -> NodeInfo -> Announce -> IO (Either Error Announced)
1005announceH (SwarmsDatabase peers toks _) naddr announcement = do
1006 checkToken toks naddr (sessionToken announcement)
1007 >>= bool (Left <$> return (Error ProtocolError "invalid parameter: token"))
1008 (Right <$> go)
1009 where
1010 go = atomically $ do
1011 modifyTVar' peers
1012 $ insertPeer (topic announcement) (announcedName announcement)
1013 $ PeerAddr
1014 { peerId = Nothing
1015 -- Avoid storing IPv4-mapped addresses.
1016 , peerHost = case nodeIP naddr of
1017 IPv6 ip6 | Just ip4 <- un4map ip6 -> IPv4 ip4
1018 a -> a
1019 , peerPort = if impliedPort announcement
1020 then nodePort naddr
1021 else port announcement
1022 }
1023 return Announced
1024
1025isReadonlyClient :: MainlineClient -> Bool
1026isReadonlyClient client = False -- TODO
1027
1028mainlineSend :: ( BEncode a
1029 , BEncode a2
1030 ) => Method
1031 -> (a2 -> b)
1032 -> (t -> a)
1033 -> MainlineClient
1034 -> t
1035 -> NodeInfo
1036 -> IO (Maybe b)
1037mainlineSend meth unwrap msg client nid addr = do
1038 reply <- sendQuery client (mainlineSerializeer meth unwrap client) (msg nid) addr
1039 -- sendQuery will return (Just (Left _)) on a parse error. We're going to
1040 -- blow it away with the join-either sequence.
1041 -- TODO: Do something with parse errors.
1042 return $ join $ either (const Nothing) Just <$> reply
1043
1044mainlineAsync :: (BEncode a1, BEncode a2) =>
1045 Method
1046 -> (a2 -> a3)
1047 -> (t -> a1)
1048 -> Client String Method TransactionId NodeInfo (Message BValue)
1049 -> t
1050 -> NodeInfo
1051 -> (Maybe a3 -> IO ())
1052 -> IO ()
1053mainlineAsync meth unwrap msg client nid addr onresult = do
1054 asyncQuery client (mainlineSerializeer meth unwrap client) (msg nid) addr
1055 $ \reply ->
1056 -- sendQuery will return (Just (Left _)) on a parse error. We're going to
1057 -- blow it away with the join-either sequence.
1058 -- TODO: Do something with parse errors.
1059 onresult $ join $ either (const Nothing) Just <$> reply
1060
1061mainlineSerializeer :: (BEncode a2, BEncode a1) =>
1062 Method
1063 -> (a2 -> b)
1064 -> MainlineClient
1065 -> MethodSerializer
1066 TransactionId NodeInfo (Message BValue) Method a1 (Either Error b)
1067mainlineSerializeer meth unwrap client = MethodSerializer
1068 { methodTimeout = \_ ni -> return (ni, 5000000)
1069 , method = meth
1070 , wrapQuery = encodeQueryPayload meth (isReadonlyClient client)
1071 , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack)
1072 (Right . unwrap)
1073 . BE.fromBEncode)
1074 . rspPayload
1075 }
1076
1077ping :: MainlineClient -> NodeInfo -> IO Bool
1078ping client addr =
1079 fromMaybe False
1080 <$> mainlineSend (Method "ping") (\Pong -> True) (const Ping) client () addr
1081
1082-- searchQuery :: ni -> IO (Maybe [ni], [r], tok))
1083getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ()))
1084getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both)
1085
1086asyncGetNodes :: Client String Method TransactionId NodeInfo (Message BValue)
1087 -> NodeId
1088 -> NodeInfo
1089 -> (Maybe ([NodeInfo], [NodeInfo], Maybe ()) -> IO ())
1090 -> IO ()
1091asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both)
1092
1093unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ())
1094unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ())
1095
1096getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[PeerAddr],Maybe Token))
1097getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce
1098
1099asyncGetPeers :: Client String Method TransactionId NodeInfo (Message BValue)
1100 -> NodeId
1101 -> NodeInfo
1102 -> (Maybe ([NodeInfo], [PeerAddr], Maybe Token) -> IO ())
1103 -> IO ()
1104asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce
1105
1106unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token)
1107unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok)
1108
1109mainlineSearch :: Either (NodeId -> NodeInfo -> IO (Maybe ([NodeInfo], [r], Maybe tok)))
1110 (NodeId -> NodeInfo -> (Maybe ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO ())
1111 -> Search NodeId (IP, PortNumber) tok NodeInfo r
1112mainlineSearch qry = Search
1113 { searchSpace = mainlineSpace
1114 , searchNodeAddress = nodeIP &&& nodePort
1115 , searchQuery = qry
1116 , searchAlpha = 8
1117 , searchK = 16
1118 }
1119
1120nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo
1121nodeSearch client = mainlineSearch (Right $ asyncGetNodes client)
1122
1123peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr
1124peerSearch client = mainlineSearch (Right $ asyncGetPeers client)
1125
1126-- | List of bootstrap nodes maintained by different bittorrent
1127-- software authors.
1128bootstrapNodes :: WantIP -> IO [NodeInfo]
1129bootstrapNodes want = unsafeInterleaveIO $ do
1130 let wellknowns =
1131 [ "router.bittorrent.com:6881" -- by BitTorrent Inc.
1132
1133 -- doesn't work at the moment (use git blame) of commit
1134 , "dht.transmissionbt.com:6881" -- by Transmission project
1135
1136 , "router.utorrent.com:6881"
1137 ]
1138 nss <- forM wellknowns $ \hostAndPort -> do
1139 e <- resolve want hostAndPort
1140 case e of
1141 Left _ -> return []
1142 Right sockaddr -> either (const $ return [])
1143 (return . (: []))
1144 $ nodeInfo zeroID sockaddr
1145 return $ concat nss
1146
1147-- | Resolve either a numeric network address or a hostname to a
1148-- numeric IP address of the node.
1149resolve :: WantIP -> String -> IO (Either IOError SockAddr)
1150resolve want hostAndPort = do
1151 let hints = defaultHints { addrSocketType = Datagram
1152 , addrFamily = case want of
1153 Want_IP4 -> AF_INET
1154 _ -> AF_INET6
1155 }
1156 (rport,rhost) = span (/= ':') $ reverse hostAndPort
1157 (host,port) = case rhost of
1158 [] -> (hostAndPort, Nothing)
1159 (_:hs) -> (reverse hs, Just (reverse rport))
1160 tryIOError $ do
1161 -- getAddrInfo throws exception on empty list, so this
1162 -- pattern matching never fails.
1163 info : _ <- getAddrInfo (Just hints) (Just host) port
1164 return $ addrAddress info
1165
1166
1167announce :: MainlineClient -> Announce -> NodeInfo -> IO (Maybe Announced)
1168announce client msg addr = do
1169 mainlineSend (Method "announce_peer") id (\() -> msg) client () addr
diff --git a/dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs b/dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs
new file mode 100644
index 00000000..05a64014
--- /dev/null
+++ b/dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs
@@ -0,0 +1,24 @@
1{-# OPTIONS_GHC -fno-warn-missing-signatures #-}
2module Network.BitTorrent.MainlineDHT.Symbols where
3
4import Data.BEncode.BDict
5
6peer_ip_key = "ip" :: BKey
7peer_id_key = "peer id" :: BKey
8peer_port_key = "port" :: BKey
9msg_type_key = "msg_type" :: BKey
10piece_key = "piece" :: BKey
11total_size_key = "total_size" :: BKey
12node_id_key = "id" :: BKey
13read_only_key = "ro" :: BKey
14want_key = "want" :: BKey
15target_key = "target" :: BKey
16nodes_key = "nodes" :: BKey
17nodes6_key = "nodes6" :: BKey
18info_hash_key = "info_hash" :: BKey
19peers_key = "values" :: BKey
20token_key = "token" :: BKey
21name_key = "name" :: BKey
22port_key = "port" :: BKey
23implied_port_key = "implied_port" :: BKey
24