From 5e141d57322658725a42b6573af2481c2063865e Mon Sep 17 00:00:00 2001 From: Daniel T <30197399+danwt@users.noreply.github.com> Date: Thu, 8 Aug 2024 11:35:58 +0100 Subject: [PATCH] fix(submit loop): add more logging around skew calculation (#1000) --- block/executor_test.go | 2 +- block/manager.go | 17 ++++++------- block/manager_test.go | 4 +-- block/produce.go | 2 +- block/state.go | 2 +- block/submit.go | 43 ++++++++++++++++++++++---------- block/submit_loop_test.go | 11 ++------ go.mod | 1 + go.sum | 2 ++ store/store_test.go | 4 +-- test/loadtime/cmd/report/main.go | 2 +- testutil/types.go | 2 +- types/serialization.go | 4 +-- types/serialization_test.go | 2 +- utils/atomic/funcs.go | 15 +++++++++++ utils/atomic/funcs_test.go | 35 ++++++++++++++++++++++++++ utils/errors/err_group.go | 24 ++++++++++++++++++ 17 files changed, 129 insertions(+), 43 deletions(-) create mode 100644 utils/atomic/funcs.go create mode 100644 utils/atomic/funcs_test.go create mode 100644 utils/errors/err_group.go diff --git a/block/executor_test.go b/block/executor_test.go index 536560cd0..86e42f58d 100644 --- a/block/executor_test.go +++ b/block/executor_test.go @@ -142,7 +142,7 @@ func TestApplyBlock(t *testing.T) { Validators: tmtypes.NewValidatorSet(nil), } state.InitialHeight = 1 - state.LastBlockHeight.Store(0) + state.SetHeight(0) maxBytes := uint64(100) state.ConsensusParams.Block.MaxBytes = int64(maxBytes) state.ConsensusParams.Block.MaxGas = 100000 diff --git a/block/manager.go b/block/manager.go index edced5f02..38491e93a 100644 --- a/block/manager.go +++ b/block/manager.go @@ -13,6 +13,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/dymensionxyz/dymint/store" + uerrors "github.com/dymensionxyz/dymint/utils/errors" uevent "github.com/dymensionxyz/dymint/utils/event" "github.com/libp2p/go-libp2p/core/crypto" @@ -164,27 +165,25 @@ func (m *Manager) Start(ctx context.Context) error { <-m.DAClient.Synced() nBytes := m.GetUnsubmittedBytes() bytesProducedC := make(chan int) - go func() { - bytesProducedC <- nBytes - }() - eg.Go(func() error { + uerrors.ErrGroupGoLog(eg, m.logger, func() error { return m.SubmitLoop(ctx, bytesProducedC) }) - eg.Go(func() error { + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + bytesProducedC <- nBytes return m.ProduceBlockLoop(ctx, bytesProducedC) }) } else { - eg.Go(func() error { + uerrors.ErrGroupGoLog(eg, m.logger, func() error { return m.RetrieveLoop(ctx) }) - eg.Go(func() error { + uerrors.ErrGroupGoLog(eg, m.logger, func() error { return m.SyncToTargetHeightLoop(ctx) }) } go func() { - err := eg.Wait() - m.logger.Info("Block manager err group finished.", "err", err) + _ = eg.Wait() // errors are already logged + m.logger.Info("Block manager err group finished.") }() return nil diff --git a/block/manager_test.go b/block/manager_test.go index 51f7f9f3c..f38a07fe1 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -89,7 +89,7 @@ func TestInitialState(t *testing.T) { store: fullStore, genesis: genesis, expectedInitialHeight: sampleState.InitialHeight, - expectedLastBlockHeight: sampleState.LastBlockHeight.Load(), + expectedLastBlockHeight: sampleState.Height(), expectedChainID: sampleState.ChainID, }, } @@ -103,7 +103,7 @@ func TestInitialState(t *testing.T) { assert.NotNil(agg) assert.Equal(c.expectedChainID, agg.State.ChainID) assert.Equal(c.expectedInitialHeight, agg.State.InitialHeight) - assert.Equal(c.expectedLastBlockHeight, agg.State.LastBlockHeight.Load()) + assert.Equal(c.expectedLastBlockHeight, agg.State.Height()) }) } } diff --git a/block/produce.go b/block/produce.go index 8fe960c03..3266fbaf2 100644 --- a/block/produce.go +++ b/block/produce.go @@ -78,7 +78,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) default: evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)} uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList) - m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission." + + m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission. " + "Pausing block production until a signal is consumed.") select { case <-ctx.Done(): diff --git a/block/state.go b/block/state.go index ea49dbacd..fa2a7e0ad 100644 --- a/block/state.go +++ b/block/state.go @@ -66,7 +66,7 @@ func NewStateFromGenesis(genDoc *tmtypes.GenesisDoc) (*types.State, error) { ConsensusParams: *genDoc.ConsensusParams, LastHeightConsensusParamsChanged: genDoc.InitialHeight, } - s.LastBlockHeight.Store(0) + s.SetHeight(0) copy(s.AppHash[:], genDoc.AppHash) return &s, nil diff --git a/block/submit.go b/block/submit.go index 758a4b0b4..4b714593f 100644 --- a/block/submit.go +++ b/block/submit.go @@ -12,6 +12,7 @@ import ( "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/types" + uatomic "github.com/dymensionxyz/dymint/utils/atomic" uchannel "github.com/dymensionxyz/dymint/utils/channel" ) @@ -23,7 +24,8 @@ import ( func (m *Manager) SubmitLoop(ctx context.Context, bytesProduced chan int, ) (err error) { - return SubmitLoopInner(ctx, + return SubmitLoopInner( + ctx, m.logger, bytesProduced, m.Conf.MaxBatchSkew, @@ -34,7 +36,8 @@ func (m *Manager) SubmitLoop(ctx context.Context, } // SubmitLoopInner is a unit testable impl of SubmitLoop -func SubmitLoopInner(ctx context.Context, +func SubmitLoopInner( + ctx context.Context, logger types.Logger, bytesProduced chan int, // a channel of block and commit bytes produced maxBatchSkew uint64, // max number of batches that submitter is allowed to have pending @@ -74,6 +77,7 @@ func SubmitLoopInner(ctx context.Context, return ctx.Err() case n := <-bytesProduced: pendingBytes.Add(uint64(n)) + logger.Info("Added bytes produced to bytes pending submission counter.", "n", n) case <-ticker.C: } } @@ -104,14 +108,17 @@ func SubmitLoopInner(ctx context.Context, break } nConsumed, err := createAndSubmitBatch(min(pending, maxBatchBytes)) - if errors.Is(err, gerrc.ErrInternal) { - panic(fmt.Sprintf("create and submit batch: %v", err)) - } if err != nil { - return fmt.Errorf("create and submit batch: %w", err) + err = fmt.Errorf("create and submit batch: %w", err) + if errors.Is(err, gerrc.ErrInternal) { + logger.Error("Create and submit batch", "err", err, "pending", pending) + panic(err) + } + return err } timeLastSubmission = time.Now() - pending = pendingBytes.Add(^(nConsumed - 1)) // subtract + pending = uatomic.Uint64Sub(&pendingBytes, nConsumed) + logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level } trigger.Nudge() } @@ -134,11 +141,26 @@ func (m *Manager) CreateAndSubmitBatchGetSizeBlocksCommits(maxSize uint64) (uint // CreateAndSubmitBatch creates and submits a batch to the DA and SL. // max size bytes is the maximum size of the serialized batch type func (m *Manager) CreateAndSubmitBatch(maxSizeBytes uint64) (*types.Batch, error) { - b, err := m.CreateBatch(maxSizeBytes, m.NextHeightToSubmit(), m.State.Height()) + startHeight := m.NextHeightToSubmit() + endHeightInclusive := m.State.Height() + + if endHeightInclusive < startHeight { + // TODO: https://github.com/dymensionxyz/dymint/issues/999 + return nil, fmt.Errorf( + "next height to submit is greater than last block height, create and submit batch should not have been called: start height: %d: end height inclusive: %d: %w", + startHeight, + endHeightInclusive, + gerrc.ErrInternal, + ) + } + + b, err := m.CreateBatch(maxSizeBytes, startHeight, endHeightInclusive) if err != nil { return nil, fmt.Errorf("create batch: %w", err) } + m.logger.Info("Created batch.", "start height", startHeight, "end height", endHeightInclusive) + if err := m.SubmitBatch(b); err != nil { return nil, fmt.Errorf("submit batch: %w", err) } @@ -148,7 +170,6 @@ func (m *Manager) CreateAndSubmitBatch(maxSizeBytes uint64) (*types.Batch, error // CreateBatch looks through the store for any unsubmitted blocks and commits and bundles them into a batch // max size bytes is the maximum size of the serialized batch type func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeightInclusive uint64) (*types.Batch, error) { - m.logger.Info("Creating batch", "start height", startHeight, "end height", endHeightInclusive) batchSize := endHeightInclusive - startHeight + 1 batch := &types.Batch{ Blocks: make([]*types.Block, 0, batchSize), @@ -182,10 +203,6 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight } } - if batch.NumBlocks() == 0 { - return nil, fmt.Errorf("empty batch: %w", gerrc.ErrInternal) - } - return batch, nil } diff --git a/block/submit_loop_test.go b/block/submit_loop_test.go index be6dc7e73..13afa6230 100644 --- a/block/submit_loop_test.go +++ b/block/submit_loop_test.go @@ -10,6 +10,7 @@ import ( "github.com/dymensionxyz/dymint/block" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/log" ) type testArgs struct { @@ -104,15 +105,7 @@ func testSubmitLoopInner( return uint64(consumed), nil } - block.SubmitLoopInner( - ctx, - nil, - producedBytesC, - args.batchSkew, - args.maxTime, - args.batchBytes, - submitBatch, - ) + block.SubmitLoopInner(ctx, log.TestingLogger(), producedBytesC, args.batchSkew, args.maxTime, args.batchBytes, submitBatch) } // Make sure the producer does not get too far ahead diff --git a/go.mod b/go.mod index 7016f5bf5..0686207fd 100644 --- a/go.mod +++ b/go.mod @@ -272,6 +272,7 @@ require ( github.com/rogpeppe/go-internal v1.11.0 // indirect go.uber.org/mock v0.4.0 // indirect golang.org/x/tools v0.18.0 // indirect + pgregory.net/rapid v1.1.0 // indirect ) replace ( diff --git a/go.sum b/go.sum index 23ba0f16f..bf884d367 100644 --- a/go.sum +++ b/go.sum @@ -1527,6 +1527,8 @@ lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI= lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k= nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= +pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= +pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/store/store_test.go b/store/store_test.go index fba5db3b9..1c6080fb6 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -98,7 +98,7 @@ func TestLoadState(t *testing.T) { NextValidators: validatorSet, Validators: validatorSet, } - s.LastBlockHeight.Store(expectedHeight) + s.SetHeight(expectedHeight) _, err := s1.SaveState(s, nil) assert.NoError(err) @@ -106,7 +106,7 @@ func TestLoadState(t *testing.T) { state, err := s2.LoadState() assert.NoError(err) - assert.Equal(expectedHeight, state.LastBlockHeight.Load()) + assert.Equal(expectedHeight, state.Height()) } func TestBlockResponses(t *testing.T) { diff --git a/test/loadtime/cmd/report/main.go b/test/loadtime/cmd/report/main.go index cbb68ce41..c981a65e4 100644 --- a/test/loadtime/cmd/report/main.go +++ b/test/loadtime/cmd/report/main.go @@ -53,7 +53,7 @@ func newBlockStore(kvstore store.KV, baseHeight uint64) *BlockStore { return &BlockStore{ DefaultStore: store, base: state.BaseHeight, - height: state.LastBlockHeight.Load(), + height: state.Height(), } } diff --git a/testutil/types.go b/testutil/types.go index 4c499ade4..622ec4667 100644 --- a/testutil/types.go +++ b/testutil/types.go @@ -229,7 +229,7 @@ func GenerateState(initialHeight int64, lastBlockHeight int64) *types.State { Validators: GenerateRandomValidatorSet(), NextValidators: GenerateRandomValidatorSet(), } - s.LastBlockHeight.Store(uint64(lastBlockHeight)) + s.SetHeight(uint64(lastBlockHeight)) return s } diff --git a/types/serialization.go b/types/serialization.go index 385bdde28..60baf29bf 100644 --- a/types/serialization.go +++ b/types/serialization.go @@ -261,7 +261,7 @@ func (s *State) ToProto() (*pb.State, error) { Version: &s.Version, ChainId: s.ChainID, InitialHeight: int64(s.InitialHeight), - LastBlockHeight: int64(s.LastBlockHeight.Load()), + LastBlockHeight: int64(s.Height()), NextValidators: nextValidators, Validators: validators, BaseHeight: s.BaseHeight, @@ -279,7 +279,7 @@ func (s *State) FromProto(other *pb.State) error { s.Version = *other.Version s.ChainID = other.ChainId s.InitialHeight = uint64(other.InitialHeight) - s.LastBlockHeight.Store(uint64(other.LastBlockHeight)) + s.SetHeight(uint64(other.LastBlockHeight)) s.BaseHeight = other.BaseHeight s.NextValidators, err = types.ValidatorSetFromProto(other.NextValidators) diff --git a/types/serialization_test.go b/types/serialization_test.go index ece4be054..c2cf676cc 100644 --- a/types/serialization_test.go +++ b/types/serialization_test.go @@ -154,7 +154,7 @@ func TestStateRoundTrip(t *testing.T) { assert := assert.New(t) if c.state.InitialHeight != 0 { - c.state.LastBlockHeight.Store(986321) + c.state.SetHeight(986321) } pState, err := c.state.ToProto() diff --git a/utils/atomic/funcs.go b/utils/atomic/funcs.go new file mode 100644 index 000000000..1812d0959 --- /dev/null +++ b/utils/atomic/funcs.go @@ -0,0 +1,15 @@ +package atomic + +import ( + "sync/atomic" +) + +/* +TODO: move to sdk-utils +*/ + +// Uint64Sub does x := x-y and returns the new value of x +func Uint64Sub(x *atomic.Uint64, y uint64) uint64 { + // Uses math + return x.Add(^(y - 1)) +} diff --git a/utils/atomic/funcs_test.go b/utils/atomic/funcs_test.go new file mode 100644 index 000000000..f0155893f --- /dev/null +++ b/utils/atomic/funcs_test.go @@ -0,0 +1,35 @@ +package atomic + +import ( + "flag" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + "pgregory.net/rapid" +) + +func TestUint64Sub(t *testing.T) { + _ = flag.Set("rapid.checks", "50") + _ = flag.Set("rapid.steps", "50") + + rapid.Check(t, func(r *rapid.T) { + exp := uint64(0) + got := atomic.Uint64{} + r.Repeat(map[string]func(r *rapid.T){ + "": func(r *rapid.T) { + require.Equal(t, exp, got.Load()) + }, + "add": func(r *rapid.T) { + d := rapid.Uint64().Draw(r, "d") + exp += d + got.Add(d) + }, + "sub": func(r *rapid.T) { + d := rapid.Uint64().Draw(r, "d") + exp -= d + Uint64Sub(&got, d) + }, + }) + }) +} diff --git a/utils/errors/err_group.go b/utils/errors/err_group.go new file mode 100644 index 000000000..c4d82409a --- /dev/null +++ b/utils/errors/err_group.go @@ -0,0 +1,24 @@ +package errors + +import ( + "github.com/dymensionxyz/dymint/types" + "golang.org/x/sync/errgroup" +) + +/* +TODO: move to sdk-utils +*/ + +// ErrGroupGoLog calls eg.Go on the errgroup but it will log the error immediately when it occurs +// instead of waiting for all goroutines in the group to finish first. This has the advantage of making sure all +// errors are logged, not just the first one, and it is more immediate. Also, it is guaranteed, in case that +// of the goroutines is not properly context aware. +func ErrGroupGoLog(eg *errgroup.Group, logger types.Logger, fn func() error) { + eg.Go(func() error { + err := fn() + if err != nil { + logger.Error("ErrGroup goroutine.", "err", err) + } + return err + }) +}