diff options
-rw-r--r-- | Mainline.hs | 145 |
1 files changed, 125 insertions, 20 deletions
diff --git a/Mainline.hs b/Mainline.hs index ef452f7d..84fe96bf 100644 --- a/Mainline.hs +++ b/Mainline.hs | |||
@@ -1,3 +1,4 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
1 | {-# LANGUAGE DeriveDataTypeable #-} | 2 | {-# LANGUAGE DeriveDataTypeable #-} |
2 | {-# LANGUAGE DeriveFoldable #-} | 3 | {-# LANGUAGE DeriveFoldable #-} |
3 | {-# LANGUAGE DeriveFunctor #-} | 4 | {-# LANGUAGE DeriveFunctor #-} |
@@ -7,11 +8,13 @@ | |||
7 | {-# LANGUAGE LambdaCase #-} | 8 | {-# LANGUAGE LambdaCase #-} |
8 | {-# LANGUAGE PatternSynonyms #-} | 9 | {-# LANGUAGE PatternSynonyms #-} |
9 | {-# LANGUAGE StandaloneDeriving #-} | 10 | {-# LANGUAGE StandaloneDeriving #-} |
11 | {-# LANGUAGE TupleSections #-} | ||
10 | module Mainline where | 12 | module Mainline where |
11 | 13 | ||
12 | import Control.Applicative | 14 | import Control.Applicative |
13 | import Control.Arrow | 15 | import Control.Arrow |
14 | import Control.Concurrent.STM | 16 | import Control.Concurrent.STM |
17 | import Control.Monad | ||
15 | import Crypto.Random | 18 | import Crypto.Random |
16 | import Data.BEncode as BE | 19 | import Data.BEncode as BE |
17 | import qualified Data.BEncode.BDict as BE | 20 | import qualified Data.BEncode.BDict as BE |
@@ -38,15 +41,15 @@ import Data.Set (Set) | |||
38 | import Data.Torrent | 41 | import Data.Torrent |
39 | import Data.Typeable | 42 | import Data.Typeable |
40 | import Data.Word | 43 | import Data.Word |
44 | import Kademlia | ||
41 | import Network.Address (Address, fromSockAddr, setPort, | 45 | import Network.Address (Address, fromSockAddr, setPort, |
42 | sockAddrPort, toSockAddr, testIdBit) | 46 | sockAddrPort, testIdBit, toSockAddr) |
43 | import Network.BitTorrent.DHT.ContactInfo as Peers | 47 | import Network.BitTorrent.DHT.ContactInfo as Peers |
44 | import Network.BitTorrent.DHT.Token as Token | 48 | import Network.BitTorrent.DHT.Token as Token |
45 | import qualified Network.DHT.Routing as R | 49 | import qualified Network.DHT.Routing as R |
46 | ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) | 50 | ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) |
47 | import Network.QueryResponse | 51 | import Network.QueryResponse |
48 | import Network.Socket | 52 | import Network.Socket |
49 | import Kademlia | ||
50 | 53 | ||
51 | newtype NodeId = NodeId ByteString | 54 | newtype NodeId = NodeId ByteString |
52 | deriving (Eq,Ord,Show,ByteArrayAccess, BEncode, Bits) | 55 | deriving (Eq,Ord,Show,ByteArrayAccess, BEncode, Bits) |
@@ -99,6 +102,9 @@ putNodeInfo6 (NodeInfo (NodeId nid) (IPv6 ip) port) | |||
99 | putNodeInfo6 _ = return () | 102 | putNodeInfo6 _ = return () |
100 | 103 | ||
101 | 104 | ||
105 | -- TODO: We should use a SocketAddrInet6 address for a dual-stack listen | ||
106 | -- socket. Therefore, the behavior of this method should depend on the bind | ||
107 | -- address for outbound packets. | ||
102 | nodeAddr :: NodeInfo -> SockAddr | 108 | nodeAddr :: NodeInfo -> SockAddr |
103 | nodeAddr (NodeInfo _ ip port) = setPort port $ toSockAddr ip | 109 | nodeAddr (NodeInfo _ ip port) = setPort port $ toSockAddr ip |
104 | 110 | ||
@@ -265,8 +271,9 @@ data Routing = Routing | |||
265 | , routing6 :: !( TVar (R.BucketList NodeInfo) ) | 271 | , routing6 :: !( TVar (R.BucketList NodeInfo) ) |
266 | } | 272 | } |
267 | 273 | ||
268 | newClient :: | 274 | type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) |
269 | SockAddr -> IO (Client String Method TransactionId NodeInfo (Message BValue)) | 275 | |
276 | newClient :: SockAddr -> IO MainlineClient | ||
270 | newClient addr = do | 277 | newClient addr = do |
271 | udp <- udpTransport addr | 278 | udp <- udpTransport addr |
272 | nid <- NodeId <$> getRandomBytes 20 | 279 | nid <- NodeId <$> getRandomBytes 20 |
@@ -281,13 +288,21 @@ newClient addr = do | |||
281 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) tenative_info nobkts | 288 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) tenative_info nobkts |
282 | return $ Routing tenative_info tbl4 tbl6 | 289 | return $ Routing tenative_info tbl4 tbl6 |
283 | swarms <- newSwarmsDatabase | 290 | swarms <- newSwarmsDatabase |
284 | let net = onInbound (updateRouting routing) | 291 | map_var <- atomically $ newTVar (0, mempty) |
292 | let net = onInbound (updateRouting outgoingClient routing) | ||
285 | $ layerTransport parsePacket encodePacket | 293 | $ layerTransport parsePacket encodePacket |
286 | $ udp | 294 | $ udp |
287 | dispatch tbl = DispatchMethods | 295 | |
288 | { classifyInbound = classify | 296 | -- Paranoid: It's safe to define /net/ and /client/ to be mutually |
289 | , lookupHandler = handlers | 297 | -- recursive since 'updateRouting' does not invoke 'awaitMessage' which |
290 | , tableMethods = tbl | 298 | -- which was modified by 'onInbound'. However, I'm going to avoid the |
299 | -- mutual reference just to be safe. | ||
300 | outgoingClient = client { clientNet = net { awaitMessage = return Nothing } } | ||
301 | |||
302 | dispatch = DispatchMethods | ||
303 | { classifyInbound = classify -- :: x -> MessageClass err meth tid | ||
304 | , lookupHandler = handlers -- :: meth -> Maybe (MethodHandler err tid addr x) | ||
305 | , tableMethods = mapT -- :: TransactionMethods tbl tid x | ||
291 | } | 306 | } |
292 | 307 | ||
293 | handlers :: Method -> Maybe Handler | 308 | handlers :: Method -> Maybe Handler |
@@ -298,12 +313,13 @@ newClient addr = do | |||
298 | handlers ( Method meth ) = Just $ defaultHandler meth | 313 | handlers ( Method meth ) = Just $ defaultHandler meth |
299 | 314 | ||
300 | mapT = transactionMethods mapMethods gen | 315 | mapT = transactionMethods mapMethods gen |
316 | |||
301 | gen :: Word16 -> (TransactionId, Word16) | 317 | gen :: Word16 -> (TransactionId, Word16) |
302 | gen cnt = (TransactionId $ S.encode cnt, cnt+1) | 318 | gen cnt = (TransactionId $ S.encode cnt, cnt+1) |
303 | map_var <- atomically $ newTVar (0, mempty) | 319 | |
304 | return Client | 320 | client = Client |
305 | { clientNet = net | 321 | { clientNet = net |
306 | , clientDispatcher = dispatch mapT | 322 | , clientDispatcher = dispatch |
307 | , clientErrorReporter = ignoreErrors -- TODO | 323 | , clientErrorReporter = ignoreErrors -- TODO |
308 | , clientPending = map_var | 324 | , clientPending = map_var |
309 | , clientAddress = \maddr -> atomically $ do | 325 | , clientAddress = \maddr -> atomically $ do |
@@ -314,16 +330,18 @@ newClient addr = do | |||
314 | , clientResponseId = return | 330 | , clientResponseId = return |
315 | } | 331 | } |
316 | 332 | ||
333 | return client | ||
334 | |||
317 | defaultHandler :: ByteString -> Handler | 335 | defaultHandler :: ByteString -> Handler |
318 | defaultHandler meth = MethodHandler decodePayload errorPayload returnError | 336 | defaultHandler meth = MethodHandler decodePayload errorPayload returnError |
319 | where | 337 | where |
320 | returnError :: NodeInfo -> BValue -> IO Error | 338 | returnError :: NodeInfo -> BValue -> IO Error |
321 | returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) | 339 | returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) |
322 | 340 | ||
323 | mainlineKademlia :: Kademlia NodeId NodeInfo | 341 | mainlineKademlia :: MainlineClient -> TVar (R.BucketList NodeInfo) -> Kademlia NodeId NodeInfo |
324 | mainlineKademlia = Kademlia quietInsertions | 342 | mainlineKademlia client var = Kademlia quietInsertions |
325 | mainlineSpace | 343 | mainlineSpace |
326 | (vanillaIO (error "var") $ error "pingProbe") | 344 | (vanillaIO var $ ping client) |
327 | 345 | ||
328 | mainlineSpace :: R.KademliaSpace NodeId NodeInfo | 346 | mainlineSpace :: R.KademliaSpace NodeId NodeInfo |
329 | mainlineSpace = R.KademliaSpace | 347 | mainlineSpace = R.KademliaSpace |
@@ -333,10 +351,11 @@ mainlineSpace = R.KademliaSpace | |||
333 | } | 351 | } |
334 | 352 | ||
335 | 353 | ||
336 | updateRouting :: Routing -> NodeInfo -> Message BValue -> IO () | 354 | updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () |
337 | updateRouting routing naddr _ = do | 355 | updateRouting client routing naddr _ = do |
338 | error "todo" insertNode | 356 | case prefer4or6 naddr Nothing of |
339 | -- TODO Update kademlia table. | 357 | Want_IP4 -> insertNode (mainlineKademlia client $ routing4 routing) naddr |
358 | Want_IP6 -> insertNode (mainlineKademlia client $ routing6 routing) naddr | ||
340 | -- TODO Update external ip address and update BEP-42 node id. | 359 | -- TODO Update external ip address and update BEP-42 node id. |
341 | return () | 360 | return () |
342 | 361 | ||
@@ -618,3 +637,89 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do | |||
618 | else port announcement | 637 | else port announcement |
619 | } | 638 | } |
620 | return Announced | 639 | return Announced |
640 | |||
641 | ping :: MainlineClient -> NodeInfo -> IO Bool | ||
642 | ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr | ||
643 | where | ||
644 | serializer = MethodSerializer | ||
645 | { methodTimeout = 5 | ||
646 | , method = Method "ping" | ||
647 | , wrapQuery = encodePayload | ||
648 | , unwrapResponse = const True | ||
649 | } | ||
650 | |||
651 | data TriadSlot = SlotA | SlotB | SlotC | ||
652 | deriving (Eq,Ord,Enum,Show,Read) | ||
653 | |||
654 | data TriadCommittee voter a = TriadCommittee | ||
655 | { triadDecider :: TVar TriadSlot | ||
656 | , triadA :: TVar (Maybe (voter,a)) | ||
657 | , triadB :: TVar (Maybe (voter,a)) | ||
658 | , triadC :: TVar (Maybe (voter,a)) | ||
659 | , triadNewDecision :: a -> STM () | ||
660 | } | ||
661 | |||
662 | triadSlot :: TriadSlot -> TriadCommittee voter a -> TVar (Maybe (voter,a)) | ||
663 | triadSlot SlotA = triadA | ||
664 | triadSlot SlotB = triadB | ||
665 | triadSlot SlotC = triadC | ||
666 | |||
667 | triadDecision :: a -> TriadCommittee voter a -> STM a | ||
668 | triadDecision fallback triad = do | ||
669 | slot <- readTVar (triadDecider triad) | ||
670 | maybe fallback snd <$> readTVar (triadSlot slot triad) | ||
671 | |||
672 | |||
673 | newTriadCommittee :: (a -> STM ()) -> STM (TriadCommittee voter a) | ||
674 | newTriadCommittee onChange = | ||
675 | TriadCommittee <$> newTVar SlotA | ||
676 | <*> newTVar Nothing | ||
677 | <*> newTVar Nothing | ||
678 | <*> newTVar Nothing | ||
679 | <*> pure onChange | ||
680 | |||
681 | triadCountVotes :: Eq a => TriadCommittee voter a -> STM () | ||
682 | triadCountVotes triad = do | ||
683 | prior <- do | ||
684 | slot <- readTVar (triadDecider triad) | ||
685 | fmap snd <$> readTVar (triadSlot slot triad) | ||
686 | a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad) | ||
687 | b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad) | ||
688 | c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad) | ||
689 | let (slot,vote) = case catMaybes [a,b,c] of | ||
690 | [ (x,xvote) | ||
691 | , (y,yvote) | ||
692 | , (z,zvote) ] -> if xvote == yvote then (x,Just xvote) | ||
693 | else (z,Just zvote) | ||
694 | [] -> (SlotA,Nothing) | ||
695 | ((slot,vote):_) -> (slot, Just vote) | ||
696 | writeTVar (triadDecider triad) slot | ||
697 | case vote of | ||
698 | Just v | vote /= prior -> triadNewDecision triad v | ||
699 | _ -> return () | ||
700 | |||
701 | |||
702 | addVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> a -> STM () | ||
703 | addVote triad voter vote = do | ||
704 | a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) | ||
705 | b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) | ||
706 | c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) | ||
707 | let avail (_,Nothing) = True | ||
708 | avail (_,Just x ) = (x == voter) | ||
709 | slots = filter avail [a,b,c] | ||
710 | forM_ (take 1 slots) $ \(slot,_) -> do | ||
711 | writeTVar (triadSlot slot triad) | ||
712 | (Just (voter,vote)) | ||
713 | triadCountVotes triad | ||
714 | |||
715 | delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM () | ||
716 | delVote triad voter = do | ||
717 | a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) | ||
718 | b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) | ||
719 | c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) | ||
720 | let match (_,Just x ) = (x == voter) | ||
721 | slots = filter match [a,b,c] | ||
722 | forM_ (take 1 slots) $ \(slot,_) -> do | ||
723 | writeTVar (triadSlot slot triad) Nothing | ||
724 | triadCountVotes triad | ||
725 | |||