diff --git a/.chloggen/elasticsearchexporter_deprecate-retry-maxrequests.yaml b/.chloggen/elasticsearchexporter_deprecate-retry-maxrequests.yaml new file mode 100644 index 000000000000..80bb0eac5fb8 --- /dev/null +++ b/.chloggen/elasticsearchexporter_deprecate-retry-maxrequests.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate retry::max_requests in favor of retry::max_retries + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32344] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: retry::max_retries will be exactly retry::max_requests - 1 + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/elasticsearchexporter_otel-mode-passthrough-field-prefix-conflict.yaml b/.chloggen/elasticsearchexporter_otel-mode-passthrough-field-prefix-conflict.yaml new file mode 100644 index 000000000000..afde47be348b --- /dev/null +++ b/.chloggen/elasticsearchexporter_otel-mode-passthrough-field-prefix-conflict.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Preserve attribute names and metric names on prefix conflict in OTel mapping mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35651] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: e.g. if there are attributes "a" and "a.b", they should be sent to Elasticsearch as is, instead of "a.value" and "a.b", in OTel mapping mode + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/mysqlreceiver_older_replica_metrics_support.yaml b/.chloggen/mysqlreceiver_older_replica_metrics_support.yaml new file mode 100644 index 000000000000..aecdf987b857 --- /dev/null +++ b/.chloggen/mysqlreceiver_older_replica_metrics_support.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: mysqlreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add replica metric support for versions of MySQL earlier than 8.0.22. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35217] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/opamp-extension-reportshealth.yaml b/.chloggen/opamp-extension-reportshealth.yaml new file mode 100644 index 000000000000..c58f185a6628 --- /dev/null +++ b/.chloggen/opamp-extension-reportshealth.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement `ReportsHealth` capability in OpAMP extension + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35433] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 5ec203f13674..b620b81158e9 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -202,7 +202,8 @@ The behaviour of this bulk indexing can be configured with the following setting - `interval` (default=30s): Write buffer flush time limit. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. - - `max_requests` (default=3): Number of HTTP request retries. + - `max_requests` (DEPRECATED, use retry::max_retries instead): Number of HTTP request retries including the initial attempt. If used, `retry::max_retries` will be set to `max_requests - 1`. + - `max_retries` (default=2): Number of HTTP request retries. To disable retries, set `retry::enabled` to `false` instead of setting `max_retries` to `0`. - `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed. - `max_interval` (default=1m): Max waiting time if a HTTP request failed. - `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`. diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 471ddc2dc7b9..62bc329a26f8 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -51,6 +51,8 @@ type bulkIndexerSession interface { Flush(context.Context) error } +const defaultMaxRetries = 2 + func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) { if config.Batcher.Enabled != nil { return newSyncBulkIndexer(logger, client, config), nil @@ -58,20 +60,25 @@ func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Co return newAsyncBulkIndexer(logger, client, config) } -func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer { - var maxDocRetry int +func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender.BulkIndexerConfig { + var maxDocRetries int if config.Retry.Enabled { - // max_requests includes initial attempt - // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344 - maxDocRetry = config.Retry.MaxRequests - 1 + maxDocRetries = defaultMaxRetries + if config.Retry.MaxRetries != 0 { + maxDocRetries = config.Retry.MaxRetries + } } + return docappender.BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: maxDocRetries, + Pipeline: config.Pipeline, + RetryOnDocumentStatus: config.Retry.RetryOnStatus, + } +} + +func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer { return &syncBulkIndexer{ - config: docappender.BulkIndexerConfig{ - Client: client, - MaxDocumentRetries: maxDocRetry, - Pipeline: config.Pipeline, - RetryOnDocumentStatus: config.Retry.RetryOnStatus, - }, + config: bulkIndexerConfig(client, config), flushTimeout: config.Timeout, retryConfig: config.Retry, logger: logger, @@ -165,13 +172,6 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi flushBytes = 5e+6 } - var maxDocRetry int - if config.Retry.Enabled { - // max_requests includes initial attempt - // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344 - maxDocRetry = config.Retry.MaxRequests - 1 - } - pool := &asyncBulkIndexer{ wg: sync.WaitGroup{}, items: make(chan docappender.BulkIndexerItem, config.NumWorkers), @@ -180,12 +180,7 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi pool.wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { - bi, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ - Client: client, - MaxDocumentRetries: maxDocRetry, - Pipeline: config.Pipeline, - RetryOnDocumentStatus: config.Retry.RetryOnStatus, - }) + bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config)) if err != nil { return nil, err } diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 072bd725c6fe..fe794d6db430 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -169,9 +169,13 @@ type RetrySettings struct { // Enabled allows users to disable retry without having to comment out all settings. Enabled bool `mapstructure:"enabled"` - // MaxRequests configures how often an HTTP request is retried before it is assumed to be failed. + // MaxRequests configures how often an HTTP request is attempted before it is assumed to be failed. + // Deprecated: use MaxRetries instead. MaxRequests int `mapstructure:"max_requests"` + // MaxRetries configures how many times an HTTP request is retried. + MaxRetries int `mapstructure:"max_retries"` + // InitialInterval configures the initial waiting time if a request failed. InitialInterval time.Duration `mapstructure:"initial_interval"` @@ -273,6 +277,17 @@ func (cfg *Config) Validate() error { // TODO support confighttp.ClientConfig.Compression return errors.New("compression is not currently configurable") } + + if cfg.Retry.MaxRequests != 0 && cfg.Retry.MaxRetries != 0 { + return errors.New("must not specify both retry::max_requests and retry::max_retries") + } + if cfg.Retry.MaxRequests < 0 { + return errors.New("retry::max_requests should be non-negative") + } + if cfg.Retry.MaxRetries < 0 { + return errors.New("retry::max_retries should be non-negative") + } + return nil } @@ -355,11 +370,16 @@ func (cfg *Config) MappingMode() MappingMode { return mappingModes[cfg.Mapping.Mode] } -func logConfigDeprecationWarnings(cfg *Config, logger *zap.Logger) { +func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) { if cfg.Mapping.Dedup != nil { logger.Warn("dedup is deprecated, and is always enabled") } if cfg.Mapping.Dedot && cfg.MappingMode() != MappingECS || !cfg.Mapping.Dedot && cfg.MappingMode() == MappingECS { logger.Warn("dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only") } + if cfg.Retry.MaxRequests != 0 { + cfg.Retry.MaxRetries = cfg.Retry.MaxRequests - 1 + // Do not set cfg.Retry.Enabled = false if cfg.Retry.MaxRequest = 1 to avoid breaking change on behavior + logger.Warn("retry::max_requests has been deprecated, and will be removed in a future version. Use retry::max_retries instead.") + } } diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index a27a28ccfe6e..9934dbb7365b 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -94,7 +94,7 @@ func TestConfig(t *testing.T) { }, Retry: RetrySettings{ Enabled: true, - MaxRequests: 5, + MaxRetries: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, @@ -164,7 +164,7 @@ func TestConfig(t *testing.T) { }, Retry: RetrySettings{ Enabled: true, - MaxRequests: 5, + MaxRetries: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, @@ -234,7 +234,7 @@ func TestConfig(t *testing.T) { }, Retry: RetrySettings{ Enabled: true, - MaxRequests: 5, + MaxRetries: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, @@ -391,6 +391,14 @@ func TestConfig_Validate(t *testing.T) { }), err: `compression is not currently configurable`, }, + "both max_retries and max_requests specified": { + config: withDefaultConfig(func(cfg *Config) { + cfg.Endpoints = []string{"http://test:9200"} + cfg.Retry.MaxRetries = 1 + cfg.Retry.MaxRequests = 1 + }), + err: `must not specify both retry::max_requests and retry::max_retries`, + }, } for name, tt := range tests { diff --git a/exporter/elasticsearchexporter/esclient.go b/exporter/elasticsearchexporter/esclient.go index 23c2d48bb9ef..556718242bbf 100644 --- a/exporter/elasticsearchexporter/esclient.go +++ b/exporter/elasticsearchexporter/esclient.go @@ -90,16 +90,6 @@ func newElasticsearchClient( headers := make(http.Header) headers.Set("User-Agent", userAgent) - // maxRetries configures the maximum number of event publishing attempts, - // including the first send and additional retries. - - maxRetries := config.Retry.MaxRequests - 1 - retryDisabled := !config.Retry.Enabled || maxRetries <= 0 - - if retryDisabled { - maxRetries = 0 - } - // endpoints converts Config.Endpoints, Config.CloudID, // and Config.ClientConfig.Endpoint to a list of addresses. endpoints, err := config.endpoints() @@ -113,6 +103,11 @@ func newElasticsearchClient( logResponseBody: config.LogResponseBody, } + maxRetries := defaultMaxRetries + if config.Retry.MaxRetries != 0 { + maxRetries = config.Retry.MaxRetries + } + return elasticsearch.NewClient(elasticsearch.Config{ Transport: httpClient.Transport, @@ -125,7 +120,7 @@ func newElasticsearchClient( // configure retry behavior RetryOnStatus: config.Retry.RetryOnStatus, - DisableRetry: retryDisabled, + DisableRetry: !config.Retry.Enabled, EnableRetryOnTimeout: config.Retry.Enabled, //RetryOnError: retryOnError, // should be used from esclient version 8 onwards MaxRetries: maxRetries, diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index e2871666b138..9a9b86a5be6f 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -540,14 +540,10 @@ func TestExporterLogs(t *testing.T) { t.Run("no retry", func(t *testing.T) { configurations := map[string]func(*Config){ - "max_requests limited": func(cfg *Config) { - cfg.Retry.MaxRequests = 1 - cfg.Retry.InitialInterval = 1 * time.Millisecond - cfg.Retry.MaxInterval = 10 * time.Millisecond - }, "retry.enabled is false": func(cfg *Config) { cfg.Retry.Enabled = false - cfg.Retry.MaxRequests = 10 + cfg.Retry.RetryOnStatus = []int{429} + cfg.Retry.MaxRetries = 10 cfg.Retry.InitialInterval = 1 * time.Millisecond cfg.Retry.MaxInterval = 10 * time.Millisecond }, @@ -557,7 +553,7 @@ func TestExporterLogs(t *testing.T) { "fail http request": func(attempts *atomic.Int64) bulkHandler { return func([]itemRequest) ([]itemResponse, error) { attempts.Add(1) - return nil, &httpTestError{message: "oops"} + return nil, &httpTestError{message: "oops", status: 429} } }, "fail item": func(attempts *atomic.Int64) bulkHandler { @@ -714,6 +710,35 @@ func TestExporterLogs(t *testing.T) { assert.Equal(t, `{"some.scope.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `scope.attributes`).Raw) assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw) }) + + t.Run("otel mode attribute key prefix conflict", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + mustSendLogs(t, exporter, newLogsWithAttributes(map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + })) + + rec.WaitItems(1) + doc := rec.Items()[0].Document + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw) + }) } func TestExporterMetrics(t *testing.T) { @@ -1300,6 +1325,75 @@ func TestExporterMetrics(t *testing.T) { assertItemsEqual(t, expected, rec.Items(), false) }) + t.Run("otel mode metric name conflict", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + metrics := pmetric.NewMetrics() + resourceMetric := metrics.ResourceMetrics().AppendEmpty() + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + + fooBarMetric := scopeMetric.Metrics().AppendEmpty() + fooBarMetric.SetName("foo.bar") + fooBarMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0) + + fooMetric := scopeMetric.Metrics().AppendEmpty() + fooMetric.SetName("foo") + fooMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0) + + fooBarBazMetric := scopeMetric.Metrics().AppendEmpty() + fooBarBazMetric.SetName("foo.bar.baz") + fooBarBazMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0) + + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(1) + expected := []itemRequest{ + { + Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.foo.bar":"gauge_long","metrics.foo":"gauge_long","metrics.foo.bar.baz":"gauge_long"}}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + }, + } + + assertItemsEqual(t, expected, rec.Items(), false) + }) + + t.Run("otel mode attribute key prefix conflict", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + mustSendMetrics(t, exporter, newMetricsWithAttributes(map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + })) + + rec.WaitItems(1) + doc := rec.Items()[0].Document + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw) + }) + t.Run("publish summary", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { @@ -1600,6 +1694,35 @@ func TestExporterTraces(t *testing.T) { assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw) } }) + + t.Run("otel mode attribute key prefix conflict", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + mustSendTraces(t, exporter, newTracesWithAttributes(map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + }, map[string]any{ + "a": "a", + "a.b": "a.b", + })) + + rec.WaitItems(1) + doc := rec.Items()[0].Document + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw) + assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw) + }) } // TestExporterAuth verifies that the Elasticsearch exporter supports diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 3f48ca1e2ec7..61af38d5cee6 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -63,7 +63,7 @@ func createDefaultConfig() component.Config { }, Retry: RetrySettings{ Enabled: true, - MaxRequests: 3, + MaxRetries: 0, // default is set in exporter code InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, RetryOnStatus: []int{ @@ -110,7 +110,7 @@ func createLogsExporter( set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.") index = cf.Index } - logConfigDeprecationWarnings(cf, set.Logger) + handleDeprecatedConfig(cf, set.Logger) exporter := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled) @@ -129,7 +129,7 @@ func createMetricsExporter( cfg component.Config, ) (exporter.Metrics, error) { cf := cfg.(*Config) - logConfigDeprecationWarnings(cf, set.Logger) + handleDeprecatedConfig(cf, set.Logger) exporter := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled) @@ -147,7 +147,7 @@ func createTracesExporter(ctx context.Context, cfg component.Config, ) (exporter.Traces, error) { cf := cfg.(*Config) - logConfigDeprecationWarnings(cf, set.Logger) + handleDeprecatedConfig(cf, set.Logger) exporter := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled) diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go index f20f9b1d213b..33a63abc794d 100644 --- a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go @@ -209,12 +209,12 @@ func (doc *Document) sort() { // The filtering only keeps the last value for a key. // // Dedup ensure that keys are sorted. -func (doc *Document) Dedup() { +func (doc *Document) Dedup(appendValueOnConflict bool) { // 1. Always ensure the fields are sorted, Dedup support requires // Fields to be sorted. doc.sort() - // 2. rename fields if a primitive value is overwritten by an object. + // 2. rename fields if a primitive value is overwritten by an object if appendValueOnConflict. // For example the pair (path.x=1, path.x.a="test") becomes: // (path.x.value=1, path.x.a="test"). // @@ -227,16 +227,18 @@ func (doc *Document) Dedup() { // field in favor of the `value` field in the document. // // This step removes potential conflicts when dedotting and serializing fields. - var renamed bool - for i := 0; i < len(doc.fields)-1; i++ { - key, nextKey := doc.fields[i].key, doc.fields[i+1].key - if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' { - renamed = true - doc.fields[i].key = key + ".value" + if appendValueOnConflict { + var renamed bool + for i := 0; i < len(doc.fields)-1; i++ { + key, nextKey := doc.fields[i].key, doc.fields[i+1].key + if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' { + renamed = true + doc.fields[i].key = key + ".value" + } + } + if renamed { + doc.sort() } - } - if renamed { - doc.sort() } // 3. mark duplicates as 'ignore' @@ -251,7 +253,7 @@ func (doc *Document) Dedup() { // 4. fix objects that might be stored in arrays for i := range doc.fields { - doc.fields[i].value.Dedup() + doc.fields[i].value.Dedup(appendValueOnConflict) } } @@ -487,13 +489,13 @@ func (v *Value) sort() { // Dedup recursively dedups keys in stored documents. // // NOTE: The value MUST be sorted. -func (v *Value) Dedup() { +func (v *Value) Dedup(appendValueOnConflict bool) { switch v.kind { case KindObject: - v.doc.Dedup() + v.doc.Dedup(appendValueOnConflict) case KindArr: for i := range v.arr { - v.arr[i].Dedup() + v.arr[i].Dedup(appendValueOnConflict) } } } diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go index 1961f716db05..3d0a07b820d0 100644 --- a/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go @@ -86,8 +86,9 @@ func TestObjectModel_CreateMap(t *testing.T) { func TestObjectModel_Dedup(t *testing.T) { tests := map[string]struct { - build func() Document - want Document + build func() Document + appendValueOnConflict bool + want Document }{ "no duplicates": { build: func() (doc Document) { @@ -95,7 +96,8 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("c", 3) return doc }, - want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}}, }, "duplicate keys": { build: func() (doc Document) { @@ -104,7 +106,8 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("a", 2) return doc }, - want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, }, "duplicate after flattening from map: namespace object at end": { build: func() Document { @@ -114,7 +117,8 @@ func TestObjectModel_Dedup(t *testing.T) { am.PutEmptyMap("namespace").PutInt("a", 23) return DocumentFromAttributes(am) }, - want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}}, }, "duplicate after flattening from map: namespace object at beginning": { build: func() Document { @@ -124,7 +128,8 @@ func TestObjectModel_Dedup(t *testing.T) { am.PutStr("toplevel", "test") return DocumentFromAttributes(am) }, - want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}}, }, "dedup in arrays": { build: func() (doc Document) { @@ -136,6 +141,7 @@ func TestObjectModel_Dedup(t *testing.T) { doc.Add("arr", ArrValue(Value{kind: KindObject, doc: embedded})) return doc }, + appendValueOnConflict: true, want: Document{fields: []field{{"arr", ArrValue(Value{kind: KindObject, doc: Document{fields: []field{ {"a", ignoreValue}, {"a", IntValue(2)}, @@ -148,7 +154,8 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("namespace.a", 2) return doc }, - want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}}, }, "dedup removes primitive if value exists": { build: func() (doc Document) { @@ -157,14 +164,25 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("namespace.value", 3) return doc }, - want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}}, + appendValueOnConflict: true, + want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}}, + }, + "dedup without append value on conflict": { + build: func() (doc Document) { + doc.AddInt("namespace", 1) + doc.AddInt("namespace.a", 2) + doc.AddInt("namespace.value", 3) + return doc + }, + appendValueOnConflict: false, + want: Document{fields: []field{{"namespace", IntValue(1)}, {"namespace.a", IntValue(2)}, {"namespace.value", IntValue(3)}}}, }, } for name, test := range tests { t.Run(name, func(t *testing.T) { doc := test.build() - doc.Dedup() + doc.Dedup(test.appendValueOnConflict) assert.Equal(t, test.want, doc) }) } @@ -282,7 +300,7 @@ func TestDocument_Serialize_Flat(t *testing.T) { m := pcommon.NewMap() assert.NoError(t, m.FromRaw(test.attrs)) doc := DocumentFromAttributes(m) - doc.Dedup() + doc.Dedup(true) err := doc.Serialize(&buf, false, false) require.NoError(t, err) @@ -343,7 +361,7 @@ func TestDocument_Serialize_Dedot(t *testing.T) { m := pcommon.NewMap() assert.NoError(t, m.FromRaw(test.attrs)) doc := DocumentFromAttributes(m) - doc.Dedup() + doc.Dedup(true) err := doc.Serialize(&buf, true, false) require.NoError(t, err) diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 434bb1090a93..8c71df950752 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -115,7 +115,8 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str default: document = m.encodeLogDefaultMode(resource, record, scope) } - document.Dedup() + // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false + document.Dedup(m.mode != MappingOTel) var buf bytes.Buffer err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) @@ -267,7 +268,8 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo } func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) { - document.Dedup() + // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false + document.Dedup(m.mode != MappingOTel) var buf bytes.Buffer err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) @@ -646,7 +648,8 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL st default: document = m.encodeSpanDefaultMode(resource, span, scope) } - document.Dedup() + // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false + document.Dedup(m.mode != MappingOTel) var buf bytes.Buffer err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) return buf.Bytes(), err diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index 6f614399b579..d76d300a51c1 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -17,7 +17,7 @@ elasticsearch/trace: flush: bytes: 10485760 retry: - max_requests: 5 + max_retries: 5 retry_on_status: - 429 - 500 @@ -38,7 +38,7 @@ elasticsearch/metric: flush: bytes: 10485760 retry: - max_requests: 5 + max_retries: 5 retry_on_status: - 429 - 500 @@ -61,7 +61,7 @@ elasticsearch/log: flush: bytes: 10485760 retry: - max_requests: 5 + max_retries: 5 retry_on_status: - 429 - 500 diff --git a/exporter/opensearchexporter/config_test.go b/exporter/opensearchexporter/config_test.go index d6840945c588..921ce1a00428 100644 --- a/exporter/opensearchexporter/config_test.go +++ b/exporter/opensearchexporter/config_test.go @@ -44,6 +44,13 @@ func TestLoadConfig(t *testing.T) { expected: sampleCfg, configValidateAssert: assert.NoError, }, + { + id: component.NewIDWithName(metadata.Type, "default"), + expected: withDefaultConfig(), + configValidateAssert: func(t assert.TestingT, err error, _ ...any) bool { + return assert.ErrorContains(t, err, "endpoint must be specified") + }, + }, { id: component.NewIDWithName(metadata.Type, "trace"), expected: &Config{ diff --git a/exporter/opensearchexporter/factory.go b/exporter/opensearchexporter/factory.go index a10073ca04ae..418987167a32 100644 --- a/exporter/opensearchexporter/factory.go +++ b/exporter/opensearchexporter/factory.go @@ -41,12 +41,10 @@ func newDefaultConfig() component.Config { func createTracesExporter(ctx context.Context, set exporter.Settings, - cfg component.Config) (exporter.Traces, error) { + cfg component.Config, +) (exporter.Traces, error) { c := cfg.(*Config) - te, e := newSSOTracesExporter(c, set) - if e != nil { - return nil, e - } + te := newSSOTracesExporter(c, set) return exporterhelper.NewTracesExporter(ctx, set, cfg, te.pushTraceData, @@ -58,12 +56,10 @@ func createTracesExporter(ctx context.Context, func createLogsExporter(ctx context.Context, set exporter.Settings, - cfg component.Config) (exporter.Logs, error) { + cfg component.Config, +) (exporter.Logs, error) { c := cfg.(*Config) - le, e := newLogExporter(c, set) - if e != nil { - return nil, e - } + le := newLogExporter(c, set) return exporterhelper.NewLogsExporter(ctx, set, cfg, le.pushLogData, diff --git a/exporter/opensearchexporter/factory_test.go b/exporter/opensearchexporter/factory_test.go index f64dd285231a..1f3ab8ccc31d 100644 --- a/exporter/opensearchexporter/factory_test.go +++ b/exporter/opensearchexporter/factory_test.go @@ -20,30 +20,6 @@ func TestCreateDefaultConfig(t *testing.T) { assert.NoError(t, componenttest.CheckConfigStruct(cfg)) } -func TestFactory_CreateMetricsExporter_Fail(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - params := exportertest.NewNopSettings() - _, err := factory.CreateMetricsExporter(context.Background(), params, cfg) - require.Error(t, err, "expected an error when creating a metrics exporter") -} - -func TestFactory_CreateTracesExporter_Fail(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - params := exportertest.NewNopSettings() - _, err := factory.CreateTracesExporter(context.Background(), params, cfg) - require.Error(t, err, "expected an error when creating a traces exporter") -} - -func TestFactory_CreateLogsExporter_Fail(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - params := exportertest.NewNopSettings() - _, err := factory.CreateLogsExporter(context.Background(), params, cfg) - require.Error(t, err, "expected an error when creating a logs exporter") -} - func TestFactory_CreateTracesExporter(t *testing.T) { factory := NewFactory() cfg := withDefaultConfig(func(cfg *Config) { diff --git a/exporter/opensearchexporter/sso_log_exporter.go b/exporter/opensearchexporter/sso_log_exporter.go index bfa34d90d1f1..fe3584f3e0f0 100644 --- a/exporter/opensearchexporter/sso_log_exporter.go +++ b/exporter/opensearchexporter/sso_log_exporter.go @@ -23,11 +23,7 @@ type logExporter struct { telemetry component.TelemetrySettings } -func newLogExporter(cfg *Config, set exporter.Settings) (*logExporter, error) { - if err := cfg.Validate(); err != nil { - return nil, err - } - +func newLogExporter(cfg *Config, set exporter.Settings) *logExporter { model := &encodeModel{ dedup: cfg.Dedup, dedot: cfg.Dedot, @@ -45,7 +41,7 @@ func newLogExporter(cfg *Config, set exporter.Settings) (*logExporter, error) { bulkAction: cfg.BulkAction, httpSettings: cfg.ClientConfig, model: model, - }, nil + } } func (l *logExporter) Start(ctx context.Context, host component.Host) error { diff --git a/exporter/opensearchexporter/sso_trace_exporter.go b/exporter/opensearchexporter/sso_trace_exporter.go index fdc6dab67e87..3c0f2e4b2211 100644 --- a/exporter/opensearchexporter/sso_trace_exporter.go +++ b/exporter/opensearchexporter/sso_trace_exporter.go @@ -25,11 +25,7 @@ type ssoTracesExporter struct { telemetry component.TelemetrySettings } -func newSSOTracesExporter(cfg *Config, set exporter.Settings) (*ssoTracesExporter, error) { - if err := cfg.Validate(); err != nil { - return nil, err - } - +func newSSOTracesExporter(cfg *Config, set exporter.Settings) *ssoTracesExporter { model := &encodeModel{ dataset: cfg.Dataset, namespace: cfg.Namespace, @@ -42,7 +38,7 @@ func newSSOTracesExporter(cfg *Config, set exporter.Settings) (*ssoTracesExporte bulkAction: cfg.BulkAction, model: model, httpSettings: cfg.ClientConfig, - }, nil + } } func (s *ssoTracesExporter) Start(ctx context.Context, host component.Host) error { diff --git a/exporter/opensearchexporter/testdata/config.yaml b/exporter/opensearchexporter/testdata/config.yaml index a187af23318e..2f967002ed96 100644 --- a/exporter/opensearchexporter/testdata/config.yaml +++ b/exporter/opensearchexporter/testdata/config.yaml @@ -7,6 +7,8 @@ opensearch: http: endpoint: https://opensearch.example.com:9200 +opensearch/default: + opensearch/empty_namespace: dataset: ngnix namespace: "" diff --git a/extension/opampextension/README.md b/extension/opampextension/README.md index 59d7284fc643..4d5227c2e276 100644 --- a/extension/opampextension/README.md +++ b/extension/opampextension/README.md @@ -43,6 +43,7 @@ The following settings are optional for both transports: instance UID remains constant across process restarts. - `capabilities`: Keys with boolean true/false values that enable a particular OpAMP capability. - `reports_effective_config`: Whether to enable the OpAMP ReportsEffectiveConfig capability. Default is `true`. + - `reports_health`: Whether to enable the OpAMP ReportsHealth capability. Default is `true`. - `agent_description`: Setting that modifies the agent description reported to the OpAMP server. - `non_identifying_attributes`: A map of key value pairs that will be added to the [non-identifying attributes](https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#agentdescriptionnon_identifying_attributes) reported to the OpAMP server. If an attribute collides with the default non-identifying attributes that are automatically added, the ones specified here take precedence. - `ppid`: An optional process ID to monitor. When this process is no longer running, the extension will emit a fatal error, causing the collector to exit. This is meant to be set by the Supervisor or some other parent process, and should not be configured manually. diff --git a/extension/opampextension/config.go b/extension/opampextension/config.go index a06adee4fab8..e47ae1894ed0 100644 --- a/extension/opampextension/config.go +++ b/extension/opampextension/config.go @@ -54,6 +54,8 @@ type AgentDescription struct { type Capabilities struct { // ReportsEffectiveConfig enables the OpAMP ReportsEffectiveConfig Capability. (default: true) ReportsEffectiveConfig bool `mapstructure:"reports_effective_config"` + // ReportsHealth enables the OpAMP ReportsHealth Capability. (default: true) + ReportsHealth bool `mapstructure:"reports_health"` } func (caps Capabilities) toAgentCapabilities() protobufs.AgentCapabilities { @@ -63,6 +65,9 @@ func (caps Capabilities) toAgentCapabilities() protobufs.AgentCapabilities { if caps.ReportsEffectiveConfig { agentCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig } + if caps.ReportsHealth { + agentCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth + } return agentCapabilities } diff --git a/extension/opampextension/config_test.go b/extension/opampextension/config_test.go index bbccff4d91aa..7f50970f3184 100644 --- a/extension/opampextension/config_test.go +++ b/extension/opampextension/config_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/open-telemetry/opamp-go/protobufs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/configopaque" @@ -39,6 +40,7 @@ func TestUnmarshalConfig(t *testing.T) { InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ", Capabilities: Capabilities{ ReportsEffectiveConfig: true, + ReportsHealth: true, }, PPIDPollInterval: 5 * time.Second, }, cfg) @@ -63,6 +65,7 @@ func TestUnmarshalHttpConfig(t *testing.T) { InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ", Capabilities: Capabilities{ ReportsEffectiveConfig: true, + ReportsHealth: true, }, PPIDPollInterval: 5 * time.Second, }, cfg) @@ -286,3 +289,41 @@ func TestConfig_Validate(t *testing.T) { }) } } + +func TestCapabilities_toAgentCapabilities(t *testing.T) { + type fields struct { + ReportsEffectiveConfig bool + ReportsHealth bool + } + tests := []struct { + name string + fields fields + want protobufs.AgentCapabilities + }{ + { + name: "default capabilities", + fields: fields{ + ReportsEffectiveConfig: false, + ReportsHealth: false, + }, + want: protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus, + }, + { + name: "all supported capabilities enabled", + fields: fields{ + ReportsEffectiveConfig: true, + ReportsHealth: true, + }, + want: protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus | protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig | protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + caps := Capabilities{ + ReportsEffectiveConfig: tt.fields.ReportsEffectiveConfig, + ReportsHealth: tt.fields.ReportsHealth, + } + assert.Equalf(t, tt.want, caps.toAgentCapabilities(), "toAgentCapabilities()") + }) + } +} diff --git a/extension/opampextension/factory.go b/extension/opampextension/factory.go index ea4ea23a22d6..868c3bc85c65 100644 --- a/extension/opampextension/factory.go +++ b/extension/opampextension/factory.go @@ -27,6 +27,7 @@ func createDefaultConfig() component.Config { Server: &OpAMPServer{}, Capabilities: Capabilities{ ReportsEffectiveConfig: true, + ReportsHealth: true, }, PPIDPollInterval: 5 * time.Second, } diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index db1ef789e738..f984974c6c88 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -32,6 +32,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages" ) +var _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) + type opampAgent struct { cfg *Config logger *zap.Logger @@ -121,6 +123,8 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error { return err } + o.setHealth(&protobufs.ComponentHealth{Healthy: false}) + o.logger.Debug("Starting OpAMP client...") if err := o.opampClient.Start(context.Background(), settings); err != nil { @@ -141,6 +145,7 @@ func (o *opampAgent) Shutdown(ctx context.Context) error { if o.opampClient == nil { return nil } + o.logger.Debug("Stopping OpAMP client...") err := o.opampClient.Stop(ctx) // Opamp-go considers this an error, but the collector does not. @@ -178,6 +183,16 @@ func (o *opampAgent) Register(capability string, opts ...opampcustommessages.Cus return o.customCapabilityRegistry.Register(capability, opts...) } +func (o *opampAgent) Ready() error { + o.setHealth(&protobufs.ComponentHealth{Healthy: true}) + return nil +} + +func (o *opampAgent) NotReady() error { + o.setHealth(&protobufs.ComponentHealth{Healthy: false}) + return nil +} + func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) { o.eclk.Lock() defer o.eclk.Unlock() @@ -344,3 +359,11 @@ func (o *opampAgent) onMessage(_ context.Context, msg *types.MessageData) { o.customCapabilityRegistry.ProcessMessage(msg.CustomMessage) } } + +func (o *opampAgent) setHealth(ch *protobufs.ComponentHealth) { + if o.capabilities.ReportsHealth && o.opampClient != nil { + if err := o.opampClient.SetHealth(ch); err != nil { + o.logger.Error("Could not report health to OpAMP server", zap.Error(err)) + } + } +} diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index e2013d1d45eb..fd72d346492c 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -31,6 +31,7 @@ func TestNewOpampAgent(t *testing.T) { assert.Equal(t, "test version", o.agentVersion) assert.NotEmpty(t, o.instanceID.String()) assert.True(t, o.capabilities.ReportsEffectiveConfig) + assert.True(t, o.capabilities.ReportsHealth) assert.Empty(t, o.effectiveConfig) assert.Nil(t, o.agentDescription) } diff --git a/pkg/stanza/docs/types/field.md b/pkg/stanza/docs/types/field.md index 35f54be3bc9b..b6fb313b8c6d 100644 --- a/pkg/stanza/docs/types/field.md +++ b/pkg/stanza/docs/types/field.md @@ -1,6 +1,6 @@ ## Fields -A _Field_ is a reference to a value in a log [entry](../types/field.md). +A _Field_ is a reference to a value in a log [entry](../types/entry.md). Many [operators](../operators/README.md) use fields in their configurations. For example, parsers use fields to specify which value to parse and where to write a new value. diff --git a/receiver/mysqlreceiver/client.go b/receiver/mysqlreceiver/client.go index 736257e6a623..3aeabf1b777b 100644 --- a/receiver/mysqlreceiver/client.go +++ b/receiver/mysqlreceiver/client.go @@ -12,11 +12,12 @@ import ( // registers the mysql driver "github.com/go-sql-driver/mysql" + "github.com/hashicorp/go-version" ) type client interface { Connect() error - getVersion() (string, error) + getVersion() (*version.Version, error) getGlobalStats() (map[string]string, error) getInnodbStats() (map[string]string, error) getTableStats() ([]TableStats, error) @@ -110,66 +111,83 @@ type tableLockWaitEventStats struct { } type ReplicaStatusStats struct { - replicaIOState string - sourceHost string - sourceUser string - sourcePort int64 - connectRetry int64 - sourceLogFile string - readSourceLogPos int64 - relayLogFile string - relayLogPos int64 - relaySourceLogFile string - replicaIORunning string - replicaSQLRunning string - replicateDoDB string - replicateIgnoreDB string - replicateDoTable string - replicateIgnoreTable string - replicateWildDoTable string - replicateWildIgnoreTable string - lastErrno int64 - lastError string - skipCounter int64 - execSourceLogPos int64 - relayLogSpace int64 - untilCondition string - untilLogFile string - untilLogPos string - sourceSSLAllowed string - sourceSSLCAFile string - sourceSSLCAPath string - sourceSSLCert string - sourceSSLCipher string - sourceSSLKey string - secondsBehindSource sql.NullInt64 - sourceSSLVerifyServerCert string - lastIOErrno int64 - lastIOError string - lastSQLErrno int64 - lastSQLError string - replicateIgnoreServerIDs string - sourceServerID int64 - sourceUUID string - sourceInfoFile string - sqlDelay int64 - sqlRemainingDelay sql.NullInt64 - replicaSQLRunningState string - sourceRetryCount int64 - sourceBind string - lastIOErrorTimestamp string - lastSQLErrorTimestamp string - sourceSSLCrl string - sourceSSLCrlpath string - retrievedGtidSet string - executedGtidSet string - autoPosition string - replicateRewriteDB string - channelName string - sourceTLSVersion string - sourcePublicKeyPath string - getSourcePublicKey int64 - networkNamespace string + replicaIOState string + sourceHost string + sourceUser string + sourcePort int64 + connectRetry int64 + sourceLogFile string + readSourceLogPos int64 + relayLogFile string + relayLogPos int64 + relaySourceLogFile string + replicaIORunning string + replicaSQLRunning string + replicateDoDB string + replicateIgnoreDB string + replicateDoTable string + replicateIgnoreTable string + replicateWildDoTable string + replicateWildIgnoreTable string + lastErrno int64 + lastError string + skipCounter int64 + execSourceLogPos int64 + relayLogSpace int64 + untilCondition string + untilLogFile string + untilLogPos string + sourceSSLAllowed string + sourceSSLCAFile string + sourceSSLCAPath string + sourceSSLCert string + sourceSSLCipher string + sourceSSLKey string + secondsBehindSource sql.NullInt64 + sourceSSLVerifyServerCert string + lastIOErrno int64 + lastIOError string + lastSQLErrno int64 + lastSQLError string + replicateIgnoreServerIDs string + sourceServerID int64 + sourceUUID string + sourceInfoFile string + sqlDelay int64 + sqlRemainingDelay sql.NullInt64 + replicaSQLRunningState string + sourceRetryCount int64 + sourceBind string + lastIOErrorTimestamp string + lastSQLErrorTimestamp string + sourceSSLCrl string + sourceSSLCrlpath string + retrievedGtidSet string + executedGtidSet string + autoPosition string + replicateRewriteDB string + channelName string + sourceTLSVersion string + sourcePublicKeyPath string + getSourcePublicKey int64 + networkNamespace string + usingGtid string + gtidIoPos string + slaveDdlGroups int64 + slaveNonTransactionalGroups int64 + slaveTransactionalGroups int64 + retriedTransactions int64 + maxRelayLogSize int64 + executedLogEntries int64 + slaveReceivedHeartbeats int64 + slaveHeartbeatPeriod int64 + gtidSlavePos string + masterLastEventTime string + slaveLastEventTime string + masterSlaveTimeDiff string + parallelMode string + replicateDoDomainIDs string + replicateIgnoreDomainIDs string } var _ client = (*mySQLClient)(nil) @@ -218,15 +236,15 @@ func (c *mySQLClient) Connect() error { } // getVersion queries the db for the version. -func (c *mySQLClient) getVersion() (string, error) { +func (c *mySQLClient) getVersion() (*version.Version, error) { query := "SELECT VERSION();" - var version string - err := c.client.QueryRow(query).Scan(&version) + var versionStr string + err := c.client.QueryRow(query).Scan(&versionStr) if err != nil { - return "", err + return nil, err } - - return version, nil + version, err := version.NewVersion(versionStr) + return version, err } // getGlobalStats queries the db for global status metrics. @@ -397,16 +415,19 @@ func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, e } func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { - version, err := c.getVersion() + mysqlVersion, err := c.getVersion() if err != nil { return nil, err } - if version < "8.0.22" { - return nil, nil + query := "SHOW REPLICA STATUS" + minMysqlVersion, _ := version.NewVersion("8.0.22") + if strings.Contains(mysqlVersion.String(), "MariaDB") { + query = "SHOW SLAVE STATUS" + } else if mysqlVersion.LessThan(minMysqlVersion) { + query = "SHOW SLAVE STATUS" } - query := "SHOW REPLICA STATUS" rows, err := c.client.Query(query) if err != nil { @@ -427,28 +448,46 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { switch strings.ToLower(col) { case "replica_io_state": dest = append(dest, &s.replicaIOState) + case "slave_io_state": + dest = append(dest, &s.replicaIOState) case "source_host": dest = append(dest, &s.sourceHost) + case "master_host": + dest = append(dest, &s.sourceHost) case "source_user": dest = append(dest, &s.sourceUser) + case "master_user": + dest = append(dest, &s.sourceUser) case "source_port": dest = append(dest, &s.sourcePort) + case "master_port": + dest = append(dest, &s.sourcePort) case "connect_retry": dest = append(dest, &s.connectRetry) case "source_log_file": dest = append(dest, &s.sourceLogFile) + case "master_log_file": + dest = append(dest, &s.sourceLogFile) case "read_source_log_pos": dest = append(dest, &s.readSourceLogPos) + case "read_master_log_pos": + dest = append(dest, &s.readSourceLogPos) case "relay_log_file": dest = append(dest, &s.relayLogFile) case "relay_log_pos": dest = append(dest, &s.relayLogPos) case "relay_source_log_file": dest = append(dest, &s.relaySourceLogFile) + case "relay_master_log_file": + dest = append(dest, &s.relaySourceLogFile) case "replica_io_running": dest = append(dest, &s.replicaIORunning) + case "slave_io_running": + dest = append(dest, &s.replicaIORunning) case "replica_sql_running": dest = append(dest, &s.replicaSQLRunning) + case "slave_sql_running": + dest = append(dest, &s.replicaSQLRunning) case "replicate_do_db": dest = append(dest, &s.replicateDoDB) case "replicate_ignore_db": @@ -469,6 +508,8 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { dest = append(dest, &s.skipCounter) case "exec_source_log_pos": dest = append(dest, &s.execSourceLogPos) + case "exec_master_log_pos": + dest = append(dest, &s.execSourceLogPos) case "relay_log_space": dest = append(dest, &s.relayLogSpace) case "until_condition": @@ -479,20 +520,36 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { dest = append(dest, &s.untilLogPos) case "source_ssl_allowed": dest = append(dest, &s.sourceSSLAllowed) + case "master_ssl_allowed": + dest = append(dest, &s.sourceSSLAllowed) case "source_ssl_ca_file": dest = append(dest, &s.sourceSSLCAFile) + case "master_ssl_ca_file": + dest = append(dest, &s.sourceSSLCAFile) case "source_ssl_ca_path": dest = append(dest, &s.sourceSSLCAPath) + case "master_ssl_ca_path": + dest = append(dest, &s.sourceSSLCAPath) case "source_ssl_cert": dest = append(dest, &s.sourceSSLCert) + case "master_ssl_cert": + dest = append(dest, &s.sourceSSLCert) case "source_ssl_cipher": dest = append(dest, &s.sourceSSLCipher) + case "master_ssl_cipher": + dest = append(dest, &s.sourceSSLCipher) case "source_ssl_key": dest = append(dest, &s.sourceSSLKey) + case "master_ssl_key": + dest = append(dest, &s.sourceSSLKey) case "seconds_behind_source": dest = append(dest, &s.secondsBehindSource) + case "seconds_behind_master": + dest = append(dest, &s.secondsBehindSource) case "source_ssl_verify_server_cert": dest = append(dest, &s.sourceSSLVerifyServerCert) + case "master_ssl_verify_server_cert": + dest = append(dest, &s.sourceSSLVerifyServerCert) case "last_io_errno": dest = append(dest, &s.lastIOErrno) case "last_io_error": @@ -505,28 +562,44 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { dest = append(dest, &s.replicateIgnoreServerIDs) case "source_server_id": dest = append(dest, &s.sourceServerID) + case "master_server_id": + dest = append(dest, &s.sourceServerID) case "source_uuid": dest = append(dest, &s.sourceUUID) + case "master_uuid": + dest = append(dest, &s.sourceUUID) case "source_info_file": dest = append(dest, &s.sourceInfoFile) + case "master_info_file": + dest = append(dest, &s.sourceInfoFile) case "sql_delay": dest = append(dest, &s.sqlDelay) case "sql_remaining_delay": dest = append(dest, &s.sqlRemainingDelay) case "replica_sql_running_state": dest = append(dest, &s.replicaSQLRunningState) + case "slave_sql_running_state": + dest = append(dest, &s.replicaSQLRunningState) case "source_retry_count": dest = append(dest, &s.sourceRetryCount) + case "master_retry_count": + dest = append(dest, &s.sourceRetryCount) case "source_bind": dest = append(dest, &s.sourceBind) + case "master_bind": + dest = append(dest, &s.sourceBind) case "last_io_error_timestamp": dest = append(dest, &s.lastIOErrorTimestamp) case "last_sql_error_timestamp": dest = append(dest, &s.lastSQLErrorTimestamp) case "source_ssl_crl": dest = append(dest, &s.sourceSSLCrl) + case "master_ssl_crl": + dest = append(dest, &s.sourceSSLCrl) case "source_ssl_crlpath": dest = append(dest, &s.sourceSSLCrlpath) + case "master_ssl_crlpath": + dest = append(dest, &s.sourceSSLCrlpath) case "retrieved_gtid_set": dest = append(dest, &s.retrievedGtidSet) case "executed_gtid_set": @@ -539,12 +612,52 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { dest = append(dest, &s.channelName) case "source_tls_version": dest = append(dest, &s.sourceTLSVersion) + case "master_tls_version": + dest = append(dest, &s.sourceTLSVersion) case "source_public_key_path": dest = append(dest, &s.sourcePublicKeyPath) + case "master_public_key_path": + dest = append(dest, &s.sourcePublicKeyPath) case "get_source_public_key": dest = append(dest, &s.getSourcePublicKey) + case "get_master_public_key": + dest = append(dest, &s.getSourcePublicKey) case "network_namespace": dest = append(dest, &s.networkNamespace) + case "using_gtid": + dest = append(dest, &s.usingGtid) + case "gtid_io_pos": + dest = append(dest, &s.gtidIoPos) + case "slave_ddl_groups": + dest = append(dest, &s.slaveDdlGroups) + case "slave_non_transactional_groups": + dest = append(dest, &s.slaveNonTransactionalGroups) + case "slave_transactional_groups": + dest = append(dest, &s.slaveTransactionalGroups) + case "retried_transactions": + dest = append(dest, &s.retriedTransactions) + case "max_relay_log_size": + dest = append(dest, &s.maxRelayLogSize) + case "executed_log_entries": + dest = append(dest, &s.executedLogEntries) + case "slave_received_heartbeats": + dest = append(dest, &s.slaveReceivedHeartbeats) + case "slave_heartbeat_period": + dest = append(dest, &s.slaveHeartbeatPeriod) + case "gtid_slave_pos": + dest = append(dest, &s.gtidSlavePos) + case "master_last_event_time": + dest = append(dest, &s.masterLastEventTime) + case "slave_last_event_time": + dest = append(dest, &s.slaveLastEventTime) + case "master_slave_time_diff": + dest = append(dest, &s.masterSlaveTimeDiff) + case "parallel_mode": + dest = append(dest, &s.parallelMode) + case "replicate_do_domain_ids": + dest = append(dest, &s.replicateDoDomainIDs) + case "replicate_ignore_domain_ids": + dest = append(dest, &s.replicateIgnoreDomainIDs) default: return nil, fmt.Errorf("unknown column name %s for replica status", col) } diff --git a/receiver/mysqlreceiver/go.mod b/receiver/mysqlreceiver/go.mod index 44c73fe67420..9f70a56d04ab 100644 --- a/receiver/mysqlreceiver/go.mod +++ b/receiver/mysqlreceiver/go.mod @@ -48,6 +48,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect diff --git a/receiver/mysqlreceiver/go.sum b/receiver/mysqlreceiver/go.sum index cf30c4b18167..a054b1a95e2a 100644 --- a/receiver/mysqlreceiver/go.sum +++ b/receiver/mysqlreceiver/go.sum @@ -58,6 +58,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/receiver/mysqlreceiver/scraper_test.go b/receiver/mysqlreceiver/scraper_test.go index 01dc4dd11840..8753752e948a 100644 --- a/receiver/mysqlreceiver/scraper_test.go +++ b/receiver/mysqlreceiver/scraper_test.go @@ -12,6 +12,7 @@ import ( "strings" "testing" + "github.com/hashicorp/go-version" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/confignet" @@ -158,8 +159,9 @@ func (c *mockClient) Connect() error { return nil } -func (c *mockClient) getVersion() (string, error) { - return "8.0.27", nil +func (c *mockClient) getVersion() (*version.Version, error) { + version, _ := version.NewVersion("8.0.27") + return version, nil } func (c *mockClient) getGlobalStats() (map[string]string, error) {