Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INFOPLAT-1539 Beholder Log Batch Processor More Settings #957

Merged
merged 14 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
if cfg.LogBatchProcessor {
loggerProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s
sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s
sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize
sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s
sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048
)
} else {
loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
Expand Down Expand Up @@ -154,7 +157,10 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
if cfg.EmitterBatchProcessor {
messageLogProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s
sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s
sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize), // Default is 512, must be <= maxQueueSize
sdklog.WithExportInterval(cfg.EmitterExportInterval), // Default is 1s
sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize), // Default is 2048
)
} else {
messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
Expand Down
27 changes: 21 additions & 6 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,29 @@ type Config struct {
// OTel Resource
ResourceAttributes []otelattr.KeyValue
// Message Emitter
EmitterExportTimeout time.Duration
EmitterExportTimeout time.Duration
EmitterExportInterval time.Duration
EmitterExportMaxBatchSize int
EmitterMaxQueueSize int
// Batch processing is enabled by default
// Disable it only for testing
EmitterBatchProcessor bool
pkcll marked this conversation as resolved.
Show resolved Hide resolved

// OTel Trace
TraceSampleRatio float64
TraceBatchTimeout time.Duration
TraceSpanExporter sdktrace.SpanExporter // optional additional exporter
TraceRetryConfig *RetryConfig

// OTel Metric
MetricReaderInterval time.Duration
MetricRetryConfig *RetryConfig

// OTel Log
LogExportTimeout time.Duration
LogExportTimeout time.Duration
LogExportInterval time.Duration
LogExportMaxBatchSize int
LogMaxQueueSize int
// Batch processing is enabled by default
// Disable it only for testing
LogBatchProcessor bool
Expand Down Expand Up @@ -79,8 +88,11 @@ func DefaultConfig() Config {
// Resource
ResourceAttributes: defaultOtelAttributes,
// Message Emitter
EmitterExportTimeout: 1 * time.Second,
EmitterBatchProcessor: true,
EmitterExportTimeout: 1 * time.Second,
EmitterExportMaxBatchSize: 512,
EmitterExportInterval: 1 * time.Second,
EmitterMaxQueueSize: 2048,
EmitterBatchProcessor: true,
// OTel message log exporter retry config
LogRetryConfig: defaultRetryConfig.Copy(),
// Trace
Expand All @@ -93,8 +105,11 @@ func DefaultConfig() Config {
// OTel metric exporter retry config
MetricRetryConfig: defaultRetryConfig.Copy(),
// Log
LogExportTimeout: 1 * time.Second,
LogBatchProcessor: true,
LogExportTimeout: 1 * time.Second,
LogExportMaxBatchSize: 512,
LogExportInterval: 1 * time.Second,
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
}
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ func ExampleConfig() {
otelattr.String("sender", "beholderclient"),
},
// Message Emitter
EmitterExportTimeout: 1 * time.Second,
EmitterBatchProcessor: true,
EmitterExportTimeout: 1 * time.Second,
EmitterExportMaxBatchSize: 512,
EmitterExportInterval: 1 * time.Second,
EmitterMaxQueueSize: 2048,
EmitterBatchProcessor: true,
// OTel message log exporter retry config
LogRetryConfig: nil,
// Trace
Expand All @@ -39,8 +42,11 @@ func ExampleConfig() {
// OTel metric exporter retry config
MetricRetryConfig: nil,
// Log
LogExportTimeout: 1 * time.Second,
LogBatchProcessor: true,
LogExportTimeout: 1 * time.Second,
LogExportMaxBatchSize: 512,
LogExportInterval: 1 * time.Second,
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
}
fmt.Printf("%+v\n", config)
config.LogRetryConfig = &beholder.RetryConfig{
Expand All @@ -50,6 +56,6 @@ func ExampleConfig() {
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> LogExportTimeout:1s LogBatchProcessor:true LogRetryConfig:<nil> AuthPublicKeyHex: AuthHeaders:map[]}
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> AuthPublicKeyHex: AuthHeaders:map[]}
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
}
10 changes: 8 additions & 2 deletions pkg/beholder/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
if cfg.LogBatchProcessor {
loggerProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s
sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s
sdklog.WithExportMaxBatchSize(cfg.LogExportMaxBatchSize), // Default is 512, must be <= maxQueueSize
sdklog.WithExportInterval(cfg.LogExportInterval), // Default is 1s
sdklog.WithMaxQueueSize(cfg.LogMaxQueueSize), // Default is 2048
)
} else {
loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
Expand Down Expand Up @@ -119,7 +122,10 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
if cfg.EmitterBatchProcessor {
messageLogProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s
sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s
sdklog.WithExportMaxBatchSize(cfg.EmitterExportMaxBatchSize), // Default is 512, must be <= maxQueueSize
sdklog.WithExportInterval(cfg.EmitterExportInterval), // Default is 1s
sdklog.WithMaxQueueSize(cfg.EmitterMaxQueueSize), // Default is 2048
)
} else {
messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
Expand Down
70 changes: 49 additions & 21 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ const (
envTracingAttribute = "CL_TRACING_ATTRIBUTE_"
envTracingTLSCertPath = "CL_TRACING_TLS_CERT_PATH"

envTelemetryEnabled = "CL_TELEMETRY_ENABLED"
envTelemetryEndpoint = "CL_TELEMETRY_ENDPOINT"
envTelemetryInsecureConn = "CL_TELEMETRY_INSECURE_CONNECTION"
envTelemetryCACertFile = "CL_TELEMETRY_CA_CERT_FILE"
envTelemetryAttribute = "CL_TELEMETRY_ATTRIBUTE_"
envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO"
envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER"
envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX"
envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR"
envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT"
envTelemetryEnabled = "CL_TELEMETRY_ENABLED"
envTelemetryEndpoint = "CL_TELEMETRY_ENDPOINT"
envTelemetryInsecureConn = "CL_TELEMETRY_INSECURE_CONNECTION"
envTelemetryCACertFile = "CL_TELEMETRY_CA_CERT_FILE"
envTelemetryAttribute = "CL_TELEMETRY_ATTRIBUTE_"
envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO"
envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER"
envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX"
envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR"
envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT"
envTelemetryEmitterExportInterval = "CL_TELEMETRY_EMITTER_EXPORT_INTERVAL"
envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE"
envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE"
)

// EnvConfig is the configuration between the application and the LOOP executable. The values
Expand All @@ -46,16 +49,19 @@ type EnvConfig struct {
TracingTLSCertPath string
TracingAttributes map[string]string

TelemetryEnabled bool
TelemetryEndpoint string
TelemetryInsecureConnection bool
TelemetryCACertFile string
TelemetryAttributes OtelAttributes
TelemetryTraceSampleRatio float64
TelemetryAuthHeaders map[string]string
TelemetryAuthPubKeyHex string
TelemetryEmitterBatchProcessor bool
TelemetryEmitterExportTimeout time.Duration
TelemetryEnabled bool
TelemetryEndpoint string
TelemetryInsecureConnection bool
TelemetryCACertFile string
TelemetryAttributes OtelAttributes
TelemetryTraceSampleRatio float64
TelemetryAuthHeaders map[string]string
TelemetryAuthPubKeyHex string
TelemetryEmitterBatchProcessor bool
TelemetryEmitterExportTimeout time.Duration
TelemetryEmitterExportInterval time.Duration
TelemetryEmitterExportMaxBatchSize int
TelemetryEmitterMaxQueueSize int
}

// AsCmdEnv returns a slice of environment variable key/value pairs for an exec.Cmd.
Expand Down Expand Up @@ -93,7 +99,9 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
add(envTelemetryAuthPubKeyHex, e.TelemetryAuthPubKeyHex)
add(envTelemetryEmitterBatchProcessor, strconv.FormatBool(e.TelemetryEmitterBatchProcessor))
add(envTelemetryEmitterExportTimeout, e.TelemetryEmitterExportTimeout.String())

add(envTelemetryEmitterExportInterval, e.TelemetryEmitterExportInterval.String())
add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize))
add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize))
return
}

Expand Down Expand Up @@ -142,6 +150,26 @@ func (e *EnvConfig) parse() error {
e.TelemetryTraceSampleRatio = getFloat64OrZero(envTelemetryTraceSampleRatio)
e.TelemetryAuthHeaders = getMap(envTelemetryAuthHeader)
e.TelemetryAuthPubKeyHex = os.Getenv(envTelemetryAuthPubKeyHex)
e.TelemetryEmitterBatchProcessor, err = getBool(envTelemetryEmitterBatchProcessor)
pkcll marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterBatchProcessor, err)
}
e.TelemetryEmitterExportTimeout, err = time.ParseDuration(os.Getenv(envTelemetryEmitterExportTimeout))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportTimeout, err)
}
e.TelemetryEmitterExportInterval, err = time.ParseDuration(os.Getenv(envTelemetryEmitterExportInterval))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportInterval, err)
}
e.TelemetryEmitterExportMaxBatchSize, err = strconv.Atoi(os.Getenv(envTelemetryEmitterExportMaxBatchSize))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportMaxBatchSize, err)
}
e.TelemetryEmitterMaxQueueSize, err = strconv.Atoi(os.Getenv(envTelemetryEmitterMaxQueueSize))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterMaxQueueSize, err)
}
}
return nil
}
Expand Down
Loading
Loading