Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Danwt/patch srene p2p block sync protocol merge main resolve conflicts #1001

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
}
// Prune old heights, if requested by ABCI app.
if 0 < retainHeight {
err = m.pruneBlocks(uint64(retainHeight))
err = m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestApplyBlock(t *testing.T) {
Validators: tmtypes.NewValidatorSet(nil),
}
state.InitialHeight = 1
state.LastBlockHeight.Store(0)
state.SetHeight(0)
maxBytes := uint64(100)
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
Expand Down
16 changes: 10 additions & 6 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/store"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"

"github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -152,17 +153,15 @@
<-m.DAClient.Synced()
nBytes := m.GetUnsubmittedBytes()
bytesProducedC := make(chan int)
go func() {
bytesProducedC <- nBytes
}()
err = m.syncFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
eg.Go(func() error {
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
eg.Go(func() error {
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
bytesProducedC <- nBytes
return m.ProduceBlockLoop(ctx, bytesProducedC)
})

Expand All @@ -180,8 +179,13 @@
// P2P Sync. Subscribe to P2P received blocks events
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger)

}

go func() {
_ = eg.Wait() // errors are already logged
m.logger.Info("Block manager err group finished.")
}()
Comment on lines +184 to +187

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestInitialState(t *testing.T) {
store: fullStore,
genesis: genesis,
expectedInitialHeight: sampleState.InitialHeight,
expectedLastBlockHeight: sampleState.LastBlockHeight.Load(),
expectedLastBlockHeight: sampleState.Height(),
expectedChainID: sampleState.ChainID,
},
}
Expand All @@ -105,7 +105,7 @@ func TestInitialState(t *testing.T) {
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.State.ChainID)
assert.Equal(c.expectedInitialHeight, agg.State.InitialHeight)
assert.Equal(c.expectedLastBlockHeight, agg.State.LastBlockHeight.Load())
assert.Equal(c.expectedLastBlockHeight, agg.State.Height())
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission." +
m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission. " +
"Pausing block production until a signal is consumed.")
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/dymensionxyz/gerr-cosmos/gerrc"
)

func (m *Manager) pruneBlocks(retainHeight uint64) error {
func (m *Manager) PruneBlocks(retainHeight uint64) error {
if m.IsSequencer() && m.NextHeightToSubmit() < retainHeight { // do not delete anything that we might submit in future
return fmt.Errorf("cannot prune blocks before they have been submitted: retain height %d: next height to submit: %d: %w",
retainHeight,
Expand Down
57 changes: 57 additions & 0 deletions block/pruning_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package block_test

import (
"context"
"testing"

"github.com/dymensionxyz/dymint/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/proxy"
)

func TestPruningRetainHeight(t *testing.T) {
require := require.New(t)
app := testutil.GetAppMock()
ctx := context.Background()
// Create proxy app
clientCreator := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(clientCreator)
err := proxyApp.Start()
require.NoError(err)

manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

// Check initial assertions
require.Zero(manager.State.Height())
require.Zero(manager.LastSubmittedHeight.Load())

batchSize := 10

// Produce blocks
for i := 0; i < batchSize; i++ {
_, _, err = manager.ProduceAndGossipBlock(ctx, true)
require.NoError(err)
}
// submit and validate sync target
manager.CreateAndSubmitBatch(100000000)
lastSubmitted := manager.LastSubmittedHeight.Load()
assert.EqualValues(t, manager.State.Height(), lastSubmitted)
assert.Equal(t, lastSubmitted, uint64(batchSize))

// Produce new blocks
for i := 0; i < batchSize; i++ {
_, _, err = manager.ProduceAndGossipBlock(ctx, true)
require.NoError(err)
}

validRetainHeight := lastSubmitted + 1 // the max possible valid retain height
for i := validRetainHeight + 1; i < manager.State.Height(); i++ {
err = manager.PruneBlocks(i)
require.Error(err) // cannot prune blocks before they have been submitted
}

err = manager.PruneBlocks(validRetainHeight)
require.NoError(err)
}
2 changes: 1 addition & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewStateFromGenesis(genDoc *tmtypes.GenesisDoc) (*types.State, error) {
ConsensusParams: *genDoc.ConsensusParams,
LastHeightConsensusParamsChanged: genDoc.InitialHeight,
}
s.LastBlockHeight.Store(0)
s.SetHeight(0)
copy(s.AppHash[:], genDoc.AppHash)

return &s, nil
Expand Down
39 changes: 33 additions & 6 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/types"
uatomic "github.com/dymensionxyz/dymint/utils/atomic"
uchannel "github.com/dymensionxyz/dymint/utils/channel"
)

Expand All @@ -23,7 +24,9 @@ import (
func (m *Manager) SubmitLoop(ctx context.Context,
bytesProduced chan int,
) (err error) {
return SubmitLoopInner(ctx,
return SubmitLoopInner(
ctx,
m.logger,
bytesProduced,
m.Conf.MaxBatchSkew,
m.Conf.BatchSubmitMaxTime,
Expand All @@ -33,7 +36,9 @@ func (m *Manager) SubmitLoop(ctx context.Context,
}

// SubmitLoopInner is a unit testable impl of SubmitLoop
func SubmitLoopInner(ctx context.Context,
func SubmitLoopInner(
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew uint64, // max number of batches that submitter is allowed to have pending
maxBatchTime time.Duration, // max time to allow between batches
Expand Down Expand Up @@ -72,6 +77,7 @@ func SubmitLoopInner(ctx context.Context,
return ctx.Err()
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Info("Added bytes produced to bytes pending submission counter.", "n", n)
case <-ticker.C:
}
}
Expand Down Expand Up @@ -103,10 +109,16 @@ func SubmitLoopInner(ctx context.Context,
}
nConsumed, err := createAndSubmitBatch(min(pending, maxBatchBytes))
if err != nil {
return fmt.Errorf("create and submit batch: %w", err)
err = fmt.Errorf("create and submit batch: %w", err)
if errors.Is(err, gerrc.ErrInternal) {
logger.Error("Create and submit batch", "err", err, "pending", pending)
panic(err)
}
return err
}
timeLastSubmission = time.Now()
pending = pendingBytes.Add(^(nConsumed - 1)) // subtract
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
}
trigger.Nudge()
}
Expand All @@ -129,11 +141,26 @@ func (m *Manager) CreateAndSubmitBatchGetSizeBlocksCommits(maxSize uint64) (uint
// CreateAndSubmitBatch creates and submits a batch to the DA and SL.
// max size bytes is the maximum size of the serialized batch type
func (m *Manager) CreateAndSubmitBatch(maxSizeBytes uint64) (*types.Batch, error) {
b, err := m.CreateBatch(maxSizeBytes, m.NextHeightToSubmit(), m.State.Height())
startHeight := m.NextHeightToSubmit()
endHeightInclusive := m.State.Height()

if endHeightInclusive < startHeight {
// TODO: https://github.com/dymensionxyz/dymint/issues/999
return nil, fmt.Errorf(
"next height to submit is greater than last block height, create and submit batch should not have been called: start height: %d: end height inclusive: %d: %w",
startHeight,
endHeightInclusive,
gerrc.ErrInternal,
)
}

b, err := m.CreateBatch(maxSizeBytes, startHeight, endHeightInclusive)
if err != nil {
return nil, fmt.Errorf("create batch: %w", err)
}

m.logger.Info("Created batch.", "start height", startHeight, "end height", endHeightInclusive)

if err := m.SubmitBatch(b); err != nil {
return nil, fmt.Errorf("submit batch: %w", err)
}
Expand Down Expand Up @@ -182,7 +209,7 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight
func (m *Manager) SubmitBatch(batch *types.Batch) error {
resultSubmitToDA := m.DAClient.SubmitBatch(batch)
if resultSubmitToDA.Code != da.StatusSuccess {
return fmt.Errorf("da client submit batch: %s", resultSubmitToDA.Message)
return fmt.Errorf("da client submit batch: %s: %w", resultSubmitToDA.Message, resultSubmitToDA.Error)
}
m.logger.Info("Submitted batch to DA.", "start height", batch.StartHeight(), "end height", batch.EndHeight())

Expand Down
10 changes: 2 additions & 8 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/dymensionxyz/dymint/block"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
)

type testArgs struct {
Expand Down Expand Up @@ -104,14 +105,7 @@ func testSubmitLoopInner(
return uint64(consumed), nil
}

block.SubmitLoopInner(
ctx,
producedBytesC,
args.batchSkew,
args.maxTime,
args.batchBytes,
submitBatch,
)
block.SubmitLoopInner(ctx, log.TestingLogger(), producedBytesC, args.batchSkew, args.maxTime, args.batchBytes, submitBatch)
}

// Make sure the producer does not get too far ahead
Expand Down
16 changes: 14 additions & 2 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS

// TODO(srene): Split batch in multiple blobs if necessary if supported
height, commitment, err := c.submit(data)
if errors.Is(err, gerrc.ErrInternal) {
// no point retrying if it's because of our code being wrong
err = fmt.Errorf("submit: %w", err)
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Error: err,
},
}
}

if err != nil {
c.logger.Error("Submit blob.", "error", err)
types.RollappConsecutiveFailedDASubmission.Inc()
Expand Down Expand Up @@ -521,11 +533,11 @@ func (c *DataAvailabilityLayerClient) checkBatchAvailability(daMetaData *da.DASu
func (c *DataAvailabilityLayerClient) submit(daBlob da.Blob) (uint64, da.Commitment, error) {
blobs, commitments, err := c.blobsAndCommitments(daBlob)
if err != nil {
return 0, nil, fmt.Errorf("blobs and commitments: %w", err)
return 0, nil, fmt.Errorf("blobs and commitments: %w: %w", err, gerrc.ErrInternal)
}

if len(commitments) == 0 {
return 0, nil, fmt.Errorf("zero commitments: %w", gerrc.ErrNotFound)
return 0, nil, fmt.Errorf("zero commitments: %w: %w", gerrc.ErrNotFound, gerrc.ErrInternal)
}

blobSizes := make([]uint32, len(blobs))
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ require (
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
go.uber.org/mock v0.4.0 // indirect
golang.org/x/tools v0.18.0 // indirect
pgregory.net/rapid v1.1.0 // indirect
)

replace (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1561,6 +1561,8 @@ lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI=
lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw=
pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
4 changes: 2 additions & 2 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ func TestLoadState(t *testing.T) {
NextValidators: validatorSet,
Validators: validatorSet,
}
s.LastBlockHeight.Store(expectedHeight)
s.SetHeight(expectedHeight)
_, err := s1.SaveState(s, nil)
assert.NoError(err)

s2 := store.New(kv)
state, err := s2.LoadState()
assert.NoError(err)

assert.Equal(expectedHeight, state.LastBlockHeight.Load())
assert.Equal(expectedHeight, state.Height())
}

func TestBlockResponses(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/loadtime/cmd/report/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newBlockStore(kvstore store.KV, baseHeight uint64) *BlockStore {
return &BlockStore{
DefaultStore: store,
base: state.BaseHeight,
height: state.LastBlockHeight.Load(),
height: state.Height(),
}
}

Expand Down
3 changes: 2 additions & 1 deletion testutil/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func GenerateState(initialHeight int64, lastBlockHeight int64) *types.State {
s := &types.State{
ChainID: "test-chain",
InitialHeight: uint64(initialHeight),
BaseHeight: uint64(initialHeight),
AppHash: [32]byte{},
LastResultsHash: GetEmptyLastResultsHash(),
Version: tmstate.Version{
Expand All @@ -227,7 +228,7 @@ func GenerateState(initialHeight int64, lastBlockHeight int64) *types.State {
Validators: GenerateRandomValidatorSet(),
NextValidators: GenerateRandomValidatorSet(),
}
s.LastBlockHeight.Store(uint64(lastBlockHeight))
s.SetHeight(uint64(lastBlockHeight))
return s
}

Expand Down
14 changes: 14 additions & 0 deletions types/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package types

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestBatchSerialization(t *testing.T) {
batch := Batch{}
bz, err := batch.MarshalBinary()
require.Nil(t, err)
require.Empty(t, bz)
}
Loading
Loading