Skip to content

Commit

Permalink
network: handle p2p to ws messages propagation (#6156)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Nov 19, 2024
1 parent a6123b6 commit ce9b2b0
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 3 deletions.
10 changes: 10 additions & 0 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ type TxHandlerOpts struct {
Config config.Local
}

// HybridRelayer is an interface for relaying p2p transactions to WS network
type HybridRelayer interface {
BridgeP2PToWS(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except network.Peer) error
}

// MakeTxHandler makes a new handler for transaction messages
func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {

Expand Down Expand Up @@ -868,6 +873,11 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
if err != nil {
logging.Base().Infof("unable to pin transaction: %v", err)
}

if hybridNet, ok := handler.net.(HybridRelayer); ok {
_ = hybridNet.BridgeP2PToWS(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender)
}

return network.OutgoingMessage{
Action: network.Accept,
}
Expand Down
5 changes: 5 additions & 0 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func (n *HybridP2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b
})
}

// BridgeP2PToWS skips Relay/Broadcast to both networks and only sends to WS
func (n *HybridP2PNetwork) BridgeP2PToWS(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error {
return n.wsNetwork.Relay(ctx, tag, data, wait, except)
}

// Disconnect implements GossipNode
func (n *HybridP2PNetwork) Disconnect(badnode DisconnectablePeer) {
net := badnode.GetNetwork()
Expand Down
156 changes: 153 additions & 3 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func setupFullNodesEx(
genesis[short] = data
}
genesis[poolAddr] = basics.AccountData{
Status: basics.Online,
Status: basics.Offline,
MicroAlgos: basics.MicroAlgos{Raw: uint64(100000)},
}

Expand Down Expand Up @@ -241,7 +241,7 @@ func setupFullNodesEx(
cfg, err := config.LoadConfigFromDisk(rootDirectory)
phonebook := phonebookHook(nodeInfos, i)
require.NoError(t, err)
node, err := MakeFull(logging.Base(), rootDirectory, cfg, phonebook, g)
node, err := MakeFull(logging.Base().With("net", fmt.Sprintf("node%d", i)), rootDirectory, cfg, phonebook, g)
nodes[i] = node
require.NoError(t, err)
}
Expand Down Expand Up @@ -861,7 +861,7 @@ func TestMaxSizesCorrect(t *testing.T) {
// N -- R -- A and ensures N can discover A and download blocks from it.
//
// N is a non-part node that joins the network later
// R is a non-archival relay node with block service disabled. It MUST NOT service blocks to force N to discover A.
// R is a non-archival relay node with block service disabled. It MUST NOT serve blocks to force N to discover A.
// A is a archival node that can only provide blocks.
// Nodes N and A have only R in their initial phonebook, and all nodes are in hybrid mode.
func TestNodeHybridTopology(t *testing.T) {
Expand Down Expand Up @@ -1112,3 +1112,153 @@ func TestNodeSetCatchpointCatchupMode(t *testing.T) {
})
}
}

// TestNodeHybridP2PGossipSend set ups 3 nodes network with the following topology:
// N0 -- R -- N2 where N0 is wsnet only, R is a relay hybrid node, and N2 is p2pnet only.
//
// N0 is the only blocks producer, and N2 is the only transaction supplier.
// Test ensures that a hybrid R relay can properly deliver transactions to N0.
func TestNodeHybridP2PGossipSend(t *testing.T) {
partitiontest.PartitionTest(t)

const consensusTest0 = protocol.ConsensusVersion("test0")

configurableConsensus := make(config.ConsensusProtocols)

testParams0 := config.Consensus[protocol.ConsensusCurrentVersion]
testParams0.AgreementFilterTimeoutPeriod0 = 500 * time.Millisecond
configurableConsensus[consensusTest0] = testParams0

// configure the stake to have R and A producing and confirming blocks
const totalStake = 100_000_000_000
const npnStake = 1_000_000
const nodeStake = totalStake - npnStake
const numAccounts = 3
acctStake := make([]basics.MicroAlgos, numAccounts)
acctStake[0] = basics.MicroAlgos{Raw: nodeStake}
acctStake[1] = basics.MicroAlgos{}
acctStake[2] = basics.MicroAlgos{Raw: npnStake}

configHook := func(ni nodeInfo, cfg config.Local) (nodeInfo, config.Local) {
cfg = config.GetDefaultLocal()
cfg.CatchpointInterval = 0
cfg.BaseLoggerDebugLevel = uint32(logging.Debug)
if ni.idx == 0 {
// node 0 is ws node only
cfg.EnableP2PHybridMode = false
cfg.EnableP2P = false
}

if ni.idx == 1 {
// node 1 is a hybrid relay
cfg.EnableBlockService = true
cfg.EnableGossipBlockService = true
cfg.NetAddress = ni.wsNetAddr()
cfg.EnableP2PHybridMode = true
cfg.PublicAddress = ni.wsNetAddr()
cfg.P2PPersistPeerID = true
privKey, err := p2p.GetPrivKey(cfg, ni.rootDir)
require.NoError(t, err)
ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic())
require.NoError(t, err)

cfg.P2PHybridNetAddress = ni.p2pNetAddr()
}
if ni.idx == 2 {
// node 2 is p2p only
cfg.EnableP2PHybridMode = false
cfg.EnableP2P = true
}
return ni, cfg
}

phonebookHook := func(ni []nodeInfo, i int) []string {
switch i {
case 0:
// node 0 (N0) connects to R
t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr())
return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()}
case 1:
// node 1 (R) is a relay accepting connections from all
t.Logf("Node%d phonebook: empty", i)
return []string{}
case 2:
// node 2 (A) connects to R
t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr())
return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()}
default:
t.Errorf("not expected number of nodes: %d", i)
t.FailNow()
}
return nil
}

nodes, wallets := setupFullNodesEx(t, consensusTest0, configurableConsensus, acctStake, configHook, phonebookHook)
require.Len(t, nodes, 3)
require.Len(t, wallets, 3)
for i := 0; i < len(nodes); i++ {
defer os.Remove(wallets[i])
defer nodes[i].Stop()
}

startAndConnectNodes(nodes, nodelayFirstNodeStartDelay)

// ensure the initial connectivity topology
require.Eventually(t, func() bool {
node0Conn := len(nodes[0].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1
node1Conn := len(nodes[1].net.GetPeers(network.PeersConnectedOut, network.PeersConnectedIn)) == 2 // connected from 0 and 2
node2Conn := len(nodes[2].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1
return node0Conn && node1Conn && node2Conn
}, 60*time.Second, 500*time.Millisecond)

// now wait 2x heartbeat interval (GossipSubHeartbeatInterval) to ensure the meshsub is built
time.Sleep(2 * time.Second)

filename := filepath.Join(nodes[2].genesisDirs.RootGenesisDir, wallets[2])
access, err := db.MakeAccessor(filename, false, false)
require.NoError(t, err)
root, err := account.RestoreRoot(access)
access.Close()
require.NoError(t, err)

addr2 := root.Address()
secrets2 := root.Secrets()

txn := transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addr2,
FirstValid: 1,
LastValid: 100,
Fee: basics.MicroAlgos{Raw: 1000},
GenesisID: nodes[2].genesisID,
GenesisHash: nodes[2].genesisHash,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: addr2,
Amount: basics.MicroAlgos{Raw: 0},
},
}
signature := secrets2.Sign(txn)
stxn := transactions.SignedTxn{
Sig: signature,
Txn: txn,
}

err = nodes[2].BroadcastSignedTxGroup([]transactions.SignedTxn{stxn})
require.NoError(t, err)

initialRound := nodes[0].ledger.NextRound()
targetRound := initialRound + 10
t.Logf("Waiting for round %d (initial %d)", targetRound, initialRound)

// ensure tx properly propagated to node 0
select {
case <-nodes[0].ledger.Wait(targetRound):
b, err := nodes[0].ledger.Block(targetRound)
require.NoError(t, err)
require.Greater(t, b.TxnCounter, uint64(1000)) // new initial value after AppForbidLowResources
case <-time.After(1 * time.Minute):
require.Fail(t, fmt.Sprintf("no block notification for wallet: %v.", wallets[0]))
}
}

0 comments on commit ce9b2b0

Please sign in to comment.