Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor): added MsgUpsertSequencer consensus message #1120

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

proto2 "github.com/gogo/protobuf/proto"
proto "github.com/gogo/protobuf/types"

abci "github.com/tendermint/tendermint/abci/types"
tmcrypto "github.com/tendermint/tendermint/crypto/encoding"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
Expand Down Expand Up @@ -104,25 +103,25 @@ func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, valset []*tmtypes.Vali
})
}

// CreateBlock reaps transactions from mempool and builds a block.
// CreateBlock reaps transactions from mempool and builds a block. Optionally, executes consensus messages that
// gets from the consensus messages stream or from the method args.
func (e *Executor) CreateBlock(
height uint64,
lastCommit *types.Commit,
lastHeaderHash, nextSeqHash [32]byte,
state *types.State,
maxBlockDataSizeBytes uint64,
consensusMsgs ...proto2.Message,
) *types.Block {
maxBlockDataSizeBytes = min(maxBlockDataSizeBytes, uint64(max(minBlockMaxBytes, state.ConsensusParams.Block.MaxBytes)))
mempoolTxs := e.mempool.ReapMaxBytesMaxGas(int64(maxBlockDataSizeBytes), state.ConsensusParams.Block.MaxGas)

var consensusAnyMessages []*proto.Any
if e.consensusMessagesStream != nil {
consensusMessages, err := e.consensusMessagesStream.GetConsensusMessages()
if err != nil {
e.logger.Error("Failed to get consensus messages", "error", err)
}

consensusAnyMessages = fromProtoMsgSliceToAnySlice(consensusMessages)
consensusMsgs = append(consensusMsgs, consensusMessages...)
}

block := &types.Block{
Expand All @@ -145,7 +144,7 @@ func (e *Executor) CreateBlock(
Txs: toDymintTxs(mempoolTxs),
IntermediateStateRoots: types.IntermediateStateRoots{RawRootsList: nil},
Evidence: types.EvidenceData{Evidence: nil},
ConsensusMessages: consensusAnyMessages,
ConsensusMessages: fromProtoMsgSliceToAnySlice(consensusMsgs...),
},
LastCommit: *lastCommit,
}
Expand Down Expand Up @@ -328,7 +327,7 @@ func fromProtoMsgToAny(msg proto2.Message) *proto.Any {
}
}

func fromProtoMsgSliceToAnySlice(msgs []proto2.Message) []*proto.Any {
func fromProtoMsgSliceToAnySlice(msgs ...proto2.Message) []*proto.Any {
result := make([]*proto.Any, len(msgs))
for i, msg := range msgs {
result[i] = fromProtoMsgToAny(msg)
Expand Down
2 changes: 1 addition & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewManager(
mempool,
proxyApp,
eventBus,
nil, // TODO add ConsensusMessagesStream
nil, // TODO add ConsensusMessagesStream: https://github.com/dymensionxyz/dymint/issues/1125
logger,
)
if err != nil {
Expand Down
101 changes: 85 additions & 16 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@
"fmt"
"time"

codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/types/bech32"
sequencertypes "github.com/dymensionxyz/dymension-rdk/x/sequencers/types"

Check failure on line 11 in block/produce.go

View workflow job for this annotation

GitHub Actions / Analyze

github.com/dymensionxyz/[email protected]: replacement directory ../dymension-rdk does not exist

Check failure on line 11 in block/produce.go

View workflow job for this annotation

GitHub Actions / build

github.com/dymensionxyz/[email protected]: replacement directory ../dymension-rdk does not exist
"github.com/dymensionxyz/gerr-cosmos/gerrc"

"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/store"
uevent "github.com/dymensionxyz/dymint/utils/event"

"github.com/gogo/protobuf/proto"
tmed25519 "github.com/tendermint/tendermint/crypto/ed25519"
tmcrypto "github.com/tendermint/tendermint/crypto/encoding"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
tmtypes "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"

"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/store"
"github.com/dymensionxyz/dymint/types"
uevent "github.com/dymensionxyz/dymint/utils/event"
)

// ProduceBlockLoop is calling publishBlock in a loop as long as we're synced.
Expand Down Expand Up @@ -96,18 +99,26 @@
}
}

// nextProposerInfo holds information about the next proposer.
type nextProposerInfo struct {
// nextProposerHash is a tendermint-compatible hash of the sequencer.
nextProposerHash [32]byte
// nextProposerAddr is a sequencer's settlement address.
nextProposerAddr string
}

// ProduceApplyGossipLastBlock produces and applies a block with the given nextProposerHash.
func (m *Manager) ProduceApplyGossipLastBlock(ctx context.Context, nextProposerHash [32]byte) (err error) {
_, _, err = m.produceApplyGossip(ctx, true, &nextProposerHash)
func (m *Manager) ProduceApplyGossipLastBlock(ctx context.Context, nextProposerInfo nextProposerInfo) (err error) {
_, _, err = m.produceApplyGossip(ctx, true, &nextProposerInfo)
return err
}

func (m *Manager) ProduceApplyGossipBlock(ctx context.Context, allowEmpty bool) (block *types.Block, commit *types.Commit, err error) {
return m.produceApplyGossip(ctx, allowEmpty, nil)
}

func (m *Manager) produceApplyGossip(ctx context.Context, allowEmpty bool, nextProposerHash *[32]byte) (block *types.Block, commit *types.Commit, err error) {
block, commit, err = m.produceBlock(allowEmpty, nextProposerHash)
func (m *Manager) produceApplyGossip(ctx context.Context, allowEmpty bool, nextProposerInfo *nextProposerInfo) (block *types.Block, commit *types.Commit, err error) {
block, commit, err = m.produceBlock(allowEmpty, nextProposerInfo)
if err != nil {
return nil, nil, fmt.Errorf("produce block: %w", err)
}
Expand All @@ -123,7 +134,7 @@
return block, commit, nil
}

func (m *Manager) produceBlock(allowEmpty bool, nextProposerHash *[32]byte) (*types.Block, *types.Commit, error) {
func (m *Manager) produceBlock(allowEmpty bool, nextProposerInfo *nextProposerInfo) (*types.Block, *types.Commit, error) {
newHeight := m.State.NextHeight()
lastHeaderHash, lastCommit, err := m.GetPreviousBlockHashes(newHeight)
if err != nil {
Expand All @@ -148,14 +159,28 @@
return nil, nil, fmt.Errorf("load block: height: %d: %w: %w", newHeight, err, ErrNonRecoverable)
}

maxBlockDataSize := uint64(float64(m.Conf.BatchSubmitBytes) * types.MaxBlockSizeAdjustment)
proposerHashForBlock := [32]byte(m.State.Sequencers.ProposerHash())
// if nextProposerHash is set, we create a last block
if nextProposerHash != nil {
var (
maxBlockDataSize = uint64(float64(m.Conf.BatchSubmitBytes) * types.MaxBlockSizeAdjustment)
proposerHashForBlock = [32]byte(m.State.Sequencers.ProposerHash())
nextProposerAddr = m.State.Sequencers.Proposer.SettlementAddress
lastProposerBlock = false // Indicates that the block is the last for the current seq. True during the rotation.
)
// if nextProposerInfo is set, we create a last block
if nextProposerInfo != nil {
maxBlockDataSize = 0
proposerHashForBlock = *nextProposerHash
proposerHashForBlock = nextProposerInfo.nextProposerHash
nextProposerAddr = nextProposerInfo.nextProposerAddr
lastProposerBlock = true
}
block = m.Executor.CreateBlock(newHeight, lastCommit, lastHeaderHash, proposerHashForBlock, m.State, maxBlockDataSize)
// TODO: Ideally, there should be only one point for adding consensus messages. Given that they come from
// ConsensusMessagesStream, this should send them there instead of having to ways of sending consensusMessages.
// There is no implementation of the stream as of now. Unify the approach of adding consensus messages when
// the stream is implemented! https://github.com/dymensionxyz/dymint/issues/1125
consensusMsgs, err := m.consensusMsgsOnCreateBlock(nextProposerAddr, lastProposerBlock)
if err != nil {
return nil, nil, fmt.Errorf("create consensus msgs for create block: last proposer block: %v, height: %d, next proposer addr: %s: %w: %w", lastProposerBlock, newHeight, nextProposerAddr, err, ErrNonRecoverable)
}
block = m.Executor.CreateBlock(newHeight, lastCommit, lastHeaderHash, proposerHashForBlock, m.State, maxBlockDataSize, consensusMsgs...)
keruch marked this conversation as resolved.
Show resolved Hide resolved
if !allowEmpty && len(block.Data.Txs) == 0 {
return nil, nil, fmt.Errorf("%w: %w", types.ErrEmptyBlock, ErrRecoverable)
}
Expand All @@ -171,6 +196,50 @@
return block, commit, nil
}

// consensusMsgsOnCreateBlock forms a list of consensus messages that need execution on rollapp's BeginBlock.
// Currently, we need to create a sequencer in the rollapp if it doesn't exist in the following cases:
// - On the very first block after the genesis or
// - On the last block of the current sequencer (eg, during the rotation).
func (m *Manager) consensusMsgsOnCreateBlock(
nextProposerSettlementAddr string,
lastSeqBlock bool, // Indicates that the block is the last for the current seq. True during the rotation.
) ([]proto.Message, error) {
if m.State.IsGenesis() || lastSeqBlock {
nextSeq := m.State.Sequencers.GetByAddress(nextProposerSettlementAddr)
// Sanity check. Must never happen in practice. The sequencer's existence is verified beforehand in Manager.CompleteRotation.
if nextSeq == nil {
return nil, fmt.Errorf("no sequencer found for address while creating a new block: %s", nextProposerSettlementAddr)
}

// Get proposer's consensus public key and convert it to proto.Any
val, err := nextSeq.TMValidator()
if err != nil {
return nil, fmt.Errorf("convert next squencer to tendermint validator: %w", err)
}
pubKey, err := tmcrypto.PubKeyToProto(val.PubKey)
if err != nil {
return nil, fmt.Errorf("next squencer pub key to proto: %w", err)
}
anyPubKey, err := codectypes.NewAnyWithValue(&pubKey)
if err != nil {
return nil, fmt.Errorf("next squencer pubkey to proto any: %w", err)
}

// Get raw bytes of the proposer's settlement address. These bytes will to be converted to the rollapp format in the app.
_, addrBytes, err := bech32.DecodeAndConvert(nextProposerSettlementAddr)
if err != nil {
return nil, fmt.Errorf("next squencer settlement addr to bech32: %w", err)
}

return []proto.Message{&sequencertypes.MsgUpsertSequencer{
Operator: nextProposerSettlementAddr,
ConsPubKey: anyPubKey,
RewardAddrBytes: addrBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the initial RewardAddress, the settlementAddr is used. Do we need to include both in the consensus message? Is there a scenario where this message can carry two distinct addresses, one Operator and the other Reward address?
Do I understand this correctly, to have a reward address different from the settlement addr, the custom reward address can be set on the rdk manually, in which case the RewardAddrBytes will not override it?
I realize keeping the reward address in the Hub was decided against, but if we recognize the Hub's x/sequencer record to be the source of truth for the sequencer, then we could maybe consider that besides the settlement address, the (custom) reward address could also originate from the Hub's sequencer record. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding your last point. Yes, you are right. That's what we finally decided to do with Omri. We'll enrich the rotation_started event with all the information needed for the sequencer, ie RewardAddr and WhitelistedRelayers so far. And the hub will be the source of truth.

Do we need to include both in the consensus message?

Good point, probably we don't. But as i said above, this will be change anyway soon.

Do I understand this correctly, to have a reward address different from the settlement addr, the custom reward address can be set on the rdk manually, in which case the RewardAddrBytes will not override it?

Currently, RewardAddrBytes overwrites the manually set value. Again, this will change once we accept that the Hub is the source of truce.

}}, nil
}
return nil, nil
}

// create commit for block
func (m *Manager) createCommit(block *types.Block) (*types.Commit, error) {
abciHeaderPb := types.ToABCIHeaderPB(&block.Header)
Expand Down
11 changes: 7 additions & 4 deletions block/sequencers.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ func (m *Manager) CompleteRotation(ctx context.Context, nextSeqAddr string) erro
copy(nextSeqHash[:], seq.Hash())
}

err := m.CreateAndPostLastBatch(ctx, nextSeqHash)
err := m.CreateAndPostLastBatch(ctx, nextProposerInfo{
nextProposerHash: nextSeqHash,
nextProposerAddr: nextSeqAddr,
})
if err != nil {
return fmt.Errorf("create and post last batch: %w", err)
}
Expand All @@ -131,18 +134,18 @@ func (m *Manager) CompleteRotation(ctx context.Context, nextSeqAddr string) erro

// CreateAndPostLastBatch creates and posts the last batch to the hub
// this called after manager shuts down the block producer and submitter
func (m *Manager) CreateAndPostLastBatch(ctx context.Context, nextSeqHash [32]byte) error {
func (m *Manager) CreateAndPostLastBatch(ctx context.Context, nextProposerInfo nextProposerInfo) error {
h := m.State.Height()
block, err := m.Store.LoadBlock(h)
if err != nil {
return fmt.Errorf("load block: height: %d: %w", h, err)
}

// check if the last block already produced with nextProposerHash set
if bytes.Equal(block.Header.NextSequencersHash[:], nextSeqHash[:]) {
if bytes.Equal(block.Header.NextSequencersHash[:], nextProposerInfo.nextProposerHash[:]) {
m.logger.Debug("Last block already produced and applied.")
} else {
err := m.ProduceApplyGossipLastBlock(ctx, nextSeqHash)
err := m.ProduceApplyGossipLastBlock(ctx, nextProposerInfo)
if err != nil {
return fmt.Errorf("produce apply gossip last block: %w", err)
}
Expand Down
19 changes: 10 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/cosmos/cosmos-sdk v0.46.16
github.com/dgraph-io/badger/v4 v4.3.0
github.com/dymensionxyz/cosmosclient v0.4.2-beta.0.20240821081230-b4018b2bac13
github.com/dymensionxyz/dymension-rdk v1.6.1
github.com/dymensionxyz/gerr-cosmos v1.0.0
github.com/go-kit/kit v0.12.0
github.com/gofrs/uuid v4.3.0+incompatible
Expand Down Expand Up @@ -42,18 +43,14 @@ require (
)

require (
cloud.google.com/go v0.112.1 // indirect
cloud.google.com/go/storage v1.38.0 // indirect
github.com/celestiaorg/go-square v1.0.1 // indirect
github.com/celestiaorg/go-square/merkle v0.0.0-20240429192549-dea967e1533b // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/dgraph-io/badger/v3 v3.2103.3 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/hashicorp/go-getter v1.7.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
google.golang.org/api v0.169.0 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
)

require (
Expand Down Expand Up @@ -257,7 +254,7 @@ require (

require (
cosmossdk.io/math v1.3.0 // indirect
github.com/DataDog/zstd v1.5.2 // indirect
github.com/DataDog/zstd v1.5.5 // indirect
github.com/Jorropo/jsync v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/btcsuite/btcd/btcutil v1.1.3 // indirect
Expand All @@ -267,10 +264,9 @@ require (
github.com/cockroachdb/pebble v1.1.0 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/cosmos/ibc-go/v6 v6.2.1 // indirect
github.com/danwt/gerr v1.0.0 // indirect
github.com/evmos/evmos/v12 v12.1.6 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/getsentry/sentry-go v0.23.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/holiman/uint256 v1.2.2 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
Expand All @@ -296,8 +292,13 @@ require (
)

replace (
// This replacement is needed in order to import dymension-rdk properly. It's inherited from
// https://github.com/dymensionxyz/dymension-rdk/blob/82c4d5f8c09365b20b4378c0cc459b414fd306e8/go.mod#L315.
github.com/CosmWasm/wasmd => github.com/decentrio/wasmd v0.33.0-sdk46.2
github.com/centrifuge/go-substrate-rpc-client/v4 => github.com/availproject/go-substrate-rpc-client/v4 v4.0.12-avail-1.4.0-rc1-5e286e3
github.com/dymensionxyz/dymension-rdk => github.com/dymensionxyz/dymension-rdk v1.6.1-0.20240827102903-08636e7ab3f8
// TODO: uncomment after https://github.com/dymensionxyz/dymension-rdk/pull/563 is merged
// github.com/dymensionxyz/dymension-rdk => github.com/dymensionxyz/dymension-rdk v1.6.1-0.20240827102903-08636e7ab3f8
github.com/dymensionxyz/dymension-rdk => ../dymension-rdk
github.com/evmos/evmos/v12 => github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.2-alpha.regen.4
github.com/gorilla/rpc => github.com/dymensionxyz/rpc v1.3.1
Expand Down
Loading
Loading