Skip to content

Commit

Permalink
CBG-3895 allow xattrs to be deleted at the same time as mutated
Browse files Browse the repository at this point in the history
- Update StoreSemantic which can only be Replace if xattrs are being deleted, it has to be Insert if the xattrs are being updated on a tombstone
  • Loading branch information
torcolvin committed May 14, 2024
1 parent 649ce2b commit 215ecf8
Show file tree
Hide file tree
Showing 16 changed files with 942 additions and 178 deletions.
330 changes: 277 additions & 53 deletions base/bucket_gocb_test.go

Large diffs are not rendered by default.

121 changes: 97 additions & 24 deletions base/collection_xattr.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *Collection) DeleteWithXattrs(ctx context.Context, k string, xattrKeys [
}

func (c *Collection) GetXattrs(ctx context.Context, k string, xattrKeys []string) (xattrs map[string][]byte, casOut uint64, err error) {
_, xattrs, casOut, err = c.subdocGetBodyAndXattrs(ctx, k, xattrKeys, false)
_, _, xattrs, casOut, err = c.subdocGetBodyAndXattrs(ctx, k, xattrKeys, false)
return xattrs, casOut, err
}

Expand All @@ -95,7 +95,8 @@ func (c *Collection) WriteSubDoc(ctx context.Context, k string, subdocKey string
}

func (c *Collection) GetWithXattrs(ctx context.Context, k string, xattrKeys []string) ([]byte, map[string][]byte, uint64, error) {
return c.subdocGetBodyAndXattrs(ctx, k, xattrKeys, true)
_, body, xattrs, cas, err := c.subdocGetBodyAndXattrs(ctx, k, xattrKeys, true)
return body, xattrs, cas, err
}

func (c *Collection) SetXattrs(ctx context.Context, k string, xattrs map[string][]byte) (casOut uint64, err error) {
Expand Down Expand Up @@ -189,12 +190,12 @@ func (c *Collection) SubdocWrite(ctx context.Context, k string, subdocKey string
}

// subdocGetBodyAndXattr retrieves the document body and xattrs in a single LookupIn subdoc operation. Does not require both to exist.
func (c *Collection) subdocGetBodyAndXattrs(ctx context.Context, k string, xattrKeys []string, fetchBody bool) (rawBody []byte, xattrs map[string][]byte, cas uint64, err error) {
func (c *Collection) subdocGetBodyAndXattrs(ctx context.Context, k string, xattrKeys []string, fetchBody bool) (isTombstone bool, rawBody []byte, xattrs map[string][]byte, cas uint64, err error) {
xattrKey2 := ""
// Backward compatibility for one system xattr and one user xattr support.
if !c.IsSupported(sgbucket.BucketStoreFeatureMultiXattrSubdocOperations) {
if len(xattrKeys) > 2 {
return nil, nil, 0, fmt.Errorf("subdocGetBodyAndXattrs: more than 2 xattrKeys %+v not supported in this version of Couchbase Server", xattrKeys)
return false, nil, nil, 0, fmt.Errorf("subdocGetBodyAndXattrs: more than 2 xattrKeys %+v not supported in this version of Couchbase Server", xattrKeys)
}
if len(xattrKeys) == 2 {
xattrKey2 = xattrKeys[1]
Expand Down Expand Up @@ -224,6 +225,7 @@ func (c *Collection) subdocGetBodyAndXattrs(ctx context.Context, k string, xattr
var docContentErr error
if fetchBody {
docContentErr = res.ContentAt(uint(len(xattrKeys)), &rawBody)
isTombstone = isKVError(docContentErr, memd.StatusSubDocMultiPathFailureDeleted)
}
cas = uint64(res.Cas())
var xattrErrors []error
Expand Down Expand Up @@ -320,7 +322,7 @@ func (c *Collection) subdocGetBodyAndXattrs(ctx context.Context, k string, xattr
err = pkgerrors.Wrapf(err, "subdocGetBodyAndXattrs %v", UD(k).Redact())
}

return rawBody, xattrs, cas, err
return isTombstone, rawBody, xattrs, cas, err
}

// createTombstone inserts a new server tombstone with associated xattrs. Writes cas and crc32c to the xattr using macro expansion.
Expand All @@ -337,9 +339,9 @@ func (c *Collection) createTombstone(_ context.Context, k string, exp uint32, ca
docFlags = gocb.SubdocDocFlagMkDoc
}

mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs))
for xattrKey, xattrVal := range xattrs {
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xattrVal), UpsertSpecXattr))
mutateOps, err := getUpsertSpecs(xattrs)
if err != nil {
return 0, err
}
mutateOps = appendMacroExpansions(mutateOps, opts)
options := &gocb.MutateInOptions{
Expand All @@ -360,9 +362,31 @@ func (c *Collection) insertBodyAndXattrs(_ context.Context, k string, exp uint32
c.Bucket.waitForAvailKvOp()
defer c.Bucket.releaseKvOp()

mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs)+1)
for xattrKey, xv := range xattrs {
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xv), UpsertSpecXattr))
mutateOps, err := getUpsertSpecs(xattrs)
if err != nil {
return 0, err
}
mutateOps = append(mutateOps, gocb.ReplaceSpec("", bytesToRawMessage(v), nil))
mutateOps = appendMacroExpansions(mutateOps, opts)
options := &gocb.MutateInOptions{
Expiry: CbsExpiryToDuration(exp),
StoreSemantic: gocb.StoreSemanticsInsert,
}
result, mutateErr := c.Collection.MutateIn(k, mutateOps, options)
if mutateErr != nil {
return 0, mutateErr
}
return uint64(result.Cas()), nil
}

// ressurectWithBodyAndXattrs inserts a document and associated xattrs in a single mutateIn operation.
func (c *Collection) resurrectWithBodyAndXattrs(_ context.Context, k string, exp uint32, v interface{}, xattrs map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
c.Bucket.waitForAvailKvOp()
defer c.Bucket.releaseKvOp()

mutateOps, err := getUpsertSpecs(xattrs)
if err != nil {
return 0, err
}
mutateOps = append(mutateOps, gocb.ReplaceSpec("", bytesToRawMessage(v), nil))
mutateOps = appendMacroExpansions(mutateOps, opts)
Expand Down Expand Up @@ -428,68 +452,105 @@ func (c *Collection) SubdocSetXattrs(k string, xvs map[string][]byte) (casOut ui

// UpdateXattrs updates the xattrs on an existing document. Writes cas and crc32c to the xattr using macro expansion.
func (c *Collection) UpdateXattrs(ctx context.Context, k string, exp uint32, cas uint64, xattrs map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
return c.updateXattrs(ctx, k, exp, cas, xattrs, nil, false, opts)
}

func (c *Collection) updateXattrs(ctx context.Context, k string, exp uint32, cas uint64, xattrs map[string][]byte, xattrsToDelete []string, tombstoneToTombstone bool, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
if !c.IsSupported(sgbucket.BucketStoreFeatureMultiXattrSubdocOperations) && len(xattrs) >= 2 {
return 0, fmt.Errorf("UpdateXattrs: more than 1 xattr %v not supported in UpdateXattrs in this version of Couchbase Server", xattrs)
}
c.Bucket.waitForAvailKvOp()
defer c.Bucket.releaseKvOp()

mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs))
for xattrKey, xattrVal := range xattrs {
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xattrVal), UpsertSpecXattr))
mutateOps, err := getUpsertSpecs(xattrs)
if err != nil {
return 0, err
}
for _, xattrKey := range xattrsToDelete {
if _, ok := xattrs[xattrKey]; ok {
return 0, fmt.Errorf("%s: %w", xattrKey, sgbucket.ErrUpsertAndDeleteSameXattr)
}
mutateOps = append(mutateOps, gocb.RemoveSpec(xattrKey, RemoveSpecXattr))
}
mutateOps = appendMacroExpansions(mutateOps, opts)

options := &gocb.MutateInOptions{
Expiry: CbsExpiryToDuration(exp),
StoreSemantic: gocb.StoreSemanticsUpsert,
StoreSemantic: gocb.StoreSemanticsReplace,
Cas: gocb.Cas(cas),
}

options.Internal.DocFlags = gocb.SubdocDocFlagAccessDeleted
fillMutateInOptions(ctx, options, opts)

result, mutateErr := c.Collection.MutateIn(k, mutateOps, options)
if mutateErr != nil {
if isKVError(mutateErr, memd.StatusSubDocBadMulti) {
mutateErr = fmt.Errorf("%w: %w", ErrXattrNotFound, mutateErr)
}
return 0, mutateErr
}
return uint64(result.Cas()), nil
}

// updateBodyAndXattr updates the document body and xattrs of an existing document. Writes cas and crc32c to the xattr using macro expansion.
func (c *Collection) updateBodyAndXattrs(ctx context.Context, k string, exp uint32, cas uint64, opts *sgbucket.MutateInOptions, v interface{}, xattrs map[string][]byte) (casOut uint64, err error) {
func (c *Collection) updateBodyAndXattrs(ctx context.Context, k string, exp uint32, cas uint64, opts *sgbucket.MutateInOptions, v interface{}, xattrs map[string][]byte, xattrsToDelete []string) (casOut uint64, err error) {
c.Bucket.waitForAvailKvOp()
defer c.Bucket.releaseKvOp()

mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs)+1)
for xattrKey, xattrVal := range xattrs {
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xattrVal), UpsertSpecXattr))
mutateOps, err := getUpsertSpecs(xattrs)
if err != nil {
return 0, err
}
for _, xattrKey := range xattrsToDelete {
if _, ok := xattrs[xattrKey]; ok {
return 0, fmt.Errorf("%s: %w", xattrKey, sgbucket.ErrUpsertAndDeleteSameXattr)
}
mutateOps = append(mutateOps, gocb.RemoveSpec(xattrKey, RemoveSpecXattr))

}
mutateOps = append(mutateOps, gocb.ReplaceSpec("", bytesToRawMessage(v), nil))
mutateOps = appendMacroExpansions(mutateOps, opts)

options := &gocb.MutateInOptions{
Expiry: CbsExpiryToDuration(exp),
StoreSemantic: gocb.StoreSemanticsUpsert,
StoreSemantic: gocb.StoreSemanticsReplace,
Cas: gocb.Cas(cas),
}
fillMutateInOptions(ctx, options, opts)
result, mutateErr := c.Collection.MutateIn(k, mutateOps, options)
if mutateErr != nil {
if errors.Is(mutateErr, gocb.ErrDocumentNotFound) {
casMismatchErr := sgbucket.CasMismatchErr{Actual: 0, Expected: cas}
mutateErr = fmt.Errorf("%w, gocb err: %w", casMismatchErr, mutateErr)
}
return 0, mutateErr
}
return uint64(result.Cas()), nil
}

// updateXattrDeleteBody deletes the document body and updates the xattrs of an existing document. Writes cas and crc32c to the xattr using macro expansion.
func (c *Collection) updateXattrsDeleteBody(_ context.Context, k string, exp uint32, cas uint64, xattrs map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
func (c *Collection) updateXattrsDeleteBody(_ context.Context, k string, exp uint32, cas uint64, xattrs map[string][]byte, xattrsToDelete []string, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
c.Bucket.waitForAvailKvOp()
defer c.Bucket.releaseKvOp()

mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs)+1)
for xattrKey, xattrVal := range xattrs {
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xattrVal), UpsertSpecXattr))
mutateOps, err := getUpsertSpecs(xattrs)
if err != nil {
return 0, err
}
if cas == 0 && len(xattrsToDelete) > 0 {
return 0, sgbucket.ErrDeleteXattrOnDocumentInsert
}

for _, xattrKey := range xattrsToDelete {
if _, ok := xattrs[xattrKey]; ok {
return 0, fmt.Errorf("%s: %w", xattrKey, sgbucket.ErrUpsertAndDeleteSameXattr)
}
mutateOps = append(mutateOps, gocb.RemoveSpec(xattrKey, RemoveSpecXattr))

}
mutateOps = append(mutateOps, gocb.RemoveSpec("", nil))

mutateOps = appendMacroExpansions(mutateOps, opts)
options := &gocb.MutateInOptions{
StoreSemantic: gocb.StoreSemanticsReplace,
Expand Down Expand Up @@ -677,3 +738,15 @@ func gocbMutationMacro(meType sgbucket.MacroExpansionType) gocb.MutationMacro {
return gocb.MutationMacroCAS
}
}

// getUpsertSpecs returns a slice of gocb.MutateInSpec for the given xattrs, or returns an error if any values are nil.
func getUpsertSpecs(xattrs map[string][]byte) ([]gocb.MutateInSpec, error) {
mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs))
for xattrKey, xattrVal := range xattrs {
if xattrVal == nil {
return nil, fmt.Errorf("%s: %w", xattrKey, sgbucket.ErrNilXattrValue)
}
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xattrVal), UpsertSpecXattr))
}
return mutateOps, nil
}
Loading

0 comments on commit 215ecf8

Please sign in to comment.