From d3b758c959abb1df681eedbc188761706e8e1432 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com> Date: Fri, 9 Feb 2024 15:00:50 +0000 Subject: [PATCH] CBG-3725: [3.1.4 backport] Import DCP rollback unsuccessful in data migration cases (#6676) * CBG-3724 ensure import rollback works correctly (#6657) * CBG-3724 ensure import rollback works correctly Major fix: pass a function to call cbgt.JANITOR_ROLLBACK_PINDEX when creating a pindex. This is necessary to ensure cbgt.Mgr will actually generate the index. - fix the TestImportRollback test which was giving a false pass before because the cbgt.Mgr was getting a kick op when the nodes were rebalanced. In order for rollback to work, the cbgt.Dest RollbackEx function has to force cgbt.Mgr to actually update the PIndex. A cbgt.JANITOR_ROLLBACK_PINDEX op will force the recreation of a PIndex. This test was broken before adding code to it. - Enhance the TestImportRollback to detect a rollback with invalid FailoverLogs. This simulates a situation where the checkpoints have been migrated from a separate bucket. - modify TestContinuousRollback to also test with bad vbuuid FailoverEntry. This test actually would pass, because Sync Gateway, unlike cbgt, did not pass DcpStreamAddFlagStrictVBUUID. DcpStreamAddFlagStrictVBUUID forces a rollback in the case where StartSeqNo=0 vbuuid=invalid whereas without the flag it is treated like StartSeqNo=0 vbuuid=0. Changed the code to match cbgt behavior adding DcpStreamAddFlagStrictVBUUID. - Added a separate flag in DCPClient to detect active vBuckets, which are the number of vBuckets that actually have a connection, and are not stuck in a connecting state. This differs from the existing activeVbuckets which is set from the time of NewDCPClient to when the streams end. In a continuous feed, the streams should never end. Cleanup: - move RollbackEx function from DCPDest to DCPCommon. DCPCommon is an embedded struct in DCPDest. - panic if DCPCommon.Rollback is called. This should never happen given the design of cbgt.Dest, but if it did, the rollback would not work. This is different from the old behavior where it would not rollback and potentially enter a rollback loop if this function was called. - remove maxVbNo from NewDCPDest in effort to shorten the arguments since the flag can be inferred from base.Bucket. - Create some public structs to make tests easier: - ShardedImportDCPMetadata - CBGCfgIndexDefs, CBGTIndexNodeDefsKnown, CBGTIndexNodeDefsWanted, CBGTCfgPlanPIndexes * work around MB-60564 * Change message * Make remove raceproof and improve comment * Reduce scope of changes for easier backport * Address comments to make code simpler * updates to remove problematic code * Update base/dcp_dest.go Co-authored-by: Tor Colvin --------- Co-authored-by: Tor Colvin --- base/dcp_common.go | 20 ++--- base/dcp_dest.go | 17 +++- base/dcp_receiver.go | 135 ++--------------------------- base/dcp_sharded.go | 15 +++- db/attachment_compaction_test.go | 1 + db/dcp_sharded_upgrade_test.go | 8 +- db/import_pindex.go | 16 ++-- rest/importtest/import_test.go | 144 ++++++++++++++----------------- 8 files changed, 119 insertions(+), 237 deletions(-) diff --git a/base/dcp_common.go b/base/dcp_common.go index 8f3954f0eb..ea3e7b3b14 100644 --- a/base/dcp_common.go +++ b/base/dcp_common.go @@ -84,6 +84,7 @@ type DCPCommon struct { checkpointPrefix string // DCP checkpoint key prefix } +// NewDCPCommon creates a new DCPCommon which manages updates coming from a cbgt-based DCP feed. The callback function will receive events from a DCP feed. The bucket is the gocb bucket to stream events from. It stores checkpoints in the metaStore collection prefixes from metaKeys + checkpointPrefix. The feed name will start with feedID and DCPCommon will add unique string. Specific stats for DCP are stored in expvars rather than SgwStats. The janitorRollback function is supplied by the global cbgt.PIndexImplType.New function, for initial opening of a partition index, and cbgt.PIndexImplType.OpenUsing for reopening of a partition index. The rollback function provides a way to pass cbgt.JANITOR_ROLLBACK_PINDEX to cbgt.Mgr and is supplied. func NewDCPCommon(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, bucket Bucket, metaStore DataStore, maxVbNo uint16, persistCheckpoints bool, dbStats *expvar.Map, feedID, checkpointPrefix string, metaKeys *MetadataKeys) (*DCPCommon, error) { newBackfillStatus := backfillStatus{ @@ -184,23 +185,14 @@ func (c *DCPCommon) getMetaData(vbucketId uint16) ( return value, lastSeq, nil } -// RollbackEx should be called by cbdatasource - Rollback required to maintain the interface. In the event -// it's called, logs warning and does a hard reset on metadata for the vbucket. Returns error if metadata -// persistence fails -func (c *DCPCommon) rollback(vbucketId uint16, rollbackSeq uint64) error { - WarnfCtx(c.loggingCtx, "DCP Rollback request. Expected RollbackEx call - resetting vbucket %d to 0.", vbucketId) - c.dbStatsExpvars.Add("dcp_rollback_count", 1) - c.updateSeq(vbucketId, 0, false) - err := c.setMetaData(vbucketId, nil, true) - return err -} - -// RollbackEx includes the vbucketUUID needed to reset the metadata correctly -func (c *DCPCommon) rollbackEx(vbucketId uint16, vbucketUUID uint64, rollbackSeq uint64, rollbackMetaData []byte) error { +// rollbackEx is called when a DCP stream issues a rollback. The metadata persisted for a given uuid and sequence number and then cbgt.Mgr JANITOR_ROLLBACK_PINDEX is issued via janitorRollback function. +func (c *DCPCommon) rollbackEx(vbucketId uint16, vbucketUUID uint64, rollbackSeq uint64, rollbackMetaData []byte, janitorRollback func()) error { WarnfCtx(c.loggingCtx, "DCP RollbackEx request - rolling back DCP feed for: vbucketId: %d, rollbackSeq: %x.", vbucketId, rollbackSeq) c.dbStatsExpvars.Add("dcp_rollback_count", 1) c.updateSeq(vbucketId, rollbackSeq, false) err := c.setMetaData(vbucketId, rollbackMetaData, true) + // if we fail to persist the metadata, we still want to rollback to keep retrying to reconnect. Returning the error will log in cbgt. + janitorRollback() return err } @@ -228,7 +220,7 @@ func (c *DCPCommon) loadCheckpoint(vbNo uint16) (vbMetadata []byte, snapshotStar } } - var snapshotMetadata cbdatasource.VBucketMetaData + var snapshotMetadata ShardedImportDCPMetadata unmarshalErr := JSONUnmarshal(rawValue, &snapshotMetadata) if unmarshalErr != nil { return []byte{}, 0, 0, err diff --git a/base/dcp_dest.go b/base/dcp_dest.go index f9d10a56c5..d1acbcc895 100644 --- a/base/dcp_dest.go +++ b/base/dcp_dest.go @@ -64,10 +64,12 @@ type DCPDest struct { stats *expvar.Map // DCP feed stats (rollback, backfill) partitionCountStat *SgwIntStat // Stat for partition count. Stored outside the DCP feed stats map metaInitComplete []bool // Whether metadata initialization has been completed, per vbNo + janitorRollback func() // This function will trigger a janitor_pindex_rollback } +// NewDCPDest creates a new DCPDest which manages updates coming from a cbgt-based DCP feed. The callback function will receive events from a DCP feed. The bucket is the gocb bucket to stream events from. It optionally stores checkpoints in the _default._default collection if persistentCheckpoints is true with prefixes from metaKeys + checkpointPrefix. The feed name will start with feedID have a unique string appended. Specific stats for DCP are stored in expvars rather than SgwStats, except for importPartitionStat representing the number of import partitions. Each import partition will have a DCPDest object. The rollback function is supplied by the global cbgt.PIndexImplType.New function, for initial opening of a partition index, and cbgt.PIndexImplType.OpenUsing for reopening of a partition index. The rollback function provides a way to pass cbgt.JANITOR_ROLLBACK_PINDEX to cbgt.Mgr. func NewDCPDest(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, bucket Bucket, maxVbNo uint16, persistCheckpoints bool, - dcpStats *expvar.Map, feedID string, importPartitionStat *SgwIntStat, checkpointPrefix string, metaKeys *MetadataKeys) (SGDest, context.Context, error) { + dcpStats *expvar.Map, feedID string, importPartitionStat *SgwIntStat, checkpointPrefix string, metaKeys *MetadataKeys, rollback func()) (SGDest, context.Context, error) { // TODO: Metadata store? metadataStore := bucket.DefaultDataStore() @@ -81,6 +83,7 @@ func NewDCPDest(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, bu stats: dcpStats, partitionCountStat: importPartitionStat, metaInitComplete: make([]bool, maxVbNo), + janitorRollback: rollback, } if d.partitionCountStat != nil { @@ -212,13 +215,21 @@ func (d *DCPDest) OpaqueSet(partition string, value []byte) error { return nil } +// Rollback is required by cbgt.Dest interface but will not work when called by Sync Gateway as we need additional information to perform a rollback. Due to the design of cbgt.Dest this will not be called without a programming error. func (d *DCPDest) Rollback(partition string, rollbackSeq uint64) error { - return d.rollback(partitionToVbNo(d.loggingCtx, partition), rollbackSeq) + err := errors.New("DCPDest.Rollback called but only RollbackEx should be called, this function is required to be implmented by cbgt.Dest interface. This function does not provide Sync Gateway with enough information to rollback and this DCP stream will not longer be running.") + WarnfCtx(d.loggingCtx, "%s", err) + return err } +// RollbackEx is called when a DCP stream request return as error. This function persists the metadata and will issue a command to cbgt.GocbcoreDCPFeed to restart. func (d *DCPDest) RollbackEx(partition string, vbucketUUID uint64, rollbackSeq uint64) error { + // MB-60564 would fix this in cbgt, if sequence is zero, don't perform vbucketUUID check, in case it is mismatched + if rollbackSeq == 0 { + vbucketUUID = 0 + } cbgtMeta := makeVbucketMetadataForSequence(vbucketUUID, rollbackSeq) - return d.rollbackEx(partitionToVbNo(d.loggingCtx, partition), vbucketUUID, rollbackSeq, cbgtMeta) + return d.rollbackEx(partitionToVbNo(d.loggingCtx, partition), vbucketUUID, rollbackSeq, cbgtMeta, d.janitorRollback) } // TODO: Not implemented, review potential usage diff --git a/base/dcp_receiver.go b/base/dcp_receiver.go index 3142db98fe..df8aacda7a 100644 --- a/base/dcp_receiver.go +++ b/base/dcp_receiver.go @@ -9,10 +9,6 @@ package base import ( - "context" - "expvar" - - "github.com/couchbase/go-couchbase/cbdatasource" "github.com/couchbase/gomemcached" sgbucket "github.com/couchbase/sg-bucket" ) @@ -35,94 +31,18 @@ type DCPReceiver struct { *DCPCommon } -func NewDCPReceiver(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, bucket Bucket, maxVbNo uint16, persistCheckpoints bool, dbStats *expvar.Map, feedID string, checkpointPrefix string, metaKeys *MetadataKeys) (cbdatasource.Receiver, context.Context, error) { - - metadataStore := bucket.DefaultDataStore() - dcpCommon, err := NewDCPCommon(ctx, callback, bucket, metadataStore, maxVbNo, persistCheckpoints, dbStats, feedID, checkpointPrefix, metaKeys) - if err != nil { - return nil, nil, err - } - - r := &DCPReceiver{ - DCPCommon: dcpCommon, - } - - if LogDebugEnabled(KeyDCP) { - InfofCtx(r.loggingCtx, KeyDCP, "Using DCP Logging Receiver") - logRec := &DCPLoggingReceiver{rec: r} - return logRec, r.loggingCtx, nil - } - - return r, r.loggingCtx, nil -} - -func (r *DCPReceiver) OnError(err error) { - WarnfCtx(r.loggingCtx, "Error processing DCP stream - will attempt to restart/reconnect if appropriate: %v.", err) - // From cbdatasource: - // Invoked in advisory fashion by the BucketDataSource when it - // encounters an error. The BucketDataSource will continue to try - // to "heal" and restart connections, etc, as necessary. The - // Receiver has a recourse during these error notifications of - // simply Close()'ing the BucketDataSource. - - // Given this, we don't need to restart the feed/take the - // database offline, particularly since this only represents an error for a single - // vbucket stream, not the entire feed. - // bucketName := "unknown" // this is currently ignored anyway - // r.notify(bucketName, err) -} - -func (r *DCPReceiver) DataUpdate(vbucketId uint16, key []byte, seq uint64, - req *gomemcached.MCRequest) error { - if !dcpKeyFilter(key, r.metaKeys) { - return nil - } - event := makeFeedEventForMCRequest(req, sgbucket.FeedOpMutation) - r.dataUpdate(seq, event) - return nil -} - -func (r *DCPReceiver) DataDelete(vbucketId uint16, key []byte, seq uint64, - req *gomemcached.MCRequest) error { - if !dcpKeyFilter(key, r.metaKeys) { - return nil - } - event := makeFeedEventForMCRequest(req, sgbucket.FeedOpDeletion) - r.dataUpdate(seq, event) - return nil -} - // Make a feed event for a gomemcached request. Extracts expiry from extras func makeFeedEventForMCRequest(rq *gomemcached.MCRequest, opcode sgbucket.FeedOpcode) sgbucket.FeedEvent { return makeFeedEvent(rq.Key, rq.Body, rq.DataType, rq.Cas, ExtractExpiryFromDCPMutation(rq), rq.VBucket, 0, opcode) } -func (r *DCPReceiver) SnapshotStart(vbNo uint16, - snapStart, snapEnd uint64, snapType uint32) error { - r.snapshotStart(vbNo, snapStart, snapEnd) - return nil -} - -func (r *DCPReceiver) SetMetaData(vbucketId uint16, value []byte) error { - _ = r.setMetaData(vbucketId, value, false) - return nil -} - -func (r *DCPReceiver) GetMetaData(vbNo uint16) ( - value []byte, lastSeq uint64, err error) { - - return r.getMetaData(vbNo) -} - -// RollbackEx should be called by cbdatasource - Rollback required to maintain the interface. In the event -// it's called, logs warning and does a hard reset on metadata for the vbucket -func (r *DCPReceiver) Rollback(vbucketId uint16, rollbackSeq uint64) error { - return r.rollback(vbucketId, rollbackSeq) -} - -// RollbackEx includes the vbucketUUID needed to reset the metadata correctly -func (r *DCPReceiver) RollbackEx(vbucketId uint16, vbucketUUID uint64, rollbackSeq uint64) error { - return r.rollbackEx(vbucketId, vbucketUUID, rollbackSeq, makeVbucketMetadataForSequence(vbucketUUID, rollbackSeq)) +// ShardedImportDCPMetadata is an internal struct that is exposed to enable json marshaling, used by sharded import feed. It differs from DCPMetadata because it must match the private struct used by cbgt.metadata. +type ShardedImportDCPMetadata struct { + FailOverLog [][]uint64 `json:"failOverLog"` + SeqStart uint64 `json:"seqStart"` + SeqEnd uint64 `json:"seqEnd"` + SnapStart uint64 `json:"snapStart"` + SnapEnd uint64 `json:"snapEnd"` } // Generate cbdatasource's VBucketMetadata for a vbucket from underlying components @@ -130,7 +50,7 @@ func makeVbucketMetadata(vbucketUUID uint64, sequence uint64, snapStart uint64, failOver := make([][]uint64, 1) failOverEntry := []uint64{vbucketUUID, 0} failOver[0] = failOverEntry - metadata := &cbdatasource.VBucketMetaData{ + metadata := &ShardedImportDCPMetadata{ SeqStart: sequence, SeqEnd: uint64(0xFFFFFFFFFFFFFFFF), SnapStart: snapStart, @@ -156,45 +76,6 @@ type DCPLoggingReceiver struct { rec *DCPReceiver } -func (r *DCPLoggingReceiver) OnError(err error) { - InfofCtx(r.rec.loggingCtx, KeyDCP, "OnError: %v", err) - r.rec.OnError(err) -} - -func (r *DCPLoggingReceiver) DataUpdate(vbucketId uint16, key []byte, seq uint64, - req *gomemcached.MCRequest) error { - TracefCtx(r.rec.loggingCtx, KeyDCP, "DataUpdate:%d, %s, %d, %v", vbucketId, UD(string(key)), seq, UD(req)) - return r.rec.DataUpdate(vbucketId, key, seq, req) -} - -func (r *DCPLoggingReceiver) DataDelete(vbucketId uint16, key []byte, seq uint64, - req *gomemcached.MCRequest) error { - TracefCtx(r.rec.loggingCtx, KeyDCP, "DataDelete:%d, %s, %d, %v", vbucketId, UD(string(key)), seq, UD(req)) - return r.rec.DataDelete(vbucketId, key, seq, req) -} - -func (r *DCPLoggingReceiver) Rollback(vbucketId uint16, rollbackSeq uint64) error { - InfofCtx(r.rec.loggingCtx, KeyDCP, "Rollback:%d, %d", vbucketId, rollbackSeq) - return r.rec.Rollback(vbucketId, rollbackSeq) -} - -func (r *DCPLoggingReceiver) SetMetaData(vbucketId uint16, value []byte) error { - TracefCtx(r.rec.loggingCtx, KeyDCP, "SetMetaData:%d, %s", vbucketId, value) - return r.rec.SetMetaData(vbucketId, value) -} - -func (r *DCPLoggingReceiver) GetMetaData(vbucketId uint16) ( - value []byte, lastSeq uint64, err error) { - TracefCtx(r.rec.loggingCtx, KeyDCP, "GetMetaData:%d", vbucketId) - return r.rec.GetMetaData(vbucketId) -} - -func (r *DCPLoggingReceiver) SnapshotStart(vbucketId uint16, - snapStart, snapEnd uint64, snapType uint32) error { - TracefCtx(r.rec.loggingCtx, KeyDCP, "SnapshotStart:%d, %d, %d, %d", vbucketId, snapStart, snapEnd, snapType) - return r.rec.SnapshotStart(vbucketId, snapStart, snapEnd, snapType) -} - // NoPasswordAuthHandler is used for client cert-based auth by cbdatasource type NoPasswordAuthHandler struct { Handler AuthHandler diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index 5adef5fd1f..1c841b6a33 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -27,9 +27,15 @@ import ( "gopkg.in/couchbaselabs/gocbconnstr.v1" ) -const CBGTIndexTypeSyncGatewayImport = "syncGateway-import-" -const DefaultImportPartitions = 16 -const DefaultImportPartitionsServerless = 6 +const ( + CBGTIndexTypeSyncGatewayImport = "syncGateway-import-" + DefaultImportPartitions = 16 + DefaultImportPartitionsServerless = 6 + CBGTCfgIndexDefs = SyncDocPrefix + "cfgindexDefs" + CBGTCfgNodeDefsKnown = SyncDocPrefix + "cfgnodeDefs-known" + CBGTCfgNodeDefsWanted = SyncDocPrefix + "cfgnodeDefs-wanted" + CBGTCfgPlanPIndexes = SyncDocPrefix + "cfgplanPIndexes" +) // firstVersionToSupportCollections represents the earliest Sync Gateway release that supports collections. var firstVersionToSupportCollections = &ComparableVersion{ @@ -684,11 +690,12 @@ func (l *importHeartbeatListener) Stop() { // cbgtDestFactories map DCP feed keys (destKey) to a function that will generate cbgt.Dest. Need to be stored in a // global map to avoid races between db creation and db addition to the server context database set -type CbgtDestFactoryFunc = func() (cbgt.Dest, error) +type CbgtDestFactoryFunc = func(rollback func()) (cbgt.Dest, error) var cbgtDestFactories = make(map[string]CbgtDestFactoryFunc) var cbgtDestFactoriesLock sync.Mutex +// StoreDestFactory stores a factory function to create a cgbt.Dest object. func StoreDestFactory(ctx context.Context, destKey string, dest CbgtDestFactoryFunc) { cbgtDestFactoriesLock.Lock() _, ok := cbgtDestFactories[destKey] diff --git a/db/attachment_compaction_test.go b/db/attachment_compaction_test.go index 2d5bfd5a6e..639b938e0f 100644 --- a/db/attachment_compaction_test.go +++ b/db/attachment_compaction_test.go @@ -713,6 +713,7 @@ func TestAttachmentDifferentVBUUIDsBetweenPhases(t *testing.T) { _, err = attachmentCompactSweepPhase(ctx, dataStore, collectionID, testDB, t.Name(), vbUUIDs, false, terminator, &base.AtomicInt{}) require.Error(t, err) + require.ErrorAs(t, err, &base.ErrVbUUIDMismatch) assert.Contains(t, err.Error(), "error opening stream for vb 0: VbUUID mismatch when failOnRollback set") } diff --git a/db/dcp_sharded_upgrade_test.go b/db/dcp_sharded_upgrade_test.go index ebe5d3a73c..b3e648991f 100644 --- a/db/dcp_sharded_upgrade_test.go +++ b/db/dcp_sharded_upgrade_test.go @@ -203,11 +203,11 @@ func TestShardedDCPUpgrade(t *testing.T) { indexName = "db0x2d9928b7_index" ) - require.NoError(t, dataStore.SetRaw(base.SyncDocPrefix+"cfgindexDefs", 0, nil, []byte(fmt.Sprintf(indexDefs, tb.GetName(), bucketUUID)))) - require.NoError(t, dataStore.SetRaw(base.SyncDocPrefix+"cfgnodeDefs-known", 0, nil, []byte(nodeDefs))) - require.NoError(t, dataStore.SetRaw(base.SyncDocPrefix+"cfgnodeDefs-wanted", 0, nil, []byte(nodeDefs))) + require.NoError(t, dataStore.SetRaw(base.CBGTCfgIndexDefs, 0, nil, []byte(fmt.Sprintf(indexDefs, tb.GetName(), bucketUUID)))) + require.NoError(t, dataStore.SetRaw(base.CBGTCfgNodeDefsKnown, 0, nil, []byte(nodeDefs))) + require.NoError(t, dataStore.SetRaw(base.CBGTCfgNodeDefsWanted, 0, nil, []byte(nodeDefs))) planPIndexesJSON := preparePlanPIndexesJSON(t, tb, numVBuckets, numPartitions) - require.NoError(t, dataStore.SetRaw(base.SyncDocPrefix+"cfgplanPIndexes", 0, nil, []byte(planPIndexesJSON))) + require.NoError(t, dataStore.SetRaw(base.CBGTCfgPlanPIndexes, 0, nil, []byte(planPIndexesJSON))) // Write a doc before starting the dbContext to check that import works const ( diff --git a/db/import_pindex.go b/db/import_pindex.go index 3ba2cee6df..4b796bd1f1 100644 --- a/db/import_pindex.go +++ b/db/import_pindex.go @@ -40,7 +40,7 @@ func RegisterImportPindexImpl(ctx context.Context, configGroup string) { } // getListenerForIndex looks up the importListener for the dbName specified in the index params -func getListenerImportDest(ctx context.Context, indexParams string) (cbgt.Dest, error) { +func getListenerImportDest(ctx context.Context, indexParams string, restart func()) (cbgt.Dest, error) { var outerParams struct { Params string `json:"params"` @@ -61,7 +61,7 @@ func getListenerImportDest(ctx context.Context, indexParams string) (cbgt.Dest, if fetchErr != nil { return nil, fmt.Errorf("error retrieving listener for indexParams %v: %v", indexParams, fetchErr) } - return destFactory() + return destFactory(restart) } func getNewPIndexImplType(ctx context.Context) func(indexType, indexParams, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { @@ -70,7 +70,7 @@ func getNewPIndexImplType(ctx context.Context) func(indexType, indexParams, path newImportPIndexImpl := func(indexType, indexParams, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { defer base.FatalPanicHandler() - importDest, err := getListenerImportDest(ctx, indexParams) + importDest, err := getListenerImportDest(ctx, indexParams, restart) if err != nil { base.ErrorfCtx(ctx, "Error creating NewImportDest during NewImportPIndexImpl: %v", err) } @@ -78,6 +78,8 @@ func getNewPIndexImplType(ctx context.Context) func(indexType, indexParams, path } return newImportPIndexImpl } + +// OpenImportPIndexImpl is required to have an implementation from cbgt.PIndexImplType.Open. When this function fails, PIndexImplType will fall back to using PIndexImplType.OpenUsing func OpenImportPIndexImpl(indexType, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { return nil, nil, errors.New("Open PIndexImpl not supported for SG 3.0 databases - must provide index params") } @@ -85,14 +87,14 @@ func OpenImportPIndexImpl(indexType, path string, restart func()) (cbgt.PIndexIm func getOpenImportPIndexImplUsing(ctx context.Context) func(indexType, indexParams, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { openImportPIndexImplUsing := func(indexType, path, indexParams string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { - importDest, err := getListenerImportDest(ctx, indexParams) + importDest, err := getListenerImportDest(ctx, indexParams, restart) return nil, importDest, err } return openImportPIndexImplUsing } -// Returns a cbgt.Dest targeting the importListener's ProcessFeedEvent -func (il *importListener) NewImportDest() (cbgt.Dest, error) { +// NewImportDest returns a cbgt.Dest targeting the importListener's ProcessFeedEvent +func (il *importListener) NewImportDest(janitorRollback func()) (cbgt.Dest, error) { callback := il.ProcessFeedEvent maxVbNo, err := il.bucket.GetMaxVbno() // can safely assume that all collections on the same bucket will have the same vbNo @@ -103,7 +105,7 @@ func (il *importListener) NewImportDest() (cbgt.Dest, error) { importFeedStatsMap := il.dbStats.ImportFeedMapStats importPartitionStat := il.importStats.ImportPartitions - importDest, _, err := base.NewDCPDest(il.loggingCtx, callback, il.bucket, maxVbNo, true, importFeedStatsMap.Map, base.DCPImportFeedID, importPartitionStat, il.checkpointPrefix, il.metadataKeys) + importDest, _, err := base.NewDCPDest(il.loggingCtx, callback, il.bucket, maxVbNo, true, importFeedStatsMap.Map, base.DCPImportFeedID, importPartitionStat, il.checkpointPrefix, il.metadataKeys, janitorRollback) if err != nil { return nil, err } diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index 2c0d4efefa..eeb2b04a1a 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -2562,94 +2562,82 @@ func TestImportRollback(t *testing.T) { } base.SetUpTestLogging(t, base.LevelDebug, base.KeyImport, base.KeyDCP) + const ( + rollbackWithoutFailover = "rollbackWithoutFailover" + rollbackWithFailover = "rollbackWithFailoverLogs" + ) + for _, testType := range []string{rollbackWithoutFailover, rollbackWithFailover} { + t.Run(testType, func(t *testing.T) { + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + defer bucket.Close(ctx) - ctx := base.TestCtx(t) - bucket := base.GetPersistentTestBucket(t) - defer bucket.Close(ctx) - - rt := rest.NewRestTester(t, &rest.RestTesterConfig{ - CustomTestBucket: bucket.NoCloseClone(), - PersistentConfig: false, - }) - - key := "importRollbackTest" - lastSeq := "0" - var checkpointPrefix string - // do some setup work in an anonymous function so that we can be sure to close the rest tester on unexpected error - func() { - defer rt.Close() - - // Create a document - added, err := rt.GetSingleDataStore().AddRaw(key, 0, []byte(fmt.Sprintf(`{"star": "6"}`))) - require.True(t, added) - require.NoError(t, err) - - // wait for doc to be imported - changes, err := rt.WaitForChanges(1, "/{{.keyspace}}/_changes?since="+lastSeq, "", true) - require.NoError(t, err) - var ok bool - lastSeq, ok = changes.Last_Seq.(string) - require.True(t, ok) + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: bucket.NoCloseClone(), + PersistentConfig: false, + }) - // Close db while we mess with checkpoints - db := rt.GetDatabase() - checkpointPrefix = rt.GetDatabase().MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID) - }() + key := "importRollbackTest" + var checkpointPrefix string + // Create a document + added, err := rt.GetSingleDataStore().AddRaw(key, 0, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.True(t, added) + require.NoError(t, err) - // fetch the checkpoint for the document's vbucket, modify the checkpoint values to a higher sequence - vbNo, err := base.GetVbucketForKey(bucket, key) - require.NoError(t, err) - metaStore := bucket.GetMetadataStore() - checkpointKey := fmt.Sprintf("%s%d", checkpointPrefix, vbNo) - var checkpointData dcpMetaData - checkpointBytes, _, err := metaStore.GetRaw(checkpointKey) - require.NoError(t, err) - require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData)) + // wait for doc to be imported + changes, err := rt.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0", "", true) + require.NoError(t, err) + lastSeq, ok := changes.Last_Seq.(string) + require.True(t, ok) - checkpointData.SnapStart = 3000 + checkpointData.SnapStart - checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd - checkpointData.SeqStart = 3000 + checkpointData.SeqStart - checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd - updatedBytes, err := base.JSONMarshal(checkpointData) - require.NoError(t, err) + // Close db while we mess with checkpoints + db := rt.GetDatabase() + checkpointPrefix = rt.GetDatabase().MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID) - err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes) - require.NoError(t, err) + rt.Close() - // Reopen the db, expect DCP rollback - rt2 := rest.NewRestTester(t, &rest.RestTesterConfig{ - CustomTestBucket: bucket.NoCloseClone(), - PersistentConfig: false, - }) - defer rt2.Close() + metaStore := bucket.GetMetadataStore() + for _, docName := range []string{base.CBGTCfgIndexDefs, base.CBGTCfgNodeDefsKnown, base.CBGTCfgNodeDefsWanted, base.CBGTCfgPlanPIndexes} { + require.NoError(t, metaStore.Delete(docName), "Couldn't delete %s", docName) + } - err = rt2.GetSingleDataStore().SetRaw(key, 0, nil, []byte(fmt.Sprintf(`{"star": "7 8 9"}`))) - require.NoError(t, err) + // fetch the checkpoint for the document's vbucket, modify the checkpoint values to a higher sequence + vbNo, err := base.GetVbucketForKey(bucket, key) + require.NoError(t, err) + checkpointKey := fmt.Sprintf("%s%d", checkpointPrefix, vbNo) + var checkpointData base.ShardedImportDCPMetadata + checkpointBytes, _, err := metaStore.GetRaw(checkpointKey) + require.NoError(t, err) + require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData)) + + checkpointData.SnapStart = 3000 + checkpointData.SnapStart + checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd + checkpointData.SeqStart = 3000 + checkpointData.SeqStart + checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd + if testType == rollbackWithFailover { + existingVbUUID := checkpointData.FailOverLog[0][0] + checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}} + } + updatedBytes, err := base.JSONMarshal(checkpointData) + require.NoError(t, err) - // New RT may have import latency until old RT heartbeat expires and partitions are assigned. Wait for all import partitions - // to be assigned to the new db before waiting for changes - err = rt2.WaitForCondition(func() bool { - database := rt2.GetDatabase() - expectedPartitions := int64(database.Options.ImportOptions.ImportPartitions) - partitions := database.DbStats.SharedBucketImportStats.ImportPartitions.Value() - if partitions < expectedPartitions { - log.Printf("waiting for %d partitions to be assigned to RT (have %d)", expectedPartitions, partitions) - } - return partitions >= expectedPartitions - }) - require.NoError(t, err) + err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes) + require.NoError(t, err) + // Reopen the db, expect DCP rollback + rt2 := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: bucket.NoCloseClone(), + PersistentConfig: false, + }) + defer rt2.Close() - // wait for doc update to be imported - _, err = rt2.WaitForChanges(1, "/{{.keyspace}}/_changes?since="+lastSeq, "", true) - require.NoError(t, err) -} + err = rt2.GetSingleDataStore().SetRaw(key, 0, nil, []byte(fmt.Sprintf(`{"star": "7 8 9"}`))) + require.NoError(t, err) -type dcpMetaData struct { - SeqStart uint64 `json:"seqStart"` - SeqEnd uint64 `json:"seqEnd"` - SnapStart uint64 `json:"snapStart"` - SnapEnd uint64 `json:"snapEnd"` - FailOverLog [][]uint64 `json:"failOverLog"` + // wait for doc update to be imported + _, err = rt2.WaitForChanges(1, "/{{.keyspace}}/_changes?since="+lastSeq, "", true) + require.NoError(t, err) + }) + } } func TestImportUpdateExpiry(t *testing.T) {