Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3577 Uptake cbgt fix for MB-59249 #6562

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions base/dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func TestConcurrentCBGTIndexCreation(t *testing.T) {
spec := bucket.BucketSpec
testDBName := "testDB"

// Use an bucket-backed cfg
// Use a bucket-backed cfg
cfg, err := NewCfgSG(ctx, dataStore, "")
require.NoError(t, err)

Expand Down Expand Up @@ -459,8 +459,8 @@ func TestConcurrentCBGTIndexCreation(t *testing.T) {
log.Printf("Starting manager for %s", managerUUID)
startErr := context.StartManager(ctx, testDBName, configGroup, bucket, spec, "", nil, DefaultImportPartitions)
assert.NoError(t, startErr)

managerWg.Done()

// ensure all goroutines start the manager before we start closing them
select {
case <-terminatorChan:
Expand All @@ -473,7 +473,6 @@ func TestConcurrentCBGTIndexCreation(t *testing.T) {
}
managerWg.Wait()
close(terminator)

}

// legacyFeedParams format with credentials included
Expand Down
6 changes: 6 additions & 0 deletions db/import_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,9 @@ func (il *importListener) Stop() {
close(il.terminator)
}
}

func (db *DatabaseContext) PartitionCount() int {
il := db.ImportListener
_, pindexes := il.cbgtContext.Manager.CurrentMaps()
return len(pindexes)
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
dario.cat/mergo v1.0.0
github.com/bhoriuchi/graphql-go-tools v1.0.3
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/couchbase/cbgt v1.3.7
github.com/couchbase/cbgt v1.3.8
github.com/couchbase/clog v0.1.0
github.com/couchbase/go-blip v0.0.0-20231017145500-e4a51837754e
github.com/couchbase/gocb/v2 v2.6.4
Expand Down Expand Up @@ -43,7 +43,7 @@ require (
github.com/aws/aws-sdk-go v1.44.299 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/couchbase/blance v0.1.3 // indirect
github.com/couchbase/blance v0.1.5 // indirect
github.com/couchbase/cbauth v0.1.10 // indirect
github.com/couchbase/go-couchbase v0.1.1 // indirect
github.com/couchbase/tools-common/cloud v1.0.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/coreos/go-oidc v2.2.1+incompatible h1:mh48q/BqXqgjVHpy2ZY7WnWAbenxRjsz9N1i1YxjHAk=
github.com/coreos/go-oidc v2.2.1+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc=
github.com/couchbase/blance v0.1.3 h1:CJCirD3+N02Z0w/ybZTqqSJa9XMbsCZO9jHxCEAPQqE=
github.com/couchbase/blance v0.1.3/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU=
github.com/couchbase/blance v0.1.5 h1:kNSAwhb8FXSJpicJ8R8Kk7+0V1+MyTcY1MOHIDbU79w=
github.com/couchbase/blance v0.1.5/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU=
github.com/couchbase/cbauth v0.1.10 h1:ixJGG9mAgL1HnWKt2eKmJ8gJa0rkaUURtDPDak7Dcts=
github.com/couchbase/cbauth v0.1.10/go.mod h1:YHG+5rUI6GoLLlIViT9IfD0nwqEebBcl66TQ4pDFDw4=
github.com/couchbase/cbgt v1.3.7 h1:ZDPMS1SlNsTdl8yRSbQ6B751q43TpMk3lBkIynWqCPY=
github.com/couchbase/cbgt v1.3.7/go.mod h1:xntLl+vFnPmI49TAaolG4a1O6oOZ5YFKdqtoTMvQLWw=
github.com/couchbase/cbgt v1.3.8 h1:DmIHNfbuGkmNi85lcWFxeDReoame9Xz2c7LjvbMuhZs=
github.com/couchbase/cbgt v1.3.8/go.mod h1:kkK2gk/2LTuw6A+a4W22m2Pcta7JHu1PiXS8sJ3ow9g=
github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY=
github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4=
github.com/couchbase/go-blip v0.0.0-20231017145500-e4a51837754e h1:WWe9B68e4z1+05ASEYIp4fHAYSAUr5neJx966YOYYRk=
Expand Down
79 changes: 79 additions & 0 deletions rest/importtest/import_partition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2023-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package importtest

import (
"log"
"sync"
"testing"

"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/rest"
)

func TestImportPartitionsOnConcurrentStart(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("This test only works against Couchbase Server")
}

// Start multiple rest testers concurrently
numNodes := 4
numImportPartitions := uint16(16)
expectedPartitions := 4
restTesters := make([]*rest.RestTester, numNodes)
tb := base.GetTestBucket(t)
ctx := base.TestCtx(t)
defer tb.Close(ctx)
var wg sync.WaitGroup
for i := 0; i < numNodes; i++ {
wg.Add(1)
go func(i int) {
noCloseTB := tb.NoCloseClone()
rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: noCloseTB,
DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{
AutoImport: true,
ImportPartitions: base.Uint16Ptr(numImportPartitions),
}},
})
restTesters[i] = rt
wg.Done()
}(i)
}
wg.Wait()

defer func() {
for _, rt := range restTesters {
if rt != nil {
rt.Close()
}
}
}()

rest.WaitAndAssertCondition(t, func() bool {
totalPartitions := uint16(0)
balancedPartitions := true
currentPartitions := make([]int, len(restTesters))
for i, rt := range restTesters {
rtPartitions := rt.GetDatabase().PartitionCount()
currentPartitions[i] = rtPartitions
totalPartitions = totalPartitions + uint16(rtPartitions)
if rtPartitions != expectedPartitions {
balancedPartitions = false
}
}
if totalPartitions == numImportPartitions && balancedPartitions == true {
log.Printf("Partitions are balanced. Current total: %d, distribution: %v", totalPartitions, currentPartitions)
return true
} else {
log.Printf("Waiting for balanced partitions. Current total: %d, distribution: %v", totalPartitions, currentPartitions)
return false
}
})
}