diff --git a/base/bucket.go b/base/bucket.go index b37431c446..ee242d2be1 100644 --- a/base/bucket.go +++ b/base/bucket.go @@ -177,8 +177,8 @@ func (p *GoCBConnStringParams) FillDefaults() { } } -// GetGoCBConnString builds a gocb connection string based on BucketSpec.Server. -func (spec *BucketSpec) GetGoCBConnString(params *GoCBConnStringParams) (string, error) { +// GetGoCBConnString builds a gocb connection string based on BucketSpec.Server. params defines the defaults that will be set for dcp_buffer_size, kv_buffer_size, kv_pool_size if they are non defined as nonzero. forceKvPoolSize will override the vlaue in BucketSpec.Server and the params. Both arguments are optional. +func (spec *BucketSpec) GetGoCBConnString(params *GoCBConnStringParams, forceKvPoolSize *int) (string, error) { if params == nil { params = &GoCBConnStringParams{} } @@ -196,7 +196,10 @@ func (spec *BucketSpec) GetGoCBConnString(params *GoCBConnStringParams) (string, // Add kv_pool_size as used in both GoCB versions poolSizeFromConnStr := asValues.Get("kv_pool_size") - if poolSizeFromConnStr == "" { + if forceKvPoolSize != nil { + asValues.Set("kv_pool_size", strconv.Itoa(*forceKvPoolSize)) + spec.KvPoolSize = *forceKvPoolSize + } else if poolSizeFromConnStr == "" { asValues.Set("kv_pool_size", strconv.Itoa(params.KVPoolSize)) spec.KvPoolSize = params.KVPoolSize } else { diff --git a/base/bucket_test.go b/base/bucket_test.go index 31bf5aa384..b643b892ee 100644 --- a/base/bucket_test.go +++ b/base/bucket_test.go @@ -69,7 +69,7 @@ func TestGetGoCBConnString(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - actualConnStr, err := test.bucketSpec.GetGoCBConnString(nil) + actualConnStr, err := test.bucketSpec.GetGoCBConnString(nil, nil) assert.NoError(t, err, "Unexpected error creating connection string for bucket spec") assert.Equal(t, test.expectedConnStr, actualConnStr) }) diff --git a/base/collection.go b/base/collection.go index 56a4a91f5c..0487ef9451 100644 --- a/base/collection.go +++ b/base/collection.go @@ -32,7 +32,7 @@ import ( // GetGoCBv2Bucket opens a connection to the Couchbase cluster and returns a *GocbV2Bucket for the specified BucketSpec. func GetGoCBv2Bucket(ctx context.Context, spec BucketSpec) (*GocbV2Bucket, error) { - connString, err := spec.GetGoCBConnString(nil) + connString, err := spec.GetGoCBConnString(nil, nil) if err != nil { WarnfCtx(ctx, "Unable to parse server value: %s error: %v", SD(spec.Server), err) return nil, err diff --git a/base/dcp_client.go b/base/dcp_client.go index 4a3f112312..c603719c68 100644 --- a/base/dcp_client.go +++ b/base/dcp_client.go @@ -316,7 +316,9 @@ func (dc *DCPClient) initAgent(spec BucketSpec) error { KVBufferSize: spec.KvBufferSize, DCPBufferSize: spec.DcpBuffer, } - connStr, err := spec.GetGoCBConnString(defaultValues) + + // Force poolsize to 1, multiple clients results in DCP naming collision + connStr, err := spec.GetGoCBConnString(defaultValues, IntPtr(GoCBPoolSizeDCP)) if err != nil { return err } @@ -337,8 +339,6 @@ func (dc *DCPClient) initAgent(spec BucketSpec) error { return err } - // Force poolsize to 1, multiple clients results in DCP naming collision - agentConfig.KVConfig.PoolSize = 1 agentConfig.BucketName = spec.BucketName agentConfig.DCPConfig.AgentPriority = dc.agentPriority agentConfig.SecurityConfig.Auth = auth diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index 1c841b6a33..9c83b719c2 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -312,7 +312,7 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG DCPBufferSize: spec.DcpBuffer, } - serverURL, err := spec.GetGoCBConnString(defaultValues) + serverURL, err := spec.GetGoCBConnString(defaultValues, IntPtr(GoCBPoolSizeDCP)) if err != nil { return nil, err } diff --git a/base/dcp_test.go b/base/dcp_test.go index bff7fbd825..9e6da62352 100644 --- a/base/dcp_test.go +++ b/base/dcp_test.go @@ -542,3 +542,19 @@ func legacyFeedParams(spec BucketSpec) (string, error) { } return string(paramBytes), nil } + +func TestCBGTKvPoolSize(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + spec := bucket.BucketSpec + spec.Server += "&kv_pool_size=8" + + cfg, err := NewCbgtCfgMem() + require.NoError(t, err) + cbgtContext, err := initCBGTManager(ctx, bucket, spec, cfg, t.Name(), "fakeDb") + assert.NoError(t, err) + defer cbgtContext.Stop() + require.Contains(t, cbgtContext.Manager.Server(), "kv_pool_size=1") +} diff --git a/base/main_test_cluster.go b/base/main_test_cluster.go index e9f8c2e991..ad9a3121fb 100644 --- a/base/main_test_cluster.go +++ b/base/main_test_cluster.go @@ -65,7 +65,7 @@ func initV2Cluster(ctx context.Context, server string) *gocb.Cluster { BucketOpTimeout: &testClusterTimeout, } - connStr, err := spec.GetGoCBConnString(nil) + connStr, err := spec.GetGoCBConnString(nil, nil) if err != nil { FatalfCtx(ctx, "error getting connection string: %v", err) } diff --git a/rest/server_context.go b/rest/server_context.go index 1191975891..83c0c22832 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -523,7 +523,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config if spec.DcpBuffer != 0 { params.DCPBufferSize = spec.DcpBuffer } - connStr, err := spec.GetGoCBConnString(params) + connStr, err := spec.GetGoCBConnString(params, nil) if err != nil { return nil, err }