Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INFOPLAT-1539 Beholder Log Batch Processor More Settings #957

Merged
merged 14 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
if cfg.LogExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s
}
if cfg.LogExportMaxBatchSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize
}
if cfg.LogExportInterval > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.LogExportInterval)) // Default is 1s
}
if cfg.LogMaxQueueSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize)) // Default is 2048
}
loggerProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
batchProcessorOpts...,
Expand Down Expand Up @@ -159,6 +168,15 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
if cfg.EmitterExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s
}
if cfg.EmitterExportMaxBatchSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize
}
if cfg.EmitterExportInterval > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.EmitterExportInterval)) // Default is 1s
}
if cfg.EmitterMaxQueueSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize)) // Default is 2048
}
messageLogProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
batchProcessorOpts...,
Expand Down Expand Up @@ -361,7 +379,6 @@ func newMeterProvider(config Config, resource *sdkresource.Resource, creds crede
if err != nil {
return nil, err
}

mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(
Expand Down
35 changes: 23 additions & 12 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@ type Config struct {
// OTel Resource
ResourceAttributes []otelattr.KeyValue
// Message Emitter
EmitterExportTimeout time.Duration
// Batch processing is enabled by default
// Disable it only for testing
EmitterBatchProcessor bool
EmitterExportTimeout time.Duration
EmitterExportInterval time.Duration
EmitterExportMaxBatchSize int
EmitterMaxQueueSize int
EmitterBatchProcessor bool // Enabled by default. Disable only for testing.

// OTel Trace
TraceSampleRatio float64
TraceBatchTimeout time.Duration
TraceSpanExporter sdktrace.SpanExporter // optional additional exporter
TraceRetryConfig *RetryConfig

// OTel Metric
MetricReaderInterval time.Duration
MetricRetryConfig *RetryConfig
MetricViews []sdkmetric.View

// OTel Log
LogExportTimeout time.Duration
// Batch processing is enabled by default
// Disable it only for testing
LogBatchProcessor bool
LogExportTimeout time.Duration
LogExportInterval time.Duration
LogExportMaxBatchSize int
LogMaxQueueSize int
LogBatchProcessor bool // Enabled by default. Disable only for testing.
// Retry config for shared log exporter, used by Emitter and Logger
LogRetryConfig *RetryConfig

Expand Down Expand Up @@ -81,8 +86,11 @@ func DefaultConfig() Config {
// Resource
ResourceAttributes: defaultOtelAttributes,
// Message Emitter
EmitterExportTimeout: 1 * time.Second,
EmitterBatchProcessor: true,
EmitterExportTimeout: 30 * time.Second,
EmitterExportMaxBatchSize: 512,
EmitterExportInterval: 1 * time.Second,
EmitterMaxQueueSize: 2048,
EmitterBatchProcessor: true,
// OTel message log exporter retry config
LogRetryConfig: defaultRetryConfig.Copy(),
// Trace
Expand All @@ -95,8 +103,11 @@ func DefaultConfig() Config {
// OTel metric exporter retry config
MetricRetryConfig: defaultRetryConfig.Copy(),
// Log
LogExportTimeout: 1 * time.Second,
LogBatchProcessor: true,
LogExportTimeout: 30 * time.Second,
LogExportMaxBatchSize: 512,
LogExportInterval: 1 * time.Second,
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
}
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ func ExampleConfig() {
otelattr.String("sender", "beholderclient"),
},
// Message Emitter
EmitterExportTimeout: 1 * time.Second,
EmitterBatchProcessor: true,
EmitterExportTimeout: 1 * time.Second,
EmitterExportMaxBatchSize: 512,
EmitterExportInterval: 1 * time.Second,
EmitterMaxQueueSize: 2048,
EmitterBatchProcessor: true,
// OTel message log exporter retry config
LogRetryConfig: nil,
// Trace
Expand All @@ -39,8 +42,11 @@ func ExampleConfig() {
// OTel metric exporter retry config
MetricRetryConfig: nil,
// Log
LogExportTimeout: 1 * time.Second,
LogBatchProcessor: true,
LogExportTimeout: 1 * time.Second,
LogExportMaxBatchSize: 512,
LogExportInterval: 1 * time.Second,
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
}
fmt.Printf("%+v\n", config)
config.LogRetryConfig = &beholder.RetryConfig{
Expand All @@ -50,6 +56,6 @@ func ExampleConfig() {
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] LogExportTimeout:1s LogBatchProcessor:true LogRetryConfig:<nil> AuthPublicKeyHex: AuthHeaders:map[]}
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> AuthPublicKeyHex: AuthHeaders:map[]}
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
}
21 changes: 20 additions & 1 deletion pkg/beholder/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
if cfg.LogExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s
}
if cfg.LogExportMaxBatchSize > 0 {

batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize
}
if cfg.LogExportInterval > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.LogExportInterval)) // Default is 1s
}
if cfg.LogMaxQueueSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize)) // Default is 2048
}
loggerProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
batchProcessorOpts...,
Expand Down Expand Up @@ -124,9 +134,18 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
if cfg.EmitterExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s
}
if cfg.EmitterExportMaxBatchSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize)) // Default is 512, must be <= maxQueueSize
}
if cfg.EmitterExportInterval > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportInterval(cfg.EmitterExportInterval)) // Default is 1s
}
if cfg.EmitterMaxQueueSize > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize)) // Default is 2048
}
messageLogProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
batchProcessorOpts..., // Default is 30s
batchProcessorOpts...,
)
} else {
messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
Expand Down
62 changes: 41 additions & 21 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ const (
envTracingAttribute = "CL_TRACING_ATTRIBUTE_"
envTracingTLSCertPath = "CL_TRACING_TLS_CERT_PATH"

envTelemetryEnabled = "CL_TELEMETRY_ENABLED"
envTelemetryEndpoint = "CL_TELEMETRY_ENDPOINT"
envTelemetryInsecureConn = "CL_TELEMETRY_INSECURE_CONNECTION"
envTelemetryCACertFile = "CL_TELEMETRY_CA_CERT_FILE"
envTelemetryAttribute = "CL_TELEMETRY_ATTRIBUTE_"
envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO"
envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER"
envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX"
envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR"
envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT"
envTelemetryEnabled = "CL_TELEMETRY_ENABLED"
envTelemetryEndpoint = "CL_TELEMETRY_ENDPOINT"
envTelemetryInsecureConn = "CL_TELEMETRY_INSECURE_CONNECTION"
envTelemetryCACertFile = "CL_TELEMETRY_CA_CERT_FILE"
envTelemetryAttribute = "CL_TELEMETRY_ATTRIBUTE_"
envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO"
envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER"
envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX"
envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR"
envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT"
envTelemetryEmitterExportInterval = "CL_TELEMETRY_EMITTER_EXPORT_INTERVAL"
envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE"
envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE"
)

// EnvConfig is the configuration between the application and the LOOP executable. The values
Expand All @@ -46,16 +49,19 @@ type EnvConfig struct {
TracingTLSCertPath string
TracingAttributes map[string]string

TelemetryEnabled bool
TelemetryEndpoint string
TelemetryInsecureConnection bool
TelemetryCACertFile string
TelemetryAttributes OtelAttributes
TelemetryTraceSampleRatio float64
TelemetryAuthHeaders map[string]string
TelemetryAuthPubKeyHex string
TelemetryEmitterBatchProcessor bool
TelemetryEmitterExportTimeout time.Duration
TelemetryEnabled bool
TelemetryEndpoint string
TelemetryInsecureConnection bool
TelemetryCACertFile string
TelemetryAttributes OtelAttributes
TelemetryTraceSampleRatio float64
TelemetryAuthHeaders map[string]string
TelemetryAuthPubKeyHex string
TelemetryEmitterBatchProcessor bool
TelemetryEmitterExportTimeout time.Duration
TelemetryEmitterExportInterval time.Duration
TelemetryEmitterExportMaxBatchSize int
TelemetryEmitterMaxQueueSize int
}

// AsCmdEnv returns a slice of environment variable key/value pairs for an exec.Cmd.
Expand Down Expand Up @@ -93,7 +99,9 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
add(envTelemetryAuthPubKeyHex, e.TelemetryAuthPubKeyHex)
add(envTelemetryEmitterBatchProcessor, strconv.FormatBool(e.TelemetryEmitterBatchProcessor))
add(envTelemetryEmitterExportTimeout, e.TelemetryEmitterExportTimeout.String())

add(envTelemetryEmitterExportInterval, e.TelemetryEmitterExportInterval.String())
add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize))
add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize))
return
}

Expand Down Expand Up @@ -150,6 +158,18 @@ func (e *EnvConfig) parse() error {
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportTimeout, err)
}
e.TelemetryEmitterExportInterval, err = time.ParseDuration(os.Getenv(envTelemetryEmitterExportInterval))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportInterval, err)
}
e.TelemetryEmitterExportMaxBatchSize, err = strconv.Atoi(os.Getenv(envTelemetryEmitterExportMaxBatchSize))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportMaxBatchSize, err)
}
e.TelemetryEmitterMaxQueueSize, err = strconv.Atoi(os.Getenv(envTelemetryEmitterMaxQueueSize))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterMaxQueueSize, err)
}
}
return nil
}
Expand Down
Loading
Loading