Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eval: split TestTransactionGroup from BlockEvaluator using TransactionGroupTester #5818

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
50 changes: 44 additions & 6 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/ledger/eval"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
Expand Down Expand Up @@ -97,6 +98,9 @@
// exceed the txPoolMaxSize. This flag is reset to false OnNewBlock
stateproofOverflowed bool

testBlockEvaluator *eval.TestBlockEvaluator
testBlockEvaluatorMu deadlock.RWMutex

// shutdown is set to true when the pool is being shut down. It is checked in exported methods
// to prevent pool operations like remember and recomputing the block evaluator
// from using down stream resources like ledger that may be shutting down.
Expand All @@ -105,7 +109,6 @@

// BlockEvaluator defines the block evaluator interface exposed by the ledger package.
type BlockEvaluator interface {
TestTransactionGroup(txgroup []transactions.SignedTxn) error
Round() basics.Round
PaySetSize() int
TransactionGroup(txads []transactions.SignedTxnWithAD) error
Expand All @@ -114,6 +117,36 @@
ResetTxnBytes()
}

// testEvalContext implements the eval.TestEvalContext interface. It allows for concurrent
// calls to TestTransactionGroup for candidate transactions calling transactionPool.Test.
type testEvalContext struct {
ledger *ledger.Ledger
block bookkeeping.Block
proto config.ConsensusParams
specials transactions.SpecialAddresses
}

func newTestBlockEvaluator(ledger *ledger.Ledger, block bookkeeping.Block) *eval.TestBlockEvaluator {
return &eval.TestBlockEvaluator{
jannotti marked this conversation as resolved.
Show resolved Hide resolved
TestEvalContext: &testEvalContext{
ledger: ledger,
block: block,
proto: config.Consensus[block.CurrentProtocol],
specials: transactions.SpecialAddresses{
FeeSink: block.FeeSink,
RewardsPool: block.RewardsPool,
},
}}
}

func (c *testEvalContext) Proto() config.ConsensusParams { return c.proto }
func (c *testEvalContext) Specials() transactions.SpecialAddresses { return c.specials }
func (c *testEvalContext) TxnContext() transactions.TxnContext { return c.block }
func (c *testEvalContext) CheckDup(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error {

Check warning on line 145 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L142-L145

Added lines #L142 - L145 were not covered by tests
// will call txTail.checkDup, which uses an RLock for concurrent access.
return c.ledger.CheckDup(c.proto, c.block.BlockHeader.Round, firstValid, lastValid, txid, txl)

Check warning on line 147 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L147

Added line #L147 was not covered by tests
}

// VotingAccountSupplier provides a list of possible participating account addresses valid for a given round.
type VotingAccountSupplier interface {
VotingAccountsForRound(basics.Round) []basics.Address
Expand Down Expand Up @@ -390,19 +423,20 @@

// Test performs basic duplicate detection and well-formedness checks
// on a transaction group without storing the group.
// It may be called concurrently.
func (pool *TransactionPool) Test(txgroup []transactions.SignedTxn) error {
if err := pool.checkPendingQueueSize(txgroup); err != nil {
return err
}

pool.mu.Lock()
defer pool.mu.Unlock()
pool.testBlockEvaluatorMu.RLock()
defer pool.testBlockEvaluatorMu.RUnlock()

Check warning on line 433 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L432-L433

Added lines #L432 - L433 were not covered by tests

if pool.pendingBlockEvaluator == nil {
return fmt.Errorf("Test: pendingBlockEvaluator is nil")
if pool.testBlockEvaluator == nil {
return fmt.Errorf("Test: testEvalCtx is nil")

Check warning on line 436 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L435-L436

Added lines #L435 - L436 were not covered by tests
}

return pool.pendingBlockEvaluator.TestTransactionGroup(txgroup)
return pool.testBlockEvaluator.TestTransactionGroup(txgroup)

Check warning on line 439 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L439

Added line #L439 was not covered by tests
}

type poolIngestParams struct {
Expand Down Expand Up @@ -749,6 +783,10 @@
return
}

pool.testBlockEvaluatorMu.Lock()
pool.testBlockEvaluator = newTestBlockEvaluator(pool.ledger, next)
pool.testBlockEvaluatorMu.Unlock()

var asmStats telemetryspec.AssembleBlockMetrics
asmStats.StartCount = len(txgroups)
asmStats.StopReason = telemetryspec.AssembleBlockEmpty
Expand Down
3 changes: 2 additions & 1 deletion data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ func (handler *TxHandler) Start() {
},
})

handler.backlogWg.Add(2)
handler.backlogWg.Add(3)
go handler.backlogWorker()
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
go handler.backlogWorker()
go handler.backlogGaugeThread()
handler.streamVerifier.Start(handler.ctx)
Expand Down
75 changes: 53 additions & 22 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2204,21 +2204,46 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
handler.postProcessCheckedTxn(&wi)
require.Equal(t, 1, getMetricCounter(txPoolRememberTagTxnDead))

txn1 := transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addresses[0],
Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
FirstValid: 0,
LastValid: basics.Round(proto.MaxTxnLife),
GenesisHash: genesisHash,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: poolAddr,
Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)},
},
makeTxn := func() transactions.Transaction {
return transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addresses[0],
Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
FirstValid: 0,
LastValid: basics.Round(proto.MaxTxnLife),
GenesisHash: genesisHash,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: poolAddr,
Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)},
},
}
}
txn1 := makeTxn()

// add a round 1 block with oldTxn1 and oldTxn2 in it (for txID and lease checking later)
oldTxn1 := makeTxn()
oldTxn2 := oldTxn1
crypto.RandBytes(oldTxn2.Lease[:])
prev, err := ledger.BlockHdr(ledger.Latest())
require.NoError(t, err)
next := bookkeeping.MakeBlock(prev)
blockEval, err := ledger.StartEvaluator(next.BlockHeader, 0, 0, nil)
require.NoError(t, err)
err = blockEval.Transaction(oldTxn1.Sign(secrets[0]), transactions.ApplyData{})
require.NoError(t, err)
err = blockEval.Transaction(oldTxn2.Sign(secrets[0]), transactions.ApplyData{})
require.NoError(t, err)

// simulate this transaction was applied
ufblk, err := blockEval.GenerateBlock(nil)
require.NoError(t, err)
block := ledgercore.MakeValidatedBlock(ufblk.UnfinishedBlock(), ufblk.UnfinishedDeltas())
err = ledger.AddValidatedBlock(block, agreement.Certificate{})
require.NoError(t, err)

// trigger hitting pending queue max
wi.unverifiedTxGroup = []transactions.SignedTxn{txn1.Sign(secrets[0])}
for i := 0; i <= cfg.TxPoolSize; i++ {
txn := txn1
Expand Down Expand Up @@ -2288,24 +2313,30 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
// trigger TransactionInLedgerError (txid) error
wi.unverifiedTxGroup = []transactions.SignedTxn{txn1.Sign(secrets[0])}
wi.rawmsg = &network.IncomingMessage{}
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi) // calls Remember
handler.postProcessCheckedTxn(&wi) // calls Remember again
require.Equal(t, 1, getMetricCounter(txPoolRememberTagTxIDEval))
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagTxIDEval))
// check transaction committed in round 1 (calls Ledger.CheckDup)
wi.unverifiedTxGroup = []transactions.SignedTxn{oldTxn1.Sign(secrets[0])}
handler.checkAlreadyCommitted(&wi) // calls Test
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagTxID))

// trigger LeaseInLedgerError (lease) error
txn2 = txn1
crypto.RandBytes(txn2.Lease[:])
txn3 := txn2
txn3.Receiver = addr
wi.unverifiedTxGroup = []transactions.SignedTxn{txn2.Sign(secrets[0])}
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi) // calls Remember
wi.unverifiedTxGroup = []transactions.SignedTxn{txn3.Sign(secrets[0])}
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi) // calls Remember again
require.Equal(t, 1, getMetricCounter(txPoolRememberTagLeaseEval))
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagLeaseEval))
// check transaction lease conflict with round 1 txn (calls Ledger.CheckDup)
oldTxn3 := oldTxn2
oldTxn3.Receiver = addr
wi.unverifiedTxGroup = []transactions.SignedTxn{oldTxn3.Sign(secrets[0])}
handler.checkAlreadyCommitted(&wi) // calls Test
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagLease))

// TODO: not sure how to trigger fee error - need to return ErrNoSpace from ledger
// trigger pool fee error
Expand All @@ -2323,7 +2354,7 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
ledger.RegisterBlockListeners(blockListeners)

// add few blocks: on ci sometimes blockTicker is not fired in time in case of a single block
for i := basics.Round(1); i <= 3; i++ {
for i := basics.Round(2); i <= 4; i++ {
hdr := bookkeeping.BlockHeader{
Round: i,
UpgradeState: bookkeeping.UpgradeState{
Expand Down
48 changes: 41 additions & 7 deletions ledger/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,18 +904,50 @@ func (eval *BlockEvaluator) ResetTxnBytes() {
eval.blockTxBytes = 0
}

// TestEvalContext defines the evaluation context required by TestBlockEvaluator
// to check for well-formedness and duplicate detection.
type TestEvalContext interface {
Proto() config.ConsensusParams
Specials() transactions.SpecialAddresses
TxnContext() transactions.TxnContext
CheckDup(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error
}

// Proto implements the TestEvalContext interface.
func (eval *BlockEvaluator) Proto() config.ConsensusParams { return eval.proto }

// Specials implements the TestEvalContext interface.
func (eval *BlockEvaluator) Specials() transactions.SpecialAddresses { return eval.specials }

// TxnContext implements the TestEvalContext interface.
func (eval *BlockEvaluator) TxnContext() transactions.TxnContext { return eval.block }

// CheckDup implements the TestEvalContext interface.
func (eval *BlockEvaluator) CheckDup(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error {
return eval.state.checkDup(firstValid, lastValid, txid, txl)
}

// TestTransactionGroup is only called by tests.
func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTxn) error {
return TestBlockEvaluator{eval}.TestTransactionGroup(txgroup)
}

// TestBlockEvaluator uses a TestEvalContext to perform basic transaction checks.
type TestBlockEvaluator struct{ TestEvalContext }

// TestTransactionGroup performs basic duplicate detection and well-formedness checks
// on a transaction group, but does not actually add the transactions to the block
// evaluator, or modify the block evaluator state in any other visible way.
func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTxn) error {
// It uses a TestEvalContext to access needed recent ledger state.
func (eval TestBlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTxn) error {
// Nothing to do if there are no transactions.
if len(txgroup) == 0 {
return nil
}

if len(txgroup) > eval.proto.MaxTxGroupSize {
if len(txgroup) > eval.Proto().MaxTxGroupSize {
return &ledgercore.TxGroupMalformedError{
Msg: fmt.Sprintf("group size %d exceeds maximum %d", len(txgroup), eval.proto.MaxTxGroupSize),
Msg: fmt.Sprintf("group size %d exceeds maximum %d", len(txgroup), eval.Proto().MaxTxGroupSize),
Reason: ledgercore.TxGroupMalformedErrorReasonExceedMaxSize,
}
}
Expand Down Expand Up @@ -966,22 +998,23 @@ func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTx
// TestTransaction performs basic duplicate detection and well-formedness checks
// on a single transaction, but does not actually add the transaction to the block
// evaluator, or modify the block evaluator state in any other visible way.
func (eval *BlockEvaluator) TestTransaction(txn transactions.SignedTxn) error {
func (eval TestBlockEvaluator) TestTransaction(txn transactions.SignedTxn) error {
// Transaction valid (not expired)?
err := txn.Txn.Alive(eval.block)
err := txn.Txn.Alive(eval.TxnContext())
if err != nil {
return err
}

err = txn.Txn.WellFormed(eval.specials, eval.proto)
err = txn.Txn.WellFormed(eval.Specials(), eval.Proto())
if err != nil {
txnErr := ledgercore.TxnNotWellFormedError(fmt.Sprintf("transaction %v: malformed: %v", txn.ID(), err))
return &txnErr
}

// Transaction already in the ledger?
txid := txn.ID()
err = eval.state.checkDup(txn.Txn.First(), txn.Txn.Last(), txid, ledgercore.Txlease{Sender: txn.Txn.Sender, Lease: txn.Txn.Lease})
// BlockEvaluator.transaction will check again using cow.checkDup later, if the pool tries to add this transaction to the block.
err = eval.CheckDup(txn.Txn.First(), txn.Txn.Last(), txid, ledgercore.Txlease{Sender: txn.Txn.Sender, Lease: txn.Txn.Lease})
if err != nil {
return err
}
Expand Down Expand Up @@ -1163,6 +1196,7 @@ func (eval *BlockEvaluator) transaction(txn transactions.SignedTxn, evalParams *
}

// Transaction already in the ledger?
// this checks against the txns added to this evaluator; testTransaction currently only checks against committed txns.
err = cow.checkDup(txn.Txn.First(), txn.Txn.Last(), txid, ledgercore.Txlease{Sender: txn.Txn.Sender, Lease: txn.Txn.Lease})
if err != nil {
return err
Expand Down
Loading