Skip to content

Commit

Permalink
CBG-3777 make sure cbgt always sets kv_pool_size=1, despite whatever …
Browse files Browse the repository at this point in the history
…is on the connection string
  • Loading branch information
torcolvin committed Feb 13, 2024
1 parent d0d5060 commit 8d7c451
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 11 deletions.
9 changes: 6 additions & 3 deletions base/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ func (p *GoCBConnStringParams) FillDefaults() {
}
}

// GetGoCBConnString builds a gocb connection string based on BucketSpec.Server.
func (spec *BucketSpec) GetGoCBConnString(params *GoCBConnStringParams) (string, error) {
// GetGoCBConnString builds a gocb connection string based on BucketSpec.Server. params defines the defaults that will be set for dcp_buffer_size, kv_buffer_size, kv_pool_size if they are non defined as nonzero. forceKvPoolSize will override the vlaue in BucketSpec.Server and the params. Both arguments are optional.
func (spec *BucketSpec) GetGoCBConnString(params *GoCBConnStringParams, forceKvPoolSize *int) (string, error) {
if params == nil {
params = &GoCBConnStringParams{}
}
Expand All @@ -196,7 +196,10 @@ func (spec *BucketSpec) GetGoCBConnString(params *GoCBConnStringParams) (string,

// Add kv_pool_size as used in both GoCB versions
poolSizeFromConnStr := asValues.Get("kv_pool_size")
if poolSizeFromConnStr == "" {
if forceKvPoolSize != nil {
asValues.Set("kv_pool_size", strconv.Itoa(*forceKvPoolSize))
spec.KvPoolSize = *forceKvPoolSize
} else if poolSizeFromConnStr == "" {
asValues.Set("kv_pool_size", strconv.Itoa(params.KVPoolSize))
spec.KvPoolSize = params.KVPoolSize
} else {
Expand Down
2 changes: 1 addition & 1 deletion base/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestGetGoCBConnString(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actualConnStr, err := test.bucketSpec.GetGoCBConnString(nil)
actualConnStr, err := test.bucketSpec.GetGoCBConnString(nil, nil)
assert.NoError(t, err, "Unexpected error creating connection string for bucket spec")
assert.Equal(t, test.expectedConnStr, actualConnStr)
})
Expand Down
2 changes: 1 addition & 1 deletion base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// GetGoCBv2Bucket opens a connection to the Couchbase cluster and returns a *GocbV2Bucket for the specified BucketSpec.
func GetGoCBv2Bucket(ctx context.Context, spec BucketSpec) (*GocbV2Bucket, error) {

connString, err := spec.GetGoCBConnString(nil)
connString, err := spec.GetGoCBConnString(nil, nil)
if err != nil {
WarnfCtx(ctx, "Unable to parse server value: %s error: %v", SD(spec.Server), err)
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions base/dcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,9 @@ func (dc *DCPClient) initAgent(spec BucketSpec) error {
KVBufferSize: spec.KvBufferSize,
DCPBufferSize: spec.DcpBuffer,
}
connStr, err := spec.GetGoCBConnString(defaultValues)

// Force poolsize to 1, multiple clients results in DCP naming collision
connStr, err := spec.GetGoCBConnString(defaultValues, IntPtr(GoCBPoolSizeDCP))
if err != nil {
return err
}
Expand All @@ -337,8 +339,6 @@ func (dc *DCPClient) initAgent(spec BucketSpec) error {
return err
}

// Force poolsize to 1, multiple clients results in DCP naming collision
agentConfig.KVConfig.PoolSize = 1
agentConfig.BucketName = spec.BucketName
agentConfig.DCPConfig.AgentPriority = dc.agentPriority
agentConfig.SecurityConfig.Auth = auth
Expand Down
2 changes: 1 addition & 1 deletion base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG
DCPBufferSize: spec.DcpBuffer,
}

serverURL, err := spec.GetGoCBConnString(defaultValues)
serverURL, err := spec.GetGoCBConnString(defaultValues, IntPtr(GoCBPoolSizeDCP))
if err != nil {
return nil, err
}
Expand Down
16 changes: 16 additions & 0 deletions base/dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,19 @@ func legacyFeedParams(spec BucketSpec) (string, error) {
}
return string(paramBytes), nil
}

func TestCBGTKvPoolSize(t *testing.T) {
ctx := TestCtx(t)
bucket := GetTestBucket(t)
defer bucket.Close(ctx)

spec := bucket.BucketSpec
spec.Server += "&kv_pool_size=8"

cfg, err := NewCbgtCfgMem()
require.NoError(t, err)
cbgtContext, err := initCBGTManager(ctx, bucket, spec, cfg, t.Name(), "fakeDb")
assert.NoError(t, err)
defer cbgtContext.Stop()
require.Contains(t, cbgtContext.Manager.Server(), "kv_pool_size=1")
}
2 changes: 1 addition & 1 deletion base/main_test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func initV2Cluster(ctx context.Context, server string) *gocb.Cluster {
BucketOpTimeout: &testClusterTimeout,
}

connStr, err := spec.GetGoCBConnString(nil)
connStr, err := spec.GetGoCBConnString(nil, nil)
if err != nil {
FatalfCtx(ctx, "error getting connection string: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion rest/server_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config
if spec.DcpBuffer != 0 {
params.DCPBufferSize = spec.DcpBuffer
}
connStr, err := spec.GetGoCBConnString(params)
connStr, err := spec.GetGoCBConnString(params, nil)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 8d7c451

Please sign in to comment.