From 08545a9e54d377b48d70d320ef60fe41e584aaca Mon Sep 17 00:00:00 2001 From: Jeff Swenson Date: Thu, 17 Oct 2024 15:33:54 -0400 Subject: [PATCH] cloud: add WithClientName option The WithClientName option allows clients of the cloud package to specific a client name. This name is used by the cloud input/output network metrics to attribute network usage to a specific sub component. The initial consumers of the API are the backup job processor and the cdc cloud storage sink. This makes it possible to distinguish backup bytes from other cloud clients like CDC. Fixes: #132862 Release note: None --- pkg/ccl/backupccl/backup_processor.go | 2 +- pkg/ccl/changefeedccl/sink_cloudstorage.go | 3 +- .../changefeedccl/sink_cloudstorage_test.go | 6 ++ pkg/cloud/BUILD.bazel | 1 + pkg/cloud/amazon/aws_kms.go | 2 +- pkg/cloud/amazon/s3_storage.go | 55 +++++++++---------- pkg/cloud/amazon/s3_storage_test.go | 12 ++-- pkg/cloud/azure/azure_storage.go | 8 +-- pkg/cloud/cloud_io.go | 8 +-- pkg/cloud/external_storage.go | 10 ++++ pkg/cloud/gcp/gcs_storage.go | 3 +- pkg/cloud/httpsink/http_storage.go | 3 +- pkg/cloud/metrics.go | 2 +- pkg/cloud/options.go | 7 +++ pkg/cloud/options_test.go | 23 ++++++++ 15 files changed, 97 insertions(+), 48 deletions(-) create mode 100644 pkg/cloud/options_test.go diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index edc94025c23a..7d1290aa6929 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -406,7 +406,7 @@ func runBackupProcessor( progCh: progCh, settings: &flowCtx.Cfg.Settings.SV, } - storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest) + storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest, cloud.WithClientName("backup")) if err != nil { return err } diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 79794e433608..b0fd796a671a 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -487,7 +487,8 @@ func makeCloudStorageSink( // We make the external storage with a nil IOAccountingInterceptor since we // record usage metrics via s.metrics. - if s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil)); err != nil { + s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil), cloud.WithClientName("cdc")) + if err != nil { return nil, err } if mb != nil && s.es != nil { diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 4d00edb9d8a8..c479e163d5c4 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -172,6 +172,12 @@ func TestCloudStorageSink(t *testing.T) { clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir) externalStorageFromURI := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) { + var options cloud.ExternalStorageOptions + for _, opt := range opts { + opt(&options) + } + require.Equal(t, options.ClientName, "cdc") + return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings, clientFactory, user, diff --git a/pkg/cloud/BUILD.bazel b/pkg/cloud/BUILD.bazel index 7d184bc4a312..7515e8ad44df 100644 --- a/pkg/cloud/BUILD.bazel +++ b/pkg/cloud/BUILD.bazel @@ -41,6 +41,7 @@ go_test( name = "cloud_test", srcs = [ "cloud_io_test.go", + "options_test.go", "uris_test.go", ], args = ["-test.timeout=295s"], diff --git a/pkg/cloud/amazon/aws_kms.go b/pkg/cloud/amazon/aws_kms.go index 9ec0b33d5e97..15248e0076c3 100644 --- a/pkg/cloud/amazon/aws_kms.go +++ b/pkg/cloud/amazon/aws_kms.go @@ -140,7 +140,7 @@ func MakeAWSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e // situation is. region = "default-region" } - client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS") + client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS", "") if err != nil { return nil, err } diff --git a/pkg/cloud/amazon/s3_storage.go b/pkg/cloud/amazon/s3_storage.go index bd8007fd78dc..1fac07a22972 100644 --- a/pkg/cloud/amazon/s3_storage.go +++ b/pkg/cloud/amazon/s3_storage.go @@ -98,12 +98,13 @@ var NightlyEnvVarKMSParams = map[string]string{ } type s3Storage struct { - bucket *string - conf *cloudpb.ExternalStorage_S3 - ioConf base.ExternalIODirConfig - settings *cluster.Settings - prefix string - metrics *cloud.Metrics + bucket *string + conf *cloudpb.ExternalStorage_S3 + ioConf base.ExternalIODirConfig + settings *cluster.Settings + prefix string + metrics *cloud.Metrics + storageOptions cloud.ExternalStorageOptions opts s3ClientConfig cached *s3Client @@ -469,7 +470,7 @@ func MakeS3Storage( // other callers from making clients in the meantime, not just to avoid making // duplicate clients in a race but also because making clients concurrently // can fail if the AWS metadata server hits its rate limit. - client, _, err := newClient(ctx, args.MetricsRecorder, s.opts, s.settings) + client, _, err := s.newClient(ctx) if err != nil { return nil, err } @@ -499,11 +500,9 @@ var awsVerboseLogging = aws.LogLevel(aws.LogDebugWithRequestRetries | aws.LogDeb // config's region is empty, used the passed bucket to determine a region and // configures the client with it as well as returning it (so the caller can // remember it for future calls). -func newClient( - ctx context.Context, metrics *cloud.Metrics, conf s3ClientConfig, settings *cluster.Settings, -) (s3Client, string, error) { +func (s *s3Storage) newClient(ctx context.Context) (s3Client, string, error) { // Open a span if client creation will do IO/RPCs to find creds/bucket region. - if conf.region == "" || conf.auth == cloud.AuthParamImplicit { + if s.opts.region == "" || s.opts.auth == cloud.AuthParamImplicit { var sp *tracing.Span ctx, sp = tracing.ChildSpan(ctx, "s3.newClient") defer sp.Finish() @@ -512,22 +511,22 @@ func newClient( opts := session.Options{} { - httpClient, err := cloud.MakeHTTPClient(settings, metrics, "aws", conf.bucket) + httpClient, err := cloud.MakeHTTPClient(s.settings, s.metrics, "aws", s.opts.bucket, s.storageOptions.ClientName) if err != nil { return s3Client{}, "", err } opts.Config.HTTPClient = httpClient } - if conf.endpoint != "" { - opts.Config.Endpoint = aws.String(conf.endpoint) + if s.opts.endpoint != "" { + opts.Config.Endpoint = aws.String(s.opts.endpoint) opts.Config.S3ForcePathStyle = aws.Bool(true) - if conf.region == "" { - conf.region = "default-region" + if s.opts.region == "" { + s.opts.region = "default-region" } - client, err := cloud.MakeHTTPClient(settings, metrics, "aws", conf.bucket) + client, err := cloud.MakeHTTPClient(s.settings, s.metrics, "aws", s.opts.bucket, s.storageOptions.ClientName) if err != nil { return s3Client{}, "", err } @@ -540,7 +539,7 @@ func newClient( opts.Config.CredentialsChainVerboseErrors = aws.Bool(true) opts.Config.Logger = newLogAdapter(ctx) - if conf.verbose { + if s.opts.verbose { opts.Config.LogLevel = awsVerboseLogging } @@ -554,13 +553,13 @@ func newClient( var sess *session.Session var err error - switch conf.auth { + switch s.opts.auth { case "", cloud.AuthParamSpecified: sess, err = session.NewSessionWithOptions(opts) if err != nil { return s3Client{}, "", errors.Wrap(err, "new aws session") } - sess.Config.Credentials = credentials.NewStaticCredentials(conf.accessKey, conf.secret, conf.tempToken) + sess.Config.Credentials = credentials.NewStaticCredentials(s.opts.accessKey, s.opts.secret, s.opts.tempToken) case cloud.AuthParamImplicit: opts.SharedConfigState = session.SharedConfigEnable sess, err = session.NewSessionWithOptions(opts) @@ -569,8 +568,8 @@ func newClient( } } - if conf.assumeRoleProvider.roleARN != "" { - for _, delegateProvider := range conf.delegateRoleProviders { + if s.opts.assumeRoleProvider.roleARN != "" { + for _, delegateProvider := range s.opts.delegateRoleProviders { intermediateCreds := stscreds.NewCredentials(sess, delegateProvider.roleARN, withExternalID(delegateProvider.externalID)) opts.Config.Credentials = intermediateCreds @@ -580,7 +579,7 @@ func newClient( } } - creds := stscreds.NewCredentials(sess, conf.assumeRoleProvider.roleARN, withExternalID(conf.assumeRoleProvider.externalID)) + creds := stscreds.NewCredentials(sess, s.opts.assumeRoleProvider.roleARN, withExternalID(s.opts.assumeRoleProvider.externalID)) opts.Config.Credentials = creds sess, err = session.NewSessionWithOptions(opts) if err != nil { @@ -588,10 +587,10 @@ func newClient( } } - region := conf.region + region := s.opts.region if region == "" { if err := cloud.DelayedRetry(ctx, "s3manager.GetBucketRegion", s3ErrDelay, func() error { - region, err = s3manager.GetBucketRegion(ctx, sess, conf.bucket, "us-east-1") + region, err = s3manager.GetBucketRegion(ctx, sess, s.opts.bucket, "us-east-1") return err }); err != nil { return s3Client{}, "", errors.Wrap(err, "could not find s3 bucket's region") @@ -601,7 +600,7 @@ func newClient( c := s3.New(sess) u := s3manager.NewUploader(sess, func(uploader *s3manager.Uploader) { - uploader.PartSize = cloud.WriteChunkSize.Get(&settings.SV) + uploader.PartSize = cloud.WriteChunkSize.Get(&s.settings.SV) }) return s3Client{client: c, uploader: u}, region, nil } @@ -610,7 +609,7 @@ func (s *s3Storage) getClient(ctx context.Context) (*s3.S3, error) { if s.cached != nil { return s.cached.client, nil } - client, region, err := newClient(ctx, s.metrics, s.opts, s.settings) + client, region, err := s.newClient(ctx) if err != nil { return nil, err } @@ -624,7 +623,7 @@ func (s *s3Storage) getUploader(ctx context.Context) (*s3manager.Uploader, error if s.cached != nil { return s.cached.uploader, nil } - client, region, err := newClient(ctx, s.metrics, s.opts, s.settings) + client, region, err := s.newClient(ctx) if err != nil { return nil, err } diff --git a/pkg/cloud/amazon/s3_storage_test.go b/pkg/cloud/amazon/s3_storage_test.go index 110e74793b5d..d387298f4f71 100644 --- a/pkg/cloud/amazon/s3_storage_test.go +++ b/pkg/cloud/amazon/s3_storage_test.go @@ -593,11 +593,15 @@ func TestNewClientErrorsOnBucketRegion(t *testing.T) { testSettings := cluster.MakeTestingClusterSettings() ctx := context.Background() - cfg := s3ClientConfig{ - bucket: "bucket-does-not-exist-v1i3m", - auth: cloud.AuthParamImplicit, + s3 := s3Storage{ + opts: s3ClientConfig{ + bucket: "bucket-does-not-exist-v1i3m", + auth: cloud.AuthParamImplicit, + }, + metrics: cloud.NilMetrics, + settings: testSettings, } - _, _, err = newClient(ctx, cloud.NilMetrics, cfg, testSettings) + _, _, err = s3.newClient(ctx) require.Regexp(t, "could not find s3 bucket's region", err) } diff --git a/pkg/cloud/azure/azure_storage.go b/pkg/cloud/azure/azure_storage.go index e38c3b5b0cdf..6e919a421d9e 100644 --- a/pkg/cloud/azure/azure_storage.go +++ b/pkg/cloud/azure/azure_storage.go @@ -222,7 +222,8 @@ func makeAzureStorage( return nil, errors.Wrap(err, "azure: account name is not valid") } - t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container) + options := args.ExternalStorageOptions() + t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container, options.ClientName) if err != nil { return nil, errors.Wrap(err, "azure: unable to create transport") } @@ -255,11 +256,6 @@ func makeAzureStorage( "implicit credentials disallowed for azure due to --external-io-disable-implicit-credentials flag") } - options := cloud.ExternalStorageOptions{} - for _, o := range args.Options { - o(&options) - } - defaultCredentialsOptions := &DefaultAzureCredentialWithFileOptions{} if knobs := options.AzureStorageTestingKnobs; knobs != nil { defaultCredentialsOptions.testingKnobs = knobs.(*TestingKnobs) diff --git a/pkg/cloud/cloud_io.go b/pkg/cloud/cloud_io.go index 6a6891edf2fa..e4cefcd5a2d5 100644 --- a/pkg/cloud/cloud_io.go +++ b/pkg/cloud/cloud_io.go @@ -77,9 +77,9 @@ var httpMetrics = settings.RegisterBoolSetting( // MakeHTTPClient makes an http client configured with the common settings used // for interacting with cloud storage (timeouts, retries, CA certs, etc). func MakeHTTPClient( - settings *cluster.Settings, metrics *Metrics, cloud, bucket string, + settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string, ) (*http.Client, error) { - t, err := MakeTransport(settings, metrics, cloud, bucket) + t, err := MakeTransport(settings, metrics, cloud, bucket, client) if err != nil { return nil, err } @@ -99,7 +99,7 @@ func MakeHTTPClientForTransport(t http.RoundTripper) (*http.Client, error) { // used for interacting with cloud storage (timeouts, retries, CA certs, etc). // Prefer MakeHTTPClient where possible. func MakeTransport( - settings *cluster.Settings, metrics *Metrics, cloud, bucket string, + settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string, ) (*http.Transport, error) { var tlsConf *tls.Config if pem := httpCustomCA.Get(&settings.SV); pem != "" { @@ -118,7 +118,7 @@ func MakeTransport( // Add our custom CA. t.TLSClientConfig = tlsConf if metrics != nil { - t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket) + t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket, client) } return t, nil } diff --git a/pkg/cloud/external_storage.go b/pkg/cloud/external_storage.go index e3a74476df64..7dd51e298b9a 100644 --- a/pkg/cloud/external_storage.go +++ b/pkg/cloud/external_storage.go @@ -165,12 +165,22 @@ type ExternalStorageContext struct { MetricsRecorder *Metrics } +// ExternalStorageOptions rolls up the Options into a struct. +func (e *EarlyBootExternalStorageContext) ExternalStorageOptions() ExternalStorageOptions { + var options ExternalStorageOptions + for _, option := range e.Options { + option(&options) + } + return options +} + // ExternalStorageOptions holds dependencies and values that can be // overridden by callers of an ExternalStorageFactory via a passed // ExternalStorageOption. type ExternalStorageOptions struct { ioAccountingInterceptor ReadWriterInterceptor AzureStorageTestingKnobs base.ModuleTestingKnobs + ClientName string } // ExternalStorageConstructor is a function registered to create instances diff --git a/pkg/cloud/gcp/gcs_storage.go b/pkg/cloud/gcp/gcs_storage.go index 587c90de4cfc..65cc74d52ea6 100644 --- a/pkg/cloud/gcp/gcs_storage.go +++ b/pkg/cloud/gcp/gcs_storage.go @@ -185,7 +185,8 @@ func makeGCSStorage( opts = append(opts, assumeOpt) } - baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket) + clientName := args.ExternalStorageOptions().ClientName + baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket, clientName) if err != nil { return nil, errors.Wrap(err, "failed to create http transport") } diff --git a/pkg/cloud/httpsink/http_storage.go b/pkg/cloud/httpsink/http_storage.go index 935333795889..29ba913469f0 100644 --- a/pkg/cloud/httpsink/http_storage.go +++ b/pkg/cloud/httpsink/http_storage.go @@ -68,7 +68,8 @@ func MakeHTTPStorage( return nil, errors.Errorf("HTTP storage requested but prefix path not provided") } - client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base) + clientName := args.ExternalStorageOptions().ClientName + client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base, clientName) if err != nil { return nil, err } diff --git a/pkg/cloud/metrics.go b/pkg/cloud/metrics.go index 3237dbb75252..0ac1d9abde88 100644 --- a/pkg/cloud/metrics.go +++ b/pkg/cloud/metrics.go @@ -133,7 +133,7 @@ func MakeMetrics(cidrLookup *cidr.Lookup) *Metrics { ConnsOpened: metric.NewCounter(connsOpened), ConnsReused: metric.NewCounter(connsReused), TLSHandhakes: metric.NewCounter(tlsHandhakes), - NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket"), + NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket", "client"), } } diff --git a/pkg/cloud/options.go b/pkg/cloud/options.go index f9234c02d294..e4bf68c3dfb0 100644 --- a/pkg/cloud/options.go +++ b/pkg/cloud/options.go @@ -23,3 +23,10 @@ func WithAzureStorageTestingKnobs(knobs base.ModuleTestingKnobs) ExternalStorage opts.AzureStorageTestingKnobs = knobs } } + +// WithClientName sets the "client" label on network metrics. +func WithClientName(name string) ExternalStorageOption { + return func(opts *ExternalStorageOptions) { + opts.ClientName = name + } +} diff --git a/pkg/cloud/options_test.go b/pkg/cloud/options_test.go new file mode 100644 index 000000000000..b54b9348196f --- /dev/null +++ b/pkg/cloud/options_test.go @@ -0,0 +1,23 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package cloud + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestClientName(t *testing.T) { + options := func(options ...ExternalStorageOption) ExternalStorageOptions { + context := EarlyBootExternalStorageContext{ + Options: options, + } + return context.ExternalStorageOptions() + } + require.Empty(t, options().ClientName) + require.Equal(t, options(WithClientName("this-is-the-name")).ClientName, "this-is-the-name") +}