Skip to content

Commit

Permalink
Merge pull request #6478 from onflow/fxamacker/update-checkpoint-coll…
Browse files Browse the repository at this point in the history
…ect-stats

Update `checkpoint-collect-stats` command to support payloads file and state commitment
  • Loading branch information
fxamacker authored Sep 27, 2024
2 parents 901e171 + 2bc8287 commit 2fa508e
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 58 deletions.
186 changes: 132 additions & 54 deletions cmd/util/cmd/checkpoint-collect-stats/cmd.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package checkpoint_collect_stats

import (
"bufio"
"encoding/json"
"math"
"os"
"path/filepath"
"strings"

"github.com/montanaflynn/stats"
Expand All @@ -17,31 +13,51 @@ import (

"github.com/onflow/atree"

"github.com/onflow/flow-go/cmd/util/ledger/reporters"
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/utils/debug"
)

var (
flagCheckpointDir string
flagOutputDir string
flagMemProfile bool
flagCheckpointDir string
flagStateCommitment string
flagPayloads string
flagOutputDir string
flagMemProfile bool
)

const (
ledgerStatsReportName = "ledger-stats"
)

var Cmd = &cobra.Command{
Use: "checkpoint-collect-stats",
Short: "collects stats on tries stored in a checkpoint",
Run: run,
Short: "collects stats on tries stored in a checkpoint, or payloads from a payloads file",
Long: `checkpoint-collect-stats collects stats on tries stored in a checkpoint, or payloads from a payloads file.
Two kinds of input data are supported:
- checkpoint file(s) ("--checkpoint-dir" with optional "--state-commitment"), or
- payloads file ("--payload-filename")`,
Run: run,
}

func init() {
Cmd.Flags().StringVar(&flagCheckpointDir, "checkpoint-dir", "",
"Directory to load checkpoint files from")
_ = Cmd.MarkFlagRequired("checkpoint-dir")

// state-commitment is optional.
// When provided, this program only gathers stats on trie with matching state commitment.
Cmd.Flags().StringVar(&flagStateCommitment, "state-commitment", "",
"Trie state commitment")

Cmd.Flags().StringVar(&flagPayloads, "payload-filename", "",
"Payloads file name to load payloads from")

Cmd.Flags().StringVar(&flagOutputDir, "output-dir", "",
"Directory to write checkpoint stats to")
Expand All @@ -52,7 +68,7 @@ func init() {
}

type Stats struct {
LedgerStats *complete.LedgerStats
LedgerStats *complete.LedgerStats `json:",omitempty"`
PayloadStats *PayloadStats
}

Expand All @@ -78,10 +94,79 @@ type sizesByType map[string][]float64

func run(*cobra.Command, []string) {

if flagPayloads == "" && flagCheckpointDir == "" {
log.Fatal().Msg("Either --payload-filename or --checkpoint-dir must be provided")
}
if flagPayloads != "" && flagCheckpointDir != "" {
log.Fatal().Msg("Only one of --payload-filename or --checkpoint-dir must be provided")
}
if flagCheckpointDir == "" && flagStateCommitment != "" {
log.Fatal().Msg("--checkpont-dir must be provided when --state-commitment is provided")
}

if flagMemProfile {
defer profile.Start(profile.MemProfile).Stop()
}

var totalPayloadSize, totalPayloadValueSize uint64

valueSizesByType := make(sizesByType, 0)

payloadCallback := func(p *ledger.Payload) {
key, err := p.Key()
if err != nil {
log.Fatal().Err(err).Msg("cannot load a key")
}

size := p.Size()
value := p.Value()
valueSize := value.Size()
totalPayloadSize += uint64(size)
totalPayloadValueSize += uint64(valueSize)
valueSizesByType[getType(key)] = append(valueSizesByType[getType(key)], float64(valueSize))
}

var ledgerStats *complete.LedgerStats

useCheckpointFile := flagPayloads == ""
if useCheckpointFile {
ledgerStats = getPayloadStatsFromCheckpoint(payloadCallback)
} else {
getPayloadStatsFromPayloadFile(payloadCallback)
}

statsByTypes := getStats(valueSizesByType)

stats := &Stats{
LedgerStats: ledgerStats,
PayloadStats: &PayloadStats{
TotalPayloadSize: totalPayloadSize,
TotalPayloadValueSize: totalPayloadValueSize,
StatsByTypes: statsByTypes,
},
}

writeStats(ledgerStatsReportName, stats)
}

func getPayloadStatsFromPayloadFile(payloadCallBack func(payload *ledger.Payload)) {
memAllocBefore := debug.GetHeapAllocsBytes()
log.Info().Msgf("loading payloads from %v", flagPayloads)

_, payloads, err := util.ReadPayloadFile(log.Logger, flagPayloads)
if err != nil {
log.Fatal().Err(err).Msg("failed to read payloads")
}

memAllocAfter := debug.GetHeapAllocsBytes()
log.Info().Msgf("%d payloads are loaded, mem usage: %d", len(payloads), memAllocAfter-memAllocBefore)

for _, p := range payloads {
payloadCallBack(p)
}
}

func getPayloadStatsFromCheckpoint(payloadCallBack func(payload *ledger.Payload)) *complete.LedgerStats {
memAllocBefore := debug.GetHeapAllocsBytes()
log.Info().Msgf("loading checkpoint(s) from %v", flagCheckpointDir)

Expand All @@ -106,26 +191,40 @@ func run(*cobra.Command, []string) {
memAllocAfter := debug.GetHeapAllocsBytes()
log.Info().Msgf("the checkpoint is loaded, mem usage: %d", memAllocAfter-memAllocBefore)

var totalPayloadSize, totalPayloadValueSize uint64
var value ledger.Value
var key ledger.Key
var size, valueSize int
var tries []*trie.MTrie

valueSizesByType := make(sizesByType, 0)
ledgerStats, err := led.CollectStats(func(p *ledger.Payload) {
key, err = p.Key()
if flagStateCommitment != "" {
stateCommitment := util.ParseStateCommitment(flagStateCommitment)

t, err := led.FindTrieByStateCommit(stateCommitment)
if err != nil {
log.Fatal().Err(err).Msg("cannot load a key")
log.Fatal().Err(err).Msgf("failed to find trie with state commitment %x", stateCommitment)
}
if t == nil {
log.Fatal().Msgf("no trie with state commitment %x", stateCommitment)
}

size = p.Size()
value = p.Value()
valueSize = value.Size()
totalPayloadSize += uint64(size)
totalPayloadValueSize += uint64(valueSize)
valueSizesByType[getType(key)] = append(valueSizesByType[getType(key)], float64(valueSize))
})
tries = append(tries, t)
} else {
ts, err := led.Tries()
if err != nil {
log.Fatal().Err(err).Msg("failed to get tries")
}

tries = append(tries, ts...)
}

log.Info().Msgf("collecting stats on %d tries", len(tries))

ledgerStats, err := complete.CollectStats(tries, payloadCallBack)
if err != nil {
log.Fatal().Err(err).Msg("failed to collect stats")
}

return ledgerStats
}

func getStats(valueSizesByType sizesByType) []RegisterStatsByTypes {
statsByTypes := make([]RegisterStatsByTypes, 0)
for t, values := range valueSizesByType {

Expand Down Expand Up @@ -178,36 +277,15 @@ func run(*cobra.Command, []string) {
})
}

if err != nil {
log.Fatal().Err(err).Msg("failed to collect stats")
}

stats := &Stats{
LedgerStats: ledgerStats,
PayloadStats: &PayloadStats{
TotalPayloadSize: totalPayloadSize,
TotalPayloadValueSize: totalPayloadValueSize,
StatsByTypes: statsByTypes,
},
}

path := filepath.Join(flagOutputDir, "ledger.stats.json")

fi, err := os.Create(path)
if err != nil {
log.Fatal().Err(err).Msg("failed to create path")
}
defer fi.Close()

writer := bufio.NewWriter(fi)
defer writer.Flush()
return statsByTypes
}

encoder := json.NewEncoder(writer)
func writeStats(reportName string, stats interface{}) {
rw := reporters.NewReportFileWriterFactory(flagOutputDir, log.Logger).
ReportWriter(reportName)
defer rw.Close()

err = encoder.Encode(stats)
if err != nil {
log.Fatal().Err(err).Msg("could not json encode ledger stats")
}
rw.Write(stats)
}

func getType(key ledger.Key) string {
Expand Down
13 changes: 9 additions & 4 deletions ledger/complete/ledger_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
"github.com/onflow/flow-go/ledger/complete/mtrie/node"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
)

type LedgerStats struct {
Expand All @@ -16,14 +17,18 @@ type LedgerStats struct {
}

func (l *Ledger) CollectStats(payloadCallBack func(payload *ledger.Payload)) (*LedgerStats, error) {
visitedNodes := make(map[*node.Node]uint64)
var interimNodeCounter, leafNodeCounter, totalNodeCounter uint64

tries, err := l.Tries()
if err != nil {
return nil, err
}

return CollectStats(tries, payloadCallBack)
}

func CollectStats(tries []*trie.MTrie, payloadCallBack func(payload *ledger.Payload)) (*LedgerStats, error) {
visitedNodes := make(map[*node.Node]uint64)
var interimNodeCounter, leafNodeCounter, totalNodeCounter uint64

bar := progressbar.Default(int64(len(tries)), "collecting ledger stats")
for _, trie := range tries {
for itr := flattener.NewUniqueNodeIterator(trie.RootNode(), visitedNodes); itr.Next(); {
Expand All @@ -38,7 +43,7 @@ func (l *Ledger) CollectStats(payloadCallBack func(payload *ledger.Payload)) (*L
visitedNodes[n] = totalNodeCounter
totalNodeCounter++
}
if err = bar.Add(1); err != nil {
if err := bar.Add(1); err != nil {
return nil, err
}
}
Expand Down

0 comments on commit 2fa508e

Please sign in to comment.