summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Network/BitTorrent/MainlineDHT.hs1081
1 files changed, 1081 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs
new file mode 100644
index 00000000..9d48c67b
--- /dev/null
+++ b/src/Network/BitTorrent/MainlineDHT.hs
@@ -0,0 +1,1081 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE DeriveDataTypeable #-}
3{-# LANGUAGE DeriveFoldable #-}
4{-# LANGUAGE DeriveFunctor #-}
5{-# LANGUAGE DeriveTraversable #-}
6{-# LANGUAGE FlexibleInstances #-}
7{-# LANGUAGE GeneralizedNewtypeDeriving #-}
8{-# LANGUAGE LambdaCase #-}
9{-# LANGUAGE PatternSynonyms #-}
10{-# LANGUAGE StandaloneDeriving #-}
11{-# LANGUAGE TupleSections #-}
12module Network.BitTorrent.MainlineDHT where
13
14import Control.Applicative
15import Control.Arrow
16import Control.Concurrent.STM
17import Control.Monad
18import Crypto.Random
19import Data.BEncode as BE
20import qualified Data.BEncode.BDict as BE
21 ;import Data.BEncode.BDict (BKey)
22import Data.BEncode.Pretty
23import Data.BEncode.Types (BDict)
24import Data.Bits
25import Data.Bits.ByteString
26import Data.Bool
27import qualified Data.ByteArray as BA
28 ;import Data.ByteArray (ByteArrayAccess)
29import qualified Data.ByteString as B
30 ;import Data.ByteString (ByteString)
31import qualified Data.ByteString.Base16 as Base16
32import qualified Data.ByteString.Char8 as C8
33import Data.ByteString.Lazy (toStrict)
34import qualified Data.ByteString.Lazy.Char8 as L8
35import Data.Char
36import Data.Coerce
37import Data.Data
38import Data.Default
39import Data.Digest.CRC32C
40import Data.Function (fix)
41import Data.Hashable
42import Data.IP
43import Data.List
44import Data.Maybe
45import Data.Monoid
46import Data.Ord
47import qualified Data.Serialize as S
48import Data.Set (Set)
49import Data.Time.Clock.POSIX (POSIXTime)
50import Data.Torrent
51import Data.Typeable
52import Data.Word
53import qualified Data.Wrapper.PSQInt as Int
54import Debug.Trace
55import Network.Kademlia
56import Network.Address (Address, fromAddr, fromSockAddr,
57 setPort, sockAddrPort, testIdBit,
58 toSockAddr, genBucketSample', WantIP(..),
59 un4map,either4or6,ipFamily)
60import Network.BitTorrent.DHT.ContactInfo as Peers
61import Network.BitTorrent.DHT.Search (Search (..))
62import Network.BitTorrent.DHT.Token as Token
63import qualified Network.DHT.Routing as R
64 ;import Network.DHT.Routing (Timestamp, getTimestamp)
65import Network.QueryResponse
66import Network.Socket
67import System.IO
68import System.IO.Error
69import System.IO.Unsafe (unsafeInterleaveIO)
70import qualified Text.ParserCombinators.ReadP as RP
71#ifdef THREAD_DEBUG
72import Control.Concurrent.Lifted.Instrument
73#else
74import Control.Concurrent.Lifted
75import GHC.Conc (labelThread)
76#endif
77import Control.Exception (SomeException (..), handle)
78import qualified Data.Aeson as JSON
79 ;import Data.Aeson (FromJSON, ToJSON, (.=))
80import Text.Read
81import System.Global6
82import Control.TriadCommittee
83
84newtype NodeId = NodeId ByteString
85 deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable)
86
87instance BEncode NodeId where
88 fromBEncode bval = do
89 bs <- fromBEncode bval
90 if B.length bs /= 20
91 then Left "Invalid length node id."
92 else Right $ NodeId bs
93
94 toBEncode (NodeId bs) = toBEncode bs
95
96instance Show NodeId where
97 show (NodeId bs) = C8.unpack $ Base16.encode bs
98
99instance S.Serialize NodeId where
100 get = NodeId <$> S.getBytes 20
101 put (NodeId bs) = S.putByteString bs
102
103instance FiniteBits NodeId where
104 finiteBitSize _ = 160
105
106instance Read NodeId where
107 readsPrec _ str
108 | (bs, xs) <- Base16.decode $ C8.pack str
109 , B.length bs == 20
110 = [ (NodeId bs, drop 40 str) ]
111 | otherwise = []
112
113zeroID :: NodeId
114zeroID = NodeId $ B.replicate 20 0
115
116data NodeInfo = NodeInfo
117 { nodeId :: NodeId
118 , nodeIP :: IP
119 , nodePort :: PortNumber
120 }
121 deriving (Eq,Ord)
122
123instance ToJSON NodeInfo where
124 toJSON (NodeInfo nid (IPv4 ip) port)
125 = JSON.object [ "node-id" .= show nid
126 , "ipv4" .= show ip
127 , "port" .= (fromIntegral port :: Int)
128 ]
129 toJSON (NodeInfo nid (IPv6 ip6) port)
130 | Just ip <- un4map ip6
131 = JSON.object [ "node-id" .= show nid
132 , "ipv4" .= show ip
133 , "port" .= (fromIntegral port :: Int)
134 ]
135 | otherwise
136 = JSON.object [ "node-id" .= show nid
137 , "ipv6" .= show ip6
138 , "port" .= (fromIntegral port :: Int)
139 ]
140instance FromJSON NodeInfo where
141 parseJSON (JSON.Object v) = do
142 nidstr <- v JSON..: "node-id"
143 ip6str <- v JSON..:? "ipv6"
144 ip4str <- v JSON..:? "ipv4"
145 portnum <- v JSON..: "port"
146 ip <- maybe empty (return . IPv6) (ip6str >>= readMaybe)
147 <|> maybe empty (return . IPv4) (ip4str >>= readMaybe)
148 let (bs,_) = Base16.decode (C8.pack nidstr)
149 guard (B.length bs == 20)
150 return $ NodeInfo (NodeId bs) ip (fromIntegral (portnum :: Word16))
151
152hexdigit :: Char -> Bool
153hexdigit c = ('0' <= c && c <= '9') || ( 'a' <= c && c <= 'f') || ( 'A' <= c && c <= 'F')
154
155instance Read NodeInfo where
156 readsPrec i = RP.readP_to_S $ do
157 RP.skipSpaces
158 let n = 40 -- characters in node id.
159 parseAddr = RP.between (RP.char '(') (RP.char ')') (RP.munch (/=')'))
160 RP.+++ RP.munch (not . isSpace)
161 nodeidAt = do hexhash <- sequence $ replicate n (RP.satisfy hexdigit)
162 RP.char '@' RP.+++ RP.satisfy isSpace
163 addrstr <- parseAddr
164 nid <- case Base16.decode $ C8.pack hexhash of
165 (bs,_) | B.length bs==20 -> return (NodeId bs)
166 _ -> fail "Bad node id."
167 return (nid,addrstr)
168 (nid,addrstr) <- ( nodeidAt RP.+++ ( (zeroID,) <$> parseAddr) )
169 let raddr = do
170 ip <- RP.between (RP.char '[') (RP.char ']')
171 (IPv6 <$> RP.readS_to_P (readsPrec i))
172 RP.+++ (IPv4 <$> RP.readS_to_P (readsPrec i))
173 _ <- RP.char ':'
174 port <- toEnum <$> RP.readS_to_P (readsPrec i)
175 return (ip, port)
176
177 (ip,port) <- case RP.readP_to_S raddr addrstr of
178 [] -> fail "Bad address."
179 ((ip,port),_):_ -> return (ip,port)
180 return $ NodeInfo nid ip port
181
182
183
184-- The Hashable instance depends only on the IP address and port number. It is
185-- used to compute the announce token.
186instance Hashable NodeInfo where
187 hashWithSalt s ni = hashWithSalt s (nodeIP ni , nodePort ni)
188 {-# INLINE hashWithSalt #-}
189
190
191instance Show NodeInfo where
192 showsPrec _ (NodeInfo nid ip port) =
193 shows nid . ('@' :) . showsip . (':' :) . shows port
194 where
195 showsip
196 | IPv4 ip4 <- ip = shows ip4
197 | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = shows ip4
198 | otherwise = ('[' :) . shows ip . (']' :)
199
200{-
201
202-- | KRPC 'compact list' compatible encoding: contact information for
203-- nodes is encoded as a 26-byte string. Also known as "Compact node
204-- info" the 20-byte Node ID in network byte order has the compact
205-- IP-address/port info concatenated to the end.
206 get = NodeInfo <$> (NodeId <$> S.getBytes 20 ) <*> S.get <*> S.get
207-}
208
209getNodeInfo4 :: S.Get NodeInfo
210getNodeInfo4 = NodeInfo <$> (NodeId <$> S.getBytes 20)
211 <*> (IPv4 <$> S.get)
212 <*> S.get
213
214putNodeInfo4 :: NodeInfo -> S.Put
215putNodeInfo4 (NodeInfo (NodeId nid) ip port)
216 | IPv4 ip4 <- ip = put4 ip4
217 | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = put4 ip4
218 | otherwise = return ()
219 where
220 put4 ip4 = S.putByteString nid >> S.put ip4 >> S.put port
221
222getNodeInfo6 :: S.Get NodeInfo
223getNodeInfo6 = NodeInfo <$> (NodeId <$> S.getBytes 20)
224 <*> (IPv6 <$> S.get)
225 <*> S.get
226
227putNodeInfo6 :: NodeInfo -> S.Put
228putNodeInfo6 (NodeInfo (NodeId nid) (IPv6 ip) port)
229 = S.putByteString nid >> S.put ip >> S.put port
230putNodeInfo6 _ = return ()
231
232
233-- | TODO: This should depend on the bind address to support IPv4-only. For
234-- now, in order to support dual-stack listen, we're going to assume IPv6 is
235-- wanted and map IPv4 addresses accordingly.
236nodeAddr :: NodeInfo -> SockAddr
237nodeAddr (NodeInfo _ ip port) =
238 case ip of
239 IPv4 ip4 -> setPort port $ toSockAddr (ipv4ToIPv6 ip4)
240 IPv6 ip6 -> setPort port $ toSockAddr ip6
241
242nodeInfo :: NodeId -> SockAddr -> Either String NodeInfo
243nodeInfo nid saddr
244 | Just ip <- fromSockAddr saddr
245 , Just port <- sockAddrPort saddr = Right $ NodeInfo nid ip port
246 | otherwise = Left "Address family not supported."
247
248-- | Types of RPC errors.
249data ErrorCode
250 -- | Some error doesn't fit in any other category.
251 = GenericError
252
253 -- | Occurs when server fail to process procedure call.
254 | ServerError
255
256 -- | Malformed packet, invalid arguments or bad token.
257 | ProtocolError
258
259 -- | Occurs when client trying to call method server don't know.
260 | MethodUnknown
261 deriving (Show, Read, Eq, Ord, Bounded, Typeable, Data)
262
263-- | According to the table:
264-- <http://bittorrent.org/beps/bep_0005.html#errors>
265instance Enum ErrorCode where
266 fromEnum GenericError = 201
267 fromEnum ServerError = 202
268 fromEnum ProtocolError = 203
269 fromEnum MethodUnknown = 204
270 {-# INLINE fromEnum #-}
271 toEnum 201 = GenericError
272 toEnum 202 = ServerError
273 toEnum 203 = ProtocolError
274 toEnum 204 = MethodUnknown
275 toEnum _ = GenericError
276 {-# INLINE toEnum #-}
277
278instance BEncode ErrorCode where
279 toBEncode = toBEncode . fromEnum
280 {-# INLINE toBEncode #-}
281 fromBEncode b = toEnum <$> fromBEncode b
282 {-# INLINE fromBEncode #-}
283
284data Error = Error
285 { errorCode :: !ErrorCode -- ^ The type of error.
286 , errorMessage :: !ByteString -- ^ Human-readable text message.
287 } deriving ( Show, Eq, Ord, Typeable, Data, Read )
288
289newtype TransactionId = TransactionId ByteString
290 deriving (Eq, Ord, Show, BEncode)
291
292newtype Method = Method ByteString
293 deriving (Eq, Ord, Show, BEncode)
294
295data Message a = Q { msgOrigin :: NodeId
296 , msgID :: TransactionId
297 , qryPayload :: a
298 , qryMethod :: Method
299 , qryReadOnly :: Bool }
300
301 | R { msgOrigin :: NodeId
302 , msgID :: TransactionId
303 , rspPayload :: Either Error a
304 , rspReflectedIP :: Maybe SockAddr }
305
306showBE bval = L8.unpack (showBEncode bval)
307
308instance BE.BEncode (Message BValue) where
309 toBEncode m = encodeMessage m
310 {-
311 in case m of
312 Q {} -> trace ("encoded(query): "++showBE r) r
313 R {} -> trace ("encoded(response): "++showBE r) r -}
314 fromBEncode bval = decodeMessage bval
315 {-
316 in case r of
317 Left e -> trace (show e) r
318 Right (Q {}) -> trace ("decoded(query): "++showBE bval) r
319 Right (R {}) -> trace ("decoded(response): "++showBE bval) r -}
320
321decodeMessage :: BValue -> Either String (Message BValue)
322decodeMessage = fromDict $ do
323 key <- lookAhead (field (req "y"))
324 let _ = key :: BKey
325 f <- case key of
326 "q" -> do a <- field (req "a")
327 g <- either fail return $ flip fromDict a $ do
328 who <- field (req "id")
329 ro <- fromMaybe False <$> optional (field (req "ro"))
330 return $ \meth tid -> Q who tid a meth ro
331 meth <- field (req "q")
332 return $ g meth
333 "r" -> do ip <- do
334 ipstr <- optional (field (req "ip"))
335 mapM (either fail return . decodeAddr) ipstr
336 vals <- field (req "r")
337 either fail return $ flip fromDict vals $ do
338 who <- field (req "id")
339 return $ \tid -> R who tid (Right vals) ip
340 "e" -> do (ecode,emsg) <- field (req "e")
341 ip <- do
342 ipstr <- optional (field (req "ip"))
343 mapM (either fail return . decodeAddr) ipstr
344 -- FIXME:Spec does not give us the NodeId of the sender.
345 -- Using 'zeroID' as place holder.
346 -- We should ignore the msgOrigin for errors in 'updateRouting'.
347 -- We should consider making msgOrigin a Maybe value.
348 return $ \tid -> R zeroID tid (Left (Error ecode emsg)) ip
349 _ -> fail $ "Mainline message is not a query, response, or an error: "
350 ++ show key
351 tid <- field (req "t")
352 return $ f (tid :: TransactionId)
353
354
355encodeMessage :: Message BValue -> BValue
356encodeMessage (Q origin tid a meth ro)
357 = case a of
358 BDict args -> encodeQuery tid meth (BDict $ genericArgs origin ro `BE.union` args)
359 _ -> encodeQuery tid meth a -- XXX: Not really a valid query.
360encodeMessage (R origin tid v ip)
361 = case v of
362 Right (BDict vals) -> encodeResponse tid (BDict $ genericArgs origin False `BE.union` vals) ip
363 Left err -> encodeError tid err
364
365
366encodeAddr :: SockAddr -> ByteString
367encodeAddr = either encode4 encode6 . either4or6
368 where
369 encode4 (SockAddrInet port addr)
370 = S.runPut (S.putWord32host addr >> S.putWord16be (fromIntegral port))
371
372 encode6 (SockAddrInet6 port _ addr _)
373 = S.runPut (S.put addr >> S.putWord16be (fromIntegral port))
374 encode6 _ = B.empty
375
376decodeAddr :: ByteString -> Either String SockAddr
377decodeAddr bs = S.runGet g bs
378 where
379 g | (B.length bs == 6) = flip SockAddrInet <$> S.getWord32host <*> (fromIntegral <$> S.getWord16be)
380 | otherwise = do host <- S.get -- TODO: Is this right?
381 port <- fromIntegral <$> S.getWord16be
382 return $ SockAddrInet6 port 0 host 0
383
384genericArgs :: BEncode a => a -> Bool -> BDict
385genericArgs nodeid ro =
386 "id" .=! nodeid
387 .: "ro" .=? bool Nothing (Just (1 :: Int)) ro
388 .: endDict
389
390encodeError :: BEncode a => a -> Error -> BValue
391encodeError tid (Error ecode emsg) = encodeAny tid "e" (ecode,emsg) id
392
393encodeResponse :: (BEncode tid, BEncode vals) =>
394 tid -> vals -> Maybe SockAddr -> BValue
395encodeResponse tid rvals rip =
396 encodeAny tid "r" rvals ("ip" .=? (BString . encodeAddr <$> rip) .:)
397
398encodeQuery :: (BEncode args, BEncode tid, BEncode method) =>
399 tid -> method -> args -> BValue
400encodeQuery tid qmeth qargs = encodeAny tid "q" qmeth ("a" .=! qargs .:)
401
402encodeAny ::
403 (BEncode tid, BEncode a) =>
404 tid -> BKey -> a -> (BDict -> BDict) -> BValue
405encodeAny tid key val aux = toDict $
406 aux $ key .=! val
407 .: "t" .=! tid
408 .: "y" .=! key
409 .: endDict
410
411
412showPacket f addr flow bs = L8.unpack $ L8.unlines es
413 where
414 es = map (L8.append prefix) (f $ L8.lines pp)
415
416 prefix = L8.pack (either show show $ either4or6 addr) <> flow
417
418 pp = either L8.pack showBEncode $ BE.decode bs
419
420-- Add detailed printouts for every packet.
421addVerbosity tr =
422 tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do
423 forM_ m $ mapM_ $ \(msg,addr) -> do
424 hPutStrLn stderr (showPacket id addr " --> " msg)
425 kont m
426 , sendMessage = \addr msg -> do
427 hPutStrLn stderr (showPacket id addr " <-- " msg)
428 sendMessage tr addr msg
429 }
430
431
432
433showParseError bs addr err = showPacket (L8.pack err :) addr " --> " bs
434
435parsePacket :: ByteString -> SockAddr -> Either String (Message BValue, NodeInfo)
436parsePacket bs addr = left (showParseError bs addr) $ do
437 pkt <- BE.decode bs
438 -- TODO: Error packets do not include a valid msgOrigin.
439 -- The BE.decode method is using 'zeroID' as a placeholder.
440 ni <- nodeInfo (msgOrigin pkt) addr
441 return (pkt, ni)
442
443encodePacket :: Message BValue -> NodeInfo -> (ByteString, SockAddr)
444encodePacket msg ni = ( toStrict $ BE.encode msg
445 , nodeAddr ni )
446
447classify :: Message BValue -> MessageClass String Method TransactionId
448classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid
449classify (R { msgID = tid }) = IsResponse tid
450
451encodeResponsePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue
452encodeResponsePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest)
453
454encodeQueryPayload :: BEncode a =>
455 Method -> Bool -> TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue
456encodeQueryPayload meth isReadonly tid self dest b = Q (nodeId self) tid (BE.toBEncode b) meth isReadonly
457
458errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a
459errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest)
460
461decodePayload :: BEncode a => Message BValue -> Either String a
462decodePayload msg = BE.fromBEncode $ qryPayload msg
463
464type Handler = MethodHandler String TransactionId NodeInfo (Message BValue)
465
466handler :: ( BEncode a
467 , BEncode b
468 ) =>
469 (NodeInfo -> a -> IO b) -> Maybe Handler
470handler f = Just $ MethodHandler decodePayload encodeResponsePayload f
471
472
473handlerE :: ( BEncode a
474 , BEncode b
475 ) =>
476 (NodeInfo -> a -> IO (Either Error b)) -> Maybe Handler
477handlerE f = Just $ MethodHandler decodePayload enc f
478 where
479 enc tid self dest (Left e) = errorPayload tid self dest e
480 enc tid self dest (Right b) = encodeResponsePayload tid self dest b
481
482type AnnounceSet = Set (InfoHash, PortNumber)
483
484data SwarmsDatabase = SwarmsDatabase
485 { contactInfo :: !( TVar PeerStore ) -- ^ Published by other nodes.
486 , sessionTokens :: !( TVar SessionTokens ) -- ^ Query session IDs.
487 , announceInfo :: !( TVar AnnounceSet ) -- ^ To publish by this node.
488 }
489
490newSwarmsDatabase :: IO SwarmsDatabase
491newSwarmsDatabase = do
492 toks <- nullSessionTokens
493 atomically
494 $ SwarmsDatabase <$> newTVar def
495 <*> newTVar toks
496 <*> newTVar def
497
498data Routing = Routing
499 { tentativeId :: NodeInfo
500 , sched4 :: !( TVar (Int.PSQ POSIXTime) )
501 , routing4 :: !( TVar (R.BucketList NodeInfo) )
502 , committee4 :: TriadCommittee NodeId SockAddr
503 , sched6 :: !( TVar (Int.PSQ POSIXTime) )
504 , routing6 :: !( TVar (R.BucketList NodeInfo) )
505 , committee6 :: TriadCommittee NodeId SockAddr
506 }
507
508traced :: Show tid => TableMethods t tid -> TableMethods t tid
509traced (TableMethods ins del lkup)
510 = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t)
511 (\tid t -> trace ("del "++show tid) $ del tid t)
512 (\tid t -> trace ("lookup "++show tid) $ lkup tid t)
513
514
515type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue)
516
517newClient :: SwarmsDatabase -> SockAddr -> IO (MainlineClient, Routing)
518newClient swarms addr = do
519 udp <- udpTransport addr
520 nid <- NodeId <$> getRandomBytes 20
521 let tentative_info = NodeInfo
522 { nodeId = nid
523 , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr
524 , nodePort = fromMaybe 0 $ sockAddrPort addr
525 }
526 tentative_info6 <-
527 maybe tentative_info
528 (\ip6 -> tentative_info { nodeId = fromMaybe (nodeId tentative_info)
529 $ bep42 (toSockAddr ip6) (nodeId tentative_info)
530 , nodeIP = IPv6 ip6
531 })
532 <$> global6
533 addr4 <- atomically $ newTChan
534 addr6 <- atomically $ newTChan
535 routing <- atomically $ do
536 let nobkts = R.defaultBucketCount :: Int
537 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts
538 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts
539 let updateIPVote tblvar addrvar a = do
540 bkts <- readTVar tblvar
541 case bep42 a (nodeId $ R.thisNode bkts) of
542 Just nid -> do
543 let tbl = R.nullTable (comparing nodeId)
544 (\s -> hashWithSalt s . nodeId)
545 (NodeInfo nid
546 (fromMaybe (toEnum 0) $ fromSockAddr a)
547 (fromMaybe 0 $ sockAddrPort a))
548 nobkts
549 writeTVar tblvar tbl
550 writeTChan addrvar (a,map fst $ concat $ R.toList bkts)
551 Nothing -> return ()
552 committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4
553 committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6
554 sched4 <- newTVar Int.empty
555 sched6 <- newTVar Int.empty
556 return $ Routing tentative_info sched4 tbl4 committee4 sched6 tbl6 committee6
557 map_var <- atomically $ newTVar (0, mempty)
558 let net = onInbound (updateRouting outgoingClient routing)
559 $ layerTransport parsePacket encodePacket
560 -- $ addVerbosity
561 $ udp
562
563 -- Paranoid: It's safe to define /net/ and /client/ to be mutually
564 -- recursive since 'updateRouting' does not invoke 'awaitMessage' which
565 -- which was modified by 'onInbound'. However, I'm going to avoid the
566 -- mutual reference just to be safe.
567 outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } }
568
569 dispatch = DispatchMethods
570 { classifyInbound = classify -- :: x -> MessageClass err meth tid
571 , lookupHandler = handlers -- :: meth -> Maybe (MethodHandler err tid addr x)
572 , tableMethods = mapT -- :: TransactionMethods tbl tid x
573 }
574
575 handlers :: Method -> Maybe Handler
576 handlers ( Method "ping" ) = handler pingH
577 handlers ( Method "find_node" ) = handler $ findNodeH routing
578 handlers ( Method "get_peers" ) = handler $ getPeersH routing swarms
579 handlers ( Method "announce_peer" ) = handlerE $ announceH swarms
580 handlers ( Method meth ) = Just $ defaultHandler meth
581
582 mapT = transactionMethods mapMethods gen
583
584 gen :: Word16 -> (TransactionId, Word16)
585 gen cnt = (TransactionId $ S.encode cnt, cnt+1)
586
587 client = Client
588 { clientNet = addHandler (handleMessage client) net
589 , clientDispatcher = dispatch
590 , clientErrorReporter = ignoreErrors -- printErrors stderr
591 , clientPending = map_var
592 , clientAddress = \maddr -> atomically $ do
593 let var = case flip prefer4or6 Nothing <$> maddr of
594 Just Want_IP6 -> routing6 routing
595 _ -> routing4 routing
596 R.thisNode <$> readTVar var
597 , clientResponseId = return
598 }
599
600 -- TODO: Provide some means of shutting down these four auxillary threads:
601
602 fork $ fix $ \again -> do
603 myThreadId >>= flip labelThread "addr4"
604 (addr, ns) <- atomically $ readTChan addr4
605 hPutStrLn stderr $ "External IPv4: "++show (addr, length ns)
606 forM_ ns $ \n -> do
607 hPutStrLn stderr $ "Change IP, ping: "++show n
608 ping outgoingClient n
609 -- TODO: trigger bootstrap ipv4
610 again
611 fork $ fix $ \again -> do
612 myThreadId >>= flip labelThread "addr6"
613 (addr,ns) <- atomically $ readTChan addr6
614 hPutStrLn stderr $ "External IPv6: "++show (addr, length ns)
615 forM_ ns $ \n -> do
616 hPutStrLn stderr $ "Change IP, ping: "++show n
617 ping outgoingClient n
618 -- TODO: trigger bootstrap ipv6
619 again
620
621 refresh_thread4 <- forkPollForRefresh
622 (15*60)
623 (sched4 routing)
624 (refreshBucket (nodeSearch client) (routing4 routing))
625 refresh_thread6 <- forkPollForRefresh
626 (15*60)
627 (sched6 routing)
628 (refreshBucket (nodeSearch client) (routing6 routing))
629
630 return (client, routing)
631
632-- | Modifies a purely random 'NodeId' to one that is related to a given
633-- routable address in accordance with BEP 42.
634--
635-- Test vectors from the spec:
636--
637-- IP rand example node ID
638-- ============ ===== ==========================================
639-- 124.31.75.21 1 5fbfbf f10c5d6a4ec8a88e4c6ab4c28b95eee4 01
640-- 21.75.31.124 86 5a3ce9 c14e7a08645677bbd1cfe7d8f956d532 56
641-- 65.23.51.170 22 a5d432 20bc8f112a3d426c84764f8c2a1150e6 16
642-- 84.124.73.14 65 1b0321 dd1bb1fe518101ceef99462b947a01ff 41
643-- 43.213.53.83 90 e56f6c bf5b7c4be0237986d5243b87aa6d5130 5a
644bep42 :: SockAddr -> NodeId -> Maybe NodeId
645bep42 addr0 (NodeId r)
646 | let addr = either id id $ either4or6 addr0 -- unmap 4mapped SockAddrs
647 , Just ip <- fmap S.encode (fromSockAddr addr :: Maybe IPv4)
648 <|> fmap S.encode (fromSockAddr addr :: Maybe IPv6)
649 = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0)
650 | otherwise
651 = Nothing
652 where
653 ip4mask = "\x03\x0f\x3f\xff" :: ByteString
654 ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString
655 nbhood_select = B.last r .&. 7
656 retr n = pure $ B.drop (B.length r - n) r
657 crc = S.encode . crc32c . B.pack
658 applyMask ip = case B.zipWith (.&.) msk ip of
659 (b:bs) -> (b .|. shiftL nbhood_select 5) : bs
660 bs -> bs
661 where msk | B.length ip == 4 = ip4mask
662 | otherwise = ip6mask
663
664
665
666defaultHandler :: ByteString -> Handler
667defaultHandler meth = MethodHandler decodePayload errorPayload returnError
668 where
669 returnError :: NodeInfo -> BValue -> IO Error
670 returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth)
671
672mainlineKademlia :: MainlineClient -> TriadCommittee NodeId SockAddr -> TVar (R.BucketList NodeInfo) -> TVar (Int.PSQ POSIXTime) -> Kademlia NodeId NodeInfo
673mainlineKademlia client committee var sched
674 = Kademlia quietInsertions
675 mainlineSpace
676 (vanillaIO var $ ping client)
677 { tblTransition = \tr -> do
678 io1 <- transitionCommittee committee tr
679 io2 <- touchBucket mainlineSpace (15*60) var sched tr
680 return $ do
681 io1 >> io2
682 {- noisy (timestamp updates are currently reported as transitions to Accepted)
683 hPutStrLn stderr $ unwords
684 [ show (transitionedTo tr)
685 , show (transitioningNode tr)
686 ] -}
687 }
688
689
690mainlineSpace :: R.KademliaSpace NodeId NodeInfo
691mainlineSpace = R.KademliaSpace
692 { R.kademliaLocation = nodeId
693 , R.kademliaTestBit = testIdBit
694 , R.kademliaXor = xor
695 , R.kademliaSample = genBucketSample'
696 }
697
698transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ())
699transitionCommittee committee (RoutingTransition ni Stranger) = do
700 delVote committee (nodeId ni)
701 return $ do
702 hPutStrLn stderr $ "delVote "++show (nodeId ni)
703transitionCommittee committee _ = return $ return ()
704
705updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO ()
706updateRouting client routing naddr msg = do
707 case prefer4or6 naddr Nothing of
708 Want_IP4 -> go (routing4 routing) (committee4 routing) (sched4 routing)
709 Want_IP6 -> go (routing6 routing) (committee6 routing) (sched6 routing)
710 where
711 go tbl committee sched = do
712 self <- atomically $ R.thisNode <$> readTVar tbl
713 when (nodeIP self /= nodeIP naddr) $ do
714 case msg of
715 R { rspReflectedIP = Just sockaddr }
716 -> do
717 -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr)
718 atomically $ addVote committee (nodeId naddr) sockaddr
719 _ -> return ()
720 insertNode (mainlineKademlia client committee tbl sched) naddr
721
722data Ping = Ping deriving Show
723
724-- Pong is the same as Ping.
725type Pong = Ping
726pattern Pong = Ping
727
728instance BEncode Ping where
729 toBEncode Ping = toDict endDict
730 fromBEncode _ = pure Ping
731
732wantList :: WantIP -> [ByteString]
733wantList Want_IP4 = ["ip4"]
734wantList Want_IP6 = ["ip6"]
735wantList Want_Both = ["ip4","ip6"]
736
737instance BEncode WantIP where
738 toBEncode w = toBEncode $ wantList w
739 fromBEncode bval = do
740 wants <- fromBEncode bval
741 let _ = wants :: [ByteString]
742 case (elem "ip4" wants, elem "ip6" wants) of
743 (True,True) -> Right Want_Both
744 (True,False) -> Right Want_IP4
745 (False,True) -> Right Want_IP6
746 _ -> Left "Unrecognized IP type."
747
748data FindNode = FindNode NodeId (Maybe WantIP)
749
750instance BEncode FindNode where
751 toBEncode (FindNode nid iptyp) = toDict $ target_key .=! nid
752 .: want_key .=? iptyp
753 .: endDict
754 fromBEncode = fromDict $ FindNode <$>! target_key
755 <*>? want_key
756
757data NodeFound = NodeFound
758 { nodes4 :: [NodeInfo]
759 , nodes6 :: [NodeInfo]
760 }
761
762instance BEncode NodeFound where
763 toBEncode (NodeFound ns ns6) = toDict $
764 nodes_key .=?
765 (if Prelude.null ns then Nothing
766 else Just (S.runPut (mapM_ putNodeInfo4 ns)))
767 .: nodes6_key .=?
768 (if Prelude.null ns6 then Nothing
769 else Just (S.runPut (mapM_ putNodeInfo6 ns6)))
770 .: endDict
771
772 fromBEncode bval = NodeFound <$> ns4 <*> ns6
773 where
774 opt ns = fromMaybe [] <$> optional ns
775 ns4 = opt $ fromDict (binary getNodeInfo4 nodes_key) bval
776 ns6 = opt $ fromDict (binary getNodeInfo6 nodes6_key) bval
777
778binary :: S.Get a -> BKey -> BE.Get [a]
779binary get k = field (req k) >>= either (fail . format) return .
780 S.runGet (many get)
781 where
782 format str = "fail to deserialize " ++ show k ++ " field: " ++ str
783
784pingH :: NodeInfo -> Ping -> IO Pong
785pingH _ Ping = return Pong
786
787prefer4or6 :: NodeInfo -> Maybe WantIP -> WantIP
788prefer4or6 addr iptyp = fromMaybe (ipFamily $ nodeIP addr) iptyp
789
790findNodeH :: Routing -> NodeInfo -> FindNode -> IO NodeFound
791findNodeH routing addr (FindNode node iptyp) = do
792 let preferred = prefer4or6 addr iptyp
793
794 (append4,append6) <- atomically $ do
795 ni4 <- R.thisNode <$> readTVar (routing4 routing)
796 ni6 <- R.thisNode <$> readTVar (routing6 routing)
797 return $ case ipFamily (nodeIP addr) of
798 Want_IP4 -> (id, (++ [ni6]))
799 Want_IP6 -> ((++ [ni4]), id)
800 ks <- bool (return []) (go append4 $ routing4 routing) (preferred /= Want_IP6)
801 ks6 <- bool (return []) (go append6 $ routing6 routing) (preferred /= Want_IP4)
802 return $ NodeFound ks ks6
803 where
804 go f var = f . R.kclosest mainlineSpace k node <$> atomically (readTVar var)
805
806 k = R.defaultK
807
808
809data GetPeers = GetPeers InfoHash (Maybe WantIP)
810
811instance BEncode GetPeers where
812 toBEncode (GetPeers ih iptyp)
813 = toDict $ info_hash_key .=! ih
814 .: want_key .=? iptyp
815 .: endDict
816 fromBEncode = fromDict $ GetPeers <$>! info_hash_key <*>? want_key
817
818
819data GotPeers = GotPeers
820 { -- | If the queried node has no peers for the infohash, returned
821 -- the K nodes in the queried nodes routing table closest to the
822 -- infohash supplied in the query.
823 peers :: [PeerAddr]
824
825 , nodes :: NodeFound
826
827 -- | The token value is a required argument for a future
828 -- announce_peer query.
829 , grantedToken :: Token
830 } -- deriving (Show, Eq, Typeable)
831
832nodeIsIPv6 :: NodeInfo -> Bool
833nodeIsIPv6 (NodeInfo _ (IPv6 _) _) = True
834nodeIsIPv6 _ = False
835
836instance BEncode GotPeers where
837 toBEncode GotPeers { nodes = NodeFound ns4 ns6, ..} = toDict $
838 nodes_key .=? (if null ns4 then Nothing
839 else Just $ S.runPut (mapM_ putNodeInfo4 ns4))
840 .: nodes6_key .=? (if null ns6 then Nothing
841 else Just $ S.runPut (mapM_ putNodeInfo4 ns6))
842 .: token_key .=! grantedToken
843 .: peers_key .=! map S.encode peers
844 .: endDict
845
846 fromBEncode = fromDict $ do
847 ns4 <- fromMaybe [] <$> optional (binary getNodeInfo4 nodes_key) -- "nodes"
848 ns6 <- fromMaybe [] <$> optional (binary getNodeInfo6 nodes6_key) -- "nodes6"
849 -- TODO: BEP 42...
850 --
851 -- Once enforced, responses to get_peers requests whose node ID does not
852 -- match its external IP should be considered to not contain a token and
853 -- thus not be eligible as storage target. Implementations should take
854 -- care that they find the closest set of nodes which return a token and
855 -- whose IDs matches their IPs before sending a store request to those
856 -- nodes.
857 --
858 -- Sounds like something to take care of at peer-search time, so I'll
859 -- ignore it for now.
860 tok <- field (req token_key) -- "token"
861 ps <- fromMaybe [] <$> optional (field (req peers_key) >>= decodePeers) -- "values"
862 pure $ GotPeers ps (NodeFound ns4 ns6) tok
863 where
864 decodePeers = either fail pure . mapM S.decode
865
866getPeersH :: Routing -> SwarmsDatabase -> NodeInfo -> GetPeers -> IO GotPeers
867getPeersH routing (SwarmsDatabase peers toks _) naddr (GetPeers ih iptyp) = do
868 ps <- do
869 tm <- getTimestamp
870 atomically $ do
871 (ps,store') <- Peers.freshPeers ih tm <$> readTVar peers
872 writeTVar peers store'
873 return ps
874 -- Filter peer results to only a single address family, IPv4 or IPv6, as
875 -- per BEP 32.
876 let notboth = iptyp >>= \case Want_Both -> Nothing
877 specific -> Just specific
878 selected = prefer4or6 naddr notboth
879 ps' = filter ( (== selected) . ipFamily . peerHost ) ps
880 tok <- grantToken toks naddr
881 ns <- findNodeH routing naddr (FindNode (coerce ih) iptyp)
882 return $ GotPeers ps' ns tok
883
884-- | Announce that the peer, controlling the querying node, is
885-- downloading a torrent on a port.
886data Announce = Announce
887 { -- | If set, the 'port' field should be ignored and the source
888 -- port of the UDP packet should be used as the peer's port
889 -- instead. This is useful for peers behind a NAT that may not
890 -- know their external port, and supporting uTP, they accept
891 -- incoming connections on the same port as the DHT port.
892 impliedPort :: Bool
893
894 -- | infohash of the torrent;
895 , topic :: InfoHash
896
897 -- | some clients announce the friendly name of the torrent here.
898 , announcedName :: Maybe ByteString
899
900 -- | the port /this/ peer is listening;
901 , port :: PortNumber
902
903 -- TODO: optional boolean "seed" key
904
905 -- | received in response to a previous get_peers query.
906 , sessionToken :: Token
907
908 } deriving (Show, Eq, Typeable)
909
910peer_ip_key = "ip"
911peer_id_key = "peer id"
912peer_port_key = "port"
913msg_type_key = "msg_type"
914piece_key = "piece"
915total_size_key = "total_size"
916node_id_key :: BKey
917node_id_key = "id"
918read_only_key :: BKey
919read_only_key = "ro"
920want_key :: BKey
921want_key = "want"
922target_key :: BKey
923target_key = "target"
924nodes_key :: BKey
925nodes_key = "nodes"
926nodes6_key :: BKey
927nodes6_key = "nodes6"
928info_hash_key :: BKey
929info_hash_key = "info_hash"
930peers_key :: BKey
931peers_key = "values"
932token_key :: BKey
933token_key = "token"
934name_key :: BKey
935name_key = "name"
936port_key :: BKey
937port_key = "port"
938implied_port_key :: BKey
939implied_port_key = "implied_port"
940
941instance BEncode Announce where
942 toBEncode Announce {..} = toDict $
943 implied_port_key .=? flagField impliedPort
944 .: info_hash_key .=! topic
945 .: name_key .=? announcedName
946 .: port_key .=! port
947 .: token_key .=! sessionToken
948 .: endDict
949 where
950 flagField flag = if flag then Just (1 :: Int) else Nothing
951
952 fromBEncode = fromDict $ do
953 Announce <$> (boolField <$> optional (field (req implied_port_key)))
954 <*>! info_hash_key
955 <*>? name_key
956 <*>! port_key
957 <*>! token_key
958 where
959 boolField = maybe False (/= (0 :: Int))
960
961
962
963-- | The queried node must verify that the token was previously sent
964-- to the same IP address as the querying node. Then the queried node
965-- should store the IP address of the querying node and the supplied
966-- port number under the infohash in its store of peer contact
967-- information.
968data Announced = Announced
969 deriving (Show, Eq, Typeable)
970
971instance BEncode Announced where
972 toBEncode _ = toBEncode Ping
973 fromBEncode _ = pure Announced
974
975announceH :: SwarmsDatabase -> NodeInfo -> Announce -> IO (Either Error Announced)
976announceH (SwarmsDatabase peers toks _) naddr announcement = do
977 checkToken toks naddr (sessionToken announcement)
978 >>= bool (Left <$> return (Error ProtocolError "invalid parameter: token"))
979 (Right <$> go)
980 where
981 go = atomically $ do
982 modifyTVar' peers
983 $ insertPeer (topic announcement) (announcedName announcement)
984 $ PeerAddr
985 { peerId = Nothing
986 -- Avoid storing IPv4-mapped addresses.
987 , peerHost = case nodeIP naddr of
988 IPv6 ip6 | Just ip4 <- un4map ip6 -> IPv4 ip4
989 a -> a
990 , peerPort = if impliedPort announcement
991 then nodePort naddr
992 else port announcement
993 }
994 return Announced
995
996isReadonlyClient client = False -- TODO
997
998mainlineSend meth unwrap msg client nid addr = do
999 reply <- sendQuery client serializer (msg nid) addr
1000 -- sendQuery will return (Just (Left _)) on a parse error. We're going to
1001 -- blow it away with the join-either sequence.
1002 -- TODO: Do something with parse errors.
1003 return $ join $ either (const Nothing) Just <$> reply
1004 where
1005 serializer = MethodSerializer
1006 { methodTimeout = 5
1007 , method = meth
1008 , wrapQuery = encodeQueryPayload meth (isReadonlyClient client)
1009 , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack)
1010 (Right . unwrap)
1011 . BE.fromBEncode)
1012 . rspPayload
1013 }
1014
1015ping :: MainlineClient -> NodeInfo -> IO Bool
1016ping client addr =
1017 fromMaybe False
1018 <$> mainlineSend (Method "ping") (\Pong -> True) (const Ping) client () addr
1019
1020-- searchQuery :: ni -> IO (Maybe [ni], [r], tok))
1021getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],()))
1022getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both)
1023
1024unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6,())
1025
1026getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[PeerAddr],Token))
1027getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce
1028
1029unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, tok)
1030
1031mainlineSearch qry = Search
1032 { searchSpace = mainlineSpace
1033 , searchNodeAddress = nodeIP &&& nodePort
1034 , searchQuery = qry
1035 }
1036
1037nodeSearch client = mainlineSearch (getNodes client)
1038
1039peerSearch client = mainlineSearch (getPeers client)
1040
1041-- | List of bootstrap nodes maintained by different bittorrent
1042-- software authors.
1043bootstrapNodes :: WantIP -> IO [NodeInfo]
1044bootstrapNodes want = unsafeInterleaveIO $ do
1045 let wellknowns =
1046 [ "router.bittorrent.com:6881" -- by BitTorrent Inc.
1047
1048 -- doesn't work at the moment (use git blame) of commit
1049 , "dht.transmissionbt.com:6881" -- by Transmission project
1050
1051 , "router.utorrent.com:6881"
1052 ]
1053 nss <- forM wellknowns $ \hostAndPort -> do
1054 e <- resolve want hostAndPort
1055 case e of
1056 Left _ -> return []
1057 Right sockaddr -> either (const $ return [])
1058 (return . (: []))
1059 $ nodeInfo zeroID sockaddr
1060 return $ concat nss
1061
1062-- | Resolve either a numeric network address or a hostname to a
1063-- numeric IP address of the node.
1064resolve :: WantIP -> String -> IO (Either IOError SockAddr)
1065resolve want hostAndPort = do
1066 let hints = defaultHints { addrSocketType = Datagram
1067 , addrFamily = case want of
1068 Want_IP4 -> AF_INET
1069 _ -> AF_INET6
1070 }
1071 (rport,rhost) = span (/= ':') $ reverse hostAndPort
1072 (host,port) = case rhost of
1073 [] -> (hostAndPort, Nothing)
1074 (_:hs) -> (reverse hs, Just (reverse rport))
1075 tryIOError $ do
1076 -- getAddrInfo throws exception on empty list, so this
1077 -- pattern matching never fails.
1078 info : _ <- getAddrInfo (Just hints) (Just host) port
1079 return $ addrAddress info
1080
1081