diff --git a/block/block.go b/block/block.go index fe961a854..a21ad6d22 100644 --- a/block/block.go +++ b/block/block.go @@ -92,7 +92,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta } // Prune old heights, if requested by ABCI app. if 0 < retainHeight { - err = m.pruneBlocks(uint64(retainHeight)) + err = m.PruneBlocks(uint64(retainHeight)) if err != nil { m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err) } diff --git a/block/executor_test.go b/block/executor_test.go index 536560cd0..86e42f58d 100644 --- a/block/executor_test.go +++ b/block/executor_test.go @@ -142,7 +142,7 @@ func TestApplyBlock(t *testing.T) { Validators: tmtypes.NewValidatorSet(nil), } state.InitialHeight = 1 - state.LastBlockHeight.Store(0) + state.SetHeight(0) maxBytes := uint64(100) state.ConsensusParams.Block.MaxBytes = int64(maxBytes) state.ConsensusParams.Block.MaxGas = 100000 diff --git a/block/manager.go b/block/manager.go index 95b1f6200..b1c6239a9 100644 --- a/block/manager.go +++ b/block/manager.go @@ -12,6 +12,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/dymensionxyz/dymint/store" + uerrors "github.com/dymensionxyz/dymint/utils/errors" uevent "github.com/dymensionxyz/dymint/utils/event" "github.com/libp2p/go-libp2p/core/crypto" @@ -152,17 +153,15 @@ func (m *Manager) Start(ctx context.Context) error { <-m.DAClient.Synced() nBytes := m.GetUnsubmittedBytes() bytesProducedC := make(chan int) - go func() { - bytesProducedC <- nBytes - }() err = m.syncFromSettlement() if err != nil { return fmt.Errorf("sync block manager from settlement: %w", err) } - eg.Go(func() error { + uerrors.ErrGroupGoLog(eg, m.logger, func() error { return m.SubmitLoop(ctx, bytesProducedC) }) - eg.Go(func() error { + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + bytesProducedC <- nBytes return m.ProduceBlockLoop(ctx, bytesProducedC) }) @@ -180,8 +179,13 @@ func (m *Manager) Start(ctx context.Context) error { // P2P Sync. Subscribe to P2P received blocks events go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger) go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger) - } + + go func() { + _ = eg.Wait() // errors are already logged + m.logger.Info("Block manager err group finished.") + }() + return nil } diff --git a/block/manager_test.go b/block/manager_test.go index 5915b60b5..58bf17f98 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -91,7 +91,7 @@ func TestInitialState(t *testing.T) { store: fullStore, genesis: genesis, expectedInitialHeight: sampleState.InitialHeight, - expectedLastBlockHeight: sampleState.LastBlockHeight.Load(), + expectedLastBlockHeight: sampleState.Height(), expectedChainID: sampleState.ChainID, }, } @@ -105,7 +105,7 @@ func TestInitialState(t *testing.T) { assert.NotNil(agg) assert.Equal(c.expectedChainID, agg.State.ChainID) assert.Equal(c.expectedInitialHeight, agg.State.InitialHeight) - assert.Equal(c.expectedLastBlockHeight, agg.State.LastBlockHeight.Load()) + assert.Equal(c.expectedLastBlockHeight, agg.State.Height()) }) } } diff --git a/block/produce.go b/block/produce.go index ab7b30d43..4690f8d47 100644 --- a/block/produce.go +++ b/block/produce.go @@ -79,7 +79,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) default: evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)} uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList) - m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission." + + m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission. " + "Pausing block production until a signal is consumed.") select { case <-ctx.Done(): diff --git a/block/pruning.go b/block/pruning.go index 026f99262..bc222783d 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -7,7 +7,7 @@ import ( "github.com/dymensionxyz/gerr-cosmos/gerrc" ) -func (m *Manager) pruneBlocks(retainHeight uint64) error { +func (m *Manager) PruneBlocks(retainHeight uint64) error { if m.IsSequencer() && m.NextHeightToSubmit() < retainHeight { // do not delete anything that we might submit in future return fmt.Errorf("cannot prune blocks before they have been submitted: retain height %d: next height to submit: %d: %w", retainHeight, diff --git a/block/pruning_test.go b/block/pruning_test.go new file mode 100644 index 000000000..50346eb88 --- /dev/null +++ b/block/pruning_test.go @@ -0,0 +1,57 @@ +package block_test + +import ( + "context" + "testing" + + "github.com/dymensionxyz/dymint/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/proxy" +) + +func TestPruningRetainHeight(t *testing.T) { + require := require.New(t) + app := testutil.GetAppMock() + ctx := context.Background() + // Create proxy app + clientCreator := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(clientCreator) + err := proxyApp.Start() + require.NoError(err) + + manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil) + require.NoError(err) + + // Check initial assertions + require.Zero(manager.State.Height()) + require.Zero(manager.LastSubmittedHeight.Load()) + + batchSize := 10 + + // Produce blocks + for i := 0; i < batchSize; i++ { + _, _, err = manager.ProduceAndGossipBlock(ctx, true) + require.NoError(err) + } + // submit and validate sync target + manager.CreateAndSubmitBatch(100000000) + lastSubmitted := manager.LastSubmittedHeight.Load() + assert.EqualValues(t, manager.State.Height(), lastSubmitted) + assert.Equal(t, lastSubmitted, uint64(batchSize)) + + // Produce new blocks + for i := 0; i < batchSize; i++ { + _, _, err = manager.ProduceAndGossipBlock(ctx, true) + require.NoError(err) + } + + validRetainHeight := lastSubmitted + 1 // the max possible valid retain height + for i := validRetainHeight + 1; i < manager.State.Height(); i++ { + err = manager.PruneBlocks(i) + require.Error(err) // cannot prune blocks before they have been submitted + } + + err = manager.PruneBlocks(validRetainHeight) + require.NoError(err) +} diff --git a/block/state.go b/block/state.go index c97e62a99..ef8e2a8c9 100644 --- a/block/state.go +++ b/block/state.go @@ -66,7 +66,7 @@ func NewStateFromGenesis(genDoc *tmtypes.GenesisDoc) (*types.State, error) { ConsensusParams: *genDoc.ConsensusParams, LastHeightConsensusParamsChanged: genDoc.InitialHeight, } - s.LastBlockHeight.Store(0) + s.SetHeight(0) copy(s.AppHash[:], genDoc.AppHash) return &s, nil diff --git a/block/submit.go b/block/submit.go index fdd13d6c5..4b714593f 100644 --- a/block/submit.go +++ b/block/submit.go @@ -12,6 +12,7 @@ import ( "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/types" + uatomic "github.com/dymensionxyz/dymint/utils/atomic" uchannel "github.com/dymensionxyz/dymint/utils/channel" ) @@ -23,7 +24,9 @@ import ( func (m *Manager) SubmitLoop(ctx context.Context, bytesProduced chan int, ) (err error) { - return SubmitLoopInner(ctx, + return SubmitLoopInner( + ctx, + m.logger, bytesProduced, m.Conf.MaxBatchSkew, m.Conf.BatchSubmitMaxTime, @@ -33,7 +36,9 @@ func (m *Manager) SubmitLoop(ctx context.Context, } // SubmitLoopInner is a unit testable impl of SubmitLoop -func SubmitLoopInner(ctx context.Context, +func SubmitLoopInner( + ctx context.Context, + logger types.Logger, bytesProduced chan int, // a channel of block and commit bytes produced maxBatchSkew uint64, // max number of batches that submitter is allowed to have pending maxBatchTime time.Duration, // max time to allow between batches @@ -72,6 +77,7 @@ func SubmitLoopInner(ctx context.Context, return ctx.Err() case n := <-bytesProduced: pendingBytes.Add(uint64(n)) + logger.Info("Added bytes produced to bytes pending submission counter.", "n", n) case <-ticker.C: } } @@ -103,10 +109,16 @@ func SubmitLoopInner(ctx context.Context, } nConsumed, err := createAndSubmitBatch(min(pending, maxBatchBytes)) if err != nil { - return fmt.Errorf("create and submit batch: %w", err) + err = fmt.Errorf("create and submit batch: %w", err) + if errors.Is(err, gerrc.ErrInternal) { + logger.Error("Create and submit batch", "err", err, "pending", pending) + panic(err) + } + return err } timeLastSubmission = time.Now() - pending = pendingBytes.Add(^(nConsumed - 1)) // subtract + pending = uatomic.Uint64Sub(&pendingBytes, nConsumed) + logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level } trigger.Nudge() } @@ -129,11 +141,26 @@ func (m *Manager) CreateAndSubmitBatchGetSizeBlocksCommits(maxSize uint64) (uint // CreateAndSubmitBatch creates and submits a batch to the DA and SL. // max size bytes is the maximum size of the serialized batch type func (m *Manager) CreateAndSubmitBatch(maxSizeBytes uint64) (*types.Batch, error) { - b, err := m.CreateBatch(maxSizeBytes, m.NextHeightToSubmit(), m.State.Height()) + startHeight := m.NextHeightToSubmit() + endHeightInclusive := m.State.Height() + + if endHeightInclusive < startHeight { + // TODO: https://github.com/dymensionxyz/dymint/issues/999 + return nil, fmt.Errorf( + "next height to submit is greater than last block height, create and submit batch should not have been called: start height: %d: end height inclusive: %d: %w", + startHeight, + endHeightInclusive, + gerrc.ErrInternal, + ) + } + + b, err := m.CreateBatch(maxSizeBytes, startHeight, endHeightInclusive) if err != nil { return nil, fmt.Errorf("create batch: %w", err) } + m.logger.Info("Created batch.", "start height", startHeight, "end height", endHeightInclusive) + if err := m.SubmitBatch(b); err != nil { return nil, fmt.Errorf("submit batch: %w", err) } @@ -182,7 +209,7 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight func (m *Manager) SubmitBatch(batch *types.Batch) error { resultSubmitToDA := m.DAClient.SubmitBatch(batch) if resultSubmitToDA.Code != da.StatusSuccess { - return fmt.Errorf("da client submit batch: %s", resultSubmitToDA.Message) + return fmt.Errorf("da client submit batch: %s: %w", resultSubmitToDA.Message, resultSubmitToDA.Error) } m.logger.Info("Submitted batch to DA.", "start height", batch.StartHeight(), "end height", batch.EndHeight()) diff --git a/block/submit_loop_test.go b/block/submit_loop_test.go index 9faba65c1..13afa6230 100644 --- a/block/submit_loop_test.go +++ b/block/submit_loop_test.go @@ -10,6 +10,7 @@ import ( "github.com/dymensionxyz/dymint/block" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/log" ) type testArgs struct { @@ -104,14 +105,7 @@ func testSubmitLoopInner( return uint64(consumed), nil } - block.SubmitLoopInner( - ctx, - producedBytesC, - args.batchSkew, - args.maxTime, - args.batchBytes, - submitBatch, - ) + block.SubmitLoopInner(ctx, log.TestingLogger(), producedBytesC, args.batchSkew, args.maxTime, args.batchBytes, submitBatch) } // Make sure the producer does not get too far ahead diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index a73bafbb9..7429b05e3 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -207,6 +207,18 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS // TODO(srene): Split batch in multiple blobs if necessary if supported height, commitment, err := c.submit(data) + if errors.Is(err, gerrc.ErrInternal) { + // no point retrying if it's because of our code being wrong + err = fmt.Errorf("submit: %w", err) + return da.ResultSubmitBatch{ + BaseResult: da.BaseResult{ + Code: da.StatusError, + Message: err.Error(), + Error: err, + }, + } + } + if err != nil { c.logger.Error("Submit blob.", "error", err) types.RollappConsecutiveFailedDASubmission.Inc() @@ -521,11 +533,11 @@ func (c *DataAvailabilityLayerClient) checkBatchAvailability(daMetaData *da.DASu func (c *DataAvailabilityLayerClient) submit(daBlob da.Blob) (uint64, da.Commitment, error) { blobs, commitments, err := c.blobsAndCommitments(daBlob) if err != nil { - return 0, nil, fmt.Errorf("blobs and commitments: %w", err) + return 0, nil, fmt.Errorf("blobs and commitments: %w: %w", err, gerrc.ErrInternal) } if len(commitments) == 0 { - return 0, nil, fmt.Errorf("zero commitments: %w", gerrc.ErrNotFound) + return 0, nil, fmt.Errorf("zero commitments: %w: %w", gerrc.ErrNotFound, gerrc.ErrInternal) } blobSizes := make([]uint32, len(blobs)) diff --git a/go.mod b/go.mod index 4a107f286..0976f51e9 100644 --- a/go.mod +++ b/go.mod @@ -288,6 +288,7 @@ require ( github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect go.uber.org/mock v0.4.0 // indirect golang.org/x/tools v0.18.0 // indirect + pgregory.net/rapid v1.1.0 // indirect ) replace ( diff --git a/go.sum b/go.sum index 67a7b8d1a..c4673db35 100644 --- a/go.sum +++ b/go.sum @@ -1561,6 +1561,8 @@ lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI= lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k= nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= +pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= +pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/store/store_test.go b/store/store_test.go index 1b541c216..26db3190a 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -101,7 +101,7 @@ func TestLoadState(t *testing.T) { NextValidators: validatorSet, Validators: validatorSet, } - s.LastBlockHeight.Store(expectedHeight) + s.SetHeight(expectedHeight) _, err := s1.SaveState(s, nil) assert.NoError(err) @@ -109,7 +109,7 @@ func TestLoadState(t *testing.T) { state, err := s2.LoadState() assert.NoError(err) - assert.Equal(expectedHeight, state.LastBlockHeight.Load()) + assert.Equal(expectedHeight, state.Height()) } func TestBlockResponses(t *testing.T) { diff --git a/test/loadtime/cmd/report/main.go b/test/loadtime/cmd/report/main.go index cbb68ce41..c981a65e4 100644 --- a/test/loadtime/cmd/report/main.go +++ b/test/loadtime/cmd/report/main.go @@ -53,7 +53,7 @@ func newBlockStore(kvstore store.KV, baseHeight uint64) *BlockStore { return &BlockStore{ DefaultStore: store, base: state.BaseHeight, - height: state.LastBlockHeight.Load(), + height: state.Height(), } } diff --git a/testutil/types.go b/testutil/types.go index da7e70d94..09cee1bb6 100644 --- a/testutil/types.go +++ b/testutil/types.go @@ -216,6 +216,7 @@ func GenerateState(initialHeight int64, lastBlockHeight int64) *types.State { s := &types.State{ ChainID: "test-chain", InitialHeight: uint64(initialHeight), + BaseHeight: uint64(initialHeight), AppHash: [32]byte{}, LastResultsHash: GetEmptyLastResultsHash(), Version: tmstate.Version{ @@ -227,7 +228,7 @@ func GenerateState(initialHeight int64, lastBlockHeight int64) *types.State { Validators: GenerateRandomValidatorSet(), NextValidators: GenerateRandomValidatorSet(), } - s.LastBlockHeight.Store(uint64(lastBlockHeight)) + s.SetHeight(uint64(lastBlockHeight)) return s } diff --git a/types/batch_test.go b/types/batch_test.go new file mode 100644 index 000000000..1aa00f7e3 --- /dev/null +++ b/types/batch_test.go @@ -0,0 +1,14 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBatchSerialization(t *testing.T) { + batch := Batch{} + bz, err := batch.MarshalBinary() + require.Nil(t, err) + require.Empty(t, bz) +} diff --git a/types/serialization.go b/types/serialization.go index 385bdde28..60baf29bf 100644 --- a/types/serialization.go +++ b/types/serialization.go @@ -261,7 +261,7 @@ func (s *State) ToProto() (*pb.State, error) { Version: &s.Version, ChainId: s.ChainID, InitialHeight: int64(s.InitialHeight), - LastBlockHeight: int64(s.LastBlockHeight.Load()), + LastBlockHeight: int64(s.Height()), NextValidators: nextValidators, Validators: validators, BaseHeight: s.BaseHeight, @@ -279,7 +279,7 @@ func (s *State) FromProto(other *pb.State) error { s.Version = *other.Version s.ChainID = other.ChainId s.InitialHeight = uint64(other.InitialHeight) - s.LastBlockHeight.Store(uint64(other.LastBlockHeight)) + s.SetHeight(uint64(other.LastBlockHeight)) s.BaseHeight = other.BaseHeight s.NextValidators, err = types.ValidatorSetFromProto(other.NextValidators) diff --git a/types/serialization_test.go b/types/serialization_test.go index ece4be054..c2cf676cc 100644 --- a/types/serialization_test.go +++ b/types/serialization_test.go @@ -154,7 +154,7 @@ func TestStateRoundTrip(t *testing.T) { assert := assert.New(t) if c.state.InitialHeight != 0 { - c.state.LastBlockHeight.Store(986321) + c.state.SetHeight(986321) } pState, err := c.state.ToProto() diff --git a/utils/atomic/funcs.go b/utils/atomic/funcs.go new file mode 100644 index 000000000..1812d0959 --- /dev/null +++ b/utils/atomic/funcs.go @@ -0,0 +1,15 @@ +package atomic + +import ( + "sync/atomic" +) + +/* +TODO: move to sdk-utils +*/ + +// Uint64Sub does x := x-y and returns the new value of x +func Uint64Sub(x *atomic.Uint64, y uint64) uint64 { + // Uses math + return x.Add(^(y - 1)) +} diff --git a/utils/atomic/funcs_test.go b/utils/atomic/funcs_test.go new file mode 100644 index 000000000..f0155893f --- /dev/null +++ b/utils/atomic/funcs_test.go @@ -0,0 +1,35 @@ +package atomic + +import ( + "flag" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + "pgregory.net/rapid" +) + +func TestUint64Sub(t *testing.T) { + _ = flag.Set("rapid.checks", "50") + _ = flag.Set("rapid.steps", "50") + + rapid.Check(t, func(r *rapid.T) { + exp := uint64(0) + got := atomic.Uint64{} + r.Repeat(map[string]func(r *rapid.T){ + "": func(r *rapid.T) { + require.Equal(t, exp, got.Load()) + }, + "add": func(r *rapid.T) { + d := rapid.Uint64().Draw(r, "d") + exp += d + got.Add(d) + }, + "sub": func(r *rapid.T) { + d := rapid.Uint64().Draw(r, "d") + exp -= d + Uint64Sub(&got, d) + }, + }) + }) +} diff --git a/utils/errors/err_group.go b/utils/errors/err_group.go new file mode 100644 index 000000000..c4d82409a --- /dev/null +++ b/utils/errors/err_group.go @@ -0,0 +1,24 @@ +package errors + +import ( + "github.com/dymensionxyz/dymint/types" + "golang.org/x/sync/errgroup" +) + +/* +TODO: move to sdk-utils +*/ + +// ErrGroupGoLog calls eg.Go on the errgroup but it will log the error immediately when it occurs +// instead of waiting for all goroutines in the group to finish first. This has the advantage of making sure all +// errors are logged, not just the first one, and it is more immediate. Also, it is guaranteed, in case that +// of the goroutines is not properly context aware. +func ErrGroupGoLog(eg *errgroup.Group, logger types.Logger, fn func() error) { + eg.Go(func() error { + err := fn() + if err != nil { + logger.Error("ErrGroup goroutine.", "err", err) + } + return err + }) +}