Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3895 allow xattrs to be deleted at the same time as they are updated #6791

Merged
merged 14 commits into from
Jun 10, 2024
284 changes: 231 additions & 53 deletions base/bucket_gocb_test.go

Large diffs are not rendered by default.

100 changes: 74 additions & 26 deletions base/collection_xattr.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *Collection) DeleteWithXattrs(ctx context.Context, k string, xattrKeys [
}

func (c *Collection) GetXattrs(ctx context.Context, k string, xattrKeys []string) (xattrs map[string][]byte, casOut uint64, err error) {
_, xattrs, casOut, err = c.subdocGetBodyAndXattrs(ctx, k, xattrKeys, false)
_, _, xattrs, casOut, err = c.subdocGetBodyAndXattrs(ctx, k, xattrKeys, false)
return xattrs, casOut, err
}

Expand All @@ -95,7 +95,8 @@ func (c *Collection) WriteSubDoc(ctx context.Context, k string, subdocKey string
}

func (c *Collection) GetWithXattrs(ctx context.Context, k string, xattrKeys []string) ([]byte, map[string][]byte, uint64, error) {
return c.subdocGetBodyAndXattrs(ctx, k, xattrKeys, true)
_, body, xattrs, cas, err := c.subdocGetBodyAndXattrs(ctx, k, xattrKeys, true)
return body, xattrs, cas, err
}

func (c *Collection) SetXattrs(ctx context.Context, k string, xattrs map[string][]byte) (casOut uint64, err error) {
Expand Down Expand Up @@ -188,13 +189,13 @@ func (c *Collection) SubdocWrite(ctx context.Context, k string, subdocKey string
return casOut, err
}

// subdocGetBodyAndXattr retrieves the document body and xattrs in a single LookupIn subdoc operation. Does not require both to exist.
func (c *Collection) subdocGetBodyAndXattrs(ctx context.Context, k string, xattrKeys []string, fetchBody bool) (rawBody []byte, xattrs map[string][]byte, cas uint64, err error) {
// subdocGetBodyAndXattrs retrieves the document body and xattrs in a single LookupIn subdoc operation. Does not require both to exist.
func (c *Collection) subdocGetBodyAndXattrs(ctx context.Context, k string, xattrKeys []string, fetchBody bool) (isTombstone bool, rawBody []byte, xattrs map[string][]byte, cas uint64, err error) {
torcolvin marked this conversation as resolved.
Show resolved Hide resolved
xattrKey2 := ""
// Backward compatibility for one system xattr and one user xattr support.
if !c.IsSupported(sgbucket.BucketStoreFeatureMultiXattrSubdocOperations) {
if len(xattrKeys) > 2 {
return nil, nil, 0, fmt.Errorf("subdocGetBodyAndXattrs: more than 2 xattrKeys %+v not supported in this version of Couchbase Server", xattrKeys)
return false, nil, nil, 0, fmt.Errorf("subdocGetBodyAndXattrs: more than 2 xattrKeys %+v not supported in this version of Couchbase Server", xattrKeys)
}
if len(xattrKeys) == 2 {
xattrKey2 = xattrKeys[1]
Expand Down Expand Up @@ -224,6 +225,8 @@ func (c *Collection) subdocGetBodyAndXattrs(ctx context.Context, k string, xattr
var docContentErr error
if fetchBody {
docContentErr = res.ContentAt(uint(len(xattrKeys)), &rawBody)
// check for tombstone here. Usually rawBody == nil would be a tombstone, with the exception of empty raw binary docs
isTombstone = isKVError(docContentErr, memd.StatusSubDocMultiPathFailureDeleted)
}
cas = uint64(res.Cas())
var xattrErrors []error
Expand Down Expand Up @@ -320,7 +323,7 @@ func (c *Collection) subdocGetBodyAndXattrs(ctx context.Context, k string, xattr
err = pkgerrors.Wrapf(err, "subdocGetBodyAndXattrs %v", UD(k).Redact())
}

return rawBody, xattrs, cas, err
return isTombstone, rawBody, xattrs, cas, err
}

// createTombstone inserts a new server tombstone with associated xattrs. Writes cas and crc32c to the xattr using macro expansion.
Expand All @@ -337,9 +340,9 @@ func (c *Collection) createTombstone(_ context.Context, k string, exp uint32, ca
docFlags = gocb.SubdocDocFlagMkDoc
}

mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs))
for xattrKey, xattrVal := range xattrs {
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xattrVal), UpsertSpecXattr))
mutateOps, err := getUpsertSpecsForXattrs(xattrs)
if err != nil {
return 0, err
}
mutateOps = appendMacroExpansions(mutateOps, opts)
options := &gocb.MutateInOptions{
Expand All @@ -360,9 +363,9 @@ func (c *Collection) insertBodyAndXattrs(_ context.Context, k string, exp uint32
c.Bucket.waitForAvailKvOp()
defer c.Bucket.releaseKvOp()

mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs)+1)
for xattrKey, xv := range xattrs {
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xv), UpsertSpecXattr))
mutateOps, err := getUpsertSpecsForXattrs(xattrs)
if err != nil {
return 0, err
}
mutateOps = append(mutateOps, gocb.ReplaceSpec("", bytesToRawMessage(v), nil))
mutateOps = appendMacroExpansions(mutateOps, opts)
Expand Down Expand Up @@ -428,23 +431,37 @@ func (c *Collection) SubdocSetXattrs(k string, xvs map[string][]byte) (casOut ui

// UpdateXattrs updates the xattrs on an existing document. Writes cas and crc32c to the xattr using macro expansion.
func (c *Collection) UpdateXattrs(ctx context.Context, k string, exp uint32, cas uint64, xattrs map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
return c.updateXattrs(ctx, k, exp, cas, xattrs, nil, opts)
}

func (c *Collection) updateXattrs(ctx context.Context, k string, exp uint32, cas uint64, xattrs map[string][]byte, xattrsToDelete []string, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
if !c.IsSupported(sgbucket.BucketStoreFeatureMultiXattrSubdocOperations) && len(xattrs) >= 2 {
return 0, fmt.Errorf("UpdateXattrs: more than 1 xattr %v not supported in UpdateXattrs in this version of Couchbase Server", xattrs)
}
if cas == 0 && len(xattrsToDelete) > 0 {
return 0, sgbucket.ErrDeleteXattrOnDocumentInsert
}
c.Bucket.waitForAvailKvOp()
defer c.Bucket.releaseKvOp()

mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs))
for xattrKey, xattrVal := range xattrs {
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xattrVal), UpsertSpecXattr))
mutateOps, err := getUpsertSpecsForXattrs(xattrs)
if err != nil {
return 0, err
}
for _, xattrKey := range xattrsToDelete {
if _, ok := xattrs[xattrKey]; ok {
return 0, fmt.Errorf("%s: %w", xattrKey, sgbucket.ErrUpsertAndDeleteSameXattr)
}
mutateOps = append(mutateOps, gocb.RemoveSpec(xattrKey, RemoveSpecXattr))
}
mutateOps = appendMacroExpansions(mutateOps, opts)

options := &gocb.MutateInOptions{
Expiry: CbsExpiryToDuration(exp),
StoreSemantic: gocb.StoreSemanticsUpsert,
StoreSemantic: gocb.StoreSemanticsReplace,
Cas: gocb.Cas(cas),
}

options.Internal.DocFlags = gocb.SubdocDocFlagAccessDeleted
fillMutateInOptions(ctx, options, opts)

Expand All @@ -455,21 +472,28 @@ func (c *Collection) UpdateXattrs(ctx context.Context, k string, exp uint32, cas
return uint64(result.Cas()), nil
}

// updateBodyAndXattr updates the document body and xattrs of an existing document. Writes cas and crc32c to the xattr using macro expansion.
func (c *Collection) updateBodyAndXattrs(ctx context.Context, k string, exp uint32, cas uint64, opts *sgbucket.MutateInOptions, v interface{}, xattrs map[string][]byte) (casOut uint64, err error) {
// updateBodyAndXattrs updates the document body and xattrs of an existing document. Writes cas and crc32c to the xattr using macro expansion.
func (c *Collection) updateBodyAndXattrs(ctx context.Context, k string, exp uint32, cas uint64, opts *sgbucket.MutateInOptions, v interface{}, xattrs map[string][]byte, xattrsToDelete []string) (casOut uint64, err error) {
c.Bucket.waitForAvailKvOp()
defer c.Bucket.releaseKvOp()

mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs)+1)
for xattrKey, xattrVal := range xattrs {
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xattrVal), UpsertSpecXattr))
mutateOps, err := getUpsertSpecsForXattrs(xattrs)
if err != nil {
return 0, err
}
for _, xattrKey := range xattrsToDelete {
if _, ok := xattrs[xattrKey]; ok {
return 0, fmt.Errorf("%s: %w", xattrKey, sgbucket.ErrUpsertAndDeleteSameXattr)
}
mutateOps = append(mutateOps, gocb.RemoveSpec(xattrKey, RemoveSpecXattr))

}
mutateOps = append(mutateOps, gocb.ReplaceSpec("", bytesToRawMessage(v), nil))
mutateOps = appendMacroExpansions(mutateOps, opts)

options := &gocb.MutateInOptions{
Expiry: CbsExpiryToDuration(exp),
StoreSemantic: gocb.StoreSemanticsUpsert,
StoreSemantic: gocb.StoreSemanticsReplace,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above - do the xattrs have to already exist to use StoreSemanticsReplace?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace implies that the body has to exist, it does not care about the presence of the xattr. This is covered by the test TestWriteWithXattrs but you should make sure to confirm.

Cas: gocb.Cas(cas),
}
fillMutateInOptions(ctx, options, opts)
Expand All @@ -481,15 +505,27 @@ func (c *Collection) updateBodyAndXattrs(ctx context.Context, k string, exp uint
}

// updateXattrDeleteBody deletes the document body and updates the xattrs of an existing document. Writes cas and crc32c to the xattr using macro expansion.
func (c *Collection) updateXattrsDeleteBody(_ context.Context, k string, exp uint32, cas uint64, xattrs map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
func (c *Collection) updateXattrsDeleteBody(_ context.Context, k string, exp uint32, cas uint64, xattrs map[string][]byte, xattrsToDelete []string, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
c.Bucket.waitForAvailKvOp()
defer c.Bucket.releaseKvOp()

mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs)+1)
for xattrKey, xattrVal := range xattrs {
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xattrVal), UpsertSpecXattr))
mutateOps, err := getUpsertSpecsForXattrs(xattrs)
if err != nil {
return 0, err
}
if cas == 0 && len(xattrsToDelete) > 0 {
return 0, sgbucket.ErrDeleteXattrOnDocumentInsert
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do this check in other methods that attempt to delete xattrs (like updateXattrs). Is it needed there also?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this question is still unresolved - any details on this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateXattrs is a private function and this is checked in the callers to this function in WriteTombstoneWithXattrs and WriteWithXattrs. UpdateXattrs doesn't allow you to delete xattrs since that's not yet needed. I think this has the potential to go wrong if the code is refactored and I like the idea to put this check in updateXattrs as well, though it will not presently be hit.


for _, xattrKey := range xattrsToDelete {
if _, ok := xattrs[xattrKey]; ok {
return 0, fmt.Errorf("%s: %w", xattrKey, sgbucket.ErrUpsertAndDeleteSameXattr)
}
mutateOps = append(mutateOps, gocb.RemoveSpec(xattrKey, RemoveSpecXattr))

}
mutateOps = append(mutateOps, gocb.RemoveSpec("", nil))

mutateOps = appendMacroExpansions(mutateOps, opts)
options := &gocb.MutateInOptions{
StoreSemantic: gocb.StoreSemanticsReplace,
Expand Down Expand Up @@ -677,3 +713,15 @@ func gocbMutationMacro(meType sgbucket.MacroExpansionType) gocb.MutationMacro {
return gocb.MutationMacroCAS
}
}

// getUpsertSpecsForXattrs returns a slice of gocb.MutateInSpec for the given xattrs, or returns an error if any values are nil.
func getUpsertSpecsForXattrs(xattrs map[string][]byte) ([]gocb.MutateInSpec, error) {
mutateOps := make([]gocb.MutateInSpec, 0, len(xattrs))
for xattrKey, xattrVal := range xattrs {
if xattrVal == nil {
return nil, fmt.Errorf("%s: %w", xattrKey, sgbucket.ErrNilXattrValue)
}
mutateOps = append(mutateOps, gocb.UpsertSpec(xattrKey, bytesToRawMessage(xattrVal), UpsertSpecXattr))
}
return mutateOps, nil
}
Loading
Loading