Skip to content

Commit

Permalink
fix(submit loop): add more logging around skew calculation (#1000)
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt authored and omritoptix committed Aug 13, 2024
1 parent b744805 commit 5e141d5
Show file tree
Hide file tree
Showing 17 changed files with 129 additions and 43 deletions.
2 changes: 1 addition & 1 deletion block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestApplyBlock(t *testing.T) {
Validators: tmtypes.NewValidatorSet(nil),
}
state.InitialHeight = 1
state.LastBlockHeight.Store(0)
state.SetHeight(0)
maxBytes := uint64(100)
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
Expand Down
17 changes: 8 additions & 9 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/store"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"

"github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -164,27 +165,25 @@ func (m *Manager) Start(ctx context.Context) error {
<-m.DAClient.Synced()
nBytes := m.GetUnsubmittedBytes()
bytesProducedC := make(chan int)
go func() {
bytesProducedC <- nBytes
}()
eg.Go(func() error {
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
eg.Go(func() error {
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
bytesProducedC <- nBytes
return m.ProduceBlockLoop(ctx, bytesProducedC)
})
} else {
eg.Go(func() error {
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.RetrieveLoop(ctx)
})
eg.Go(func() error {
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SyncToTargetHeightLoop(ctx)
})
}

go func() {
err := eg.Wait()
m.logger.Info("Block manager err group finished.", "err", err)
_ = eg.Wait() // errors are already logged
m.logger.Info("Block manager err group finished.")
}()

return nil
Expand Down
4 changes: 2 additions & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestInitialState(t *testing.T) {
store: fullStore,
genesis: genesis,
expectedInitialHeight: sampleState.InitialHeight,
expectedLastBlockHeight: sampleState.LastBlockHeight.Load(),
expectedLastBlockHeight: sampleState.Height(),
expectedChainID: sampleState.ChainID,
},
}
Expand All @@ -103,7 +103,7 @@ func TestInitialState(t *testing.T) {
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.State.ChainID)
assert.Equal(c.expectedInitialHeight, agg.State.InitialHeight)
assert.Equal(c.expectedLastBlockHeight, agg.State.LastBlockHeight.Load())
assert.Equal(c.expectedLastBlockHeight, agg.State.Height())
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission." +
m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission. " +
"Pausing block production until a signal is consumed.")
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewStateFromGenesis(genDoc *tmtypes.GenesisDoc) (*types.State, error) {
ConsensusParams: *genDoc.ConsensusParams,
LastHeightConsensusParamsChanged: genDoc.InitialHeight,
}
s.LastBlockHeight.Store(0)
s.SetHeight(0)
copy(s.AppHash[:], genDoc.AppHash)

return &s, nil
Expand Down
43 changes: 30 additions & 13 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/types"
uatomic "github.com/dymensionxyz/dymint/utils/atomic"
uchannel "github.com/dymensionxyz/dymint/utils/channel"
)

Expand All @@ -23,7 +24,8 @@ import (
func (m *Manager) SubmitLoop(ctx context.Context,
bytesProduced chan int,
) (err error) {
return SubmitLoopInner(ctx,
return SubmitLoopInner(
ctx,
m.logger,
bytesProduced,
m.Conf.MaxBatchSkew,
Expand All @@ -34,7 +36,8 @@ func (m *Manager) SubmitLoop(ctx context.Context,
}

// SubmitLoopInner is a unit testable impl of SubmitLoop
func SubmitLoopInner(ctx context.Context,
func SubmitLoopInner(
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew uint64, // max number of batches that submitter is allowed to have pending
Expand Down Expand Up @@ -74,6 +77,7 @@ func SubmitLoopInner(ctx context.Context,
return ctx.Err()
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Info("Added bytes produced to bytes pending submission counter.", "n", n)
case <-ticker.C:
}
}
Expand Down Expand Up @@ -104,14 +108,17 @@ func SubmitLoopInner(ctx context.Context,
break
}
nConsumed, err := createAndSubmitBatch(min(pending, maxBatchBytes))
if errors.Is(err, gerrc.ErrInternal) {
panic(fmt.Sprintf("create and submit batch: %v", err))
}
if err != nil {
return fmt.Errorf("create and submit batch: %w", err)
err = fmt.Errorf("create and submit batch: %w", err)
if errors.Is(err, gerrc.ErrInternal) {
logger.Error("Create and submit batch", "err", err, "pending", pending)
panic(err)
}
return err
}
timeLastSubmission = time.Now()
pending = pendingBytes.Add(^(nConsumed - 1)) // subtract
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
}
trigger.Nudge()
}
Expand All @@ -134,11 +141,26 @@ func (m *Manager) CreateAndSubmitBatchGetSizeBlocksCommits(maxSize uint64) (uint
// CreateAndSubmitBatch creates and submits a batch to the DA and SL.
// max size bytes is the maximum size of the serialized batch type
func (m *Manager) CreateAndSubmitBatch(maxSizeBytes uint64) (*types.Batch, error) {
b, err := m.CreateBatch(maxSizeBytes, m.NextHeightToSubmit(), m.State.Height())
startHeight := m.NextHeightToSubmit()
endHeightInclusive := m.State.Height()

if endHeightInclusive < startHeight {
// TODO: https://github.com/dymensionxyz/dymint/issues/999
return nil, fmt.Errorf(
"next height to submit is greater than last block height, create and submit batch should not have been called: start height: %d: end height inclusive: %d: %w",
startHeight,
endHeightInclusive,
gerrc.ErrInternal,
)
}

b, err := m.CreateBatch(maxSizeBytes, startHeight, endHeightInclusive)
if err != nil {
return nil, fmt.Errorf("create batch: %w", err)
}

m.logger.Info("Created batch.", "start height", startHeight, "end height", endHeightInclusive)

if err := m.SubmitBatch(b); err != nil {
return nil, fmt.Errorf("submit batch: %w", err)
}
Expand All @@ -148,7 +170,6 @@ func (m *Manager) CreateAndSubmitBatch(maxSizeBytes uint64) (*types.Batch, error
// CreateBatch looks through the store for any unsubmitted blocks and commits and bundles them into a batch
// max size bytes is the maximum size of the serialized batch type
func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeightInclusive uint64) (*types.Batch, error) {
m.logger.Info("Creating batch", "start height", startHeight, "end height", endHeightInclusive)
batchSize := endHeightInclusive - startHeight + 1
batch := &types.Batch{
Blocks: make([]*types.Block, 0, batchSize),
Expand Down Expand Up @@ -182,10 +203,6 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight
}
}

if batch.NumBlocks() == 0 {
return nil, fmt.Errorf("empty batch: %w", gerrc.ErrInternal)
}

return batch, nil
}

Expand Down
11 changes: 2 additions & 9 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/dymensionxyz/dymint/block"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
)

type testArgs struct {
Expand Down Expand Up @@ -104,15 +105,7 @@ func testSubmitLoopInner(
return uint64(consumed), nil
}

block.SubmitLoopInner(
ctx,
nil,
producedBytesC,
args.batchSkew,
args.maxTime,
args.batchBytes,
submitBatch,
)
block.SubmitLoopInner(ctx, log.TestingLogger(), producedBytesC, args.batchSkew, args.maxTime, args.batchBytes, submitBatch)
}

// Make sure the producer does not get too far ahead
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ require (
github.com/rogpeppe/go-internal v1.11.0 // indirect
go.uber.org/mock v0.4.0 // indirect
golang.org/x/tools v0.18.0 // indirect
pgregory.net/rapid v1.1.0 // indirect
)

replace (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,8 @@ lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI=
lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw=
pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
4 changes: 2 additions & 2 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ func TestLoadState(t *testing.T) {
NextValidators: validatorSet,
Validators: validatorSet,
}
s.LastBlockHeight.Store(expectedHeight)
s.SetHeight(expectedHeight)
_, err := s1.SaveState(s, nil)
assert.NoError(err)

s2 := store.New(kv)
state, err := s2.LoadState()
assert.NoError(err)

assert.Equal(expectedHeight, state.LastBlockHeight.Load())
assert.Equal(expectedHeight, state.Height())
}

func TestBlockResponses(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/loadtime/cmd/report/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newBlockStore(kvstore store.KV, baseHeight uint64) *BlockStore {
return &BlockStore{
DefaultStore: store,
base: state.BaseHeight,
height: state.LastBlockHeight.Load(),
height: state.Height(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion testutil/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func GenerateState(initialHeight int64, lastBlockHeight int64) *types.State {
Validators: GenerateRandomValidatorSet(),
NextValidators: GenerateRandomValidatorSet(),
}
s.LastBlockHeight.Store(uint64(lastBlockHeight))
s.SetHeight(uint64(lastBlockHeight))
return s
}

Expand Down
4 changes: 2 additions & 2 deletions types/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (s *State) ToProto() (*pb.State, error) {
Version: &s.Version,
ChainId: s.ChainID,
InitialHeight: int64(s.InitialHeight),
LastBlockHeight: int64(s.LastBlockHeight.Load()),
LastBlockHeight: int64(s.Height()),
NextValidators: nextValidators,
Validators: validators,
BaseHeight: s.BaseHeight,
Expand All @@ -279,7 +279,7 @@ func (s *State) FromProto(other *pb.State) error {
s.Version = *other.Version
s.ChainID = other.ChainId
s.InitialHeight = uint64(other.InitialHeight)
s.LastBlockHeight.Store(uint64(other.LastBlockHeight))
s.SetHeight(uint64(other.LastBlockHeight))
s.BaseHeight = other.BaseHeight

s.NextValidators, err = types.ValidatorSetFromProto(other.NextValidators)
Expand Down
2 changes: 1 addition & 1 deletion types/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestStateRoundTrip(t *testing.T) {
assert := assert.New(t)

if c.state.InitialHeight != 0 {
c.state.LastBlockHeight.Store(986321)
c.state.SetHeight(986321)
}

pState, err := c.state.ToProto()
Expand Down
15 changes: 15 additions & 0 deletions utils/atomic/funcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package atomic

import (
"sync/atomic"
)

/*
TODO: move to sdk-utils
*/

// Uint64Sub does x := x-y and returns the new value of x
func Uint64Sub(x *atomic.Uint64, y uint64) uint64 {
// Uses math
return x.Add(^(y - 1))
}
35 changes: 35 additions & 0 deletions utils/atomic/funcs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package atomic

import (
"flag"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
"pgregory.net/rapid"
)

func TestUint64Sub(t *testing.T) {
_ = flag.Set("rapid.checks", "50")
_ = flag.Set("rapid.steps", "50")

rapid.Check(t, func(r *rapid.T) {
exp := uint64(0)
got := atomic.Uint64{}
r.Repeat(map[string]func(r *rapid.T){
"": func(r *rapid.T) {
require.Equal(t, exp, got.Load())
},
"add": func(r *rapid.T) {
d := rapid.Uint64().Draw(r, "d")
exp += d
got.Add(d)
},
"sub": func(r *rapid.T) {
d := rapid.Uint64().Draw(r, "d")
exp -= d
Uint64Sub(&got, d)
},
})
})
}
24 changes: 24 additions & 0 deletions utils/errors/err_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package errors

import (
"github.com/dymensionxyz/dymint/types"
"golang.org/x/sync/errgroup"
)

/*
TODO: move to sdk-utils
*/

// ErrGroupGoLog calls eg.Go on the errgroup but it will log the error immediately when it occurs
// instead of waiting for all goroutines in the group to finish first. This has the advantage of making sure all
// errors are logged, not just the first one, and it is more immediate. Also, it is guaranteed, in case that
// of the goroutines is not properly context aware.
func ErrGroupGoLog(eg *errgroup.Group, logger types.Logger, fn func() error) {
eg.Go(func() error {
err := fn()
if err != nil {
logger.Error("ErrGroup goroutine.", "err", err)
}
return err
})
}

0 comments on commit 5e141d5

Please sign in to comment.