From ce9b2b0870043ef9d89be9ccf5cda0c42e3af70c Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Tue, 19 Nov 2024 07:43:50 -0500 Subject: [PATCH] network: handle p2p to ws messages propagation (#6156) --- data/txHandler.go | 10 +++ network/hybridNetwork.go | 5 ++ node/node_test.go | 156 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 168 insertions(+), 3 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 11f2a7fe5a..ecea78f522 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -149,6 +149,11 @@ type TxHandlerOpts struct { Config config.Local } +// HybridRelayer is an interface for relaying p2p transactions to WS network +type HybridRelayer interface { + BridgeP2PToWS(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except network.Peer) error +} + // MakeTxHandler makes a new handler for transaction messages func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { @@ -868,6 +873,11 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa if err != nil { logging.Base().Infof("unable to pin transaction: %v", err) } + + if hybridNet, ok := handler.net.(HybridRelayer); ok { + _ = hybridNet.BridgeP2PToWS(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender) + } + return network.OutgoingMessage{ Action: network.Accept, } diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 5f31436fb8..c62c01c5d6 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -125,6 +125,11 @@ func (n *HybridP2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b }) } +// BridgeP2PToWS skips Relay/Broadcast to both networks and only sends to WS +func (n *HybridP2PNetwork) BridgeP2PToWS(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error { + return n.wsNetwork.Relay(ctx, tag, data, wait, except) +} + // Disconnect implements GossipNode func (n *HybridP2PNetwork) Disconnect(badnode DisconnectablePeer) { net := badnode.GetNetwork() diff --git a/node/node_test.go b/node/node_test.go index 664f115482..cc4a795c82 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -207,7 +207,7 @@ func setupFullNodesEx( genesis[short] = data } genesis[poolAddr] = basics.AccountData{ - Status: basics.Online, + Status: basics.Offline, MicroAlgos: basics.MicroAlgos{Raw: uint64(100000)}, } @@ -241,7 +241,7 @@ func setupFullNodesEx( cfg, err := config.LoadConfigFromDisk(rootDirectory) phonebook := phonebookHook(nodeInfos, i) require.NoError(t, err) - node, err := MakeFull(logging.Base(), rootDirectory, cfg, phonebook, g) + node, err := MakeFull(logging.Base().With("net", fmt.Sprintf("node%d", i)), rootDirectory, cfg, phonebook, g) nodes[i] = node require.NoError(t, err) } @@ -861,7 +861,7 @@ func TestMaxSizesCorrect(t *testing.T) { // N -- R -- A and ensures N can discover A and download blocks from it. // // N is a non-part node that joins the network later -// R is a non-archival relay node with block service disabled. It MUST NOT service blocks to force N to discover A. +// R is a non-archival relay node with block service disabled. It MUST NOT serve blocks to force N to discover A. // A is a archival node that can only provide blocks. // Nodes N and A have only R in their initial phonebook, and all nodes are in hybrid mode. func TestNodeHybridTopology(t *testing.T) { @@ -1112,3 +1112,153 @@ func TestNodeSetCatchpointCatchupMode(t *testing.T) { }) } } + +// TestNodeHybridP2PGossipSend set ups 3 nodes network with the following topology: +// N0 -- R -- N2 where N0 is wsnet only, R is a relay hybrid node, and N2 is p2pnet only. +// +// N0 is the only blocks producer, and N2 is the only transaction supplier. +// Test ensures that a hybrid R relay can properly deliver transactions to N0. +func TestNodeHybridP2PGossipSend(t *testing.T) { + partitiontest.PartitionTest(t) + + const consensusTest0 = protocol.ConsensusVersion("test0") + + configurableConsensus := make(config.ConsensusProtocols) + + testParams0 := config.Consensus[protocol.ConsensusCurrentVersion] + testParams0.AgreementFilterTimeoutPeriod0 = 500 * time.Millisecond + configurableConsensus[consensusTest0] = testParams0 + + // configure the stake to have R and A producing and confirming blocks + const totalStake = 100_000_000_000 + const npnStake = 1_000_000 + const nodeStake = totalStake - npnStake + const numAccounts = 3 + acctStake := make([]basics.MicroAlgos, numAccounts) + acctStake[0] = basics.MicroAlgos{Raw: nodeStake} + acctStake[1] = basics.MicroAlgos{} + acctStake[2] = basics.MicroAlgos{Raw: npnStake} + + configHook := func(ni nodeInfo, cfg config.Local) (nodeInfo, config.Local) { + cfg = config.GetDefaultLocal() + cfg.CatchpointInterval = 0 + cfg.BaseLoggerDebugLevel = uint32(logging.Debug) + if ni.idx == 0 { + // node 0 is ws node only + cfg.EnableP2PHybridMode = false + cfg.EnableP2P = false + } + + if ni.idx == 1 { + // node 1 is a hybrid relay + cfg.EnableBlockService = true + cfg.EnableGossipBlockService = true + cfg.NetAddress = ni.wsNetAddr() + cfg.EnableP2PHybridMode = true + cfg.PublicAddress = ni.wsNetAddr() + cfg.P2PPersistPeerID = true + privKey, err := p2p.GetPrivKey(cfg, ni.rootDir) + require.NoError(t, err) + ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic()) + require.NoError(t, err) + + cfg.P2PHybridNetAddress = ni.p2pNetAddr() + } + if ni.idx == 2 { + // node 2 is p2p only + cfg.EnableP2PHybridMode = false + cfg.EnableP2P = true + } + return ni, cfg + } + + phonebookHook := func(ni []nodeInfo, i int) []string { + switch i { + case 0: + // node 0 (N0) connects to R + t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr()) + return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()} + case 1: + // node 1 (R) is a relay accepting connections from all + t.Logf("Node%d phonebook: empty", i) + return []string{} + case 2: + // node 2 (A) connects to R + t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr()) + return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()} + default: + t.Errorf("not expected number of nodes: %d", i) + t.FailNow() + } + return nil + } + + nodes, wallets := setupFullNodesEx(t, consensusTest0, configurableConsensus, acctStake, configHook, phonebookHook) + require.Len(t, nodes, 3) + require.Len(t, wallets, 3) + for i := 0; i < len(nodes); i++ { + defer os.Remove(wallets[i]) + defer nodes[i].Stop() + } + + startAndConnectNodes(nodes, nodelayFirstNodeStartDelay) + + // ensure the initial connectivity topology + require.Eventually(t, func() bool { + node0Conn := len(nodes[0].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1 + node1Conn := len(nodes[1].net.GetPeers(network.PeersConnectedOut, network.PeersConnectedIn)) == 2 // connected from 0 and 2 + node2Conn := len(nodes[2].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1 + return node0Conn && node1Conn && node2Conn + }, 60*time.Second, 500*time.Millisecond) + + // now wait 2x heartbeat interval (GossipSubHeartbeatInterval) to ensure the meshsub is built + time.Sleep(2 * time.Second) + + filename := filepath.Join(nodes[2].genesisDirs.RootGenesisDir, wallets[2]) + access, err := db.MakeAccessor(filename, false, false) + require.NoError(t, err) + root, err := account.RestoreRoot(access) + access.Close() + require.NoError(t, err) + + addr2 := root.Address() + secrets2 := root.Secrets() + + txn := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: addr2, + FirstValid: 1, + LastValid: 100, + Fee: basics.MicroAlgos{Raw: 1000}, + GenesisID: nodes[2].genesisID, + GenesisHash: nodes[2].genesisHash, + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: addr2, + Amount: basics.MicroAlgos{Raw: 0}, + }, + } + signature := secrets2.Sign(txn) + stxn := transactions.SignedTxn{ + Sig: signature, + Txn: txn, + } + + err = nodes[2].BroadcastSignedTxGroup([]transactions.SignedTxn{stxn}) + require.NoError(t, err) + + initialRound := nodes[0].ledger.NextRound() + targetRound := initialRound + 10 + t.Logf("Waiting for round %d (initial %d)", targetRound, initialRound) + + // ensure tx properly propagated to node 0 + select { + case <-nodes[0].ledger.Wait(targetRound): + b, err := nodes[0].ledger.Block(targetRound) + require.NoError(t, err) + require.Greater(t, b.TxnCounter, uint64(1000)) // new initial value after AppForbidLowResources + case <-time.After(1 * time.Minute): + require.Fail(t, fmt.Sprintf("no block notification for wallet: %v.", wallets[0])) + } +}