Skip to content

Commit

Permalink
Merge pull request #6744 from dolthub/aaron/fix-hasCache-dangling-ref…
Browse files Browse the repository at this point in the history
…erences-bug-2

go/store/nbs: store.go: Clean up how we update hasCache so that we only update it after successfully writing the memtable.
  • Loading branch information
reltuk authored Sep 29, 2023
2 parents 6aba08d + d6b8936 commit 2c89075
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 83 deletions.
125 changes: 125 additions & 0 deletions go/libraries/doltcore/doltdb/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package doltdb_test

import (
"context"
"errors"
"os"
"testing"

"github.com/dolthub/go-mysql-server/sql"
Expand All @@ -28,7 +30,13 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)

func TestGarbageCollection(t *testing.T) {
Expand All @@ -40,6 +48,8 @@ func TestGarbageCollection(t *testing.T) {
testGarbageCollection(t, gct)
})
}

t.Run("HasCacheDataCorruption", testGarbageCollectionHasCacheDataCorruptionBugFix)
}

type stage struct {
Expand Down Expand Up @@ -140,3 +150,118 @@ func testGarbageCollection(t *testing.T, test gcTest) {
require.NoError(t, err)
assert.Equal(t, test.expected, actual)
}

// In September 2023, we found a failure to handle the `hasCache` in
// `*NomsBlockStore` appropriately while cleaning up a memtable into which
// dangling references had been written could result in writing chunks to a
// database which referenced non-existant chunks.
//
// The general pattern was to get new chunk addresses into the hasCache, but
// not written to the store, and then to have an incoming chunk add a refenece
// to missing chunk. At that time, we would clear the memtable, since it had
// invalid chunks in it, but we wouldn't purge the hasCache. Later writes which
// attempted to reference the chunks which had made it into the hasCache would
// succeed.
//
// One such concrete pattern for doing this is implemented below. We do:
//
// 1) Put a new chunk to the database -- C1.
//
// 2) Run a GC.
//
// 3) Put a new chunk to the database -- C2.
//
// 4) Call NBS.Commit() with a stale last hash.Hash. This causes us to cache C2
// as present in the store, but it does not get written to disk, because the
// optimistic concurrency control on the value of the current root hash fails.
//
// 5) Put a chunk referencing C1 to the database -- R1.
//
// 5) Call NBS.Commit(). This causes ErrDanglingRef. C1 was written before the
// GC and is no longer in the store. C2 is also cleared from the pending write
// set.
//
// 6) Put a chunk referencing C2 to the database -- R2.
//
// 7) Call NBS.Commit(). This should fail, since R2 references C2 and C2 is not
// in the store. However, C2 is in the cache as a result of step #4, and so
// this does not fail. R2 gets written to disk with a dangling reference to C2.
func testGarbageCollectionHasCacheDataCorruptionBugFix(t *testing.T) {
ctx := context.Background()

d, err := os.MkdirTemp(t.TempDir(), "hascachetest-")
require.NoError(t, err)

ddb, err := doltdb.LoadDoltDB(ctx, types.Format_DOLT, "file://"+d, filesys.LocalFS)
require.NoError(t, err)
defer ddb.Close()

err = ddb.WriteEmptyRepo(ctx, "main", "Aaron Son", "[email protected]")
require.NoError(t, err)

root, err := ddb.NomsRoot(ctx)
require.NoError(t, err)

ns := ddb.NodeStore()

c1 := newIntMap(t, ctx, ns, 1, 1)
_, err = ns.Write(ctx, c1.Node())
require.NoError(t, err)

err = ddb.GC(ctx, nil)
require.NoError(t, err)

c2 := newIntMap(t, ctx, ns, 2, 2)
_, err = ns.Write(ctx, c2.Node())
require.NoError(t, err)

success, err := ddb.CommitRoot(ctx, c2.HashOf(), c2.HashOf())
require.NoError(t, err)
require.False(t, success, "committing the root with a last hash which does not match the current root must fail")

r1 := newAddrMap(t, ctx, ns, "r1", c1.HashOf())
_, err = ns.Write(ctx, r1.Node())
require.NoError(t, err)

success, err = ddb.CommitRoot(ctx, root, root)
require.True(t, errors.Is(err, nbs.ErrDanglingRef), "committing a reference to just-collected c1 must fail with ErrDanglingRef")

r2 := newAddrMap(t, ctx, ns, "r2", c2.HashOf())
_, err = ns.Write(ctx, r2.Node())
require.NoError(t, err)

success, err = ddb.CommitRoot(ctx, root, root)
require.True(t, errors.Is(err, nbs.ErrDanglingRef), "committing a reference to c2, which was erased with the ErrDanglingRef above, must also fail with ErrDanglingRef")
}

func newIntMap(t *testing.T, ctx context.Context, ns tree.NodeStore, k, v int8) prolly.Map {
desc := val.NewTupleDescriptor(val.Type{
Enc: val.Int8Enc,
Nullable: false,
})

tb := val.NewTupleBuilder(desc)
tb.PutInt8(0, k)
keyTuple := tb.Build(ns.Pool())

tb.PutInt8(0, v)
valueTuple := tb.Build(ns.Pool())

m, err := prolly.NewMapFromTuples(ctx, ns, desc, desc, keyTuple, valueTuple)
require.NoError(t, err)
return m
}

func newAddrMap(t *testing.T, ctx context.Context, ns tree.NodeStore, key string, h hash.Hash) prolly.AddressMap {
m, err := prolly.NewEmptyAddressMap(ns)
require.NoError(t, err)

editor := m.Editor()
err = editor.Add(ctx, key, h)
require.NoError(t, err)

m, err = editor.Flush(ctx)
require.NoError(t, err)

return m
}
13 changes: 0 additions & 13 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package nbs

import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
Expand Down Expand Up @@ -154,18 +153,6 @@ func (gcs *GenerationalNBS) hasMany(recs []hasRecord) (absent hash.HashSet, err
return gcs.oldGen.hasMany(recs)
}

func (gcs *GenerationalNBS) errorIfDangling(ctx context.Context, addrs hash.HashSet) error {
absent, err := gcs.HasMany(ctx, addrs)
if err != nil {
return err
}
if len(absent) != 0 {
s := absent.String()
return fmt.Errorf("Found dangling references to %s", s)
}
return nil
}

// Put caches c in the ChunkSource. Upon return, c must be visible to
// subsequent Get and Has calls, but must not be persistent until a call
// to Flush(). Put may be called concurrently with other calls to Put(),
Expand Down
106 changes: 40 additions & 66 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,36 @@ func (nbs *NomsBlockStore) putChunk(ctx context.Context, c chunks.Chunk, getAddr
return nil
}

// When we have chunks with dangling references in our memtable, we have to
// throw away the entire memtable.
func (nbs *NomsBlockStore) handlePossibleDanglingRefError(err error) {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
}

// Writes to a Dolt database typically involve mutating some tuple maps and
// then mutating the top-level address map which points to all the branch heads
// and working sets. Each internal node of the address map can have many
// references and many of them typically change quite slowly. We keep a cache
// of recently written references which we know are in the database so that we
// don't have to check the table file indexes for these chunks when we write
// references to them again in the near future.
//
// This cache needs to be treated in a principled manner. The integrity checks
// that we run against the a set of chunks we are attempting to write consider
// the to-be-written chunks themselves as also being in the database. This is
// correct, assuming that all the chunks are written at the same time. However,
// we should not add the results of those presence checks to the cache until
// those chunks actually land in the database.
func (nbs *NomsBlockStore) addPendingRefsToHasCache() {
for _, e := range nbs.mt.pendingRefs {
if e.has {
nbs.hasCache.Add(*e.a, struct{}{})
}
}
}

func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs hash.HashSet, checker refCheck) (bool, error) {
if err := ctx.Err(); err != nil {
return false, err
Expand All @@ -725,11 +755,10 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
if addChunkRes == chunkNotAdded {
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
if err != nil {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
nbs.handlePossibleDanglingRefError(err)
return false, err
}
nbs.addPendingRefsToHasCache()
nbs.tables = ts
nbs.mt = newMemTable(nbs.mtSize)
addChunkRes = nbs.mt.addChunk(a, ch.Data())
Expand Down Expand Up @@ -757,7 +786,6 @@ type refCheck func(reqs []hasRecord) (hash.HashSet, error)
func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) error {
if !root.IsEmpty() {
a := addr(root)
// We use |Get| here, since it updates recency of the entry.
if _, ok := nbs.hasCache.Get(a); !ok {
var hr [1]hasRecord
hr[0].a = &a
Expand All @@ -771,32 +799,6 @@ func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) err
nbs.hasCache.Add(a, struct{}{})
}
}

if nbs.mt == nil || nbs.mt.pendingRefs == nil {
return nil // no pending refs to check
}

for i := range nbs.mt.pendingRefs {
// All of these are going to be |Add|ed after the call. We use
// |Contains| to check here so the frequency count only gets
// bumped once.
if nbs.hasCache.Contains(*nbs.mt.pendingRefs[i].a) {
nbs.mt.pendingRefs[i].has = true
}
}

sort.Sort(hasRecordByPrefix(nbs.mt.pendingRefs))
absent, err := checker(nbs.mt.pendingRefs)
if err != nil {
return err
} else if absent.Size() > 0 {
return fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
}

for _, e := range nbs.mt.pendingRefs {
nbs.hasCache.Add(*e.a, struct{}{})
}

return nil
}

Expand Down Expand Up @@ -1130,39 +1132,6 @@ func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash,
return true, nil
}

// check for dangling references in |nbs.mt|
if err = nbs.errorIfDangling(current, checker); err != nil {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
return false, err
}

// This is unfortunate. We want to serialize commits to the same store
// so that we avoid writing a bunch of unreachable small tables which result
// from optimistic lock failures. However, this means that the time to
// write tables is included in "commit" time and if all commits are
// serialized, it means a lot more waiting.
// "non-trivial" tables are persisted here, outside of the commit-lock.
// all other tables are persisted in updateManifest()
if nbs.mt != nil {
cnt, err := nbs.mt.count()
if err != nil {
return false, err
}

if cnt > preflushChunkCount {
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
if err != nil {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
return false, err
}
nbs.tables, nbs.mt = ts, nil
}
}

nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()
Expand Down Expand Up @@ -1233,11 +1202,10 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
if cnt > 0 {
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
if err != nil {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
nbs.handlePossibleDanglingRefError(err)
return err
}
nbs.addPendingRefsToHasCache()
nbs.tables, nbs.mt = ts, nil
}
}
Expand All @@ -1250,6 +1218,12 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
return errOptimisticLockFailedTables
}

// check for dangling reference to the new root
if err = nbs.errorIfDangling(current, checker); err != nil {
nbs.handlePossibleDanglingRefError(err)
return err
}

specs, err := nbs.tables.toSpecs()
if err != nil {
return err
Expand Down
4 changes: 0 additions & 4 deletions go/store/nbs/table_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,6 @@ func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, h
return tableSet{}, fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
}

for _, e := range mt.pendingRefs {
hasCache.Add(*e.a, struct{}{})
}

cs, err := ts.p.Persist(ctx, mt, ts, stats)
if err != nil {
return tableSet{}, err
Expand Down

0 comments on commit 2c89075

Please sign in to comment.