Skip to content

Commit

Permalink
Merge branch 'main' into refactor-zipkinreceiver
Browse files Browse the repository at this point in the history
  • Loading branch information
odubajDT authored Oct 18, 2024
2 parents d86bd67 + d5c641a commit 19be99b
Show file tree
Hide file tree
Showing 32 changed files with 640 additions and 204 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_deprecate-retry-maxrequests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate retry::max_requests in favor of retry::max_retries

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32344]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: retry::max_retries will be exactly retry::max_requests - 1

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Preserve attribute names and metric names on prefix conflict in OTel mapping mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35651]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: e.g. if there are attributes "a" and "a.b", they should be sent to Elasticsearch as is, instead of "a.value" and "a.b", in OTel mapping mode

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
27 changes: 27 additions & 0 deletions .chloggen/mysqlreceiver_older_replica_metrics_support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: mysqlreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add replica metric support for versions of MySQL earlier than 8.0.22.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35217]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/opamp-extension-reportshealth.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implement `ReportsHealth` capability in OpAMP extension

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35433]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 2 additions & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ The behaviour of this bulk indexing can be configured with the following setting
- `interval` (default=30s): Write buffer flush time limit.
- `retry`: Elasticsearch bulk request retry settings
- `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff.
- `max_requests` (default=3): Number of HTTP request retries.
- `max_requests` (DEPRECATED, use retry::max_retries instead): Number of HTTP request retries including the initial attempt. If used, `retry::max_retries` will be set to `max_requests - 1`.
- `max_retries` (default=2): Number of HTTP request retries. To disable retries, set `retry::enabled` to `false` instead of setting `max_retries` to `0`.
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
- `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`.
Expand Down
43 changes: 19 additions & 24 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,34 @@ type bulkIndexerSession interface {
Flush(context.Context) error
}

const defaultMaxRetries = 2

func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) {
if config.Batcher.Enabled != nil {
return newSyncBulkIndexer(logger, client, config), nil
}
return newAsyncBulkIndexer(logger, client, config)
}

func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer {
var maxDocRetry int
func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender.BulkIndexerConfig {
var maxDocRetries int
if config.Retry.Enabled {
// max_requests includes initial attempt
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
maxDocRetry = config.Retry.MaxRequests - 1
maxDocRetries = defaultMaxRetries
if config.Retry.MaxRetries != 0 {
maxDocRetries = config.Retry.MaxRetries
}
}
return docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetries,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
}
}

func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer {
return &syncBulkIndexer{
config: docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetry,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
},
config: bulkIndexerConfig(client, config),
flushTimeout: config.Timeout,
retryConfig: config.Retry,
logger: logger,
Expand Down Expand Up @@ -165,13 +172,6 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi
flushBytes = 5e+6
}

var maxDocRetry int
if config.Retry.Enabled {
// max_requests includes initial attempt
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
maxDocRetry = config.Retry.MaxRequests - 1
}

pool := &asyncBulkIndexer{
wg: sync.WaitGroup{},
items: make(chan docappender.BulkIndexerItem, config.NumWorkers),
Expand All @@ -180,12 +180,7 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi
pool.wg.Add(numWorkers)

for i := 0; i < numWorkers; i++ {
bi, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetry,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
})
bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config))
if err != nil {
return nil, err
}
Expand Down
24 changes: 22 additions & 2 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,13 @@ type RetrySettings struct {
// Enabled allows users to disable retry without having to comment out all settings.
Enabled bool `mapstructure:"enabled"`

// MaxRequests configures how often an HTTP request is retried before it is assumed to be failed.
// MaxRequests configures how often an HTTP request is attempted before it is assumed to be failed.
// Deprecated: use MaxRetries instead.
MaxRequests int `mapstructure:"max_requests"`

// MaxRetries configures how many times an HTTP request is retried.
MaxRetries int `mapstructure:"max_retries"`

// InitialInterval configures the initial waiting time if a request failed.
InitialInterval time.Duration `mapstructure:"initial_interval"`

Expand Down Expand Up @@ -273,6 +277,17 @@ func (cfg *Config) Validate() error {
// TODO support confighttp.ClientConfig.Compression
return errors.New("compression is not currently configurable")
}

if cfg.Retry.MaxRequests != 0 && cfg.Retry.MaxRetries != 0 {
return errors.New("must not specify both retry::max_requests and retry::max_retries")
}
if cfg.Retry.MaxRequests < 0 {
return errors.New("retry::max_requests should be non-negative")
}
if cfg.Retry.MaxRetries < 0 {
return errors.New("retry::max_retries should be non-negative")
}

return nil
}

Expand Down Expand Up @@ -355,11 +370,16 @@ func (cfg *Config) MappingMode() MappingMode {
return mappingModes[cfg.Mapping.Mode]
}

func logConfigDeprecationWarnings(cfg *Config, logger *zap.Logger) {
func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) {
if cfg.Mapping.Dedup != nil {
logger.Warn("dedup is deprecated, and is always enabled")
}
if cfg.Mapping.Dedot && cfg.MappingMode() != MappingECS || !cfg.Mapping.Dedot && cfg.MappingMode() == MappingECS {
logger.Warn("dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only")
}
if cfg.Retry.MaxRequests != 0 {
cfg.Retry.MaxRetries = cfg.Retry.MaxRequests - 1
// Do not set cfg.Retry.Enabled = false if cfg.Retry.MaxRequest = 1 to avoid breaking change on behavior
logger.Warn("retry::max_requests has been deprecated, and will be removed in a future version. Use retry::max_retries instead.")
}
}
14 changes: 11 additions & 3 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -391,6 +391,14 @@ func TestConfig_Validate(t *testing.T) {
}),
err: `compression is not currently configurable`,
},
"both max_retries and max_requests specified": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"http://test:9200"}
cfg.Retry.MaxRetries = 1
cfg.Retry.MaxRequests = 1
}),
err: `must not specify both retry::max_requests and retry::max_retries`,
},
}

for name, tt := range tests {
Expand Down
17 changes: 6 additions & 11 deletions exporter/elasticsearchexporter/esclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@ func newElasticsearchClient(
headers := make(http.Header)
headers.Set("User-Agent", userAgent)

// maxRetries configures the maximum number of event publishing attempts,
// including the first send and additional retries.

maxRetries := config.Retry.MaxRequests - 1
retryDisabled := !config.Retry.Enabled || maxRetries <= 0

if retryDisabled {
maxRetries = 0
}

// endpoints converts Config.Endpoints, Config.CloudID,
// and Config.ClientConfig.Endpoint to a list of addresses.
endpoints, err := config.endpoints()
Expand All @@ -113,6 +103,11 @@ func newElasticsearchClient(
logResponseBody: config.LogResponseBody,
}

maxRetries := defaultMaxRetries
if config.Retry.MaxRetries != 0 {
maxRetries = config.Retry.MaxRetries
}

return elasticsearch.NewClient(elasticsearch.Config{
Transport: httpClient.Transport,

Expand All @@ -125,7 +120,7 @@ func newElasticsearchClient(

// configure retry behavior
RetryOnStatus: config.Retry.RetryOnStatus,
DisableRetry: retryDisabled,
DisableRetry: !config.Retry.Enabled,
EnableRetryOnTimeout: config.Retry.Enabled,
//RetryOnError: retryOnError, // should be used from esclient version 8 onwards
MaxRetries: maxRetries,
Expand Down
Loading

0 comments on commit 19be99b

Please sign in to comment.