Skip to content

Commit

Permalink
Merge pull request #8462 from dolthub/aaron/dolt-gc-full
Browse files Browse the repository at this point in the history
dolt gc --full: Implement a flag for GC which will collect everything, including the old gen.
  • Loading branch information
reltuk authored Oct 17, 2024
2 parents 5bf1898 + 0a53805 commit d58a69a
Show file tree
Hide file tree
Showing 20 changed files with 436 additions and 96 deletions.
1 change: 1 addition & 0 deletions go/cmd/dolt/cli/arg_parser_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func CreateLogArgParser(isTableFunction bool) *argparser.ArgParser {
func CreateGCArgParser() *argparser.ArgParser {
ap := argparser.NewArgParserWithMaxArgs("gc", 0)
ap.SupportsFlag(ShallowFlag, "s", "perform a fast, but incomplete garbage collection pass")
ap.SupportsFlag(FullFlag, "f", "perform a full garbage collection, including the old generation")
return ap
}

Expand Down
1 change: 1 addition & 0 deletions go/cmd/dolt/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
DryRunFlag = "dry-run"
EmptyParam = "empty"
ForceFlag = "force"
FullFlag = "full"
GraphFlag = "graph"
HardResetParam = "hard"
HostFlag = "host"
Expand Down
19 changes: 17 additions & 2 deletions go/cmd/dolt/commands/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,17 @@ var gcDocs = cli.CommandDocumentationContent{
ShortDesc: "Cleans up unreferenced data from the repository.",
LongDesc: `Searches the repository for data that is no longer referenced and no longer needed.
If the {{.EmphasisLeft}}--shallow{{.EmphasisRight}} flag is supplied, a faster but less thorough garbage collection will be performed.`,
Dolt GC is generational. When a GC is run, everything reachable from any commit on any branch
is put into the old generation. Data which is only reachable from uncommited branch HEADs is kept in
the new generation. By default, Dolt GC will only visit data in the new generation, and so will never
collect data from deleted branches which has previously made its way to the old generation from being
copied during a prior garbage collection.
If the {{.EmphasisLeft}}--shallow{{.EmphasisRight}} flag is supplied, a faster but less thorough garbage collection will be performed.
If the {{.EmphasisLeft}}--full{{.EmphasisRight}} flag is supplied, a more thorough garbage collection, fully collecting the old gen and new gen, will be performed.`,
Synopsis: []string{
"[--shallow]",
"[--shallow|--full]",
},
}

Expand Down Expand Up @@ -83,6 +91,10 @@ func (cmd GarbageCollectionCmd) Exec(ctx context.Context, commandStr string, arg
help, usage := cli.HelpAndUsagePrinters(cli.CommandDocsForCommandString(commandStr, gcDocs, ap))
apr := cli.ParseArgsOrDie(ap, args, help)

if apr.Contains(cli.ShallowFlag) && apr.Contains(cli.FullFlag) {
return HandleVErrAndExitCode(errhand.BuildDError("Invalid Argument: --shallow is not compatible with --full").SetPrintUsage().Build(), usage)
}

queryist, sqlCtx, closeFunc, err := cliCtx.QueryEngine(ctx)
if err != nil {
return HandleVErrAndExitCode(errhand.VerboseErrorFromError(err), usage)
Expand Down Expand Up @@ -110,6 +122,9 @@ func constructDoltGCQuery(apr *argparser.ArgParseResults) (string, error) {
if apr.Contains(cli.ShallowFlag) {
query += "'--shallow'"
}
if apr.Contains(cli.FullFlag) {
query += "'--full'"
}
query += ")"
return query, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1633,7 +1633,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error {
// until no possibly-stale ChunkStore state is retained in memory, or failing
// certain in-progress operations which cannot be finalized in a timely manner,
// etc.
func (ddb *DoltDB) GC(ctx context.Context, safepointF func() error) error {
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func() error) error {
collector, ok := ddb.db.Database.(datas.GarbageCollector)
if !ok {
return fmt.Errorf("this database does not support garbage collection")
Expand Down Expand Up @@ -1677,7 +1677,7 @@ func (ddb *DoltDB) GC(ctx context.Context, safepointF func() error) error {
return err
}

return collector.GC(ctx, oldGen, newGen, safepointF)
return collector.GC(ctx, mode, oldGen, newGen, safepointF)
}

func (ddb *DoltDB) ShallowGC(ctx context.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/doltdb/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func testGarbageCollection(t *testing.T, test gcTest) {
}
}

err := dEnv.DoltDB.GC(ctx, nil)
err := dEnv.DoltDB.GC(ctx, types.GCModeDefault, nil)
require.NoError(t, err)
test.postGCFunc(ctx, t, dEnv.DoltDB, res)

Expand Down Expand Up @@ -208,7 +208,7 @@ func testGarbageCollectionHasCacheDataCorruptionBugFix(t *testing.T) {
_, err = ns.Write(ctx, c1.Node())
require.NoError(t, err)

err = ddb.GC(ctx, nil)
err = ddb.GC(ctx, types.GCModeDefault, nil)
require.NoError(t, err)

c2 := newIntMap(t, ctx, ns, 2, 2)
Expand Down
12 changes: 11 additions & 1 deletion go/libraries/doltcore/sqle/dprocedures/dolt_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/types"
)

const (
Expand Down Expand Up @@ -81,6 +82,10 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
return cmdFailure, fmt.Errorf("Could not load database %s", dbName)
}

if apr.Contains(cli.ShallowFlag) && apr.Contains(cli.FullFlag) {
return cmdFailure, fmt.Errorf("cannot supply both --shallow and --full to dolt_gc: %w", InvalidArgErr)
}

if apr.Contains(cli.ShallowFlag) {
err = ddb.ShallowGC(ctx)
if err != nil {
Expand All @@ -106,10 +111,15 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
origepoch = epoch.(int)
}

var mode types.GCMode = types.GCModeDefault
if apr.Contains(cli.FullFlag) {
mode = types.GCModeFull
}

// TODO: If we got a callback at the beginning and an
// (allowed-to-block) callback at the end, we could more
// gracefully tear things down.
err = ddb.GC(ctx, func() error {
err = ddb.GC(ctx, mode, func() error {
if origepoch != -1 {
// Here we need to sanity check role and epoch.
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
Expand Down
30 changes: 29 additions & 1 deletion go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,34 @@ type LoggingChunkStore interface {

var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block")

// The function type for ChunkStore.HasMany. Used as a return value in the
// GCFinalizer interface.
type HasManyFunc func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error)

// A GCFinalizer is returned from MarkAndSweepChunks after the keep hashes channel is closed.
//
// A GCFinalizer is a handle to one or more table files which has been
// constructed as part of the GC process. It can be used to add the table files
// to the existing store, as we do in the case of a default-mode collection
// into the old gen, and it can be used to replace all existing table files in
// the store with the new table files, as we do in the collection into the new
// gen.
//
// In addition, adding the table files to an existing store exposes a HasMany
// implementation which inspects only the table files that were added, not all
// the table files in the resulting store. This is an important part of the
// full gc protocol, which works as follows:
//
// * Collect everything reachable from old gen refs into a new table file in the old gen.
// * Add the new table file to the old gen.
// * Collect everything reachable from new gen refs into the new gen, skipping stuff that is in the new old gen table file.
// * Swap to the new gen table file.
// * Swap to the old gen table file.
type GCFinalizer interface {
AddChunksToStore(ctx context.Context) (HasManyFunc, error)
SwapChunksInStore(ctx context.Context) error
}

// ChunkStoreGarbageCollector is a ChunkStore that supports garbage collection.
type ChunkStoreGarbageCollector interface {
ChunkStore
Expand Down Expand Up @@ -185,7 +213,7 @@ type ChunkStoreGarbageCollector interface {
// This behavior is a little different for ValueStore.GC()'s
// interactions with generational stores. See ValueStore and
// NomsBlockStore/GenerationalNBS for details.
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error)

// Count returns the number of chunks in the store.
Count() (uint32, error)
Expand Down
29 changes: 21 additions & 8 deletions go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,24 @@ func (ms *MemoryStoreView) EndGC() {
ms.transitionToNoGC()
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error {
type msvGcFinalizer struct {
ms *MemoryStoreView
keepers map[hash.Hash]Chunk
}

func (mgcf msvGcFinalizer) AddChunksToStore(ctx context.Context) (HasManyFunc, error) {
panic("unsupported")
}

func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error {
mgcf.ms.mu.Lock()
defer mgcf.ms.mu.Unlock()
mgcf.ms.storage = &MemoryStorage{rootHash: mgcf.ms.rootHash, data: mgcf.keepers}
mgcf.ms.pending = map[hash.Hash]Chunk{}
return nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) {
if dest != ms {
panic("unsupported")
}
Expand All @@ -366,20 +383,16 @@ LOOP:
for _, h := range hs {
c, err := ms.Get(ctx, h)
if err != nil {
return err
return nil, err
}
keepers[h] = c
}
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
}
}

ms.mu.Lock()
defer ms.mu.Unlock()
ms.storage = &MemoryStorage{rootHash: ms.rootHash, data: keepers}
ms.pending = map[hash.Hash]Chunk{}
return nil
return msvGcFinalizer{ms, keepers}, nil
}

func (ms *MemoryStoreView) Count() (uint32, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ func (s *TestStoreView) EndGC() {
collector.EndGC()
}

func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error {
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok || dest != s {
return ErrUnsupportedOperation
return nil, ErrUnsupportedOperation
}
return collector.MarkAndSweepChunks(ctx, hashes, collector)
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/datas/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type GarbageCollector interface {

// GC traverses the database starting at the Root and removes
// all unreferenced data from persistent storage.
GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error
GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error
}

// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all
Expand Down
4 changes: 2 additions & 2 deletions go/store/datas/database_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,8 +1148,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string, workingse
}

// GC traverses the database starting at the Root and removes all unreferenced data from persistent storage.
func (db *database) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
return db.ValueStore.GC(ctx, oldGenRefs, newGenRefs, safepointF)
func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointF)
}

func (db *database) tryCommitChunks(ctx context.Context, newRootHash hash.Hash, currentRootHash hash.Hash) error {
Expand Down
4 changes: 3 additions & 1 deletion go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord)
}

func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
return false, errors.New("Archive chunk source does not support getManyCompressed")
return acs.getMany(ctx, eg, reqs, func(ctx context.Context, chk *chunks.Chunk) {
found(ctx, ChunkToCompressedChunk(*chk))
}, stats)
}

func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
Expand Down
38 changes: 38 additions & 0 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
var _ chunks.ChunkStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ chunks.TableFileStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ chunks.ChunkStoreGarbageCollector = (*GenerationalNBS)(nil)

type GenerationalNBS struct {
oldGen *NomsBlockStore
Expand Down Expand Up @@ -492,3 +494,39 @@ func (gcs *GenerationalNBS) Path() (string, bool) {
func (gcs *GenerationalNBS) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) {
return gcs.newGen.UpdateManifest(ctx, updates)
}

func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool) error {
return gcs.newGen.BeginGC(keeper)
}

func (gcs *GenerationalNBS) EndGC() {
gcs.newGen.EndGC()
}

func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest)
}

func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
err := gcs.newGen.IterateAllChunks(ctx, cb)
if err != nil {
return err
}
err = gcs.oldGen.IterateAllChunks(ctx, cb)
if err != nil {
return err
}
return nil
}

func (gcs *GenerationalNBS) Count() (uint32, error) {
newGenCnt, err := gcs.newGen.Count()
if err != nil {
return 0, err
}
oldGenCnt, err := gcs.oldGen.Count()
if err != nil {
return 0, err
}
return newGenCnt + oldGenCnt, nil
}
2 changes: 1 addition & 1 deletion go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
}

func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) error {
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest)
}

Expand Down
Loading

0 comments on commit d58a69a

Please sign in to comment.