From 1bef0569742164d24c2b813a464073d143948b75 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Mon, 30 Oct 2023 16:43:06 -0700 Subject: [PATCH] CBG-3577 Uptake cbgt fix for MB-59249 (#6562) * CBG-3577 Uptake cbgt fix for MB-59249 Includes new unit test that reproduced the issue being fixed. * PR review fixes --- base/dcp_test.go | 5 +- db/import_listener.go | 6 ++ go.mod | 4 +- go.sum | 8 +-- rest/importtest/import_partition_test.go | 79 ++++++++++++++++++++++++ 5 files changed, 93 insertions(+), 9 deletions(-) create mode 100644 rest/importtest/import_partition_test.go diff --git a/base/dcp_test.go b/base/dcp_test.go index 32b51b134c..35794efa56 100644 --- a/base/dcp_test.go +++ b/base/dcp_test.go @@ -427,7 +427,7 @@ func TestConcurrentCBGTIndexCreation(t *testing.T) { spec := bucket.BucketSpec testDBName := "testDB" - // Use an bucket-backed cfg + // Use a bucket-backed cfg cfg, err := NewCfgSG(ctx, dataStore, "") require.NoError(t, err) @@ -459,8 +459,8 @@ func TestConcurrentCBGTIndexCreation(t *testing.T) { log.Printf("Starting manager for %s", managerUUID) startErr := context.StartManager(ctx, testDBName, configGroup, bucket, spec, "", nil, DefaultImportPartitions) assert.NoError(t, startErr) - managerWg.Done() + // ensure all goroutines start the manager before we start closing them select { case <-terminatorChan: @@ -473,7 +473,6 @@ func TestConcurrentCBGTIndexCreation(t *testing.T) { } managerWg.Wait() close(terminator) - } // legacyFeedParams format with credentials included diff --git a/db/import_listener.go b/db/import_listener.go index 1af3884c41..5701245040 100644 --- a/db/import_listener.go +++ b/db/import_listener.go @@ -244,3 +244,9 @@ func (il *importListener) Stop() { close(il.terminator) } } + +func (db *DatabaseContext) PartitionCount() int { + il := db.ImportListener + _, pindexes := il.cbgtContext.Manager.CurrentMaps() + return len(pindexes) +} diff --git a/go.mod b/go.mod index 17cbf459ed..3ec72ae0b3 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( dario.cat/mergo v1.0.0 github.com/bhoriuchi/graphql-go-tools v1.0.3 github.com/coreos/go-oidc v2.2.1+incompatible - github.com/couchbase/cbgt v1.3.7 + github.com/couchbase/cbgt v1.3.8 github.com/couchbase/clog v0.1.0 github.com/couchbase/go-blip v0.0.0-20231017145500-e4a51837754e github.com/couchbase/gocb/v2 v2.6.4 @@ -43,7 +43,7 @@ require ( github.com/aws/aws-sdk-go v1.44.299 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/couchbase/blance v0.1.3 // indirect + github.com/couchbase/blance v0.1.5 // indirect github.com/couchbase/cbauth v0.1.10 // indirect github.com/couchbase/go-couchbase v0.1.1 // indirect github.com/couchbase/tools-common/cloud v1.0.0 // indirect diff --git a/go.sum b/go.sum index 2b06924e93..9591b04d80 100644 --- a/go.sum +++ b/go.sum @@ -18,12 +18,12 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/coreos/go-oidc v2.2.1+incompatible h1:mh48q/BqXqgjVHpy2ZY7WnWAbenxRjsz9N1i1YxjHAk= github.com/coreos/go-oidc v2.2.1+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= -github.com/couchbase/blance v0.1.3 h1:CJCirD3+N02Z0w/ybZTqqSJa9XMbsCZO9jHxCEAPQqE= -github.com/couchbase/blance v0.1.3/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU= +github.com/couchbase/blance v0.1.5 h1:kNSAwhb8FXSJpicJ8R8Kk7+0V1+MyTcY1MOHIDbU79w= +github.com/couchbase/blance v0.1.5/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU= github.com/couchbase/cbauth v0.1.10 h1:ixJGG9mAgL1HnWKt2eKmJ8gJa0rkaUURtDPDak7Dcts= github.com/couchbase/cbauth v0.1.10/go.mod h1:YHG+5rUI6GoLLlIViT9IfD0nwqEebBcl66TQ4pDFDw4= -github.com/couchbase/cbgt v1.3.7 h1:ZDPMS1SlNsTdl8yRSbQ6B751q43TpMk3lBkIynWqCPY= -github.com/couchbase/cbgt v1.3.7/go.mod h1:xntLl+vFnPmI49TAaolG4a1O6oOZ5YFKdqtoTMvQLWw= +github.com/couchbase/cbgt v1.3.8 h1:DmIHNfbuGkmNi85lcWFxeDReoame9Xz2c7LjvbMuhZs= +github.com/couchbase/cbgt v1.3.8/go.mod h1:kkK2gk/2LTuw6A+a4W22m2Pcta7JHu1PiXS8sJ3ow9g= github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY= github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4= github.com/couchbase/go-blip v0.0.0-20231017145500-e4a51837754e h1:WWe9B68e4z1+05ASEYIp4fHAYSAUr5neJx966YOYYRk= diff --git a/rest/importtest/import_partition_test.go b/rest/importtest/import_partition_test.go new file mode 100644 index 0000000000..a1e50aaa14 --- /dev/null +++ b/rest/importtest/import_partition_test.go @@ -0,0 +1,79 @@ +// Copyright 2023-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package importtest + +import ( + "log" + "sync" + "testing" + + "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/rest" +) + +func TestImportPartitionsOnConcurrentStart(t *testing.T) { + if base.UnitTestUrlIsWalrus() { + t.Skip("This test only works against Couchbase Server") + } + + // Start multiple rest testers concurrently + numNodes := 4 + numImportPartitions := uint16(16) + expectedPartitions := 4 + restTesters := make([]*rest.RestTester, numNodes) + tb := base.GetTestBucket(t) + ctx := base.TestCtx(t) + defer tb.Close(ctx) + var wg sync.WaitGroup + for i := 0; i < numNodes; i++ { + wg.Add(1) + go func(i int) { + noCloseTB := tb.NoCloseClone() + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: noCloseTB, + DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ + AutoImport: true, + ImportPartitions: base.Uint16Ptr(numImportPartitions), + }}, + }) + restTesters[i] = rt + wg.Done() + }(i) + } + wg.Wait() + + defer func() { + for _, rt := range restTesters { + if rt != nil { + rt.Close() + } + } + }() + + rest.WaitAndAssertCondition(t, func() bool { + totalPartitions := uint16(0) + balancedPartitions := true + currentPartitions := make([]int, len(restTesters)) + for i, rt := range restTesters { + rtPartitions := rt.GetDatabase().PartitionCount() + currentPartitions[i] = rtPartitions + totalPartitions = totalPartitions + uint16(rtPartitions) + if rtPartitions != expectedPartitions { + balancedPartitions = false + } + } + if totalPartitions == numImportPartitions && balancedPartitions == true { + log.Printf("Partitions are balanced. Current total: %d, distribution: %v", totalPartitions, currentPartitions) + return true + } else { + log.Printf("Waiting for balanced partitions. Current total: %d, distribution: %v", totalPartitions, currentPartitions) + return false + } + }) +}