Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix blockhash caching #44

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 53 additions & 53 deletions stubchain/genesis.json

Large diffs are not rendered by default.

24 changes: 2 additions & 22 deletions symapp/app_di.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ package symapp

import (
"cosmossdk.io/x/symStaking/abci"
stakingtypes "cosmossdk.io/x/symStaking/types"
_ "embed"
"fmt"
"io"
"path/filepath"
"strings"

dbm "github.com/cosmos/cosmos-db"
"github.com/spf13/cast"
"io"
"path/filepath"

clienthelpers "cosmossdk.io/client/v2/helpers"
"cosmossdk.io/core/appmodule"
Expand Down Expand Up @@ -228,7 +225,6 @@ func NewSymApp(
ba.SetVerifyVoteExtensionHandler(voteExtensionHandler.VerifyVoteExtension())
ba.SetPrepareProposal(abciPropHandler.PrepareProposal())
ba.SetPreBlocker(abciPropHandler.PreBlocker())
ba.AddRunTxRecoveryHandler(RecoverHandler)
})

app.App = appBuilder.Build(db, traceStore, baseAppOptions...)
Expand Down Expand Up @@ -437,19 +433,3 @@ func BlockedAddresses() map[string]bool {

return result
}

// we use this handler to restart on failure on our errors
func RecoverHandler(recoveryObj interface{}) error {
err, ok := recoveryObj.(error)
if !ok || err == nil {
return nil
}
if stakingtypes.ErrSymbioticValUpdate.Is(err) {
panic(fmt.Errorf("symbiotic panic: %w", err))
}
// IDK after this error node stops receiving blocks
if strings.Contains(err.Error(), "peer is not sending us data fast enough") {
panic(fmt.Errorf("Consensus peer error: %w", err))
}
return err
}
4 changes: 3 additions & 1 deletion x/symGenutil/gentx.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func DeliverGenTxs(
}
}

stakingKeeper.CacheBlockHash(initBlockHash, 0)
if err := stakingKeeper.CacheBlockHash(ctx, stakingtypes.CachedBlockHash{BlockHash: initBlockHash, Height: 0}); err != nil {
return nil, fmt.Errorf("failed to cache block hash %w", err)
}

return stakingKeeper.BlockValidatorUpdates(ctx)
}
3 changes: 2 additions & 1 deletion x/symGenutil/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"cosmossdk.io/core/appmodule"
bankexported "cosmossdk.io/x/bank/exported"
stakingtypes "cosmossdk.io/x/symStaking/types"
"encoding/json"

"github.com/cosmos/cosmos-sdk/codec"
Expand All @@ -12,7 +13,7 @@ import (

// StakingKeeper defines the expected staking keeper (noalias)
type StakingKeeper interface {
CacheBlockHash(blockHash string, height int64)
CacheBlockHash(ctx context.Context, blockHash stakingtypes.CachedBlockHash) error
BlockValidatorUpdates(ctx context.Context) ([]appmodule.ValidatorUpdate, error)
}

Expand Down
98 changes: 42 additions & 56 deletions x/symStaking/abci/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package abci
import (
"cosmossdk.io/log"
keeper2 "cosmossdk.io/x/symStaking/keeper"
"cosmossdk.io/x/symStaking/types"
stakingtypes "cosmossdk.io/x/symStaking/types"
"encoding/json"
"errors"
abci "github.com/cometbft/cometbft/abci/types"
Expand All @@ -25,78 +25,64 @@ func NewProposalHandler(logger log.Logger, keeper *keeper2.Keeper) *ProposalHand

func (h *ProposalHandler) PrepareProposal() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req *abci.PrepareProposalRequest) (*abci.PrepareProposalResponse, error) {
resp, err := h.internalPrepareProposal(ctx, req)
if err != nil {
panic(errors.Join(types.ErrSymbioticValUpdate, err))
}
proposalTxs := req.Txs

return resp, nil
}
}
if req.Height%keeper2.SYMBIOTIC_SYNC_PERIOD != 0 {
return &abci.PrepareProposalResponse{
Txs: proposalTxs,
}, nil
}

func (h *ProposalHandler) PreBlocker() sdk.PreBlocker {
return func(ctx sdk.Context, req *abci.FinalizeBlockRequest) error {
if err := h.internalPreBlocker(ctx, req); err != nil {
panic(errors.Join(types.ErrSymbioticValUpdate, err))
blockHash, err := h.keeper.GetFinalizedBlockHash(ctx)
if err != nil {
// anyway recovers in baseapp.abci so just skip
blockHash = keeper2.INVALID_BLOCKHASH
}

return nil
}
}
// NOTE: We use stdlib JSON encoding, but an application may choose to use
// a performant mechanism. This is for demo purposes only.
bz, err := json.Marshal(blockHash)
if err != nil {
return nil, errors.New("failed to encode injected vote extension tx")
}

func (h *ProposalHandler) internalPrepareProposal(ctx sdk.Context, req *abci.PrepareProposalRequest) (*abci.PrepareProposalResponse, error) {
proposalTxs := req.Txs
// Inject a "fake" tx into the proposal s.t. validators can decode, verify,
// and store the canonical stake-weighted average prices.
proposalTxs = append([][]byte{bz}, proposalTxs...)

if req.Height%keeper2.SYMBIOTIC_SYNC_PERIOD != 0 {
return &abci.PrepareProposalResponse{
Txs: proposalTxs,
}, nil
}
}

blockHash, err := h.keeper.GetFinalizedBlockHash(ctx)
if err != nil {
return nil, err
}

// NOTE: We use stdlib JSON encoding, but an application may choose to use
// a performant mechanism. This is for demo purposes only.
bz, err := json.Marshal(blockHash)
if err != nil {
return nil, errors.New("failed to encode injected vote extension tx")
}
func (h *ProposalHandler) PreBlocker() sdk.PreBlocker {
return func(ctx sdk.Context, req *abci.FinalizeBlockRequest) error {
if req.Height%keeper2.SYMBIOTIC_SYNC_PERIOD != 0 || len(req.Txs) == 0 {
return nil
}

// Inject a "fake" tx into the proposal s.t. validators can decode, verify,
// and store the canonical stake-weighted average prices.
proposalTxs = append([][]byte{bz}, proposalTxs...)
var blockHash string
if err := json.Unmarshal(req.Txs[0], &blockHash); err != nil {
return err
}

return &abci.PrepareProposalResponse{
Txs: proposalTxs,
}, nil
}
block, err := h.keeper.GetBlockByHash(ctx, blockHash)
if err != nil {
return err
}

func (h *ProposalHandler) internalPreBlocker(context sdk.Context, req *abci.FinalizeBlockRequest) error {
if req.Height%keeper2.SYMBIOTIC_SYNC_PERIOD != 0 || len(req.Txs) == 0 {
return nil
}
if block.Time() < h.prevBlockTime || int64(block.Time()) >= ctx.HeaderInfo().Time.Unix() || block.Time() < h.keeper.GetMinBlockTimestamp(ctx) {
err := h.keeper.CacheBlockHash(ctx, stakingtypes.CachedBlockHash{BlockHash: blockHash, Height: req.Height})
return err
}

var blockHash string
if err := json.Unmarshal(req.Txs[0], &blockHash); err != nil {
return err
}
if err := h.keeper.CacheBlockHash(ctx, stakingtypes.CachedBlockHash{BlockHash: blockHash, Height: req.Height}); err != nil {
return err
}

block, err := h.keeper.GetBlockByHash(context, blockHash)
if err != nil {
return err
}
h.prevBlockTime = block.Time()

if block.Time() < h.prevBlockTime || int64(block.Time()) >= context.HeaderInfo().Time.Unix() || block.Time() < h.keeper.GetMinBlockTimestamp(context) {
h.keeper.CacheBlockHash(keeper2.INVALID_BLOCKHASH, req.Height)
return nil
}

h.keeper.CacheBlockHash(blockHash, req.Height)

h.prevBlockTime = block.Time()

return nil
}
5 changes: 3 additions & 2 deletions x/symStaking/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ type Keeper struct {
cometInfoService comet.Service
apiUrls types.ApiUrls
networkMiddlewareAddress string
cachedBlockHash CachedBlockHash

Schema collections.Schema

// CachedBlockHash value: CachedBlockHash
CachedBlockHash collections.Item[[]byte]
// HistoricalInfo key: Height | value: HistoricalInfo
HistoricalInfo collections.Map[uint64, types.HistoricalRecord]
// LastTotalPower value: LastTotalPower
Expand Down Expand Up @@ -111,8 +112,8 @@ func NewKeeper(
consensusAddressCodec: consensusAddressCodec,
cometInfoService: cometInfoService,
apiUrls: types.NewApiUrls(),
cachedBlockHash: CachedBlockHash{},
networkMiddlewareAddress: networkMiddlewareAddress,
CachedBlockHash: collections.NewItem(sb, types.CachedBlockHashKey, "cached_block_hash", collections.BytesValue),
LastTotalPower: collections.NewItem(sb, types.LastTotalPowerKey, "last_total_power", sdk.IntValue),
HistoricalInfo: collections.NewMap(sb, types.HistoricalInfoKey, "historical_info", collections.Uint64Key, HistoricalInfoCodec(cdc)),
UnbondingID: collections.NewSequence(sb, types.UnbondingIDKey, "unbonding_id"),
Expand Down
44 changes: 31 additions & 13 deletions x/symStaking/keeper/symbiotic_state_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ type Validator struct {
ConsAddr [32]byte
}

type CachedBlockHash struct {
BlockHash string
Height int64
}

const (
SYMBIOTIC_SYNC_PERIOD = 10
SLEEP_ON_RETRY = 200
Expand Down Expand Up @@ -120,9 +115,13 @@ const (
]`
)

func (k *Keeper) CacheBlockHash(blockHash string, height int64) {
k.cachedBlockHash.BlockHash = blockHash
k.cachedBlockHash.Height = height
func (k *Keeper) CacheBlockHash(ctx context.Context, blockHash stakingtypes.CachedBlockHash) error {
bz, err := json.Marshal(blockHash)
if err != nil {
return err
}
err = k.CachedBlockHash.Set(ctx, bz)
return err
}

func (k *Keeper) SymbioticUpdateValidatorsPower(ctx context.Context) error {
Expand All @@ -136,19 +135,38 @@ func (k *Keeper) SymbioticUpdateValidatorsPower(ctx context.Context) error {
return nil
}

if k.cachedBlockHash.Height != height {
return fmt.Errorf("symbiotic no blockhash cache, actual cached height %v, expected %v", k.cachedBlockHash.Height, height)
exist, err := k.CachedBlockHash.Has(ctx)
if err != nil {
return err
}

if k.cachedBlockHash.BlockHash == INVALID_BLOCKHASH {
if !exist {
return nil
}

data, err := k.CachedBlockHash.Get(ctx)
if err != nil {
return err
}

var cachedBlockHash stakingtypes.CachedBlockHash

if err := json.Unmarshal(data, &cachedBlockHash); err != nil {
return err
}

if cachedBlockHash.Height != height { // TODO need to research failures on processProposal, mb better to skip block if height is old
return fmt.Errorf("symbiotic no blockhash cache, actual cached height %v, expected %v", cachedBlockHash.Height, height)
}

if cachedBlockHash.BlockHash == INVALID_BLOCKHASH {
return nil
}

var validators []Validator
var err error

for i := 0; i < RETRIES; i++ {
validators, err = k.getSymbioticValidatorSet(ctx, k.cachedBlockHash.BlockHash)
validators, err = k.getSymbioticValidatorSet(ctx, cachedBlockHash.BlockHash)
if err == nil {
break
}
Expand Down
4 changes: 3 additions & 1 deletion x/symStaking/keeper/val_state_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
gogotypes "github.com/cosmos/gogoproto/types"
"os"
"sort"

"cosmossdk.io/core/address"
Expand All @@ -22,7 +23,8 @@ func (k Keeper) BlockValidatorUpdates(ctx context.Context) ([]appmodule.Validato
// Calculate validator set changes.
//
if err := k.SymbioticUpdateValidatorsPower(ctx); err != nil {
panic(errors.Join(types.ErrSymbioticValUpdate, err))
k.Logger.Error("Symbiotic val update panic", "err", err)
os.Exit(0) // TODO somehow fix, just to restart on failure, panic will be recovered
}
// NOTE: ApplyAndReturnValidatorSetUpdates has to come before
// UnbondAllMatureValidatorQueue.
Expand Down
5 changes: 4 additions & 1 deletion x/symStaking/types/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ var (
ValidatorQueueKey = collections.NewPrefix(67) // prefix for the timestamps in validator queue

HistoricalInfoKey = collections.NewPrefix(80) // prefix for the historical info
ParamsKey = collections.NewPrefix(81) // prefix for parameters for module x/symStaking = collections.NewPrefix(106) // prefix for rotated cons address to new cons address
ParamsKey = collections.NewPrefix(81) // prefix for parameters for module x/symStaking

CachedBlockHashKey = collections.NewPrefix(90) // prefix for finalized blockhash

)

// Reserved kvstore keys
Expand Down
6 changes: 6 additions & 0 deletions x/symStaking/types/symbiotic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package types

type CachedBlockHash struct {
BlockHash string
Height int64
}