Skip to content

Commit

Permalink
CBG-3725: [3.1.4 backport] Import DCP rollback unsuccessful in data m…
Browse files Browse the repository at this point in the history
…igration cases (#6676)

* CBG-3724 ensure import rollback works correctly (#6657)

* CBG-3724 ensure import rollback works correctly

Major fix: pass a function to call cbgt.JANITOR_ROLLBACK_PINDEX when
creating a pindex. This is necessary to ensure cbgt.Mgr will actually
generate the index.

- fix the TestImportRollback test which was giving a false pass before
  because the cbgt.Mgr was getting a kick op when the nodes were
  rebalanced. In order for rollback to work, the cbgt.Dest RollbackEx
  function has to force cgbt.Mgr to actually update the PIndex. A
  cbgt.JANITOR_ROLLBACK_PINDEX op will force the recreation of a PIndex.
  This test was broken before adding code to it.
- Enhance the TestImportRollback to detect a rollback with invalid FailoverLogs.
  This simulates a situation where the checkpoints have been migrated
  from a separate bucket.

- modify TestContinuousRollback to also test with bad vbuuid FailoverEntry.
  This test actually would pass, because Sync Gateway, unlike cbgt, did
  not pass DcpStreamAddFlagStrictVBUUID. DcpStreamAddFlagStrictVBUUID
  forces a rollback in the case where StartSeqNo=0 vbuuid=invalid
  whereas without the flag it is treated like StartSeqNo=0 vbuuid=0.
  Changed the code to match cbgt behavior adding
  DcpStreamAddFlagStrictVBUUID.
- Added a separate flag in DCPClient to detect active vBuckets, which
  are the number of vBuckets that actually have a connection, and are
  not stuck in a connecting state. This differs from the existing
  activeVbuckets which is set from the time of NewDCPClient to when the
  streams end. In a continuous feed, the streams should never end.

Cleanup:

- move RollbackEx function from DCPDest to DCPCommon. DCPCommon is an
  embedded struct in DCPDest.
- panic if DCPCommon.Rollback is called. This should never happen given
  the design of cbgt.Dest, but if it did, the rollback would not work.
  This is different from the old behavior where it would not rollback
  and potentially enter a rollback loop if this function was called.
- remove maxVbNo from NewDCPDest in effort to shorten the arguments
  since the flag can be inferred from base.Bucket.
- Create some public structs to make tests easier:
	- ShardedImportDCPMetadata
	- CBGCfgIndexDefs, CBGTIndexNodeDefsKnown, CBGTIndexNodeDefsWanted, CBGTCfgPlanPIndexes

* work around MB-60564

* Change message

* Make remove raceproof and improve comment

* Reduce scope of changes for easier backport

* Address comments to make code simpler

* updates to remove problematic code

* Update base/dcp_dest.go

Co-authored-by: Tor Colvin <[email protected]>

---------

Co-authored-by: Tor Colvin <[email protected]>
  • Loading branch information
gregns1 and torcolvin authored Feb 9, 2024
1 parent 732a692 commit d3b758c
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 237 deletions.
20 changes: 6 additions & 14 deletions base/dcp_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type DCPCommon struct {
checkpointPrefix string // DCP checkpoint key prefix
}

// NewDCPCommon creates a new DCPCommon which manages updates coming from a cbgt-based DCP feed. The callback function will receive events from a DCP feed. The bucket is the gocb bucket to stream events from. It stores checkpoints in the metaStore collection prefixes from metaKeys + checkpointPrefix. The feed name will start with feedID and DCPCommon will add unique string. Specific stats for DCP are stored in expvars rather than SgwStats. The janitorRollback function is supplied by the global cbgt.PIndexImplType.New function, for initial opening of a partition index, and cbgt.PIndexImplType.OpenUsing for reopening of a partition index. The rollback function provides a way to pass cbgt.JANITOR_ROLLBACK_PINDEX to cbgt.Mgr and is supplied.
func NewDCPCommon(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, bucket Bucket, metaStore DataStore,
maxVbNo uint16, persistCheckpoints bool, dbStats *expvar.Map, feedID, checkpointPrefix string, metaKeys *MetadataKeys) (*DCPCommon, error) {
newBackfillStatus := backfillStatus{
Expand Down Expand Up @@ -184,23 +185,14 @@ func (c *DCPCommon) getMetaData(vbucketId uint16) (
return value, lastSeq, nil
}

// RollbackEx should be called by cbdatasource - Rollback required to maintain the interface. In the event
// it's called, logs warning and does a hard reset on metadata for the vbucket. Returns error if metadata
// persistence fails
func (c *DCPCommon) rollback(vbucketId uint16, rollbackSeq uint64) error {
WarnfCtx(c.loggingCtx, "DCP Rollback request. Expected RollbackEx call - resetting vbucket %d to 0.", vbucketId)
c.dbStatsExpvars.Add("dcp_rollback_count", 1)
c.updateSeq(vbucketId, 0, false)
err := c.setMetaData(vbucketId, nil, true)
return err
}

// RollbackEx includes the vbucketUUID needed to reset the metadata correctly
func (c *DCPCommon) rollbackEx(vbucketId uint16, vbucketUUID uint64, rollbackSeq uint64, rollbackMetaData []byte) error {
// rollbackEx is called when a DCP stream issues a rollback. The metadata persisted for a given uuid and sequence number and then cbgt.Mgr JANITOR_ROLLBACK_PINDEX is issued via janitorRollback function.
func (c *DCPCommon) rollbackEx(vbucketId uint16, vbucketUUID uint64, rollbackSeq uint64, rollbackMetaData []byte, janitorRollback func()) error {
WarnfCtx(c.loggingCtx, "DCP RollbackEx request - rolling back DCP feed for: vbucketId: %d, rollbackSeq: %x.", vbucketId, rollbackSeq)
c.dbStatsExpvars.Add("dcp_rollback_count", 1)
c.updateSeq(vbucketId, rollbackSeq, false)
err := c.setMetaData(vbucketId, rollbackMetaData, true)
// if we fail to persist the metadata, we still want to rollback to keep retrying to reconnect. Returning the error will log in cbgt.
janitorRollback()
return err
}

Expand Down Expand Up @@ -228,7 +220,7 @@ func (c *DCPCommon) loadCheckpoint(vbNo uint16) (vbMetadata []byte, snapshotStar
}
}

var snapshotMetadata cbdatasource.VBucketMetaData
var snapshotMetadata ShardedImportDCPMetadata
unmarshalErr := JSONUnmarshal(rawValue, &snapshotMetadata)
if unmarshalErr != nil {
return []byte{}, 0, 0, err
Expand Down
17 changes: 14 additions & 3 deletions base/dcp_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ type DCPDest struct {
stats *expvar.Map // DCP feed stats (rollback, backfill)
partitionCountStat *SgwIntStat // Stat for partition count. Stored outside the DCP feed stats map
metaInitComplete []bool // Whether metadata initialization has been completed, per vbNo
janitorRollback func() // This function will trigger a janitor_pindex_rollback
}

// NewDCPDest creates a new DCPDest which manages updates coming from a cbgt-based DCP feed. The callback function will receive events from a DCP feed. The bucket is the gocb bucket to stream events from. It optionally stores checkpoints in the _default._default collection if persistentCheckpoints is true with prefixes from metaKeys + checkpointPrefix. The feed name will start with feedID have a unique string appended. Specific stats for DCP are stored in expvars rather than SgwStats, except for importPartitionStat representing the number of import partitions. Each import partition will have a DCPDest object. The rollback function is supplied by the global cbgt.PIndexImplType.New function, for initial opening of a partition index, and cbgt.PIndexImplType.OpenUsing for reopening of a partition index. The rollback function provides a way to pass cbgt.JANITOR_ROLLBACK_PINDEX to cbgt.Mgr.
func NewDCPDest(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, bucket Bucket, maxVbNo uint16, persistCheckpoints bool,
dcpStats *expvar.Map, feedID string, importPartitionStat *SgwIntStat, checkpointPrefix string, metaKeys *MetadataKeys) (SGDest, context.Context, error) {
dcpStats *expvar.Map, feedID string, importPartitionStat *SgwIntStat, checkpointPrefix string, metaKeys *MetadataKeys, rollback func()) (SGDest, context.Context, error) {

// TODO: Metadata store?
metadataStore := bucket.DefaultDataStore()
Expand All @@ -81,6 +83,7 @@ func NewDCPDest(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, bu
stats: dcpStats,
partitionCountStat: importPartitionStat,
metaInitComplete: make([]bool, maxVbNo),
janitorRollback: rollback,
}

if d.partitionCountStat != nil {
Expand Down Expand Up @@ -212,13 +215,21 @@ func (d *DCPDest) OpaqueSet(partition string, value []byte) error {
return nil
}

// Rollback is required by cbgt.Dest interface but will not work when called by Sync Gateway as we need additional information to perform a rollback. Due to the design of cbgt.Dest this will not be called without a programming error.
func (d *DCPDest) Rollback(partition string, rollbackSeq uint64) error {
return d.rollback(partitionToVbNo(d.loggingCtx, partition), rollbackSeq)
err := errors.New("DCPDest.Rollback called but only RollbackEx should be called, this function is required to be implmented by cbgt.Dest interface. This function does not provide Sync Gateway with enough information to rollback and this DCP stream will not longer be running.")
WarnfCtx(d.loggingCtx, "%s", err)
return err
}

// RollbackEx is called when a DCP stream request return as error. This function persists the metadata and will issue a command to cbgt.GocbcoreDCPFeed to restart.
func (d *DCPDest) RollbackEx(partition string, vbucketUUID uint64, rollbackSeq uint64) error {
// MB-60564 would fix this in cbgt, if sequence is zero, don't perform vbucketUUID check, in case it is mismatched
if rollbackSeq == 0 {
vbucketUUID = 0
}
cbgtMeta := makeVbucketMetadataForSequence(vbucketUUID, rollbackSeq)
return d.rollbackEx(partitionToVbNo(d.loggingCtx, partition), vbucketUUID, rollbackSeq, cbgtMeta)
return d.rollbackEx(partitionToVbNo(d.loggingCtx, partition), vbucketUUID, rollbackSeq, cbgtMeta, d.janitorRollback)
}

// TODO: Not implemented, review potential usage
Expand Down
135 changes: 8 additions & 127 deletions base/dcp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
package base

import (
"context"
"expvar"

"github.com/couchbase/go-couchbase/cbdatasource"
"github.com/couchbase/gomemcached"
sgbucket "github.com/couchbase/sg-bucket"
)
Expand All @@ -35,102 +31,26 @@ type DCPReceiver struct {
*DCPCommon
}

func NewDCPReceiver(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, bucket Bucket, maxVbNo uint16, persistCheckpoints bool, dbStats *expvar.Map, feedID string, checkpointPrefix string, metaKeys *MetadataKeys) (cbdatasource.Receiver, context.Context, error) {

metadataStore := bucket.DefaultDataStore()
dcpCommon, err := NewDCPCommon(ctx, callback, bucket, metadataStore, maxVbNo, persistCheckpoints, dbStats, feedID, checkpointPrefix, metaKeys)
if err != nil {
return nil, nil, err
}

r := &DCPReceiver{
DCPCommon: dcpCommon,
}

if LogDebugEnabled(KeyDCP) {
InfofCtx(r.loggingCtx, KeyDCP, "Using DCP Logging Receiver")
logRec := &DCPLoggingReceiver{rec: r}
return logRec, r.loggingCtx, nil
}

return r, r.loggingCtx, nil
}

func (r *DCPReceiver) OnError(err error) {
WarnfCtx(r.loggingCtx, "Error processing DCP stream - will attempt to restart/reconnect if appropriate: %v.", err)
// From cbdatasource:
// Invoked in advisory fashion by the BucketDataSource when it
// encounters an error. The BucketDataSource will continue to try
// to "heal" and restart connections, etc, as necessary. The
// Receiver has a recourse during these error notifications of
// simply Close()'ing the BucketDataSource.

// Given this, we don't need to restart the feed/take the
// database offline, particularly since this only represents an error for a single
// vbucket stream, not the entire feed.
// bucketName := "unknown" // this is currently ignored anyway
// r.notify(bucketName, err)
}

func (r *DCPReceiver) DataUpdate(vbucketId uint16, key []byte, seq uint64,
req *gomemcached.MCRequest) error {
if !dcpKeyFilter(key, r.metaKeys) {
return nil
}
event := makeFeedEventForMCRequest(req, sgbucket.FeedOpMutation)
r.dataUpdate(seq, event)
return nil
}

func (r *DCPReceiver) DataDelete(vbucketId uint16, key []byte, seq uint64,
req *gomemcached.MCRequest) error {
if !dcpKeyFilter(key, r.metaKeys) {
return nil
}
event := makeFeedEventForMCRequest(req, sgbucket.FeedOpDeletion)
r.dataUpdate(seq, event)
return nil
}

// Make a feed event for a gomemcached request. Extracts expiry from extras
func makeFeedEventForMCRequest(rq *gomemcached.MCRequest, opcode sgbucket.FeedOpcode) sgbucket.FeedEvent {
return makeFeedEvent(rq.Key, rq.Body, rq.DataType, rq.Cas, ExtractExpiryFromDCPMutation(rq), rq.VBucket, 0, opcode)
}

func (r *DCPReceiver) SnapshotStart(vbNo uint16,
snapStart, snapEnd uint64, snapType uint32) error {
r.snapshotStart(vbNo, snapStart, snapEnd)
return nil
}

func (r *DCPReceiver) SetMetaData(vbucketId uint16, value []byte) error {
_ = r.setMetaData(vbucketId, value, false)
return nil
}

func (r *DCPReceiver) GetMetaData(vbNo uint16) (
value []byte, lastSeq uint64, err error) {

return r.getMetaData(vbNo)
}

// RollbackEx should be called by cbdatasource - Rollback required to maintain the interface. In the event
// it's called, logs warning and does a hard reset on metadata for the vbucket
func (r *DCPReceiver) Rollback(vbucketId uint16, rollbackSeq uint64) error {
return r.rollback(vbucketId, rollbackSeq)
}

// RollbackEx includes the vbucketUUID needed to reset the metadata correctly
func (r *DCPReceiver) RollbackEx(vbucketId uint16, vbucketUUID uint64, rollbackSeq uint64) error {
return r.rollbackEx(vbucketId, vbucketUUID, rollbackSeq, makeVbucketMetadataForSequence(vbucketUUID, rollbackSeq))
// ShardedImportDCPMetadata is an internal struct that is exposed to enable json marshaling, used by sharded import feed. It differs from DCPMetadata because it must match the private struct used by cbgt.metadata.
type ShardedImportDCPMetadata struct {
FailOverLog [][]uint64 `json:"failOverLog"`
SeqStart uint64 `json:"seqStart"`
SeqEnd uint64 `json:"seqEnd"`
SnapStart uint64 `json:"snapStart"`
SnapEnd uint64 `json:"snapEnd"`
}

// Generate cbdatasource's VBucketMetadata for a vbucket from underlying components
func makeVbucketMetadata(vbucketUUID uint64, sequence uint64, snapStart uint64, snapEnd uint64) []byte {
failOver := make([][]uint64, 1)
failOverEntry := []uint64{vbucketUUID, 0}
failOver[0] = failOverEntry
metadata := &cbdatasource.VBucketMetaData{
metadata := &ShardedImportDCPMetadata{
SeqStart: sequence,
SeqEnd: uint64(0xFFFFFFFFFFFFFFFF),
SnapStart: snapStart,
Expand All @@ -156,45 +76,6 @@ type DCPLoggingReceiver struct {
rec *DCPReceiver
}

func (r *DCPLoggingReceiver) OnError(err error) {
InfofCtx(r.rec.loggingCtx, KeyDCP, "OnError: %v", err)
r.rec.OnError(err)
}

func (r *DCPLoggingReceiver) DataUpdate(vbucketId uint16, key []byte, seq uint64,
req *gomemcached.MCRequest) error {
TracefCtx(r.rec.loggingCtx, KeyDCP, "DataUpdate:%d, %s, %d, %v", vbucketId, UD(string(key)), seq, UD(req))
return r.rec.DataUpdate(vbucketId, key, seq, req)
}

func (r *DCPLoggingReceiver) DataDelete(vbucketId uint16, key []byte, seq uint64,
req *gomemcached.MCRequest) error {
TracefCtx(r.rec.loggingCtx, KeyDCP, "DataDelete:%d, %s, %d, %v", vbucketId, UD(string(key)), seq, UD(req))
return r.rec.DataDelete(vbucketId, key, seq, req)
}

func (r *DCPLoggingReceiver) Rollback(vbucketId uint16, rollbackSeq uint64) error {
InfofCtx(r.rec.loggingCtx, KeyDCP, "Rollback:%d, %d", vbucketId, rollbackSeq)
return r.rec.Rollback(vbucketId, rollbackSeq)
}

func (r *DCPLoggingReceiver) SetMetaData(vbucketId uint16, value []byte) error {
TracefCtx(r.rec.loggingCtx, KeyDCP, "SetMetaData:%d, %s", vbucketId, value)
return r.rec.SetMetaData(vbucketId, value)
}

func (r *DCPLoggingReceiver) GetMetaData(vbucketId uint16) (
value []byte, lastSeq uint64, err error) {
TracefCtx(r.rec.loggingCtx, KeyDCP, "GetMetaData:%d", vbucketId)
return r.rec.GetMetaData(vbucketId)
}

func (r *DCPLoggingReceiver) SnapshotStart(vbucketId uint16,
snapStart, snapEnd uint64, snapType uint32) error {
TracefCtx(r.rec.loggingCtx, KeyDCP, "SnapshotStart:%d, %d, %d, %d", vbucketId, snapStart, snapEnd, snapType)
return r.rec.SnapshotStart(vbucketId, snapStart, snapEnd, snapType)
}

// NoPasswordAuthHandler is used for client cert-based auth by cbdatasource
type NoPasswordAuthHandler struct {
Handler AuthHandler
Expand Down
15 changes: 11 additions & 4 deletions base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@ import (
"gopkg.in/couchbaselabs/gocbconnstr.v1"
)

const CBGTIndexTypeSyncGatewayImport = "syncGateway-import-"
const DefaultImportPartitions = 16
const DefaultImportPartitionsServerless = 6
const (
CBGTIndexTypeSyncGatewayImport = "syncGateway-import-"
DefaultImportPartitions = 16
DefaultImportPartitionsServerless = 6
CBGTCfgIndexDefs = SyncDocPrefix + "cfgindexDefs"
CBGTCfgNodeDefsKnown = SyncDocPrefix + "cfgnodeDefs-known"
CBGTCfgNodeDefsWanted = SyncDocPrefix + "cfgnodeDefs-wanted"
CBGTCfgPlanPIndexes = SyncDocPrefix + "cfgplanPIndexes"
)

// firstVersionToSupportCollections represents the earliest Sync Gateway release that supports collections.
var firstVersionToSupportCollections = &ComparableVersion{
Expand Down Expand Up @@ -684,11 +690,12 @@ func (l *importHeartbeatListener) Stop() {
// cbgtDestFactories map DCP feed keys (destKey) to a function that will generate cbgt.Dest. Need to be stored in a
// global map to avoid races between db creation and db addition to the server context database set

type CbgtDestFactoryFunc = func() (cbgt.Dest, error)
type CbgtDestFactoryFunc = func(rollback func()) (cbgt.Dest, error)

var cbgtDestFactories = make(map[string]CbgtDestFactoryFunc)
var cbgtDestFactoriesLock sync.Mutex

// StoreDestFactory stores a factory function to create a cgbt.Dest object.
func StoreDestFactory(ctx context.Context, destKey string, dest CbgtDestFactoryFunc) {
cbgtDestFactoriesLock.Lock()
_, ok := cbgtDestFactories[destKey]
Expand Down
1 change: 1 addition & 0 deletions db/attachment_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ func TestAttachmentDifferentVBUUIDsBetweenPhases(t *testing.T) {

_, err = attachmentCompactSweepPhase(ctx, dataStore, collectionID, testDB, t.Name(), vbUUIDs, false, terminator, &base.AtomicInt{})
require.Error(t, err)
require.ErrorAs(t, err, &base.ErrVbUUIDMismatch)
assert.Contains(t, err.Error(), "error opening stream for vb 0: VbUUID mismatch when failOnRollback set")
}

Expand Down
8 changes: 4 additions & 4 deletions db/dcp_sharded_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,11 @@ func TestShardedDCPUpgrade(t *testing.T) {
indexName = "db0x2d9928b7_index"
)

require.NoError(t, dataStore.SetRaw(base.SyncDocPrefix+"cfgindexDefs", 0, nil, []byte(fmt.Sprintf(indexDefs, tb.GetName(), bucketUUID))))
require.NoError(t, dataStore.SetRaw(base.SyncDocPrefix+"cfgnodeDefs-known", 0, nil, []byte(nodeDefs)))
require.NoError(t, dataStore.SetRaw(base.SyncDocPrefix+"cfgnodeDefs-wanted", 0, nil, []byte(nodeDefs)))
require.NoError(t, dataStore.SetRaw(base.CBGTCfgIndexDefs, 0, nil, []byte(fmt.Sprintf(indexDefs, tb.GetName(), bucketUUID))))
require.NoError(t, dataStore.SetRaw(base.CBGTCfgNodeDefsKnown, 0, nil, []byte(nodeDefs)))
require.NoError(t, dataStore.SetRaw(base.CBGTCfgNodeDefsWanted, 0, nil, []byte(nodeDefs)))
planPIndexesJSON := preparePlanPIndexesJSON(t, tb, numVBuckets, numPartitions)
require.NoError(t, dataStore.SetRaw(base.SyncDocPrefix+"cfgplanPIndexes", 0, nil, []byte(planPIndexesJSON)))
require.NoError(t, dataStore.SetRaw(base.CBGTCfgPlanPIndexes, 0, nil, []byte(planPIndexesJSON)))

// Write a doc before starting the dbContext to check that import works
const (
Expand Down
16 changes: 9 additions & 7 deletions db/import_pindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func RegisterImportPindexImpl(ctx context.Context, configGroup string) {
}

// getListenerForIndex looks up the importListener for the dbName specified in the index params
func getListenerImportDest(ctx context.Context, indexParams string) (cbgt.Dest, error) {
func getListenerImportDest(ctx context.Context, indexParams string, restart func()) (cbgt.Dest, error) {

var outerParams struct {
Params string `json:"params"`
Expand All @@ -61,7 +61,7 @@ func getListenerImportDest(ctx context.Context, indexParams string) (cbgt.Dest,
if fetchErr != nil {
return nil, fmt.Errorf("error retrieving listener for indexParams %v: %v", indexParams, fetchErr)
}
return destFactory()
return destFactory(restart)
}

func getNewPIndexImplType(ctx context.Context) func(indexType, indexParams, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) {
Expand All @@ -70,29 +70,31 @@ func getNewPIndexImplType(ctx context.Context) func(indexType, indexParams, path
newImportPIndexImpl := func(indexType, indexParams, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) {
defer base.FatalPanicHandler()

importDest, err := getListenerImportDest(ctx, indexParams)
importDest, err := getListenerImportDest(ctx, indexParams, restart)
if err != nil {
base.ErrorfCtx(ctx, "Error creating NewImportDest during NewImportPIndexImpl: %v", err)
}
return nil, importDest, err
}
return newImportPIndexImpl
}

// OpenImportPIndexImpl is required to have an implementation from cbgt.PIndexImplType.Open. When this function fails, PIndexImplType will fall back to using PIndexImplType.OpenUsing
func OpenImportPIndexImpl(indexType, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) {
return nil, nil, errors.New("Open PIndexImpl not supported for SG 3.0 databases - must provide index params")
}

func getOpenImportPIndexImplUsing(ctx context.Context) func(indexType, indexParams, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) {

openImportPIndexImplUsing := func(indexType, path, indexParams string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) {
importDest, err := getListenerImportDest(ctx, indexParams)
importDest, err := getListenerImportDest(ctx, indexParams, restart)
return nil, importDest, err
}
return openImportPIndexImplUsing
}

// Returns a cbgt.Dest targeting the importListener's ProcessFeedEvent
func (il *importListener) NewImportDest() (cbgt.Dest, error) {
// NewImportDest returns a cbgt.Dest targeting the importListener's ProcessFeedEvent
func (il *importListener) NewImportDest(janitorRollback func()) (cbgt.Dest, error) {
callback := il.ProcessFeedEvent

maxVbNo, err := il.bucket.GetMaxVbno() // can safely assume that all collections on the same bucket will have the same vbNo
Expand All @@ -103,7 +105,7 @@ func (il *importListener) NewImportDest() (cbgt.Dest, error) {
importFeedStatsMap := il.dbStats.ImportFeedMapStats
importPartitionStat := il.importStats.ImportPartitions

importDest, _, err := base.NewDCPDest(il.loggingCtx, callback, il.bucket, maxVbNo, true, importFeedStatsMap.Map, base.DCPImportFeedID, importPartitionStat, il.checkpointPrefix, il.metadataKeys)
importDest, _, err := base.NewDCPDest(il.loggingCtx, callback, il.bucket, maxVbNo, true, importFeedStatsMap.Map, base.DCPImportFeedID, importPartitionStat, il.checkpointPrefix, il.metadataKeys, janitorRollback)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d3b758c

Please sign in to comment.