diff --git a/gossip/common_test.go b/gossip/common_test.go index 3817f5036..a05671f24 100644 --- a/gossip/common_test.go +++ b/gossip/common_test.go @@ -8,7 +8,6 @@ import ( "math" "math/big" "sync" - "sync/atomic" "time" "github.com/Fantom-foundation/lachesis-base/abft" @@ -111,7 +110,10 @@ func (em testEmitterWorldExternal) Build(e *inter.MutableEventPayload, onIndexed return err } -func (em testEmitterWorldExternal) Broadcast(*inter.EventPayload) {} +func (em testEmitterWorldExternal) Broadcast(emitted *inter.EventPayload) { + // PM listens and will broadcast it + em.env.feed.newEmittedEvent.Send(emitted) +} type testConfirmedEventsProcessor struct { blockproc.ConfirmedEventsProcessor @@ -199,6 +201,7 @@ func newTestEnv(firstEpoch idx.Epoch, validatorsNum idx.Validator) *testEnv { env.RegisterEmitter(em) env.pubkeys = append(env.pubkeys, pubkey) em.Start() + em.Stop() // to control emitting manually } _ = env.store.GenerateSnapshotAt(common.Hash(store.GetBlockState().FinalizedStateRoot), false) @@ -221,35 +224,48 @@ func (env *testEnv) GetEvmStateReader() *EvmStateReader { } func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) { - env.t = env.t.Add(spent) - - env.callback.onEventConfirmed = func(e inter.EventI) { - for _, em := range env.emitters { - em.OnEventConfirmed(e) - } - } + return env.applyTxs(spent, false, txs...) +} - rest := int32(len(txs)) - waitFor := make(map[common.Hash]struct{}, len(txs)) - for _, tx := range txs { +func (env *testEnv) BlockTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) { + return env.applyTxs(spent, true, txs...) +} - g, err := tx.EffectiveGasTip(env.store.GetRules().Economy.MinGasPrice) +func (env *testEnv) applyTxs(spent time.Duration, singleBlock bool, txs ...*types.Transaction) (types.Receipts, error) { + env.t = env.t.Add(spent) - fmt.Printf("->> TX: %s, nonce: %d, g: %d, err: %s\n", tx.Hash().String(), tx.Nonce(), g, err) - waitFor[tx.Hash()] = struct{}{} + waitForInEvents := make(map[common.Hash]struct{}, len(txs)) + waitForInBlocks := make(map[common.Hash]struct{}, len(txs)) + for _, tx := range txs { + waitForInEvents[tx.Hash()] = struct{}{} + waitForInBlocks[tx.Hash()] = struct{}{} } - receipts := make(types.Receipts, 0, len(txs)) - wg := new(sync.WaitGroup) - wg.Add(1) + wg.Add(2) defer wg.Wait() + newEvents := make(chan *inter.EventPayload) + defer close(newEvents) + eventsSub := env.feed.SubscribeNewEmitted(newEvents) + defer eventsSub.Unsubscribe() + go func() { + defer wg.Done() + for e := range newEvents { + for _, tx := range e.Txs() { + h := tx.Hash() + delete(waitForInEvents, h) + // env.txpool.(*dummyTxPool).Delete(tx.Hash()) + } + } + }() + + receipts := make(types.Receipts, 0, len(txs)) + newBlocks := make(chan evmcore.ChainHeadNotify) defer close(newBlocks) - chainHeadSub := env.feed.SubscribeNewBlock(newBlocks) - defer chainHeadSub.Unsubscribe() - + blocksSub := env.feed.SubscribeNewBlock(newBlocks) + defer blocksSub.Unsubscribe() go func() { defer wg.Done() for b := range newBlocks { @@ -260,11 +276,9 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty rr := env.store.evm.GetReceipts(n, env.EthAPI.signer, b.Block.Hash, b.Block.Transactions) for _, r := range rr { env.txpool.(*dummyTxPool).Delete(r.TxHash) - if _, ok := waitFor[r.TxHash]; ok { - fmt.Printf("<<- TX: %s, g: %d\n", r.TxHash.String(), r.GasUsed) + if _, ok := waitForInBlocks[r.TxHash]; ok { receipts = append(receipts, r) - delete(waitFor, r.TxHash) - atomic.AddInt32(&rest, -1) + delete(waitForInBlocks, r.TxHash) } } } @@ -298,22 +312,32 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty } } env.txpool.(*dummyTxPool).Delete(tx) - if _, ok := waitFor[tx]; ok { - fmt.Printf("<<- TX: %s, err: skipped\n", tx.String()) - delete(waitFor, tx) - atomic.AddInt32(&rest, -1) + if _, ok := waitForInBlocks[tx]; ok { + delete(waitForInBlocks, tx) } } } } }() + byEmitters := func() []*emitter.Emitter { + // datarace does not matter + if singleBlock && len(waitForInEvents) > 0 { + count := len(env.emitters) * 70 / 100 + // prevent block creation + return env.emitters[:count] + } + if len(waitForInBlocks) > 0 { + // allow block creation + return env.emitters + } + // ready to stop + return nil + } + env.txpool.AddRemotes(txs) defer env.txpool.(*dummyTxPool).Clear() - - err := env.EmitUntil(func() bool { - return atomic.LoadInt32(&rest) == 0 - }) + err := env.EmitUntil(byEmitters) return receipts, err } @@ -346,16 +370,24 @@ func (env *testEnv) ApplyMPs(spent time.Duration, mps ...inter.MisbehaviourProof env.callback.buildEvent = nil }() - return env.EmitUntil(func() bool { - return confirmed - }) -} + byEmitters := func() []*emitter.Emitter { + if !confirmed { + return env.emitters + } + return nil + } -func (env *testEnv) EmitUntil(stop func() bool) error { - t := time.Now() + return env.EmitUntil(byEmitters) +} - for !stop() { - for _, em := range env.emitters { +func (env *testEnv) EmitUntil(by func() []*emitter.Emitter) error { + start := time.Now() + for { + emitters := by() + if len(emitters) < 1 { + break + } + for _, em := range emitters { _, err := em.EmitEvent() if err != nil { return err @@ -363,7 +395,7 @@ func (env *testEnv) EmitUntil(stop func() bool) error { } env.WaitBlockEnd() env.t = env.t.Add(time.Second) - if time.Since(t) > 30*time.Second { + if time.Since(start) > 30*time.Second { panic("block doesn't get processed") } } diff --git a/gossip/txposition_test.go b/gossip/txposition_test.go index 2eb62f56e..e277d7071 100644 --- a/gossip/txposition_test.go +++ b/gossip/txposition_test.go @@ -5,6 +5,7 @@ import ( "math/big" "testing" + "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/require" "github.com/Fantom-foundation/go-opera/gossip/contract/ballot" @@ -25,44 +26,49 @@ func TestTxIndexing(t *testing.T) { ballotOption("Option 3"), } - // valid tx - _, tx1ok, cBallot, err := ballot.DeployBallot(env.Pay(1), env, proposals) + // preparing + _, tx1pre, cBallot, err := ballot.DeployBallot(env.Pay(1), env, proposals) require.NoError(err) require.NotNil(cBallot) - require.NotNil(tx1ok) - - // invalid tx - tx2reverted, err := cBallot.Vote(env.Pay(2), big.NewInt(0)) + require.NotNil(tx1pre) + tx2pre, err := cBallot.GiveRightToVote(env.Pay(1), env.Address(3)) require.NoError(err) - require.NotNil(tx2reverted) - - // valid tx - tx3ok, err := cBallot.GiveRightToVote(env.Pay(1), env.Address(3)) + require.NotNil(tx2pre) + receipts, err := env.BlockTxs(nextEpoch, + tx1pre, + tx2pre, + ) require.NoError(err) - require.NotNil(tx3ok) - + require.Len(receipts, 2) + for i, r := range receipts { + require.Equal(types.ReceiptStatusSuccessful, r.Status, i) + } // invalid tx - tx4skipped, err := cBallot.Vote(withLowGas(env.Pay(2)), big.NewInt(0)) + tx1reverted, err := cBallot.Vote(env.Pay(2), big.NewInt(0)) require.NoError(err) - require.NotNil(tx4skipped) - + require.NotNil(tx1reverted) // valid tx - tx5ok, err := cBallot.Vote(env.Pay(3), big.NewInt(0)) + tx2ok, err := cBallot.Vote(env.Pay(3), big.NewInt(0)) require.NoError(err) - require.NotNil(tx5ok) + require.NotNil(tx2ok) + // skipped tx + _, tx3skipped, _, err := ballot.DeployBallot(withLowGas(env.Pay(1)), env, proposals) + require.NoError(err) + require.NotNil(tx3skipped) - receipts, err := env.ApplyTxs(nextEpoch, - tx1ok, - tx2reverted, - tx3ok, - tx4skipped, - tx5ok, + receipts, err = env.BlockTxs(nextEpoch, + tx1reverted, + tx2ok, + tx3skipped, ) require.NoError(err) - - for _, r := range receipts { - fmt.Printf(">>>>>>>>> tx[%s] status %d\n", r.TxHash.String(), r.Status) + require.Len(receipts, 3) + var block *big.Int + for i, r := range receipts { + if block == nil { + block = r.BlockNumber + } + require.Equal(block.Uint64(), r.BlockNumber.Uint64(), i) } - require.Len(receipts, 0) }