From 19418332bcb52c4164d0bf687d44701c605d9e2a Mon Sep 17 00:00:00 2001 From: gheorghestrimtu Date: Thu, 28 Nov 2024 15:17:21 +0200 Subject: [PATCH 01/12] Beholder Log Batch Processor More Settings --- pkg/beholder/client.go | 6 +++++- pkg/beholder/config.go | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index b4f64f650..b7394f424 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -114,7 +114,11 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro if cfg.LogBatchProcessor { loggerProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s + sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s + sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize + sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s + sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048 + ) } else { loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index b99d20176..3c31a3dad 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -29,7 +29,10 @@ type Config struct { MetricReaderInterval time.Duration MetricRetryConfig *RetryConfig // OTel Log - LogExportTimeout time.Duration + LogExportTimeout time.Duration + LogExportInterval time.Duration + LogExportMaxBatchSize int + LogMaxQueueSize int // Batch processing is enabled by default // Disable it only for testing LogBatchProcessor bool From 376358ebfdd6a386b978c855d6daaece626e9b5f Mon Sep 17 00:00:00 2001 From: gheorghestrimtu Date: Thu, 28 Nov 2024 15:20:59 +0200 Subject: [PATCH 02/12] add settings for message emitter --- pkg/beholder/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index b7394f424..cfb83cd1e 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -158,7 +158,10 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro if cfg.EmitterBatchProcessor { messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize + sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s + sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048 ) } else { messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) From e97bb2594c296bb09d933fb116c044e8b2ec1501 Mon Sep 17 00:00:00 2001 From: gheorghestrimtu Date: Thu, 28 Nov 2024 15:36:34 +0200 Subject: [PATCH 03/12] remove settings --- pkg/beholder/client.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index cfb83cd1e..b7394f424 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -158,10 +158,7 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro if cfg.EmitterBatchProcessor { messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s - sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize - sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s - sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048 + sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s ) } else { messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) From 2e79ea203944cc1fe2f825612bf15a7039063941 Mon Sep 17 00:00:00 2001 From: gheorghestrimtu Date: Thu, 28 Nov 2024 15:37:08 +0200 Subject: [PATCH 04/12] add settings --- pkg/beholder/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index b7394f424..cfb83cd1e 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -158,7 +158,10 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro if cfg.EmitterBatchProcessor { messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize + sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s + sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048 ) } else { messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) From 9fb5e70f2a46ec708dbd0fe63f5f04341117d6ee Mon Sep 17 00:00:00 2001 From: gheorghestrimtu Date: Thu, 28 Nov 2024 16:00:42 +0200 Subject: [PATCH 05/12] more settings --- pkg/beholder/client.go | 8 ++++---- pkg/beholder/config.go | 5 ++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index cfb83cd1e..a327ec1d5 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -158,10 +158,10 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro if cfg.EmitterBatchProcessor { messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s - sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize - sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s - sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048 + sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize), // Default is 512, must be <= maxQueueSize + sdklog.WithExportInterval(cfg.EmitterExportInterval), // Default is 1s + sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize), // Default is 2048 ) } else { messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index 3c31a3dad..8c0decf20 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -16,7 +16,10 @@ type Config struct { // OTel Resource ResourceAttributes []otelattr.KeyValue // Message Emitter - EmitterExportTimeout time.Duration + EmitterExportTimeout time.Duration + EmitterExportInterval time.Duration + EmitterExportMaxBatchSize int + EmitterMaxQueueSize int // Batch processing is enabled by default // Disable it only for testing EmitterBatchProcessor bool From 39e2ca6d70cc0d9a90376e10f8b129d29f72beb5 Mon Sep 17 00:00:00 2001 From: gheorghestrimtu Date: Fri, 29 Nov 2024 10:54:10 +0200 Subject: [PATCH 06/12] add too httpclient and test --- pkg/beholder/client.go | 1 - pkg/beholder/config.go | 14 ++++++++++---- pkg/beholder/config_test.go | 14 ++++++++++---- pkg/beholder/httpclient.go | 10 ++++++++-- 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index a327ec1d5..a9751e2c4 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -118,7 +118,6 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048 - ) } else { loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index 8c0decf20..5bdd85b8d 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -85,8 +85,11 @@ func DefaultConfig() Config { // Resource ResourceAttributes: defaultOtelAttributes, // Message Emitter - EmitterExportTimeout: 1 * time.Second, - EmitterBatchProcessor: true, + EmitterExportTimeout: 1 * time.Second, + EmitterExportMaxBatchSize: 512, + EmitterExportInterval: 1 * time.Second, + EmitterMaxQueueSize: 2048, + EmitterBatchProcessor: true, // OTel message log exporter retry config LogRetryConfig: defaultRetryConfig.Copy(), // Trace @@ -99,8 +102,11 @@ func DefaultConfig() Config { // OTel metric exporter retry config MetricRetryConfig: defaultRetryConfig.Copy(), // Log - LogExportTimeout: 1 * time.Second, - LogBatchProcessor: true, + LogExportTimeout: 1 * time.Second, + LogExportMaxBatchSize: 512, + LogExportInterval: 1 * time.Second, + LogMaxQueueSize: 2048, + LogBatchProcessor: true, } } diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index c1d2a42c0..9d07fa4b5 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -25,8 +25,11 @@ func ExampleConfig() { otelattr.String("sender", "beholderclient"), }, // Message Emitter - EmitterExportTimeout: 1 * time.Second, - EmitterBatchProcessor: true, + EmitterExportTimeout: 1 * time.Second, + EmitterExportMaxBatchSize: 512, + EmitterExportInterval: 1 * time.Second, + EmitterMaxQueueSize: 2048, + EmitterBatchProcessor: true, // OTel message log exporter retry config LogRetryConfig: nil, // Trace @@ -39,8 +42,11 @@ func ExampleConfig() { // OTel metric exporter retry config MetricRetryConfig: nil, // Log - LogExportTimeout: 1 * time.Second, - LogBatchProcessor: true, + LogExportTimeout: 1 * time.Second, + LogExportMaxBatchSize: 512, + LogExportInterval: 1 * time.Second, + LogMaxQueueSize: 2048, + LogBatchProcessor: true, } fmt.Printf("%+v\n", config) config.LogRetryConfig = &beholder.RetryConfig{ diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index ee70e8243..826eeabba 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -79,7 +79,10 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro if cfg.LogBatchProcessor { loggerProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s + sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s + sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize + sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s + sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048 ) } else { loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) @@ -119,7 +122,10 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro if cfg.EmitterBatchProcessor { messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize), // Default is 512, must be <= maxQueueSize + sdklog.WithExportInterval(cfg.EmitterExportInterval), // Default is 1s + sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize), // Default is 2048 ) } else { messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) From 64c8fb6e60c09302f2b00bded7520b471bd48f9f Mon Sep 17 00:00:00 2001 From: gheorghestrimtu Date: Fri, 29 Nov 2024 11:01:11 +0200 Subject: [PATCH 07/12] fix ExampleConfig test --- pkg/beholder/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index 9d07fa4b5..d36fdd2de 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -56,6 +56,6 @@ func ExampleConfig() { } fmt.Printf("%+v\n", *config.LogRetryConfig) // Output: - // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: LogExportTimeout:1s LogBatchProcessor:true LogRetryConfig: AuthPublicKeyHex: AuthHeaders:map[]} + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: AuthPublicKeyHex: AuthHeaders:map[]} // {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s} } From 9d0f27e48febe29909fa822e611eec852a9674cb Mon Sep 17 00:00:00 2001 From: gheorghestrimtu Date: Mon, 2 Dec 2024 16:21:23 +0200 Subject: [PATCH 08/12] add new lines for spacing in Config --- pkg/beholder/config.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index 5bdd85b8d..b90e51dba 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -23,14 +23,17 @@ type Config struct { // Batch processing is enabled by default // Disable it only for testing EmitterBatchProcessor bool + // OTel Trace TraceSampleRatio float64 TraceBatchTimeout time.Duration TraceSpanExporter sdktrace.SpanExporter // optional additional exporter TraceRetryConfig *RetryConfig + // OTel Metric MetricReaderInterval time.Duration MetricRetryConfig *RetryConfig + // OTel Log LogExportTimeout time.Duration LogExportInterval time.Duration From 4d865c3d4718f259c9f2f8c5fb35b34432a03fd7 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:20:33 -0500 Subject: [PATCH 09/12] Add all beholder config options to loop/EnvConfig; set beholder config options from loop EnvConfig --- pkg/loop/config.go | 70 +++++++++++----- pkg/loop/config_test.go | 175 ++++++++++++++++++++++++++++++++-------- pkg/loop/server.go | 21 ++--- 3 files changed, 203 insertions(+), 63 deletions(-) diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 6e18e60ae..ea68cfa4a 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -21,16 +21,19 @@ const ( envTracingAttribute = "CL_TRACING_ATTRIBUTE_" envTracingTLSCertPath = "CL_TRACING_TLS_CERT_PATH" - envTelemetryEnabled = "CL_TELEMETRY_ENABLED" - envTelemetryEndpoint = "CL_TELEMETRY_ENDPOINT" - envTelemetryInsecureConn = "CL_TELEMETRY_INSECURE_CONNECTION" - envTelemetryCACertFile = "CL_TELEMETRY_CA_CERT_FILE" - envTelemetryAttribute = "CL_TELEMETRY_ATTRIBUTE_" - envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO" - envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER" - envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX" - envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR" - envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT" + envTelemetryEnabled = "CL_TELEMETRY_ENABLED" + envTelemetryEndpoint = "CL_TELEMETRY_ENDPOINT" + envTelemetryInsecureConn = "CL_TELEMETRY_INSECURE_CONNECTION" + envTelemetryCACertFile = "CL_TELEMETRY_CA_CERT_FILE" + envTelemetryAttribute = "CL_TELEMETRY_ATTRIBUTE_" + envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO" + envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER" + envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX" + envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR" + envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT" + envTelemetryEmitterExportInterval = "CL_TELEMETRY_EMITTER_EXPORT_INTERVAL" + envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE" + envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE" ) // EnvConfig is the configuration between the application and the LOOP executable. The values @@ -46,16 +49,19 @@ type EnvConfig struct { TracingTLSCertPath string TracingAttributes map[string]string - TelemetryEnabled bool - TelemetryEndpoint string - TelemetryInsecureConnection bool - TelemetryCACertFile string - TelemetryAttributes OtelAttributes - TelemetryTraceSampleRatio float64 - TelemetryAuthHeaders map[string]string - TelemetryAuthPubKeyHex string - TelemetryEmitterBatchProcessor bool - TelemetryEmitterExportTimeout time.Duration + TelemetryEnabled bool + TelemetryEndpoint string + TelemetryInsecureConnection bool + TelemetryCACertFile string + TelemetryAttributes OtelAttributes + TelemetryTraceSampleRatio float64 + TelemetryAuthHeaders map[string]string + TelemetryAuthPubKeyHex string + TelemetryEmitterBatchProcessor bool + TelemetryEmitterExportTimeout time.Duration + TelemetryEmitterExportInterval time.Duration + TelemetryEmitterExportMaxBatchSize int + TelemetryEmitterMaxQueueSize int } // AsCmdEnv returns a slice of environment variable key/value pairs for an exec.Cmd. @@ -93,7 +99,9 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envTelemetryAuthPubKeyHex, e.TelemetryAuthPubKeyHex) add(envTelemetryEmitterBatchProcessor, strconv.FormatBool(e.TelemetryEmitterBatchProcessor)) add(envTelemetryEmitterExportTimeout, e.TelemetryEmitterExportTimeout.String()) - + add(envTelemetryEmitterExportInterval, e.TelemetryEmitterExportInterval.String()) + add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize)) + add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize)) return } @@ -142,6 +150,26 @@ func (e *EnvConfig) parse() error { e.TelemetryTraceSampleRatio = getFloat64OrZero(envTelemetryTraceSampleRatio) e.TelemetryAuthHeaders = getMap(envTelemetryAuthHeader) e.TelemetryAuthPubKeyHex = os.Getenv(envTelemetryAuthPubKeyHex) + e.TelemetryEmitterBatchProcessor, err = getBool(envTelemetryEmitterBatchProcessor) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterBatchProcessor, err) + } + e.TelemetryEmitterExportTimeout, err = time.ParseDuration(os.Getenv(envTelemetryEmitterExportTimeout)) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportTimeout, err) + } + e.TelemetryEmitterExportInterval, err = time.ParseDuration(os.Getenv(envTelemetryEmitterExportInterval)) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportInterval, err) + } + e.TelemetryEmitterExportMaxBatchSize, err = strconv.Atoi(os.Getenv(envTelemetryEmitterExportMaxBatchSize)) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportMaxBatchSize, err) + } + e.TelemetryEmitterMaxQueueSize, err = strconv.Atoi(os.Getenv(envTelemetryEmitterMaxQueueSize)) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterMaxQueueSize, err) + } } return nil } diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index b2eb18745..3574a9390 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -18,34 +18,74 @@ import ( func TestEnvConfig_parse(t *testing.T) { cases := []struct { - name string - envVars map[string]string - expectError bool - expectedDatabaseURL string - expectedPrometheusPort int - expectedTracingEnabled bool - expectedTracingCollectorTarget string - expectedTracingSamplingRatio float64 - expectedTracingTLSCertPath string + name string + envVars map[string]string + expectError bool + expectedDatabaseURL string + expectedPrometheusPort int + expectedTracingEnabled bool + expectedTracingCollectorTarget string + expectedTracingSamplingRatio float64 + expectedTracingTLSCertPath string + expectedTelemetryEnabled bool + expectedTelemetryEndpoint string + expectedTelemetryInsecureConn bool + expectedTelemetryCACertFile string + expectedTelemetryAttributes OtelAttributes + expectedTelemetryTraceSampleRatio float64 + expectedTelemetryAuthHeaders map[string]string + expectedTelemetryAuthPubKeyHex string + expectedTelemetryEmitterBatchProcessor bool + expectedTelemetryEmitterExportTimeout time.Duration + expectedTelemetryEmitterExportInterval time.Duration + expectedTelemetryEmitterExportMaxBatchSize int + expectedTelemetryEmitterMaxQueueSize int }{ { name: "All variables set correctly", envVars: map[string]string{ - envDatabaseURL: "postgres://user:password@localhost:5432/db", - envPromPort: "8080", - envTracingEnabled: "true", - envTracingCollectorTarget: "some:target", - envTracingSamplingRatio: "1.0", - envTracingTLSCertPath: "internal/test/fixtures/client.pem", - envTracingAttribute + "XYZ": "value", + envDatabaseURL: "postgres://user:password@localhost:5432/db", + envPromPort: "8080", + envTracingEnabled: "true", + envTracingCollectorTarget: "some:target", + envTracingSamplingRatio: "1.0", + envTracingTLSCertPath: "internal/test/fixtures/client.pem", + envTracingAttribute + "XYZ": "value", + envTelemetryEnabled: "true", + envTelemetryEndpoint: "example.com/beholder", + envTelemetryInsecureConn: "true", + envTelemetryCACertFile: "foo/bar", + envTelemetryAttribute + "foo": "bar", + envTelemetryAttribute + "baz": "42", + envTelemetryTraceSampleRatio: "0.42", + envTelemetryAuthHeader + "header-key": "header-value", + envTelemetryAuthPubKeyHex: "pub-key-hex", + envTelemetryEmitterBatchProcessor: "true", + envTelemetryEmitterExportTimeout: "1s", + envTelemetryEmitterExportInterval: "2s", + envTelemetryEmitterExportMaxBatchSize: "100", + envTelemetryEmitterMaxQueueSize: "1000", }, - expectError: false, - expectedDatabaseURL: "postgres://user:password@localhost:5432/db", - expectedPrometheusPort: 8080, - expectedTracingEnabled: true, - expectedTracingCollectorTarget: "some:target", - expectedTracingSamplingRatio: 1.0, - expectedTracingTLSCertPath: "internal/test/fixtures/client.pem", + expectError: false, + expectedDatabaseURL: "postgres://user:password@localhost:5432/db", + expectedPrometheusPort: 8080, + expectedTracingEnabled: true, + expectedTracingCollectorTarget: "some:target", + expectedTracingSamplingRatio: 1.0, + expectedTracingTLSCertPath: "internal/test/fixtures/client.pem", + expectedTelemetryEnabled: true, + expectedTelemetryEndpoint: "example.com/beholder", + expectedTelemetryInsecureConn: true, + expectedTelemetryCACertFile: "foo/bar", + expectedTelemetryAttributes: OtelAttributes{"foo": "bar", "baz": "42"}, + expectedTelemetryTraceSampleRatio: 0.42, + expectedTelemetryAuthHeaders: map[string]string{"header-key": "header-value"}, + expectedTelemetryAuthPubKeyHex: "pub-key-hex", + expectedTelemetryEmitterBatchProcessor: true, + expectedTelemetryEmitterExportTimeout: 1 * time.Second, + expectedTelemetryEmitterExportInterval: 2 * time.Second, + expectedTelemetryEmitterExportMaxBatchSize: 100, + expectedTelemetryEmitterMaxQueueSize: 1000, }, { name: "CL_DATABASE_URL parse error", @@ -106,12 +146,75 @@ func TestEnvConfig_parse(t *testing.T) { if config.TracingTLSCertPath != tc.expectedTracingTLSCertPath { t.Errorf("Expected tracingTLSCertPath %s, got %s", tc.expectedTracingTLSCertPath, config.TracingTLSCertPath) } + if config.TelemetryEnabled != tc.expectedTelemetryEnabled { + t.Errorf("Expected telemetryEnabled %v, got %v", tc.expectedTelemetryEnabled, config.TelemetryEnabled) + } + if config.TelemetryEndpoint != tc.expectedTelemetryEndpoint { + t.Errorf("Expected telemetryEndpoint %s, got %s", tc.expectedTelemetryEndpoint, config.TelemetryEndpoint) + } + if config.TelemetryInsecureConnection != tc.expectedTelemetryInsecureConn { + t.Errorf("Expected telemetryInsecureConn %v, got %v", tc.expectedTelemetryInsecureConn, config.TelemetryInsecureConnection) + } + if config.TelemetryCACertFile != tc.expectedTelemetryCACertFile { + t.Errorf("Expected telemetryCACertFile %s, got %s", tc.expectedTelemetryCACertFile, config.TelemetryCACertFile) + } + if !equalOtelAttributes(config.TelemetryAttributes, tc.expectedTelemetryAttributes) { + t.Errorf("Expected telemetryAttributes %v, got %v", tc.expectedTelemetryAttributes, config.TelemetryAttributes) + } + if config.TelemetryTraceSampleRatio != tc.expectedTelemetryTraceSampleRatio { + t.Errorf("Expected telemetryTraceSampleRatio %f, got %f", tc.expectedTelemetryTraceSampleRatio, config.TelemetryTraceSampleRatio) + } + if !equalStringMaps(config.TelemetryAuthHeaders, tc.expectedTelemetryAuthHeaders) { + t.Errorf("Expected telemetryAuthHeaders %v, got %v", tc.expectedTelemetryAuthHeaders, config.TelemetryAuthHeaders) + } + if config.TelemetryAuthPubKeyHex != tc.expectedTelemetryAuthPubKeyHex { + t.Errorf("Expected telemetryAuthPubKeyHex %s, got %s", tc.expectedTelemetryAuthPubKeyHex, config.TelemetryAuthPubKeyHex) + } + if config.TelemetryEmitterBatchProcessor != tc.expectedTelemetryEmitterBatchProcessor { + t.Errorf("Expected telemetryEmitterBatchProcessor %v, got %v", tc.expectedTelemetryEmitterBatchProcessor, config.TelemetryEmitterBatchProcessor) + } + if config.TelemetryEmitterExportTimeout != tc.expectedTelemetryEmitterExportTimeout { + t.Errorf("Expected telemetryEmitterExportTimeout %v, got %v", tc.expectedTelemetryEmitterExportTimeout, config.TelemetryEmitterExportTimeout) + } + if config.TelemetryEmitterExportInterval != tc.expectedTelemetryEmitterExportInterval { + t.Errorf("Expected telemetryEmitterExportInterval %v, got %v", tc.expectedTelemetryEmitterExportInterval, config.TelemetryEmitterExportInterval) + } + if config.TelemetryEmitterExportMaxBatchSize != tc.expectedTelemetryEmitterExportMaxBatchSize { + t.Errorf("Expected telemetryEmitterExportMaxBatchSize %d, got %d", tc.expectedTelemetryEmitterExportMaxBatchSize, config.TelemetryEmitterExportMaxBatchSize) + } + if config.TelemetryEmitterMaxQueueSize != tc.expectedTelemetryEmitterMaxQueueSize { + t.Errorf("Expected telemetryEmitterMaxQueueSize %d, got %d", tc.expectedTelemetryEmitterMaxQueueSize, config.TelemetryEmitterMaxQueueSize) + } } } }) } } +func equalOtelAttributes(a, b OtelAttributes) bool { + if len(a) != len(b) { + return false + } + for k, v := range a { + if b[k] != v { + return false + } + } + return true +} + +func equalStringMaps(a, b map[string]string) bool { + if len(a) != len(b) { + return false + } + for k, v := range a { + if b[k] != v { + return false + } + } + return true +} + func TestEnvConfig_AsCmdEnv(t *testing.T) { envCfg := EnvConfig{ DatabaseURL: &url.URL{Scheme: "postgres", Host: "localhost:5432", User: url.UserPassword("user", "password"), Path: "/db"}, @@ -123,16 +226,19 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) { TracingTLSCertPath: "some/path", TracingAttributes: map[string]string{"key": "value"}, - TelemetryEnabled: true, - TelemetryEndpoint: "example.com/beholder", - TelemetryInsecureConnection: true, - TelemetryCACertFile: "foo/bar", - TelemetryAttributes: OtelAttributes{"foo": "bar", "baz": "42"}, - TelemetryTraceSampleRatio: 0.42, - TelemetryAuthHeaders: map[string]string{"header-key": "header-value"}, - TelemetryAuthPubKeyHex: "pub-key-hex", - TelemetryEmitterBatchProcessor: true, - TelemetryEmitterExportTimeout: 1 * time.Second, + TelemetryEnabled: true, + TelemetryEndpoint: "example.com/beholder", + TelemetryInsecureConnection: true, + TelemetryCACertFile: "foo/bar", + TelemetryAttributes: OtelAttributes{"foo": "bar", "baz": "42"}, + TelemetryTraceSampleRatio: 0.42, + TelemetryAuthHeaders: map[string]string{"header-key": "header-value"}, + TelemetryAuthPubKeyHex: "pub-key-hex", + TelemetryEmitterBatchProcessor: true, + TelemetryEmitterExportTimeout: 1 * time.Second, + TelemetryEmitterExportInterval: 2 * time.Second, + TelemetryEmitterExportMaxBatchSize: 100, + TelemetryEmitterMaxQueueSize: 1000, } got := map[string]string{} for _, kv := range envCfg.AsCmdEnv() { @@ -161,6 +267,9 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) { assert.Equal(t, "pub-key-hex", got[envTelemetryAuthPubKeyHex]) assert.Equal(t, "true", got[envTelemetryEmitterBatchProcessor]) assert.Equal(t, "1s", got[envTelemetryEmitterExportTimeout]) + assert.Equal(t, "2s", got[envTelemetryEmitterExportInterval]) + assert.Equal(t, "100", got[envTelemetryEmitterExportMaxBatchSize]) + assert.Equal(t, "1000", got[envTelemetryEmitterMaxQueueSize]) } func TestGetMap(t *testing.T) { diff --git a/pkg/loop/server.go b/pkg/loop/server.go index c866be20b..6cf4f0c5b 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -92,15 +92,18 @@ func (s *Server) start() error { } beholderCfg := beholder.Config{ - InsecureConnection: envCfg.TelemetryInsecureConnection, - CACertFile: envCfg.TelemetryCACertFile, - OtelExporterGRPCEndpoint: envCfg.TelemetryEndpoint, - ResourceAttributes: append(attributes, envCfg.TelemetryAttributes.AsStringAttributes()...), - TraceSampleRatio: envCfg.TelemetryTraceSampleRatio, - AuthHeaders: envCfg.TelemetryAuthHeaders, - AuthPublicKeyHex: envCfg.TelemetryAuthPubKeyHex, - EmitterBatchProcessor: envCfg.TelemetryEmitterBatchProcessor, - EmitterExportTimeout: envCfg.TelemetryEmitterExportTimeout, + InsecureConnection: envCfg.TelemetryInsecureConnection, + CACertFile: envCfg.TelemetryCACertFile, + OtelExporterGRPCEndpoint: envCfg.TelemetryEndpoint, + ResourceAttributes: append(attributes, envCfg.TelemetryAttributes.AsStringAttributes()...), + TraceSampleRatio: envCfg.TelemetryTraceSampleRatio, + AuthHeaders: envCfg.TelemetryAuthHeaders, + AuthPublicKeyHex: envCfg.TelemetryAuthPubKeyHex, + EmitterBatchProcessor: envCfg.TelemetryEmitterBatchProcessor, + EmitterExportTimeout: envCfg.TelemetryEmitterExportTimeout, + EmitterExportInterval: envCfg.TelemetryEmitterExportInterval, + EmitterExportMaxBatchSize: envCfg.TelemetryEmitterExportMaxBatchSize, + EmitterMaxQueueSize: envCfg.TelemetryEmitterMaxQueueSize, } if tracingConfig.Enabled { From e59dc78d19a1acff5427cff4e62eff3cd802c0aa Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Wed, 4 Dec 2024 10:10:11 -0500 Subject: [PATCH 10/12] Set EmitterExportTimeout, LogExportTimeout to 30sec which is OTel default --- pkg/beholder/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index b90e51dba..4398798f5 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -88,7 +88,7 @@ func DefaultConfig() Config { // Resource ResourceAttributes: defaultOtelAttributes, // Message Emitter - EmitterExportTimeout: 1 * time.Second, + EmitterExportTimeout: 30 * time.Second, EmitterExportMaxBatchSize: 512, EmitterExportInterval: 1 * time.Second, EmitterMaxQueueSize: 2048, @@ -105,7 +105,7 @@ func DefaultConfig() Config { // OTel metric exporter retry config MetricRetryConfig: defaultRetryConfig.Copy(), // Log - LogExportTimeout: 1 * time.Second, + LogExportTimeout: 30 * time.Second, LogExportMaxBatchSize: 512, LogExportInterval: 1 * time.Second, LogMaxQueueSize: 2048, From fedf543112d214c9c1aa459684fe54800b6b6464 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Wed, 4 Dec 2024 10:32:04 -0500 Subject: [PATCH 11/12] Update comment for EmitterBatchProcessor config option --- pkg/beholder/config.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index 4398798f5..103f899cc 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -20,9 +20,7 @@ type Config struct { EmitterExportInterval time.Duration EmitterExportMaxBatchSize int EmitterMaxQueueSize int - // Batch processing is enabled by default - // Disable it only for testing - EmitterBatchProcessor bool + EmitterBatchProcessor bool // Enabled by default. Disable only for testing. // OTel Trace TraceSampleRatio float64 @@ -39,9 +37,7 @@ type Config struct { LogExportInterval time.Duration LogExportMaxBatchSize int LogMaxQueueSize int - // Batch processing is enabled by default - // Disable it only for testing - LogBatchProcessor bool + LogBatchProcessor bool // Enabled by default. Disable only for testing. // Retry config for shared log exporter, used by Emitter and Logger LogRetryConfig *RetryConfig From a5e960501254e9fb1f96023eb743825c56ceaedc Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Thu, 19 Dec 2024 12:07:32 -0500 Subject: [PATCH 12/12] Dont set batch processor options with invalid values --- pkg/beholder/client.go | 29 ++++++++++++++++++----------- pkg/beholder/httpclient.go | 29 +++++++++++++++++++---------- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index d86ce456c..69e15c284 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -115,14 +115,17 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro if cfg.LogExportTimeout > 0 { batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s } - batchProcessorOpts = append(batchProcessorOpts, - sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize - sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s - sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048 - ) + if cfg.LogExportMaxBatchSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize + } + if cfg.LogExportInterval > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.LogExportInterval)) // Default is 1s + } + if cfg.LogMaxQueueSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize)) // Default is 2048 + } loggerProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - batchProcessorOpts..., ) } else { @@ -165,11 +168,15 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro if cfg.EmitterExportTimeout > 0 { batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s } - batchProcessorOpts = append(batchProcessorOpts, - sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize), // Default is 512, must be <= maxQueueSize - sdklog.WithExportInterval(cfg.EmitterExportInterval), // Default is 1s - sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize), // Default is 2048 - ) + if cfg.EmitterExportMaxBatchSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize + } + if cfg.EmitterExportInterval > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.EmitterExportInterval)) // Default is 1s + } + if cfg.EmitterMaxQueueSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize)) // Default is 2048 + } messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, batchProcessorOpts..., diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index 371f4e893..7427af5d8 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -80,11 +80,16 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro if cfg.LogExportTimeout > 0 { batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s } - batchProcessorOpts = append(batchProcessorOpts, - sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize - sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s - sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048 - ) + if cfg.LogExportMaxBatchSize > 0 { + + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize + } + if cfg.LogExportInterval > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.LogExportInterval)) // Default is 1s + } + if cfg.LogMaxQueueSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize)) // Default is 2048 + } loggerProcessor = sdklog.NewBatchProcessor( sharedLogExporter, batchProcessorOpts..., @@ -129,11 +134,15 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro if cfg.EmitterExportTimeout > 0 { batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s } - batchProcessorOpts = append(batchProcessorOpts, - sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize), // Default is 512, must be <= maxQueueSize - sdklog.WithExportInterval(cfg.EmitterExportInterval), // Default is 1s - sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize), // Default is 2048 - ) + if cfg.EmitterExportMaxBatchSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize + } + if cfg.EmitterExportInterval > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.EmitterExportInterval)) // Default is 1s + } + if cfg.EmitterMaxQueueSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize)) // Default is 2048 + } messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, batchProcessorOpts...,