Skip to content

Commit

Permalink
remove chunk data pack
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Oct 15, 2024
1 parent bfed72e commit 8b40598
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"

"github.com/cockroachdb/pebble"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

Expand All @@ -13,11 +14,13 @@ import (
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger"
storagepebble "github.com/onflow/flow-go/storage/pebble"
)

var (
flagHeight uint64
flagDataDir string
flagHeight uint64
flagDataDir string
flagChunkDataPackDir string
)

var Cmd = &cobra.Command{
Expand All @@ -36,6 +39,10 @@ func init() {
Cmd.Flags().StringVar(&flagDataDir, "datadir", "",
"directory that stores the protocol state")
_ = Cmd.MarkFlagRequired("datadir")

Cmd.Flags().StringVar(&flagChunkDataPackDir, "chunk-data-pack-dir", "",
"directory that stores the chunk data pack")
_ = Cmd.MarkFlagRequired("chunk-data-pack-dir")
}

func run(*cobra.Command, []string) {
Expand All @@ -51,16 +58,33 @@ func run(*cobra.Command, []string) {
}

db := common.InitStorage(flagDataDir)
defer func() {
err := db.Close()
if err != nil {
log.Fatal().Err(err).Msg("could not close db")
}
}()

storages := common.InitStorages(db)
state, err := common.InitProtocolState(db, storages)
if err != nil {
log.Fatal().Err(err).Msg("could not init protocol states")
}

chunkDataPackDB, err := storagepebble.OpenDefaultPebbleDB(flagChunkDataPackDir)
if err != nil {
log.Fatal().Err(err).Msg("could not open chunk data pack db")
}
defer func() {
err := chunkDataPackDB.Close()
if err != nil {
log.Fatal().Err(err).Msg("could not close chunk data pack db")
}
}()

metrics := &metrics.NoopCollector{}
transactionResults := badger.NewTransactionResults(metrics, db, badger.DefaultCacheSize)
commits := badger.NewCommits(metrics, db)
chunkDataPacks := badger.NewChunkDataPacks(metrics, db, badger.NewCollections(db, badger.NewTransactions(metrics, db)), badger.DefaultCacheSize)
results := badger.NewExecutionResults(metrics, db)
receipts := badger.NewExecutionReceipts(metrics, db, results, badger.DefaultCacheSize)
myReceipts := badger.NewMyExecutionReceipts(metrics, db, receipts)
Expand All @@ -70,8 +94,13 @@ func run(*cobra.Command, []string) {

writeBatch := badger.NewBatch(db)

chunkDataPacks := storagepebble.NewChunkDataPacks(metrics,
chunkDataPackDB, storages.Collections, 1000)
pebbleWriter := chunkDataPackDB.NewBatch()

err = removeExecutionResultsFromHeight(
writeBatch,
pebbleWriter,
state,
headers,
transactionResults,
Expand All @@ -86,11 +115,20 @@ func run(*cobra.Command, []string) {
if err != nil {
log.Fatal().Err(err).Msgf("could not remove result from height %v", flagHeight)
}

log.Info().Msgf("flushing badger write batch at %v", flagHeight)
err = writeBatch.Flush()
if err != nil {
log.Fatal().Err(err).Msgf("could not flush write batch at %v", flagHeight)
}

log.Info().Msgf("flushing pebble writer at %v", flagHeight)

err = pebbleWriter.Commit(pebble.Sync)
if err != nil {
log.Fatal().Err(err).Msgf("could not flush pebble writer at %v", flagHeight)
}

header, err := state.AtHeight(flagHeight).Head()
if err != nil {
log.Fatal().Err(err).Msgf("could not get block header at height %v", flagHeight)
Expand All @@ -109,11 +147,12 @@ func run(*cobra.Command, []string) {
// need to include the Remove methods
func removeExecutionResultsFromHeight(
writeBatch *badger.Batch,
pebbleWriter pebble.Writer,
protoState protocol.State,
headers *badger.Headers,
transactionResults *badger.TransactionResults,
commits *badger.Commits,
chunkDataPacks *badger.ChunkDataPacks,
chunkDataPacks *storagepebble.ChunkDataPacks,
results *badger.ExecutionResults,
myReceipts *badger.MyExecutionReceipts,
events *badger.Events,
Expand Down Expand Up @@ -148,7 +187,7 @@ func removeExecutionResultsFromHeight(

blockID := head.ID()

err = removeForBlockID(writeBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID)
err = removeForBlockID(writeBatch, pebbleWriter, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID)
if err != nil {
return fmt.Errorf("could not remove result for finalized block: %v, %w", blockID, err)
}
Expand All @@ -167,7 +206,7 @@ func removeExecutionResultsFromHeight(
total = len(pendings)

for _, pending := range pendings {
err = removeForBlockID(writeBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending)
err = removeForBlockID(writeBatch, pebbleWriter, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending)

if err != nil {
return fmt.Errorf("could not remove result for pending block %v: %w", pending, err)
Expand All @@ -188,11 +227,12 @@ func removeExecutionResultsFromHeight(
// It bubbles up any error encountered
func removeForBlockID(
writeBatch *badger.Batch,
pebbleWriter pebble.Writer,
headers *badger.Headers,
commits *badger.Commits,
transactionResults *badger.TransactionResults,
results *badger.ExecutionResults,
chunks *badger.ChunkDataPacks,
chunks *storagepebble.ChunkDataPacks,
myReceipts *badger.MyExecutionReceipts,
events *badger.Events,
serviceEvents *badger.ServiceEvents,
Expand All @@ -211,7 +251,7 @@ func removeForBlockID(
for _, chunk := range result.Chunks {
chunkID := chunk.ID()
// remove chunk data pack
err := chunks.BatchRemove(chunkID, writeBatch)
err := chunks.PebbleBatchRemove(chunkID, pebbleWriter)
if errors.Is(err, storage.ErrNotFound) {
log.Warn().Msgf("chunk %v not found for block %v", chunkID, blockID)
continue
Expand Down
4 changes: 4 additions & 0 deletions storage/pebble/chunk_data_packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (ch *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.Bat
return fmt.Errorf("not implemented")
}

func (ch *ChunkDataPacks) PebbleBatchRemove(chunkID flow.Identifier, batch pebble.Writer) error {
return operation.RemoveChunkDataPack(chunkID)(batch)
}

func (ch *ChunkDataPacks) batchRemove(chunkID flow.Identifier, batch pebble.Writer) error {
return operation.RemoveChunkDataPack(chunkID)(batch)
}
Expand Down

0 comments on commit 8b40598

Please sign in to comment.