Skip to content

Commit

Permalink
Merge pull request #6364 from onflow/fxamacker/add-util-to-check-stor…
Browse files Browse the repository at this point in the history
…age-health

Add check-storage command to util program to check storage health of execution state
  • Loading branch information
fxamacker authored Aug 20, 2024
2 parents 2266069 + bb5d146 commit caa362a
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 2 deletions.
329 changes: 329 additions & 0 deletions cmd/util/cmd/check-storage/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
package check_storage

import (
"context"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"

"github.com/onflow/cadence/runtime"
"github.com/onflow/cadence/runtime/common"

"github.com/onflow/flow-go/cmd/util/ledger/migrations"
"github.com/onflow/flow-go/cmd/util/ledger/reporters"
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
"github.com/onflow/flow-go/ledger"
moduleUtil "github.com/onflow/flow-go/module/util"
)

var (
flagPayloads string
flagState string
flagStateCommitment string
flagOutputDirectory string
flagNWorker int
)

var Cmd = &cobra.Command{
Use: "check-storage",
Short: "Check storage health",
Run: run,
}

const (
ReporterName = "storage-health"
)

func init() {

Cmd.Flags().StringVar(
&flagPayloads,
"payloads",
"",
"Input payload file name",
)

Cmd.Flags().StringVar(
&flagState,
"state",
"",
"Input state file name",
)

Cmd.Flags().StringVar(
&flagStateCommitment,
"state-commitment",
"",
"Input state commitment",
)

Cmd.Flags().StringVar(
&flagOutputDirectory,
"output-directory",
"",
"Output directory",
)

_ = Cmd.MarkFlagRequired("output-directory")

Cmd.Flags().IntVar(
&flagNWorker,
"n-workers",
10,
"number of workers to use",
)
}

func run(*cobra.Command, []string) {

if flagPayloads == "" && flagState == "" {
log.Fatal().Msg("Either --payloads or --state must be provided")
} else if flagPayloads != "" && flagState != "" {
log.Fatal().Msg("Only one of --payloads or --state must be provided")
}
if flagState != "" && flagStateCommitment == "" {
log.Fatal().Msg("--state-commitment must be provided when --state is provided")
}

// Create report in JSONL format
rw := reporters.NewReportFileWriterFactoryWithFormat(flagOutputDirectory, log.Logger, reporters.ReportFormatJSONL).
ReportWriter(ReporterName)
defer rw.Close()

var payloads []*ledger.Payload
var err error

// Read payloads from payload file or checkpoint file

if flagPayloads != "" {
log.Info().Msgf("Reading payloads from %s", flagPayloads)

_, payloads, err = util.ReadPayloadFile(log.Logger, flagPayloads)
if err != nil {
log.Fatal().Err(err).Msg("failed to read payloads")
}
} else {
log.Info().Msgf("Reading trie %s", flagStateCommitment)

stateCommitment := util.ParseStateCommitment(flagStateCommitment)
payloads, err = util.ReadTrie(flagState, stateCommitment)
if err != nil {
log.Fatal().Err(err).Msg("failed to read state")
}
}

log.Info().Msgf("Grouping %d payloads by accounts ...", len(payloads))

// Group payloads by accounts

payloadAccountGrouping := util.GroupPayloadsByAccount(log.Logger, payloads, flagNWorker)

log.Info().Msgf(
"Creating registers from grouped payloads (%d) ...",
len(payloads),
)

registersByAccount, err := util.NewByAccountRegistersFromPayloadAccountGrouping(payloadAccountGrouping, flagNWorker)
if err != nil {
log.Fatal().Err(err).Msg("failed to create ByAccount registers from payload")
}

accountCount := registersByAccount.AccountCount()

log.Info().Msgf(
"Created registers from payloads (%d accounts, %d payloads)",
accountCount,
len(payloads),
)

failedAccountAddresses, err := checkStorageHealth(registersByAccount, flagNWorker, rw)
if err != nil {
log.Fatal().Err(err).Msg("failed to check storage health")
}

if len(failedAccountAddresses) == 0 {
log.Info().Msgf("All %d accounts are health", accountCount)
return
}

log.Info().Msgf(
"%d out of %d accounts reported storage health check issues. See report %s for more details.",
len(failedAccountAddresses),
accountCount,
ReporterName,
)

log.Info().Msgf("Accounts with storage health issues:")
for _, address := range failedAccountAddresses {
log.Info().Msgf(" %x", []byte(address))
}
}

func checkStorageHealth(
registersByAccount *registers.ByAccount,
nWorkers int,
rw reporters.ReportWriter,
) (failedAccountAddresses []string, err error) {

accountCount := registersByAccount.AccountCount()

nWorkers = min(accountCount, nWorkers)

log.Info().Msgf("Checking storage health of %d accounts using %d workers ...", accountCount, nWorkers)

logAccount := moduleUtil.LogProgress(
log.Logger,
moduleUtil.DefaultLogProgressConfig(
"processing account group",
accountCount,
),
)

if nWorkers <= 1 {
// Skip goroutine to avoid overhead
err = registersByAccount.ForEachAccount(
func(accountRegisters *registers.AccountRegisters) error {
accountStorageIssues := checkAccountStorageHealth(accountRegisters, nWorkers)

if len(accountStorageIssues) > 0 {
failedAccountAddresses = append(failedAccountAddresses, accountRegisters.Owner())

for _, issue := range accountStorageIssues {
rw.Write(issue)
}
}

logAccount(1)

return nil
})

return failedAccountAddresses, err
}

type job struct {
accountRegisters *registers.AccountRegisters
}

type result struct {
owner string
issues []accountStorageIssue
}

jobs := make(chan job, nWorkers)

results := make(chan result, nWorkers)

g, ctx := errgroup.WithContext(context.Background())

// Launch goroutine to check account storage health
for i := 0; i < nWorkers; i++ {
g.Go(func() error {
for job := range jobs {
issues := checkAccountStorageHealth(job.accountRegisters, nWorkers)

result := result{
owner: job.accountRegisters.Owner(),
issues: issues,
}

select {
case results <- result:
case <-ctx.Done():
return ctx.Err()
}

logAccount(1)
}
return nil
})
}

// Launch goroutine to wait for workers to finish and close results (output) channel
go func() {
defer close(results)
err = g.Wait()
}()

// Launch goroutine to send job to jobs channel and close jobs (input) channel
go func() {
defer close(jobs)

err = registersByAccount.ForEachAccount(
func(accountRegisters *registers.AccountRegisters) error {
jobs <- job{accountRegisters: accountRegisters}
return nil
})
if err != nil {
log.Err(err).Msgf("failed to iterate accounts by registersByAccount")
}
}()

// Gather results
for result := range results {
if len(result.issues) > 0 {
failedAccountAddresses = append(failedAccountAddresses, result.owner)
for _, issue := range result.issues {
rw.Write(issue)
}
}
}

return failedAccountAddresses, err
}

func checkAccountStorageHealth(accountRegisters *registers.AccountRegisters, nWorkers int) []accountStorageIssue {
owner := accountRegisters.Owner()

address, err := common.BytesToAddress([]byte(owner))
if err != nil {
return []accountStorageIssue{
{
Address: address.Hex(),
Kind: storageErrorKindString[otherErrorKind],
Msg: err.Error(),
}}
}

var issues []accountStorageIssue

// Check atree storage health

ledger := &registers.ReadOnlyLedger{Registers: accountRegisters}
storage := runtime.NewStorage(ledger, nil)

err = util.CheckStorageHealth(address, storage, accountRegisters, migrations.AllStorageMapDomains, nWorkers)
if err != nil {
issues = append(
issues,
accountStorageIssue{
Address: address.Hex(),
Kind: storageErrorKindString[atreeStorageErrorKind],
Msg: err.Error(),
})
}

// TODO: check health of non-atree registers

return issues
}

type storageErrorKind int

const (
otherErrorKind storageErrorKind = iota
atreeStorageErrorKind
)

var storageErrorKindString = map[storageErrorKind]string{
otherErrorKind: "error_check_storage_failed",
atreeStorageErrorKind: "error_atree_storage",
}

type accountStorageIssue struct {
Address string
Kind string
Msg string
}
2 changes: 2 additions & 0 deletions cmd/util/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/onflow/flow-go/cmd/util/cmd/addresses"
"github.com/onflow/flow-go/cmd/util/cmd/atree_inlined_status"
bootstrap_execution_state_payloads "github.com/onflow/flow-go/cmd/util/cmd/bootstrap-execution-state-payloads"
check_storage "github.com/onflow/flow-go/cmd/util/cmd/check-storage"
checkpoint_collect_stats "github.com/onflow/flow-go/cmd/util/cmd/checkpoint-collect-stats"
checkpoint_list_tries "github.com/onflow/flow-go/cmd/util/cmd/checkpoint-list-tries"
checkpoint_trie_stats "github.com/onflow/flow-go/cmd/util/cmd/checkpoint-trie-stats"
Expand Down Expand Up @@ -116,6 +117,7 @@ func addCommands() {
rootCmd.AddCommand(find_trie_root.Cmd)
rootCmd.AddCommand(run_script.Cmd)
rootCmd.AddCommand(system_addresses.Cmd)
rootCmd.AddCommand(check_storage.Cmd)
}

func initConfig() {
Expand Down
5 changes: 3 additions & 2 deletions cmd/util/ledger/util/registers/registers.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,9 @@ type ReadOnlyLedger struct {

var _ atree.Ledger = ReadOnlyLedger{}

func (l ReadOnlyLedger) GetValue(owner, key []byte) (value []byte, err error) {
return l.Registers.Get(string(owner), string(key))
func (l ReadOnlyLedger) GetValue(address, key []byte) (value []byte, err error) {
owner := flow.AddressToRegisterOwner(flow.BytesToAddress(address))
return l.Registers.Get(owner, string(key))
}

func (l ReadOnlyLedger) ValueExists(owner, key []byte) (exists bool, err error) {
Expand Down

0 comments on commit caa362a

Please sign in to comment.