Skip to content

Commit

Permalink
[3.2.1 Backport] CBG-4150: Collection index creation improvements (#7169
Browse files Browse the repository at this point in the history
)

Co-authored-by: Tor Colvin <[email protected]>
  • Loading branch information
bbrks and torcolvin authored Oct 22, 2024
1 parent 3bb180e commit c1d99bd
Show file tree
Hide file tree
Showing 23 changed files with 816 additions and 870 deletions.
45 changes: 19 additions & 26 deletions base/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ type BootstrapConnection interface {
// Returns exists=false if key is not found, returns error for any other error.
GetDocument(ctx context.Context, bucket, docID string, rv interface{}) (exists bool, err error)

// Returns the bootstrap connection's cluster connection as N1QLStore for the specified bucket/scope/collection.
// Does NOT establish a bucket connection, the bucketName/scopeName/collectionName is for query scoping only
GetClusterN1QLStore(bucketName, scopeName, collectionName string) (*ClusterOnlyN1QLStore, error)
// Close releases any long-lived connections
Close()
}
Expand Down Expand Up @@ -149,8 +146,7 @@ var _ BootstrapConnection = &CouchbaseCluster{}
func NewCouchbaseCluster(ctx context.Context, server, username, password,
x509CertPath, x509KeyPath, caCertPath string,
forcePerBucketAuth bool, perBucketCreds PerBucketCredentialsConfig,
tlsSkipVerify *bool, useXattrConfig *bool, bucketMode BucketConnectionMode) (*CouchbaseCluster, error) {

tlsSkipVerify *bool, useXattrConfig bool, bucketMode BucketConnectionMode) (*CouchbaseCluster, error) {
securityConfig, err := GoCBv2SecurityConfig(ctx, tlsSkipVerify, caCertPath)
if err != nil {
return nil, err
Expand Down Expand Up @@ -197,7 +193,7 @@ func NewCouchbaseCluster(ctx context.Context, server, username, password,
}

cbCluster.configPersistence = &DocumentBootstrapPersistence{}
if useXattrConfig != nil && *useXattrConfig == true {
if useXattrConfig {
cbCluster.configPersistence = &XattrBootstrapPersistence{}
}

Expand Down Expand Up @@ -484,15 +480,6 @@ func (cc *CouchbaseCluster) Close() {
}
}

func (cc *CouchbaseCluster) GetClusterN1QLStore(bucketName, scopeName, collectionName string) (*ClusterOnlyN1QLStore, error) {
gocbCluster, err := cc.getClusterConnection()
if err != nil {
return nil, err
}

return NewClusterOnlyN1QLStore(gocbCluster, bucketName, scopeName, collectionName)
}

func (cc *CouchbaseCluster) getBucket(ctx context.Context, bucketName string) (b *gocb.Bucket, teardownFn func(), err error) {

if cc.bucketConnectionMode != CachedClusterConnections {
Expand Down Expand Up @@ -523,17 +510,30 @@ func (cc *CouchbaseCluster) getBucket(ctx context.Context, bucketName string) (b
return newBucket, teardownFn, nil
}

// connectToBucket establishes a new connection to a bucket, and returns the bucket after waiting for it to be ready.
func (cc *CouchbaseCluster) connectToBucket(ctx context.Context, bucketName string) (b *gocb.Bucket, teardownFn func(), err error) {
var connection *gocb.Cluster
func (cc *CouchbaseCluster) GetClusterConnectionForBucket(ctx context.Context, bucketName string) (connection *gocb.Cluster, teardownFn func(), err error) {
if bucketAuth, set := cc.perBucketAuth[bucketName]; set {
connection, err = cc.connect(bucketAuth)
} else if cc.forcePerBucketAuth {
return nil, nil, fmt.Errorf("unable to get bucket %q since credentials are not defined in bucket_credentials", MD(bucketName).Redact())
} else {
connection, err = cc.connect(nil)
}
if err != nil {
return nil, nil, err
}

teardownFn = func() {
err := connection.Close(&gocb.ClusterCloseOptions{})
if err != nil {
WarnfCtx(ctx, "Failed to close cluster connection: %v", err)
}
}
return connection, teardownFn, nil
}

// connectToBucket establishes a new connection to a bucket, and returns the bucket after waiting for it to be ready.
func (cc *CouchbaseCluster) connectToBucket(ctx context.Context, bucketName string) (b *gocb.Bucket, teardownFn func(), err error) {
connection, teardownFn, err := cc.GetClusterConnectionForBucket(ctx, bucketName)
if err != nil {
return nil, nil, err
}
Expand All @@ -545,7 +545,7 @@ func (cc *CouchbaseCluster) connectToBucket(ctx context.Context, bucketName stri
ServiceTypes: []gocb.ServiceType{gocb.ServiceTypeKeyValue},
})
if err != nil {
_ = connection.Close(&gocb.ClusterCloseOptions{})
teardownFn()

if errors.Is(err, gocb.ErrAuthenticationFailure) {
return nil, nil, ErrAuthError
Expand All @@ -554,13 +554,6 @@ func (cc *CouchbaseCluster) connectToBucket(ctx context.Context, bucketName stri
return nil, nil, err
}

teardownFn = func() {
err := connection.Close(&gocb.ClusterCloseOptions{})
if err != nil {
WarnfCtx(ctx, "Failed to close cluster connection: %v", err)
}
}

return b, teardownFn, nil
}

Expand Down
2 changes: 1 addition & 1 deletion base/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestBootstrapRefCounting(t *testing.T) {
tlsSkipVerify := BoolPtr(TestTLSSkipVerify())
var perBucketCredentialsConfig map[string]*CredentialsConfig
ctx := TestCtx(t)
cluster, err := NewCouchbaseCluster(ctx, UnitTestUrl(), TestClusterUsername(), TestClusterPassword(), x509CertPath, x509KeyPath, caCertPath, forcePerBucketAuth, perBucketCredentialsConfig, tlsSkipVerify, BoolPtr(TestUseXattrs()), CachedClusterConnections)
cluster, err := NewCouchbaseCluster(ctx, UnitTestUrl(), TestClusterUsername(), TestClusterPassword(), x509CertPath, x509KeyPath, caCertPath, forcePerBucketAuth, perBucketCredentialsConfig, tlsSkipVerify, TestUseXattrs(), CachedClusterConnections)
require.NoError(t, err)
defer cluster.Close()
require.NotNil(t, cluster)
Expand Down
43 changes: 39 additions & 4 deletions base/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,51 @@ func AsSubdocStore(ds DataStore) (sgbucket.SubdocStore, bool) {
return subdocStore, ok && ds.IsSupported(sgbucket.BucketStoreFeatureSubdocOperations)
}

// WaitUntilDataStoreExists will try to perform an operation in the given DataStore until it can succeed.
// WaitUntilDataStoreReady will try to perform a basic operation in the given DataStore until it can succeed.
// It's not necessarily the case that a datastore that exists is ready to be used.
//
// There's no WaitForReady operation in GoCB for collections, only Buckets, so attempting to use Exists in this way this seems like our best option to check for availability.
func WaitUntilDataStoreExists(ctx context.Context, ds DataStore) error {
// There's no WaitForReady operation in GoCB for collections, only Buckets, so attempting to use Exists in this way this seems like our best option to check for usability.
func WaitUntilDataStoreReady(ctx context.Context, ds DataStore) error {
return WaitForNoError(ctx, func() error {
_, err := ds.Exists("WaitUntilDataStoreExists")
// don't care whether the doc actually exists or not, just that we could perform the operation successfully
_, err := ds.Exists("WaitUntilDataStoreReady")
return err
})
}

// GetDataStoreWithRetry will attempt to get a named DataStore from the given bucket, retrying until it can succeed, if failFast is false.
func GetDataStoreWithRetry(ctx context.Context, bucket Bucket, scName ScopeAndCollectionName, failFast bool) (DataStore, error) {
if failFast {
return bucket.NamedDataStore(scName)
}

err, dataStore := RetryLoop(
ctx,
fmt.Sprintf("waiting for %s.%s.%s to exist", MD(bucket.GetName()), MD(scName.ScopeName()), MD(scName.CollectionName())),
func() (bool, error, interface{}) {
dataStore, err := bucket.NamedDataStore(scName)
return err != nil, err, dataStore
},
CreateMaxDoublingSleeperFunc(30, 10, 1000))
ds, ok := dataStore.(DataStore)
if !ok && err == nil {
AssertfCtx(ctx, "datastore %s.%s.%s was not a DataStore type, got %T", bucket.GetName(), scName.ScopeName(), scName.CollectionName(), dataStore)
}
return ds, err
}

// GetAndWaitUntilDataStoreReady will attempt to get a named DataStore from the given bucket, and ensure it's ready for use.
func GetAndWaitUntilDataStoreReady(ctx context.Context, bucket Bucket, scName ScopeAndCollectionName, failFast bool) (DataStore, error) {
dataStore, err := GetDataStoreWithRetry(ctx, bucket, scName, failFast)
if err != nil {
return nil, err
}
if err := WaitUntilDataStoreReady(ctx, dataStore); err != nil {
return nil, err
}
return dataStore, nil
}

// RequireNoBucketTTL ensures there is no MaxTTL set on the bucket (SG #3314)
func RequireNoBucketTTL(ctx context.Context, b Bucket) error {
cbs, ok := AsCouchbaseBucketStore(b)
Expand Down
29 changes: 24 additions & 5 deletions base/cluster_n1ql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ var _ N1QLStore = &ClusterOnlyN1QLStore{}
// ClusterOnlyN1QLStore instances. Anticipates future refactoring of N1QLStore to differentiate between
// collection-scoped and non-collection-scoped operations.
type ClusterOnlyN1QLStore struct {
cluster *gocb.Cluster
bucketName string // User to build keyspace for query when not otherwise set
scopeName string // Used to build keyspace for query when not otherwise set
collectionName string // Used to build keyspace for query when not otherwise set
supportsCollections bool
cluster *gocb.Cluster
bucketName string // User to build keyspace for query when not otherwise set
scopeName string // Used to build keyspace for query when not otherwise set
collectionName string // Used to build keyspace for query when not otherwise set
supportsCollections bool
supportsIfNotExistsInDDL bool // 7.1.0+ MB-38737
}

func NewClusterOnlyN1QLStore(cluster *gocb.Cluster, bucketName, scopeName, collectionName string) (*ClusterOnlyN1QLStore, error) {
Expand All @@ -51,11 +52,25 @@ func NewClusterOnlyN1QLStore(cluster *gocb.Cluster, bucketName, scopeName, colle
return nil, err
}
clusterOnlyn1qlStore.supportsCollections = isMinimumVersion(uint64(major), uint64(minor), 7, 0)
clusterOnlyn1qlStore.supportsIfNotExistsInDDL = isMinimumVersion(uint64(major), uint64(minor), 7, 1)

return clusterOnlyn1qlStore, nil

}

func (cl *ClusterOnlyN1QLStore) IsSupported(feature sgbucket.BucketStoreFeature) bool {
switch feature {
case sgbucket.BucketStoreFeatureN1ql:
return true
case sgbucket.BucketStoreFeatureCollections:
return cl.supportsCollections
case sgbucket.BucketStoreFeatureN1qlIfNotExistsDDL:
return cl.supportsIfNotExistsInDDL
default:
return false
}
}

func (cl *ClusterOnlyN1QLStore) GetName() string {
return cl.bucketName
}
Expand All @@ -72,6 +87,10 @@ func (cl *ClusterOnlyN1QLStore) CreateIndex(ctx context.Context, indexName strin
return CreateIndex(ctx, cl, indexName, expression, filterExpression, options)
}

func (cl *ClusterOnlyN1QLStore) CreateIndexIfNotExists(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
return CreateIndexIfNotExists(ctx, cl, indexName, expression, filterExpression, options)
}

func (cl *ClusterOnlyN1QLStore) CreatePrimaryIndex(ctx context.Context, indexName string, options *N1qlIndexOptions) error {
return CreatePrimaryIndex(ctx, cl, indexName, options)
}
Expand Down
2 changes: 2 additions & 0 deletions base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ func (b *GocbV2Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool {
return false
}
return len(agent.N1qlEps()) > 0
case sgbucket.BucketStoreFeatureN1qlIfNotExistsDDL:
return isMinimumVersion(b.clusterCompatMajorVersion, b.clusterCompatMinorVersion, 7, 1)
// added in Couchbase Server 6.6
case sgbucket.BucketStoreFeatureCreateDeletedWithXattr:
status, err := b.bucket.Internal().CapabilityStatus(gocb.CapabilityCreateAsDeleted)
Expand Down
4 changes: 4 additions & 0 deletions base/collection_n1ql.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func (c *Collection) CreateIndex(ctx context.Context, indexName string, expressi
return CreateIndex(ctx, c, indexName, expression, filterExpression, options)
}

func (c *Collection) CreateIndexIfNotExists(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
return CreateIndexIfNotExists(ctx, c, indexName, expression, filterExpression, options)
}

func (c *Collection) CreatePrimaryIndex(ctx context.Context, indexName string, options *N1qlIndexOptions) error {
return CreatePrimaryIndex(ctx, c, indexName, options)
}
Expand Down
Loading

0 comments on commit c1d99bd

Please sign in to comment.