diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index c6bac815a..69e15c284 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -115,6 +115,15 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro if cfg.LogExportTimeout > 0 { batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s } + if cfg.LogExportMaxBatchSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize + } + if cfg.LogExportInterval > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.LogExportInterval)) // Default is 1s + } + if cfg.LogMaxQueueSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize)) // Default is 2048 + } loggerProcessor = sdklog.NewBatchProcessor( sharedLogExporter, batchProcessorOpts..., @@ -159,6 +168,15 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro if cfg.EmitterExportTimeout > 0 { batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s } + if cfg.EmitterExportMaxBatchSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize + } + if cfg.EmitterExportInterval > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.EmitterExportInterval)) // Default is 1s + } + if cfg.EmitterMaxQueueSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize)) // Default is 2048 + } messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, batchProcessorOpts..., @@ -361,7 +379,6 @@ func newMeterProvider(config Config, resource *sdkresource.Resource, creds crede if err != nil { return nil, err } - mp := sdkmetric.NewMeterProvider( sdkmetric.WithReader( sdkmetric.NewPeriodicReader( diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index 75255c891..c01256b1c 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -17,24 +17,29 @@ type Config struct { // OTel Resource ResourceAttributes []otelattr.KeyValue // Message Emitter - EmitterExportTimeout time.Duration - // Batch processing is enabled by default - // Disable it only for testing - EmitterBatchProcessor bool + EmitterExportTimeout time.Duration + EmitterExportInterval time.Duration + EmitterExportMaxBatchSize int + EmitterMaxQueueSize int + EmitterBatchProcessor bool // Enabled by default. Disable only for testing. + // OTel Trace TraceSampleRatio float64 TraceBatchTimeout time.Duration TraceSpanExporter sdktrace.SpanExporter // optional additional exporter TraceRetryConfig *RetryConfig + // OTel Metric MetricReaderInterval time.Duration MetricRetryConfig *RetryConfig MetricViews []sdkmetric.View + // OTel Log - LogExportTimeout time.Duration - // Batch processing is enabled by default - // Disable it only for testing - LogBatchProcessor bool + LogExportTimeout time.Duration + LogExportInterval time.Duration + LogExportMaxBatchSize int + LogMaxQueueSize int + LogBatchProcessor bool // Enabled by default. Disable only for testing. // Retry config for shared log exporter, used by Emitter and Logger LogRetryConfig *RetryConfig @@ -81,8 +86,11 @@ func DefaultConfig() Config { // Resource ResourceAttributes: defaultOtelAttributes, // Message Emitter - EmitterExportTimeout: 1 * time.Second, - EmitterBatchProcessor: true, + EmitterExportTimeout: 30 * time.Second, + EmitterExportMaxBatchSize: 512, + EmitterExportInterval: 1 * time.Second, + EmitterMaxQueueSize: 2048, + EmitterBatchProcessor: true, // OTel message log exporter retry config LogRetryConfig: defaultRetryConfig.Copy(), // Trace @@ -95,8 +103,11 @@ func DefaultConfig() Config { // OTel metric exporter retry config MetricRetryConfig: defaultRetryConfig.Copy(), // Log - LogExportTimeout: 1 * time.Second, - LogBatchProcessor: true, + LogExportTimeout: 30 * time.Second, + LogExportMaxBatchSize: 512, + LogExportInterval: 1 * time.Second, + LogMaxQueueSize: 2048, + LogBatchProcessor: true, } } diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index 538ad5d36..3b4fa86f1 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -25,8 +25,11 @@ func ExampleConfig() { otelattr.String("sender", "beholderclient"), }, // Message Emitter - EmitterExportTimeout: 1 * time.Second, - EmitterBatchProcessor: true, + EmitterExportTimeout: 1 * time.Second, + EmitterExportMaxBatchSize: 512, + EmitterExportInterval: 1 * time.Second, + EmitterMaxQueueSize: 2048, + EmitterBatchProcessor: true, // OTel message log exporter retry config LogRetryConfig: nil, // Trace @@ -39,8 +42,11 @@ func ExampleConfig() { // OTel metric exporter retry config MetricRetryConfig: nil, // Log - LogExportTimeout: 1 * time.Second, - LogBatchProcessor: true, + LogExportTimeout: 1 * time.Second, + LogExportMaxBatchSize: 512, + LogExportInterval: 1 * time.Second, + LogMaxQueueSize: 2048, + LogBatchProcessor: true, } fmt.Printf("%+v\n", config) config.LogRetryConfig = &beholder.RetryConfig{ @@ -50,6 +56,6 @@ func ExampleConfig() { } fmt.Printf("%+v\n", *config.LogRetryConfig) // Output: - // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] LogExportTimeout:1s LogBatchProcessor:true LogRetryConfig: AuthPublicKeyHex: AuthHeaders:map[]} + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: AuthPublicKeyHex: AuthHeaders:map[]} // {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s} } diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index 0df44e647..7427af5d8 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -80,6 +80,16 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro if cfg.LogExportTimeout > 0 { batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s } + if cfg.LogExportMaxBatchSize > 0 { + + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize + } + if cfg.LogExportInterval > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.LogExportInterval)) // Default is 1s + } + if cfg.LogMaxQueueSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize)) // Default is 2048 + } loggerProcessor = sdklog.NewBatchProcessor( sharedLogExporter, batchProcessorOpts..., @@ -124,9 +134,18 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro if cfg.EmitterExportTimeout > 0 { batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s } + if cfg.EmitterExportMaxBatchSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize + } + if cfg.EmitterExportInterval > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.EmitterExportInterval)) // Default is 1s + } + if cfg.EmitterMaxQueueSize > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize)) // Default is 2048 + } messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - batchProcessorOpts..., // Default is 30s + batchProcessorOpts..., ) } else { messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) diff --git a/pkg/loop/config.go b/pkg/loop/config.go index e63f72f2f..ea68cfa4a 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -21,16 +21,19 @@ const ( envTracingAttribute = "CL_TRACING_ATTRIBUTE_" envTracingTLSCertPath = "CL_TRACING_TLS_CERT_PATH" - envTelemetryEnabled = "CL_TELEMETRY_ENABLED" - envTelemetryEndpoint = "CL_TELEMETRY_ENDPOINT" - envTelemetryInsecureConn = "CL_TELEMETRY_INSECURE_CONNECTION" - envTelemetryCACertFile = "CL_TELEMETRY_CA_CERT_FILE" - envTelemetryAttribute = "CL_TELEMETRY_ATTRIBUTE_" - envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO" - envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER" - envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX" - envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR" - envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT" + envTelemetryEnabled = "CL_TELEMETRY_ENABLED" + envTelemetryEndpoint = "CL_TELEMETRY_ENDPOINT" + envTelemetryInsecureConn = "CL_TELEMETRY_INSECURE_CONNECTION" + envTelemetryCACertFile = "CL_TELEMETRY_CA_CERT_FILE" + envTelemetryAttribute = "CL_TELEMETRY_ATTRIBUTE_" + envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO" + envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER" + envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX" + envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR" + envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT" + envTelemetryEmitterExportInterval = "CL_TELEMETRY_EMITTER_EXPORT_INTERVAL" + envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE" + envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE" ) // EnvConfig is the configuration between the application and the LOOP executable. The values @@ -46,16 +49,19 @@ type EnvConfig struct { TracingTLSCertPath string TracingAttributes map[string]string - TelemetryEnabled bool - TelemetryEndpoint string - TelemetryInsecureConnection bool - TelemetryCACertFile string - TelemetryAttributes OtelAttributes - TelemetryTraceSampleRatio float64 - TelemetryAuthHeaders map[string]string - TelemetryAuthPubKeyHex string - TelemetryEmitterBatchProcessor bool - TelemetryEmitterExportTimeout time.Duration + TelemetryEnabled bool + TelemetryEndpoint string + TelemetryInsecureConnection bool + TelemetryCACertFile string + TelemetryAttributes OtelAttributes + TelemetryTraceSampleRatio float64 + TelemetryAuthHeaders map[string]string + TelemetryAuthPubKeyHex string + TelemetryEmitterBatchProcessor bool + TelemetryEmitterExportTimeout time.Duration + TelemetryEmitterExportInterval time.Duration + TelemetryEmitterExportMaxBatchSize int + TelemetryEmitterMaxQueueSize int } // AsCmdEnv returns a slice of environment variable key/value pairs for an exec.Cmd. @@ -93,7 +99,9 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envTelemetryAuthPubKeyHex, e.TelemetryAuthPubKeyHex) add(envTelemetryEmitterBatchProcessor, strconv.FormatBool(e.TelemetryEmitterBatchProcessor)) add(envTelemetryEmitterExportTimeout, e.TelemetryEmitterExportTimeout.String()) - + add(envTelemetryEmitterExportInterval, e.TelemetryEmitterExportInterval.String()) + add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize)) + add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize)) return } @@ -150,6 +158,18 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportTimeout, err) } + e.TelemetryEmitterExportInterval, err = time.ParseDuration(os.Getenv(envTelemetryEmitterExportInterval)) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportInterval, err) + } + e.TelemetryEmitterExportMaxBatchSize, err = strconv.Atoi(os.Getenv(envTelemetryEmitterExportMaxBatchSize)) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportMaxBatchSize, err) + } + e.TelemetryEmitterMaxQueueSize, err = strconv.Atoi(os.Getenv(envTelemetryEmitterMaxQueueSize)) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterMaxQueueSize, err) + } } return nil } diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index 78d177aa4..f57eff666 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -19,25 +19,28 @@ import ( func TestEnvConfig_parse(t *testing.T) { cases := []struct { - name string - envVars map[string]string - expectError bool - expectedDatabaseURL string - expectedPrometheusPort int - expectedTracingEnabled bool - expectedTracingCollectorTarget string - expectedTracingSamplingRatio float64 - expectedTracingTLSCertPath string - expectedTelemetryEnabled bool - expectedTelemetryEndpoint string - expectedTelemetryInsecureConn bool - expectedTelemetryCACertFile string - expectedTelemetryAttributes OtelAttributes - expectedTelemetryTraceSampleRatio float64 - expectedTelemetryAuthHeaders map[string]string - expectedTelemetryAuthPubKeyHex string - expectedTelemetryEmitterBatchProcessor bool - expectedTelemetryEmitterExportTimeout time.Duration + name string + envVars map[string]string + expectError bool + expectedDatabaseURL string + expectedPrometheusPort int + expectedTracingEnabled bool + expectedTracingCollectorTarget string + expectedTracingSamplingRatio float64 + expectedTracingTLSCertPath string + expectedTelemetryEnabled bool + expectedTelemetryEndpoint string + expectedTelemetryInsecureConn bool + expectedTelemetryCACertFile string + expectedTelemetryAttributes OtelAttributes + expectedTelemetryTraceSampleRatio float64 + expectedTelemetryAuthHeaders map[string]string + expectedTelemetryAuthPubKeyHex string + expectedTelemetryEmitterBatchProcessor bool + expectedTelemetryEmitterExportTimeout time.Duration + expectedTelemetryEmitterExportInterval time.Duration + expectedTelemetryEmitterExportMaxBatchSize int + expectedTelemetryEmitterMaxQueueSize int }{ { name: "All variables set correctly", @@ -60,24 +63,30 @@ func TestEnvConfig_parse(t *testing.T) { envTelemetryAuthPubKeyHex: "pub-key-hex", envTelemetryEmitterBatchProcessor: "true", envTelemetryEmitterExportTimeout: "1s", + envTelemetryEmitterExportInterval: "2s", + envTelemetryEmitterExportMaxBatchSize: "100", + envTelemetryEmitterMaxQueueSize: "1000", }, - expectError: false, - expectedDatabaseURL: "postgres://user:password@localhost:5432/db", - expectedPrometheusPort: 8080, - expectedTracingEnabled: true, - expectedTracingCollectorTarget: "some:target", - expectedTracingSamplingRatio: 1.0, - expectedTracingTLSCertPath: "internal/test/fixtures/client.pem", - expectedTelemetryEnabled: true, - expectedTelemetryEndpoint: "example.com/beholder", - expectedTelemetryInsecureConn: true, - expectedTelemetryCACertFile: "foo/bar", - expectedTelemetryAttributes: OtelAttributes{"foo": "bar", "baz": "42"}, - expectedTelemetryTraceSampleRatio: 0.42, - expectedTelemetryAuthHeaders: map[string]string{"header-key": "header-value"}, - expectedTelemetryAuthPubKeyHex: "pub-key-hex", - expectedTelemetryEmitterBatchProcessor: true, - expectedTelemetryEmitterExportTimeout: 1 * time.Second, + expectError: false, + expectedDatabaseURL: "postgres://user:password@localhost:5432/db", + expectedPrometheusPort: 8080, + expectedTracingEnabled: true, + expectedTracingCollectorTarget: "some:target", + expectedTracingSamplingRatio: 1.0, + expectedTracingTLSCertPath: "internal/test/fixtures/client.pem", + expectedTelemetryEnabled: true, + expectedTelemetryEndpoint: "example.com/beholder", + expectedTelemetryInsecureConn: true, + expectedTelemetryCACertFile: "foo/bar", + expectedTelemetryAttributes: OtelAttributes{"foo": "bar", "baz": "42"}, + expectedTelemetryTraceSampleRatio: 0.42, + expectedTelemetryAuthHeaders: map[string]string{"header-key": "header-value"}, + expectedTelemetryAuthPubKeyHex: "pub-key-hex", + expectedTelemetryEmitterBatchProcessor: true, + expectedTelemetryEmitterExportTimeout: 1 * time.Second, + expectedTelemetryEmitterExportInterval: 2 * time.Second, + expectedTelemetryEmitterExportMaxBatchSize: 100, + expectedTelemetryEmitterMaxQueueSize: 1000, }, { name: "CL_DATABASE_URL parse error", @@ -168,12 +177,45 @@ func TestEnvConfig_parse(t *testing.T) { if config.TelemetryEmitterExportTimeout != tc.expectedTelemetryEmitterExportTimeout { t.Errorf("Expected telemetryEmitterExportTimeout %v, got %v", tc.expectedTelemetryEmitterExportTimeout, config.TelemetryEmitterExportTimeout) } + if config.TelemetryEmitterExportInterval != tc.expectedTelemetryEmitterExportInterval { + t.Errorf("Expected telemetryEmitterExportInterval %v, got %v", tc.expectedTelemetryEmitterExportInterval, config.TelemetryEmitterExportInterval) + } + if config.TelemetryEmitterExportMaxBatchSize != tc.expectedTelemetryEmitterExportMaxBatchSize { + t.Errorf("Expected telemetryEmitterExportMaxBatchSize %d, got %d", tc.expectedTelemetryEmitterExportMaxBatchSize, config.TelemetryEmitterExportMaxBatchSize) + } + if config.TelemetryEmitterMaxQueueSize != tc.expectedTelemetryEmitterMaxQueueSize { + t.Errorf("Expected telemetryEmitterMaxQueueSize %d, got %d", tc.expectedTelemetryEmitterMaxQueueSize, config.TelemetryEmitterMaxQueueSize) + } } } }) } } +func equalOtelAttributes(a, b OtelAttributes) bool { + if len(a) != len(b) { + return false + } + for k, v := range a { + if b[k] != v { + return false + } + } + return true +} + +func equalStringMaps(a, b map[string]string) bool { + if len(a) != len(b) { + return false + } + for k, v := range a { + if b[k] != v { + return false + } + } + return true +} + func TestEnvConfig_AsCmdEnv(t *testing.T) { envCfg := EnvConfig{ DatabaseURL: &url.URL{Scheme: "postgres", Host: "localhost:5432", User: url.UserPassword("user", "password"), Path: "/db"}, @@ -185,16 +227,19 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) { TracingTLSCertPath: "some/path", TracingAttributes: map[string]string{"key": "value"}, - TelemetryEnabled: true, - TelemetryEndpoint: "example.com/beholder", - TelemetryInsecureConnection: true, - TelemetryCACertFile: "foo/bar", - TelemetryAttributes: OtelAttributes{"foo": "bar", "baz": "42"}, - TelemetryTraceSampleRatio: 0.42, - TelemetryAuthHeaders: map[string]string{"header-key": "header-value"}, - TelemetryAuthPubKeyHex: "pub-key-hex", - TelemetryEmitterBatchProcessor: true, - TelemetryEmitterExportTimeout: 1 * time.Second, + TelemetryEnabled: true, + TelemetryEndpoint: "example.com/beholder", + TelemetryInsecureConnection: true, + TelemetryCACertFile: "foo/bar", + TelemetryAttributes: OtelAttributes{"foo": "bar", "baz": "42"}, + TelemetryTraceSampleRatio: 0.42, + TelemetryAuthHeaders: map[string]string{"header-key": "header-value"}, + TelemetryAuthPubKeyHex: "pub-key-hex", + TelemetryEmitterBatchProcessor: true, + TelemetryEmitterExportTimeout: 1 * time.Second, + TelemetryEmitterExportInterval: 2 * time.Second, + TelemetryEmitterExportMaxBatchSize: 100, + TelemetryEmitterMaxQueueSize: 1000, } got := map[string]string{} for _, kv := range envCfg.AsCmdEnv() { @@ -223,6 +268,9 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) { assert.Equal(t, "pub-key-hex", got[envTelemetryAuthPubKeyHex]) assert.Equal(t, "true", got[envTelemetryEmitterBatchProcessor]) assert.Equal(t, "1s", got[envTelemetryEmitterExportTimeout]) + assert.Equal(t, "2s", got[envTelemetryEmitterExportInterval]) + assert.Equal(t, "100", got[envTelemetryEmitterExportMaxBatchSize]) + assert.Equal(t, "1000", got[envTelemetryEmitterMaxQueueSize]) } func TestGetMap(t *testing.T) { diff --git a/pkg/loop/server.go b/pkg/loop/server.go index c866be20b..6cf4f0c5b 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -92,15 +92,18 @@ func (s *Server) start() error { } beholderCfg := beholder.Config{ - InsecureConnection: envCfg.TelemetryInsecureConnection, - CACertFile: envCfg.TelemetryCACertFile, - OtelExporterGRPCEndpoint: envCfg.TelemetryEndpoint, - ResourceAttributes: append(attributes, envCfg.TelemetryAttributes.AsStringAttributes()...), - TraceSampleRatio: envCfg.TelemetryTraceSampleRatio, - AuthHeaders: envCfg.TelemetryAuthHeaders, - AuthPublicKeyHex: envCfg.TelemetryAuthPubKeyHex, - EmitterBatchProcessor: envCfg.TelemetryEmitterBatchProcessor, - EmitterExportTimeout: envCfg.TelemetryEmitterExportTimeout, + InsecureConnection: envCfg.TelemetryInsecureConnection, + CACertFile: envCfg.TelemetryCACertFile, + OtelExporterGRPCEndpoint: envCfg.TelemetryEndpoint, + ResourceAttributes: append(attributes, envCfg.TelemetryAttributes.AsStringAttributes()...), + TraceSampleRatio: envCfg.TelemetryTraceSampleRatio, + AuthHeaders: envCfg.TelemetryAuthHeaders, + AuthPublicKeyHex: envCfg.TelemetryAuthPubKeyHex, + EmitterBatchProcessor: envCfg.TelemetryEmitterBatchProcessor, + EmitterExportTimeout: envCfg.TelemetryEmitterExportTimeout, + EmitterExportInterval: envCfg.TelemetryEmitterExportInterval, + EmitterExportMaxBatchSize: envCfg.TelemetryEmitterExportMaxBatchSize, + EmitterMaxQueueSize: envCfg.TelemetryEmitterMaxQueueSize, } if tracingConfig.Enabled {