Skip to content

Commit

Permalink
cloud: add WithClientName option
Browse files Browse the repository at this point in the history
The WithClientName option allows clients of the cloud package to
specific a client name. This name is used by the cloud input/output
network metrics to attribute network usage to a specific sub component.

The initial consumers of the API are the backup job processor and the
cdc cloud storage sink. This makes it possible to distinguish backup
bytes from other cloud clients like CDC.

Fixes: #132862
Release note: None
  • Loading branch information
jeffswenson committed Oct 18, 2024
1 parent 22b0a9b commit 08545a9
Show file tree
Hide file tree
Showing 15 changed files with 97 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func runBackupProcessor(
progCh: progCh,
settings: &flowCtx.Cfg.Settings.SV,
}
storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest)
storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest, cloud.WithClientName("backup"))
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ func makeCloudStorageSink(

// We make the external storage with a nil IOAccountingInterceptor since we
// record usage metrics via s.metrics.
if s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil)); err != nil {
s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil), cloud.WithClientName("cdc"))
if err != nil {
return nil, err
}
if mb != nil && s.es != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ func TestCloudStorageSink(t *testing.T) {
clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir)
externalStorageFromURI := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage,
error) {
var options cloud.ExternalStorageOptions
for _, opt := range opts {
opt(&options)
}
require.Equal(t, options.ClientName, "cdc")

return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings,
clientFactory,
user,
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_test(
name = "cloud_test",
srcs = [
"cloud_io_test.go",
"options_test.go",
"uris_test.go",
],
args = ["-test.timeout=295s"],
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/amazon/aws_kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func MakeAWSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e
// situation is.
region = "default-region"
}
client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS")
client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS", "")
if err != nil {
return nil, err
}
Expand Down
55 changes: 27 additions & 28 deletions pkg/cloud/amazon/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ var NightlyEnvVarKMSParams = map[string]string{
}

type s3Storage struct {
bucket *string
conf *cloudpb.ExternalStorage_S3
ioConf base.ExternalIODirConfig
settings *cluster.Settings
prefix string
metrics *cloud.Metrics
bucket *string
conf *cloudpb.ExternalStorage_S3
ioConf base.ExternalIODirConfig
settings *cluster.Settings
prefix string
metrics *cloud.Metrics
storageOptions cloud.ExternalStorageOptions

opts s3ClientConfig
cached *s3Client
Expand Down Expand Up @@ -469,7 +470,7 @@ func MakeS3Storage(
// other callers from making clients in the meantime, not just to avoid making
// duplicate clients in a race but also because making clients concurrently
// can fail if the AWS metadata server hits its rate limit.
client, _, err := newClient(ctx, args.MetricsRecorder, s.opts, s.settings)
client, _, err := s.newClient(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -499,11 +500,9 @@ var awsVerboseLogging = aws.LogLevel(aws.LogDebugWithRequestRetries | aws.LogDeb
// config's region is empty, used the passed bucket to determine a region and
// configures the client with it as well as returning it (so the caller can
// remember it for future calls).
func newClient(
ctx context.Context, metrics *cloud.Metrics, conf s3ClientConfig, settings *cluster.Settings,
) (s3Client, string, error) {
func (s *s3Storage) newClient(ctx context.Context) (s3Client, string, error) {
// Open a span if client creation will do IO/RPCs to find creds/bucket region.
if conf.region == "" || conf.auth == cloud.AuthParamImplicit {
if s.opts.region == "" || s.opts.auth == cloud.AuthParamImplicit {
var sp *tracing.Span
ctx, sp = tracing.ChildSpan(ctx, "s3.newClient")
defer sp.Finish()
Expand All @@ -512,22 +511,22 @@ func newClient(
opts := session.Options{}

{
httpClient, err := cloud.MakeHTTPClient(settings, metrics, "aws", conf.bucket)
httpClient, err := cloud.MakeHTTPClient(s.settings, s.metrics, "aws", s.opts.bucket, s.storageOptions.ClientName)
if err != nil {
return s3Client{}, "", err
}
opts.Config.HTTPClient = httpClient
}

if conf.endpoint != "" {
opts.Config.Endpoint = aws.String(conf.endpoint)
if s.opts.endpoint != "" {
opts.Config.Endpoint = aws.String(s.opts.endpoint)
opts.Config.S3ForcePathStyle = aws.Bool(true)

if conf.region == "" {
conf.region = "default-region"
if s.opts.region == "" {
s.opts.region = "default-region"
}

client, err := cloud.MakeHTTPClient(settings, metrics, "aws", conf.bucket)
client, err := cloud.MakeHTTPClient(s.settings, s.metrics, "aws", s.opts.bucket, s.storageOptions.ClientName)
if err != nil {
return s3Client{}, "", err
}
Expand All @@ -540,7 +539,7 @@ func newClient(
opts.Config.CredentialsChainVerboseErrors = aws.Bool(true)

opts.Config.Logger = newLogAdapter(ctx)
if conf.verbose {
if s.opts.verbose {
opts.Config.LogLevel = awsVerboseLogging
}

Expand All @@ -554,13 +553,13 @@ func newClient(
var sess *session.Session
var err error

switch conf.auth {
switch s.opts.auth {
case "", cloud.AuthParamSpecified:
sess, err = session.NewSessionWithOptions(opts)
if err != nil {
return s3Client{}, "", errors.Wrap(err, "new aws session")
}
sess.Config.Credentials = credentials.NewStaticCredentials(conf.accessKey, conf.secret, conf.tempToken)
sess.Config.Credentials = credentials.NewStaticCredentials(s.opts.accessKey, s.opts.secret, s.opts.tempToken)
case cloud.AuthParamImplicit:
opts.SharedConfigState = session.SharedConfigEnable
sess, err = session.NewSessionWithOptions(opts)
Expand All @@ -569,8 +568,8 @@ func newClient(
}
}

if conf.assumeRoleProvider.roleARN != "" {
for _, delegateProvider := range conf.delegateRoleProviders {
if s.opts.assumeRoleProvider.roleARN != "" {
for _, delegateProvider := range s.opts.delegateRoleProviders {
intermediateCreds := stscreds.NewCredentials(sess, delegateProvider.roleARN, withExternalID(delegateProvider.externalID))
opts.Config.Credentials = intermediateCreds

Expand All @@ -580,18 +579,18 @@ func newClient(
}
}

creds := stscreds.NewCredentials(sess, conf.assumeRoleProvider.roleARN, withExternalID(conf.assumeRoleProvider.externalID))
creds := stscreds.NewCredentials(sess, s.opts.assumeRoleProvider.roleARN, withExternalID(s.opts.assumeRoleProvider.externalID))
opts.Config.Credentials = creds
sess, err = session.NewSessionWithOptions(opts)
if err != nil {
return s3Client{}, "", errors.Wrap(err, "session with assume role credentials")
}
}

region := conf.region
region := s.opts.region
if region == "" {
if err := cloud.DelayedRetry(ctx, "s3manager.GetBucketRegion", s3ErrDelay, func() error {
region, err = s3manager.GetBucketRegion(ctx, sess, conf.bucket, "us-east-1")
region, err = s3manager.GetBucketRegion(ctx, sess, s.opts.bucket, "us-east-1")
return err
}); err != nil {
return s3Client{}, "", errors.Wrap(err, "could not find s3 bucket's region")
Expand All @@ -601,7 +600,7 @@ func newClient(

c := s3.New(sess)
u := s3manager.NewUploader(sess, func(uploader *s3manager.Uploader) {
uploader.PartSize = cloud.WriteChunkSize.Get(&settings.SV)
uploader.PartSize = cloud.WriteChunkSize.Get(&s.settings.SV)
})
return s3Client{client: c, uploader: u}, region, nil
}
Expand All @@ -610,7 +609,7 @@ func (s *s3Storage) getClient(ctx context.Context) (*s3.S3, error) {
if s.cached != nil {
return s.cached.client, nil
}
client, region, err := newClient(ctx, s.metrics, s.opts, s.settings)
client, region, err := s.newClient(ctx)
if err != nil {
return nil, err
}
Expand All @@ -624,7 +623,7 @@ func (s *s3Storage) getUploader(ctx context.Context) (*s3manager.Uploader, error
if s.cached != nil {
return s.cached.uploader, nil
}
client, region, err := newClient(ctx, s.metrics, s.opts, s.settings)
client, region, err := s.newClient(ctx)
if err != nil {
return nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/cloud/amazon/s3_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,15 @@ func TestNewClientErrorsOnBucketRegion(t *testing.T) {

testSettings := cluster.MakeTestingClusterSettings()
ctx := context.Background()
cfg := s3ClientConfig{
bucket: "bucket-does-not-exist-v1i3m",
auth: cloud.AuthParamImplicit,
s3 := s3Storage{
opts: s3ClientConfig{
bucket: "bucket-does-not-exist-v1i3m",
auth: cloud.AuthParamImplicit,
},
metrics: cloud.NilMetrics,
settings: testSettings,
}
_, _, err = newClient(ctx, cloud.NilMetrics, cfg, testSettings)
_, _, err = s3.newClient(ctx)
require.Regexp(t, "could not find s3 bucket's region", err)
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/cloud/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ func makeAzureStorage(
return nil, errors.Wrap(err, "azure: account name is not valid")
}

t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container)
options := args.ExternalStorageOptions()
t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container, options.ClientName)
if err != nil {
return nil, errors.Wrap(err, "azure: unable to create transport")
}
Expand Down Expand Up @@ -255,11 +256,6 @@ func makeAzureStorage(
"implicit credentials disallowed for azure due to --external-io-disable-implicit-credentials flag")
}

options := cloud.ExternalStorageOptions{}
for _, o := range args.Options {
o(&options)
}

defaultCredentialsOptions := &DefaultAzureCredentialWithFileOptions{}
if knobs := options.AzureStorageTestingKnobs; knobs != nil {
defaultCredentialsOptions.testingKnobs = knobs.(*TestingKnobs)
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloud/cloud_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ var httpMetrics = settings.RegisterBoolSetting(
// MakeHTTPClient makes an http client configured with the common settings used
// for interacting with cloud storage (timeouts, retries, CA certs, etc).
func MakeHTTPClient(
settings *cluster.Settings, metrics *Metrics, cloud, bucket string,
settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string,
) (*http.Client, error) {
t, err := MakeTransport(settings, metrics, cloud, bucket)
t, err := MakeTransport(settings, metrics, cloud, bucket, client)
if err != nil {
return nil, err
}
Expand All @@ -99,7 +99,7 @@ func MakeHTTPClientForTransport(t http.RoundTripper) (*http.Client, error) {
// used for interacting with cloud storage (timeouts, retries, CA certs, etc).
// Prefer MakeHTTPClient where possible.
func MakeTransport(
settings *cluster.Settings, metrics *Metrics, cloud, bucket string,
settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string,
) (*http.Transport, error) {
var tlsConf *tls.Config
if pem := httpCustomCA.Get(&settings.SV); pem != "" {
Expand All @@ -118,7 +118,7 @@ func MakeTransport(
// Add our custom CA.
t.TLSClientConfig = tlsConf
if metrics != nil {
t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket)
t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket, client)
}
return t, nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/cloud/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,22 @@ type ExternalStorageContext struct {
MetricsRecorder *Metrics
}

// ExternalStorageOptions rolls up the Options into a struct.
func (e *EarlyBootExternalStorageContext) ExternalStorageOptions() ExternalStorageOptions {
var options ExternalStorageOptions
for _, option := range e.Options {
option(&options)
}
return options
}

// ExternalStorageOptions holds dependencies and values that can be
// overridden by callers of an ExternalStorageFactory via a passed
// ExternalStorageOption.
type ExternalStorageOptions struct {
ioAccountingInterceptor ReadWriterInterceptor
AzureStorageTestingKnobs base.ModuleTestingKnobs
ClientName string
}

// ExternalStorageConstructor is a function registered to create instances
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloud/gcp/gcs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ func makeGCSStorage(
opts = append(opts, assumeOpt)
}

baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket)
clientName := args.ExternalStorageOptions().ClientName
baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket, clientName)
if err != nil {
return nil, errors.Wrap(err, "failed to create http transport")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloud/httpsink/http_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func MakeHTTPStorage(
return nil, errors.Errorf("HTTP storage requested but prefix path not provided")
}

client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base)
clientName := args.ExternalStorageOptions().ClientName
client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base, clientName)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func MakeMetrics(cidrLookup *cidr.Lookup) *Metrics {
ConnsOpened: metric.NewCounter(connsOpened),
ConnsReused: metric.NewCounter(connsReused),
TLSHandhakes: metric.NewCounter(tlsHandhakes),
NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket"),
NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket", "client"),
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/cloud/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@ func WithAzureStorageTestingKnobs(knobs base.ModuleTestingKnobs) ExternalStorage
opts.AzureStorageTestingKnobs = knobs
}
}

// WithClientName sets the "client" label on network metrics.
func WithClientName(name string) ExternalStorageOption {
return func(opts *ExternalStorageOptions) {
opts.ClientName = name
}
}
23 changes: 23 additions & 0 deletions pkg/cloud/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package cloud

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestClientName(t *testing.T) {
options := func(options ...ExternalStorageOption) ExternalStorageOptions {
context := EarlyBootExternalStorageContext{
Options: options,
}
return context.ExternalStorageOptions()
}
require.Empty(t, options().ClientName)
require.Equal(t, options(WithClientName("this-is-the-name")).ClientName, "this-is-the-name")
}

0 comments on commit 08545a9

Please sign in to comment.