Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into CBG-3445
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin committed Oct 5, 2023
2 parents d83ceb6 + ce51e97 commit 1436b03
Show file tree
Hide file tree
Showing 43 changed files with 581 additions and 1,012 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,5 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: karancode/yamllint-github-action@master
with:
yamllint_file_or_dir: 'docs/api'
18 changes: 9 additions & 9 deletions base/bucket_gocb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,15 +894,15 @@ func TestXattrWriteUpdateXattr(t *testing.T) {

// Dummy write update function that increments 'counter' in the doc and 'seq' in the xattr
writeUpdateFunc := func(doc []byte, xattr []byte, userXattr []byte, cas uint64) (
updatedDoc []byte, updatedXattr []byte, isDelete bool, updatedExpiry *uint32, err error) {
updatedDoc []byte, updatedXattr []byte, isDelete bool, updatedExpiry *uint32, updatedSpec []sgbucket.MacroExpansionSpec, err error) {

var docMap map[string]interface{}
var xattrMap map[string]interface{}
// Marshal the doc
if len(doc) > 0 {
err = JSONUnmarshal(doc, &docMap)
if err != nil {
return nil, nil, false, nil, pkgerrors.Wrapf(err, "Unable to unmarshal incoming doc")
return nil, nil, false, nil, nil, pkgerrors.Wrapf(err, "Unable to unmarshal incoming doc")
}
} else {
// No incoming doc, treat as insert.
Expand All @@ -913,7 +913,7 @@ func TestXattrWriteUpdateXattr(t *testing.T) {
if len(xattr) > 0 {
err = JSONUnmarshal(xattr, &xattrMap)
if err != nil {
return nil, nil, false, nil, pkgerrors.Wrapf(err, "Unable to unmarshal incoming xattr")
return nil, nil, false, nil, nil, pkgerrors.Wrapf(err, "Unable to unmarshal incoming xattr")
}
} else {
// No incoming xattr, treat as insert.
Expand All @@ -938,7 +938,7 @@ func TestXattrWriteUpdateXattr(t *testing.T) {

updatedDoc, _ = JSONMarshal(docMap)
updatedXattr, _ = JSONMarshal(xattrMap)
return updatedDoc, updatedXattr, false, nil, nil
return updatedDoc, updatedXattr, false, nil, updatedSpec, nil
}

// Insert
Expand Down Expand Up @@ -984,15 +984,15 @@ func TestWriteUpdateWithXattrUserXattr(t *testing.T) {
xattrKey := SyncXattrName
userXattrKey := "UserXattr"

writeUpdateFunc := func(doc []byte, xattr []byte, userXattr []byte, cas uint64) (updatedDoc []byte, updatedXattr []byte, isDelete bool, updatedExpiry *uint32, err error) {
writeUpdateFunc := func(doc []byte, xattr []byte, userXattr []byte, cas uint64) (updatedDoc []byte, updatedXattr []byte, isDelete bool, updatedExpiry *uint32, updatedSpec []sgbucket.MacroExpansionSpec, err error) {

var docMap map[string]interface{}
var xattrMap map[string]interface{}

if len(doc) > 0 {
err = JSONUnmarshal(xattr, &docMap)
if err != nil {
return nil, nil, false, nil, err
return nil, nil, false, nil, nil, err
}
} else {
docMap = make(map[string]interface{})
Expand All @@ -1001,7 +1001,7 @@ func TestWriteUpdateWithXattrUserXattr(t *testing.T) {
if len(xattr) > 0 {
err = JSONUnmarshal(xattr, &xattrMap)
if err != nil {
return nil, nil, false, nil, err
return nil, nil, false, nil, nil, err
}
} else {
xattrMap = make(map[string]interface{})
Expand All @@ -1011,7 +1011,7 @@ func TestWriteUpdateWithXattrUserXattr(t *testing.T) {
if len(userXattr) > 0 {
err = JSONUnmarshal(userXattr, &userXattrMap)
if err != nil {
return nil, nil, false, nil, err
return nil, nil, false, nil, nil, err
}
} else {
userXattrMap = nil
Expand All @@ -1022,7 +1022,7 @@ func TestWriteUpdateWithXattrUserXattr(t *testing.T) {
updatedDoc, _ = JSONMarshal(docMap)
updatedXattr, _ = JSONMarshal(xattrMap)

return updatedDoc, updatedXattr, false, nil, nil
return updatedDoc, updatedXattr, false, nil, updatedSpec, nil
}

_, err := dataStore.WriteUpdateWithXattr(ctx, key, xattrKey, userXattrKey, 0, nil, nil, writeUpdateFunc)
Expand Down
5 changes: 4 additions & 1 deletion base/collection_xattr_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func WriteUpdateWithXattr(ctx context.Context, store *Collection, k string, xatt
}

// Invoke callback to get updated value
updatedValue, updatedXattrValue, isDelete, callbackExpiry, err := callback(value, xattrValue, userXattrValue, cas)
updatedValue, updatedXattrValue, isDelete, callbackExpiry, updatedSpec, err := callback(value, xattrValue, userXattrValue, cas)

// If it's an ErrCasFailureShouldRetry, then retry by going back through the for loop
if err == ErrCasFailureShouldRetry {
Expand All @@ -215,6 +215,9 @@ func WriteUpdateWithXattr(ctx context.Context, store *Collection, k string, xatt
if callbackExpiry != nil {
exp = *callbackExpiry
}
if updatedSpec != nil {
opts.MacroExpansion = append(opts.MacroExpansion, updatedSpec...)
}

// Attempt to write the updated document to the bucket. Mark body for deletion if previous body was non-empty
deleteBody := value != nil
Expand Down
222 changes: 1 addition & 221 deletions base/dcp_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,13 @@ package base
import (
"bytes"
"context"
"crypto/tls"
"errors"
"expvar"
"fmt"
"strconv"
"sync"
"time"

"github.com/couchbase/go-couchbase"
"github.com/couchbase/go-couchbase/cbdatasource"
memcached "github.com/couchbase/gomemcached/client"
sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbaselabs/gocbconnstr"
"github.com/google/uuid"
pkgerrors "github.com/pkg/errors"
)
Expand All @@ -46,24 +40,6 @@ const kBackfillPersistInterval = 10 * time.Second
const DCPCachingFeedID = "SG"
const DCPImportFeedID = "SGI"

type SimpleFeed struct {
eventFeed chan sgbucket.FeedEvent
terminator chan bool
}

func (s *SimpleFeed) Events() <-chan sgbucket.FeedEvent {
return s.eventFeed
}

func (s *SimpleFeed) WriteEvents() chan<- sgbucket.FeedEvent {
return s.eventFeed
}

func (s *SimpleFeed) Close() error {
close(s.terminator)
return nil
}

type DCPCommon struct {
dbStatsExpvars *expvar.Map
m sync.Mutex
Expand Down Expand Up @@ -228,7 +204,7 @@ func (c *DCPCommon) loadCheckpoint(vbNo uint16) (vbMetadata []byte, snapshotStar
}
}

var snapshotMetadata cbdatasource.VBucketMetaData
var snapshotMetadata vBucketMetaData
unmarshalErr := JSONUnmarshal(rawValue, &snapshotMetadata)
if unmarshalErr != nil {
return []byte{}, 0, 0, err
Expand Down Expand Up @@ -596,199 +572,3 @@ func GenerateDcpStreamName(feedID string) (string, error) {
), nil

}

// getExternalAlternateAddress returns a external alternate address for a given dest
func getExternalAlternateAddress(loggingCtx context.Context, alternateAddressMap map[string]string, dest string) (string, error) {
if len(alternateAddressMap) == 0 {
// early exit if we know we've got nothing to find
return dest, nil
}

destHost, destPort, err := SplitHostPort(dest)
if err != nil {
return "", err
}

// Map the given destination to an external alternate address hostname if available
if extHostname, foundAltAddress := alternateAddressMap[destHost]; foundAltAddress {
host, port, _ := SplitHostPort(extHostname)
if port == "" {
port = destPort
}
if host == "" {
host = extHostname
}

InfofCtx(loggingCtx, KeyDCP, "Using alternate address %s => %s", MD(dest), MD(host+":"+port))
dest = host + ":" + port
}

return dest, nil
}

type clusterNetworkType string

const (
// clusterNetworkAuto applies a heuristic to determine which network to use (based on bootstrap hosts)
clusterNetworkAuto clusterNetworkType = "auto"
// clusterNetworkDefault uses the default (internal) network
clusterNetworkDefault clusterNetworkType = "default"
// clusterNetworkExternal will use the external network
clusterNetworkExternal clusterNetworkType = "external"
)

// getNetworkTypeFromConnSpec returns the configured network type, or clusterNetworkAuto if nothing is defined.
func getNetworkTypeFromConnSpec(ctx context.Context, spec gocbconnstr.ConnSpec) clusterNetworkType {
networkType := clusterNetworkAuto
if networkOpt, ok := spec.Options["network"]; ok && len(networkOpt) > 0 {
if len(networkOpt) > 1 {
WarnfCtx(ctx, "multiple 'network' options found in connection string - using first one: %q", networkOpt[0])
}
networkType = clusterNetworkType(networkOpt[0])
}
return networkType
}

// alternateAddressShims returns the 3 functions that wrap around ConnectBucket/Connect/ConnectTLS to provide alternate address support.
func alternateAddressShims(loggingCtx context.Context, bucketSpecTLS bool, connSpecAddresses []gocbconnstr.Address, networkType clusterNetworkType) (
connectBucketShim func(serverURL, poolName, bucketName string, auth couchbase.AuthHandler) (cbdatasource.Bucket, error),
connectShim func(protocol, dest string) (*memcached.Client, error),
connectTLSShim func(protocol, dest string, tlsConfig *tls.Config) (*memcached.Client, error),
) {

// A map of dest URL (which may be an internal-only address) to external alternate address.
var externalAlternateAddresses map[string]string

// Copy of cbdatasource's default ConnectBucket function, which maps internal addresses to alternate addresses
connectBucketShim = func(serverURL, poolName, bucketName string, auth couchbase.AuthHandler) (cbdatasource.Bucket, error) {
TracefCtx(loggingCtx, KeyDCP, "ConnectBucket callback: %s %s %s", MD(serverURL), poolName, MD(bucketName))

var (
err error
client couchbase.Client
)

if auth != nil {
client, err = couchbase.ConnectWithAuth(serverURL, auth)
} else {
client, err = couchbase.Connect(serverURL)
}
if err != nil {
return nil, err
}

// Fetch any alternate external addresses/ports and store them in the externalAlternateAddresses map
poolServices, err := client.GetPoolServices(poolName)
if err != nil {
return nil, err
}

connSpecAddressesHostMap := make(map[string]struct{}, len(connSpecAddresses))
for _, connSpecAddress := range connSpecAddresses {
connSpecAddressesHostMap[connSpecAddress.Host] = struct{}{}
}

// Recreate the map to forget about previous clustermap information.
externalAlternateAddresses = make(map[string]string, len(poolServices.NodesExt))
for _, node := range poolServices.NodesExt {

// apply heuristic if auto to select between "default" and "external"
if networkType == clusterNetworkAuto {
if _, ok := connSpecAddressesHostMap[node.Hostname]; ok {
DebugfCtx(loggingCtx, KeyDCP, "Matched host %s in connection string - using default/internal networking.", MD(node.Hostname))
// Found default hostname in connSpec - abort all alternate address behaviour.
// The client MUST use the default/internal network.
externalAlternateAddresses = nil
break
}
// select external network now heuristic failed
networkType = clusterNetworkExternal
}

DebugfCtx(loggingCtx, KeyDCP, "Finding alternate addresses for network %s", networkType)

// only try to map alternate addresses if an alternate hostname is present
if alt, ok := node.AlternateNames[string(networkType)]; ok && alt.Hostname != "" {
var port string
if bucketSpecTLS {
extPort, ok := alt.Ports["kvSSL"]
if !ok {
TracefCtx(loggingCtx, KeyDCP, "kvSSL port was not exposed for %s alternate address. Skipping remapping of this node.", networkType)
continue
}

// found exposed kvSSL port, use when connecting
port = ":" + strconv.Itoa(extPort)
DebugfCtx(loggingCtx, KeyDCP, "Storing alternate address for kvSSL: %s => %s", MD(node.Hostname), MD(alt.Hostname+port))
} else {
extPort, ok := alt.Ports["kv"]
if !ok {
TracefCtx(loggingCtx, KeyDCP, "kv port was not exposed for %s alternate address. Skipping remapping of this node.", networkType)
continue
}

// found exposed kv port, use when connecting
port = ":" + strconv.Itoa(extPort)
DebugfCtx(loggingCtx, KeyDCP, "Storing alternate address for kv: %s => %s", MD(node.Hostname), MD(alt.Hostname+port))
}

externalAlternateAddresses[node.Hostname] = alt.Hostname + port
}
}

var bucket *couchbase.Bucket
if auth != nil {
bucket, err = couchbase.ConnectWithAuthAndGetBucket(serverURL, poolName, bucketName, auth)
} else {
bucket, err = couchbase.GetBucket(serverURL, poolName, bucketName)
}
if err != nil {
return nil, err
}

if bucket == nil {
return nil, fmt.Errorf("unknown bucket,"+
" serverURL: %s, bucketName: %s", serverURL, bucketName)
}

return bucket, nil
}

// Copy of cbdatasource's default Connect function, which swaps the given destination, with alternate addresses we found in ConnectBucket.
connectShim = func(protocol, dest string) (client *memcached.Client, err error) {
TracefCtx(loggingCtx, KeyDCP, "Connect mutationCallback: %s %s", protocol, MD(dest))

dest, err = getExternalAlternateAddress(loggingCtx, externalAlternateAddresses, dest)
if err != nil {
return nil, err
}

return memcached.Connect(protocol, dest)
}

// Copy of cbdatasource's default ConnectTLS function, which swaps the given destination, with alternate addresses we found in ConnectBucket.
connectTLSShim = func(protocol, dest string, tlsConfig *tls.Config) (client *memcached.Client, err error) {
TracefCtx(loggingCtx, KeyDCP, "ConnectTLS mutationCallback: %s %s", protocol, MD(dest))

newDest, err := getExternalAlternateAddress(loggingCtx, externalAlternateAddresses, dest)
if err != nil {
return nil, err
}
if newDest == dest {
// skip unnecessary tls reconfiguration if no alternate was found
return memcached.ConnectTLS(protocol, dest, tlsConfig)
}

// extract the new host and insert into the tlsConfig
host, _, err := SplitHostPort(dest)
if err != nil {
return nil, err
}
tlsConfigCopy := tlsConfig.Clone()
tlsConfigCopy.ServerName = host

return memcached.ConnectTLS(protocol, dest, tlsConfigCopy)
}

return connectBucketShim, connectShim, connectTLSShim
}
Loading

0 comments on commit 1436b03

Please sign in to comment.