diff --git a/db/util_testing.go b/db/util_testing.go index c392fa9962..68acb33268 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -240,6 +240,67 @@ WHERE META(ks).xattrs._sync.sequence >= 0 return purgedDocCount, nil } +// emptyBucketUsingChannelIndex ensures all docs are cleared (using Channel index) for the given bucket. Workes similarly to emptyAllDocsIndex but is used when AllDocs index isn't present +func emptyBucketUsingChannelIndex(ctx context.Context, dataStore sgbucket.DataStore, tbp *base.TestBucketPool) (numCompacted int, err error) { + purgedDocCount := 0 + purgeBody := Body{"_purged": true} + + n1qlStore, ok := base.AsN1QLStore(dataStore) + if !ok { + return 0, fmt.Errorf("bucket was not a n1ql store") + } + + // A stripped down version of db.Compact() that works on AllDocs instead of tombstones + statement := `SELECT [op.name, META(ks).xattrs._sync.sequence][1] AS seq, + META(ks).xattrs._sync.rev AS rev, + META(ks).xattrs._sync.flags AS flags, + META(ks).id AS id +FROM ` + base.KeyspaceQueryToken + ` AS ks USE INDEX (sg_channels_x1) +UNNEST OBJECT_PAIRS(META(ks).xattrs._sync.channels) AS op +WHERE ([op.name, LEAST(META(ks).xattrs._sync.sequence, op.val.seq), +IFMISSING(op.val.rev,NULL),IFMISSING(op.val.del,NULL)] BETWEEN ["", 0] AND ["*",9223372036854775807])` + + results, err := n1qlStore.Query(statement, nil, base.RequestPlus, true) + base.InfofCtx(ctx, base.KeyAll, "emptyBucketUsingChannelIndex failed to remove allDocsIndex %+v", err) + if err != nil { + return 0, err + } + + var tombstonesRow QueryIdRow + for results.Next(&tombstonesRow) { + // First, attempt to purge. + var purgeErr error + if base.TestUseXattrs() { + purgeErr = dataStore.DeleteWithXattr(tombstonesRow.Id, base.SyncXattrName) + } else { + purgeErr = dataStore.Delete(tombstonesRow.Id) + } + + if base.IsKeyNotFoundError(dataStore, purgeErr) { + // If key no longer exists, need to add and remove to trigger removal from view + _, addErr := dataStore.Add(tombstonesRow.Id, 0, purgeBody) + if addErr != nil { + tbp.Logf(ctx, "Error compacting key %s (add) - will not be compacted. %v", tombstonesRow.Id, addErr) + continue + } + + if delErr := dataStore.Delete(tombstonesRow.Id); delErr != nil { + tbp.Logf(ctx, "Error compacting key %s (delete) - will not be compacted. %v", tombstonesRow.Id, delErr) + } + purgedDocCount++ + } else if purgeErr != nil { + tbp.Logf(ctx, "Error compacting key %s (purge) - will not be compacted. %v", tombstonesRow.Id, purgeErr) + } + } + err = results.Close() + if err != nil { + return 0, err + } + + base.InfofCtx(ctx, base.KeyAll, "emptyBucketUsingChannelIndex Finished compaction ... Total docs purged: %d", purgedDocCount) + return purgedDocCount, nil +} + // viewsAndGSIBucketReadier empties the bucket, initializes Views, and waits until GSI indexes are empty. It is run asynchronously as soon as a test is finished with a bucket. var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Context, b base.Bucket, tbp *base.TestBucketPool) error { if base.TestsDisableGSI() { @@ -267,7 +328,12 @@ var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Contex } if _, err := emptyAllDocsIndex(ctx, dataStore, tbp); err != nil { base.InfofCtx(ctx, base.KeyAll, "emptyAllDocsIndex error %+v", err) + return err + } + // + if _, err := emptyBucketUsingChannelIndex(ctx, dataStore, tbp); err != nil { + base.InfofCtx(ctx, base.KeyAll, "emptyBucketUsingChannelIndex error %+v", err) return err }