From 8d2934d47d2226f05ad42020ae605a7e90562233 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Thu, 24 Oct 2024 18:00:39 -0700 Subject: [PATCH] Add opensearch related metrics Signed-off-by: Tomoyuki Morita --- .../flint/core/IRestHighLevelClient.java | 6 ++ .../core/RestHighLevelClientWrapper.java | 79 ++++++++++++------- .../flint/core/metrics/MetricConstants.java | 18 +++++ .../storage/OpenSearchBulkRetryWrapper.java | 12 ++- 4 files changed, 84 insertions(+), 31 deletions(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java index 04ef216c4..9facd89ef 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java @@ -87,6 +87,10 @@ static void recordOperationSuccess(String metricNamePrefix) { MetricsUtil.incrementCounter(successMetricName); } + static void recordLatency(String metricNamePrefix, long latencyMilliseconds) { + MetricsUtil.addHistoricGauge(metricNamePrefix + ".processingTime", latencyMilliseconds); + } + /** * Records the failure of an OpenSearch operation by incrementing the corresponding metric counter. * If the exception is an OpenSearchException with a specific status code (e.g., 403), @@ -107,6 +111,8 @@ static void recordOperationFailure(String metricNamePrefix, Exception e) { if (statusCode == 403) { String forbiddenErrorMetricName = metricNamePrefix + ".403.count"; MetricsUtil.incrementCounter(forbiddenErrorMetricName); + } else if (statusCode == 429) { + MetricsUtil.incrementCounter(metricNamePrefix + ".429.count"); } String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count"; diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java index 1b83f032a..31f012256 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -5,6 +5,8 @@ package org.opensearch.flint.core; +import java.util.Arrays; +import java.util.function.Consumer; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; @@ -40,7 +42,10 @@ import org.opensearch.flint.core.storage.BulkRequestRateLimiter; import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper; +import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX; +import static org.opensearch.flint.core.metrics.MetricConstants.OS_CREATE_OP_METRIC_PREFIX; import static org.opensearch.flint.core.metrics.MetricConstants.OS_READ_OP_METRIC_PREFIX; +import static org.opensearch.flint.core.metrics.MetricConstants.OS_SEARCH_OP_METRIC_PREFIX; import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX; /** @@ -67,112 +72,126 @@ public RestHighLevelClientWrapper(RestHighLevelClient client, BulkRequestRateLim @Override public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> { + return execute(() -> { try { rateLimiter.acquirePermit(); return bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); } catch (InterruptedException e) { throw new RuntimeException("rateLimiter.acquirePermit was interrupted.", e); } - }); + }, OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX); } @Override public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.clearScroll(clearScrollRequest, options)); + return execute(() -> client.clearScroll(clearScrollRequest, options), + OS_READ_OP_METRIC_PREFIX); } @Override public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options)); + return execute(() -> client.indices().create(createIndexRequest, options), + OS_WRITE_OP_METRIC_PREFIX, OS_CREATE_OP_METRIC_PREFIX); } @Override public void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException { - execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().putMapping(putMappingRequest, options)); + execute(() -> client.indices().putMapping(putMappingRequest, options), + OS_WRITE_OP_METRIC_PREFIX); } @Override public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException { - execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options)); + execute(() -> client.indices().delete(deleteIndexRequest, options), + OS_WRITE_OP_METRIC_PREFIX); } @Override public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.delete(deleteRequest, options)); + return execute(() -> client.delete(deleteRequest, options), OS_WRITE_OP_METRIC_PREFIX); } @Override public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.get(getRequest, options)); + return execute(() -> client.get(getRequest, options), OS_READ_OP_METRIC_PREFIX); } @Override public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().get(getIndexRequest, options)); + return execute(() -> client.indices().get(getIndexRequest, options), + OS_READ_OP_METRIC_PREFIX); } @Override public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.index(indexRequest, options)); + return execute(() -> client.index(indexRequest, options), OS_WRITE_OP_METRIC_PREFIX); } @Override public Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options)); + return execute(() -> client.indices().exists(getIndexRequest, options), + OS_READ_OP_METRIC_PREFIX); } @Override public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.search(searchRequest, options)); + return execute(() -> client.search(searchRequest, options), OS_READ_OP_METRIC_PREFIX, OS_SEARCH_OP_METRIC_PREFIX); } @Override public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.scroll(searchScrollRequest, options)); + return execute(() -> client.scroll(searchScrollRequest, options), OS_READ_OP_METRIC_PREFIX); } @Override public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options)); + return execute(() -> client.update(updateRequest, options), OS_WRITE_OP_METRIC_PREFIX); } - @Override - public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, - () -> { - OpenSearchClient openSearchClient = - new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(), - new JacksonJsonpMapper())); - return openSearchClient.indices().stats(request); - }); - } + @Override + public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException { + return execute(() -> { + OpenSearchClient openSearchClient = + new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(), + new JacksonJsonpMapper())); + return openSearchClient.indices().stats(request); + }, OS_READ_OP_METRIC_PREFIX + ); + } @Override public CreatePitResponse createPit(CreatePitRequest request) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request)); + return execute(() -> openSearchClient().createPit(request), OS_WRITE_OP_METRIC_PREFIX); } /** * Executes a given operation, tracks metrics, and handles exceptions. * - * @param metricNamePrefix the prefix for the metric name - * @param operation the operation to execute * @param the return type of the operation + * @param operation the operation to execute + * @param metricNamePrefixes array of prefixes for the metric name * @return the result of the operation * @throws IOException if an I/O exception occurs */ - private T execute(String metricNamePrefix, IOCallable operation) throws IOException { + private T execute(IOCallable operation, String... metricNamePrefixes) throws IOException { + long startTime = System.currentTimeMillis(); try { T result = operation.call(); - IRestHighLevelClient.recordOperationSuccess(metricNamePrefix); + eachPrefix(IRestHighLevelClient::recordOperationSuccess, metricNamePrefixes); return result; } catch (Exception e) { - IRestHighLevelClient.recordOperationFailure(metricNamePrefix, e); + eachPrefix(prefix -> IRestHighLevelClient.recordOperationFailure(prefix, e), metricNamePrefixes); throw e; + } finally { + long latency = System.currentTimeMillis() - startTime; + eachPrefix(prefix -> IRestHighLevelClient.recordLatency(prefix, latency), metricNamePrefixes); } } + private static void eachPrefix(Consumer fn, String... prefixes) { + Arrays.stream(prefixes).forEach(fn); + } + /** * Functional interface for operations that can throw IOException. * diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 427fab9fe..ecd8c009b 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -22,6 +22,24 @@ public final class MetricConstants { */ public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write"; + /** + * Prefixes for OpenSearch API specific metrics + */ + public static final String OS_CREATE_OP_METRIC_PREFIX = "opensearch.create"; + public static final String OS_SEARCH_OP_METRIC_PREFIX = "opensearch.search"; + public static final String OS_BULK_OP_METRIC_PREFIX = "opensearch.bulk"; + + /** + * Metric name for request size of opensearch bulk request + */ + public static final String OPENSEARCH_BULK_SIZE_METRIC = "opensearch.bulk.size.count"; + + /** + * Metric name for opensearch bulk request retry count + */ + public static final String OPENSEARCH_BULK_RETRY_COUNT_METRIC = "opensearch.bulk.retry.count"; + public static final String OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC = "opensearch.bulk.allRetryFailed.count"; + /** * Metric name for counting the errors encountered with Amazon S3 operations. */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java index 279c9b642..0545a0b47 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java @@ -6,6 +6,7 @@ import dev.failsafe.function.CheckedPredicate; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import org.opensearch.action.DocWriteRequest; @@ -15,6 +16,8 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.flint.core.http.FlintRetryOptions; +import org.opensearch.flint.core.metrics.MetricConstants; +import org.opensearch.flint.core.metrics.MetricsUtil; import org.opensearch.rest.RestStatus; public class OpenSearchBulkRetryWrapper { @@ -37,20 +40,27 @@ public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions) { */ public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest, RequestOptions options) { + final AtomicInteger retryCount = new AtomicInteger(0); try { final AtomicReference nextRequest = new AtomicReference<>(bulkRequest); - return Failsafe + BulkResponse res = Failsafe .with(retryPolicy) .get(() -> { BulkResponse response = client.bulk(nextRequest.get(), options); if (retryPolicy.getConfig().allowsRetries() && bulkItemRetryableResultPredicate.test( response)) { nextRequest.set(getRetryableRequest(nextRequest.get(), response)); + retryCount.incrementAndGet(); } return response; }); + MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, bulkRequest.estimatedSizeInBytes()); + MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, retryCount.get()); + return res; } catch (FailsafeException ex) { LOG.severe("Request failed permanently. Re-throwing original exception."); + MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, retryCount.get() - 1); + MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC, 1); // unwrap original exception and throw throw new RuntimeException(ex.getCause());