From 830fc1c8b68fca2350d61ebc144405374b5040d4 Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Fri, 25 Oct 2024 17:22:21 -0700 Subject: [PATCH] Add opensearch related metrics (#818) * Add opensearch related metrics Signed-off-by: Tomoyuki Morita * Add unit tests Signed-off-by: Tomoyuki Morita --------- Signed-off-by: Tomoyuki Morita --- .../flint/core/IRestHighLevelClient.java | 6 + .../core/RestHighLevelClientWrapper.java | 79 ++++---- .../flint/core/metrics/HistoricGauge.java | 6 + .../flint/core/metrics/MetricConstants.java | 18 ++ .../storage/OpenSearchBulkRetryWrapper.java | 17 +- .../flint/core/metrics/MetricsTestUtil.java | 79 ++++++++ .../OpenSearchBulkRetryWrapperTest.java | 170 +++++++++++------- 7 files changed, 277 insertions(+), 98 deletions(-) create mode 100644 flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsTestUtil.java diff --git a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java index 04ef216c4..9facd89ef 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java @@ -87,6 +87,10 @@ static void recordOperationSuccess(String metricNamePrefix) { MetricsUtil.incrementCounter(successMetricName); } + static void recordLatency(String metricNamePrefix, long latencyMilliseconds) { + MetricsUtil.addHistoricGauge(metricNamePrefix + ".processingTime", latencyMilliseconds); + } + /** * Records the failure of an OpenSearch operation by incrementing the corresponding metric counter. * If the exception is an OpenSearchException with a specific status code (e.g., 403), @@ -107,6 +111,8 @@ static void recordOperationFailure(String metricNamePrefix, Exception e) { if (statusCode == 403) { String forbiddenErrorMetricName = metricNamePrefix + ".403.count"; MetricsUtil.incrementCounter(forbiddenErrorMetricName); + } else if (statusCode == 429) { + MetricsUtil.incrementCounter(metricNamePrefix + ".429.count"); } String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count"; diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java index 1b83f032a..31f012256 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -5,6 +5,8 @@ package org.opensearch.flint.core; +import java.util.Arrays; +import java.util.function.Consumer; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; @@ -40,7 +42,10 @@ import org.opensearch.flint.core.storage.BulkRequestRateLimiter; import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper; +import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX; +import static org.opensearch.flint.core.metrics.MetricConstants.OS_CREATE_OP_METRIC_PREFIX; import static org.opensearch.flint.core.metrics.MetricConstants.OS_READ_OP_METRIC_PREFIX; +import static org.opensearch.flint.core.metrics.MetricConstants.OS_SEARCH_OP_METRIC_PREFIX; import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX; /** @@ -67,112 +72,126 @@ public RestHighLevelClientWrapper(RestHighLevelClient client, BulkRequestRateLim @Override public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> { + return execute(() -> { try { rateLimiter.acquirePermit(); return bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); } catch (InterruptedException e) { throw new RuntimeException("rateLimiter.acquirePermit was interrupted.", e); } - }); + }, OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX); } @Override public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.clearScroll(clearScrollRequest, options)); + return execute(() -> client.clearScroll(clearScrollRequest, options), + OS_READ_OP_METRIC_PREFIX); } @Override public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options)); + return execute(() -> client.indices().create(createIndexRequest, options), + OS_WRITE_OP_METRIC_PREFIX, OS_CREATE_OP_METRIC_PREFIX); } @Override public void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException { - execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().putMapping(putMappingRequest, options)); + execute(() -> client.indices().putMapping(putMappingRequest, options), + OS_WRITE_OP_METRIC_PREFIX); } @Override public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException { - execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options)); + execute(() -> client.indices().delete(deleteIndexRequest, options), + OS_WRITE_OP_METRIC_PREFIX); } @Override public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.delete(deleteRequest, options)); + return execute(() -> client.delete(deleteRequest, options), OS_WRITE_OP_METRIC_PREFIX); } @Override public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.get(getRequest, options)); + return execute(() -> client.get(getRequest, options), OS_READ_OP_METRIC_PREFIX); } @Override public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().get(getIndexRequest, options)); + return execute(() -> client.indices().get(getIndexRequest, options), + OS_READ_OP_METRIC_PREFIX); } @Override public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.index(indexRequest, options)); + return execute(() -> client.index(indexRequest, options), OS_WRITE_OP_METRIC_PREFIX); } @Override public Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options)); + return execute(() -> client.indices().exists(getIndexRequest, options), + OS_READ_OP_METRIC_PREFIX); } @Override public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.search(searchRequest, options)); + return execute(() -> client.search(searchRequest, options), OS_READ_OP_METRIC_PREFIX, OS_SEARCH_OP_METRIC_PREFIX); } @Override public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, () -> client.scroll(searchScrollRequest, options)); + return execute(() -> client.scroll(searchScrollRequest, options), OS_READ_OP_METRIC_PREFIX); } @Override public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options)); + return execute(() -> client.update(updateRequest, options), OS_WRITE_OP_METRIC_PREFIX); } - @Override - public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException { - return execute(OS_READ_OP_METRIC_PREFIX, - () -> { - OpenSearchClient openSearchClient = - new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(), - new JacksonJsonpMapper())); - return openSearchClient.indices().stats(request); - }); - } + @Override + public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException { + return execute(() -> { + OpenSearchClient openSearchClient = + new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(), + new JacksonJsonpMapper())); + return openSearchClient.indices().stats(request); + }, OS_READ_OP_METRIC_PREFIX + ); + } @Override public CreatePitResponse createPit(CreatePitRequest request) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request)); + return execute(() -> openSearchClient().createPit(request), OS_WRITE_OP_METRIC_PREFIX); } /** * Executes a given operation, tracks metrics, and handles exceptions. * - * @param metricNamePrefix the prefix for the metric name - * @param operation the operation to execute * @param the return type of the operation + * @param operation the operation to execute + * @param metricNamePrefixes array of prefixes for the metric name * @return the result of the operation * @throws IOException if an I/O exception occurs */ - private T execute(String metricNamePrefix, IOCallable operation) throws IOException { + private T execute(IOCallable operation, String... metricNamePrefixes) throws IOException { + long startTime = System.currentTimeMillis(); try { T result = operation.call(); - IRestHighLevelClient.recordOperationSuccess(metricNamePrefix); + eachPrefix(IRestHighLevelClient::recordOperationSuccess, metricNamePrefixes); return result; } catch (Exception e) { - IRestHighLevelClient.recordOperationFailure(metricNamePrefix, e); + eachPrefix(prefix -> IRestHighLevelClient.recordOperationFailure(prefix, e), metricNamePrefixes); throw e; + } finally { + long latency = System.currentTimeMillis() - startTime; + eachPrefix(prefix -> IRestHighLevelClient.recordLatency(prefix, latency), metricNamePrefixes); } } + private static void eachPrefix(Consumer fn, String... prefixes) { + Arrays.stream(prefixes).forEach(fn); + } + /** * Functional interface for operations that can throw IOException. * diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/HistoricGauge.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/HistoricGauge.java index 181bf8575..8e5288110 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/HistoricGauge.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/HistoricGauge.java @@ -6,6 +6,7 @@ package org.opensearch.flint.core.metrics; import com.codahale.metrics.Gauge; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -60,4 +61,9 @@ public List pollDataPoints() { } return result; } + + @VisibleForTesting + public List getDataPoints() { + return dataPoints; + } } diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 48adbc3d1..35297de6a 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -22,6 +22,24 @@ public final class MetricConstants { */ public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write"; + /** + * Prefixes for OpenSearch API specific metrics + */ + public static final String OS_CREATE_OP_METRIC_PREFIX = "opensearch.create"; + public static final String OS_SEARCH_OP_METRIC_PREFIX = "opensearch.search"; + public static final String OS_BULK_OP_METRIC_PREFIX = "opensearch.bulk"; + + /** + * Metric name for request size of opensearch bulk request + */ + public static final String OPENSEARCH_BULK_SIZE_METRIC = "opensearch.bulk.size.count"; + + /** + * Metric name for opensearch bulk request retry count + */ + public static final String OPENSEARCH_BULK_RETRY_COUNT_METRIC = "opensearch.bulk.retry.count"; + public static final String OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC = "opensearch.bulk.allRetryFailed.count"; + /** * Metric name for counting the errors encountered with Amazon S3 operations. */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java index 279c9b642..6559ea903 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java @@ -6,6 +6,7 @@ import dev.failsafe.function.CheckedPredicate; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import org.opensearch.action.DocWriteRequest; @@ -15,6 +16,8 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.flint.core.http.FlintRetryOptions; +import org.opensearch.flint.core.metrics.MetricConstants; +import org.opensearch.flint.core.metrics.MetricsUtil; import org.opensearch.rest.RestStatus; public class OpenSearchBulkRetryWrapper { @@ -37,11 +40,19 @@ public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions) { */ public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest, RequestOptions options) { + final AtomicInteger requestCount = new AtomicInteger(0); try { final AtomicReference nextRequest = new AtomicReference<>(bulkRequest); - return Failsafe + BulkResponse res = Failsafe .with(retryPolicy) + .onFailure((event) -> { + if (event.isRetry()) { + MetricsUtil.addHistoricGauge( + MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC, 1); + } + }) .get(() -> { + requestCount.incrementAndGet(); BulkResponse response = client.bulk(nextRequest.get(), options); if (retryPolicy.getConfig().allowsRetries() && bulkItemRetryableResultPredicate.test( response)) { @@ -49,11 +60,15 @@ public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest } return response; }); + return res; } catch (FailsafeException ex) { LOG.severe("Request failed permanently. Re-throwing original exception."); // unwrap original exception and throw throw new RuntimeException(ex.getCause()); + } finally { + MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, bulkRequest.estimatedSizeInBytes()); + MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, requestCount.get() - 1); } } diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsTestUtil.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsTestUtil.java new file mode 100644 index 000000000..05febb92b --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsTestUtil.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metrics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import com.codahale.metrics.MetricRegistry; +import java.util.List; +import lombok.AllArgsConstructor; +import org.apache.spark.SparkEnv; +import org.apache.spark.metrics.source.Source; +import org.mockito.MockedStatic; +import org.opensearch.flint.core.metrics.HistoricGauge.DataPoint; + +/** + * Utility class for verifying metrics + */ +public class MetricsTestUtil { + @AllArgsConstructor + public static class MetricsVerifier { + + final MetricRegistry metricRegistry; + + public void assertMetricExist(String metricName) { + assertNotNull(metricRegistry.getMetrics().get(metricName)); + } + + public void assertMetricClass(String metricName, Class clazz) { + assertMetricExist(metricName); + assertEquals(clazz, metricRegistry.getMetrics().get(metricName).getClass()); + } + + public void assertHistoricGauge(String metricName, long... values) { + HistoricGauge historicGauge = getHistoricGauge(metricName); + List dataPoints = historicGauge.getDataPoints(); + for (int i = 0; i < values.length; i++) { + assertEquals(values[i], dataPoints.get(i).getValue().longValue()); + } + } + + private HistoricGauge getHistoricGauge(String metricName) { + assertMetricClass(metricName, HistoricGauge.class); + return (HistoricGauge) metricRegistry.getMetrics().get(metricName); + } + + public void assertMetricNotExist(String metricName) { + assertNull(metricRegistry.getMetrics().get(metricName)); + } + } + + @FunctionalInterface + public interface ThrowableConsumer { + void accept(T t) throws Exception; + } + + public static void withMetricEnv(ThrowableConsumer test) throws Exception { + try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { + SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); + sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); + + Source metricSource = mock(Source.class); + MetricRegistry metricRegistry = new MetricRegistry(); + when(metricSource.metricRegistry()).thenReturn(metricRegistry); + when(sparkEnv.metricsSystem().getSourcesByName(any()).head()).thenReturn(metricSource); + + test.accept(new MetricsVerifier(metricRegistry)); + } + } +} diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java index fa57da842..6b08ca881 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java @@ -24,11 +24,14 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.flint.core.http.FlintRetryOptions; +import org.opensearch.flint.core.metrics.MetricConstants; +import org.opensearch.flint.core.metrics.MetricsTestUtil; import org.opensearch.rest.RestStatus; @ExtendWith(MockitoExtension.class) class OpenSearchBulkRetryWrapperTest { + private static final long ESTIMATED_SIZE_IN_BYTES = 1000L; @Mock BulkRequest bulkRequest; @Mock @@ -45,12 +48,7 @@ class OpenSearchBulkRetryWrapperTest { DocWriteResponse docWriteResponse; @Mock IndexRequest indexRequest0, indexRequest1; - @Mock IndexRequest docWriteRequest2; -// BulkItemRequest[] bulkItemRequests = new BulkItemRequest[] { -// new BulkItemRequest(0, docWriteRequest0), -// new BulkItemRequest(1, docWriteRequest1), -// new BulkItemRequest(2, docWriteRequest2), -// }; + BulkItemResponse successItem = new BulkItemResponse(0, OpType.CREATE, docWriteResponse); BulkItemResponse failureItem = new BulkItemResponse(0, OpType.CREATE, new Failure("index", "id", null, @@ -65,87 +63,125 @@ class OpenSearchBulkRetryWrapperTest { @Test public void withRetryWhenCallSucceed() throws Exception { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithRetry); - when(client.bulk(bulkRequest, options)).thenReturn(successResponse); - when(successResponse.hasFailures()).thenReturn(false); - - BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); - - assertEquals(response, successResponse); - verify(client).bulk(bulkRequest, options); + MetricsTestUtil.withMetricEnv(verifier -> { + OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + retryOptionsWithRetry); + when(client.bulk(bulkRequest, options)).thenReturn(successResponse); + when(successResponse.hasFailures()).thenReturn(false); + when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); + + BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); + + assertEquals(response, successResponse); + verify(client).bulk(bulkRequest, options); + + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, 0); + verifier.assertMetricNotExist(MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC); + }); } @Test public void withRetryWhenCallConflict() throws Exception { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithRetry); - when(client.bulk(any(), eq(options))) - .thenReturn(conflictResponse); - mockConflictResponse(); - when(conflictResponse.hasFailures()).thenReturn(true); - - BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); - - assertEquals(response, conflictResponse); - verify(client).bulk(bulkRequest, options); + MetricsTestUtil.withMetricEnv(verifier -> { + OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + retryOptionsWithRetry); + when(client.bulk(any(), eq(options))) + .thenReturn(conflictResponse); + mockConflictResponse(); + when(conflictResponse.hasFailures()).thenReturn(true); + when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); + + BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); + + assertEquals(response, conflictResponse); + verify(client).bulk(bulkRequest, options); + + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, 0); + verifier.assertMetricNotExist(MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC); + }); } @Test public void withRetryWhenCallFailOnce() throws Exception { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithRetry); - when(client.bulk(any(), eq(options))) - .thenReturn(failureResponse) - .thenReturn(successResponse); - mockFailureResponse(); - when(successResponse.hasFailures()).thenReturn(false); - when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); - - BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); - - assertEquals(response, successResponse); - verify(client, times(2)).bulk(any(), eq(options)); + MetricsTestUtil.withMetricEnv(verifier -> { + OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + retryOptionsWithRetry); + when(client.bulk(any(), eq(options))) + .thenReturn(failureResponse) + .thenReturn(successResponse); + mockFailureResponse(); + when(successResponse.hasFailures()).thenReturn(false); + when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); + when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); + + BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); + + assertEquals(response, successResponse); + verify(client, times(2)).bulk(any(), eq(options)); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, 1); + verifier.assertMetricNotExist(MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC); + }); } @Test public void withRetryWhenAllCallFail() throws Exception { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithRetry); - when(client.bulk(any(), eq(options))) - .thenReturn(failureResponse); - mockFailureResponse(); - - BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); - - assertEquals(response, failureResponse); - verify(client, times(3)).bulk(any(), eq(options)); + MetricsTestUtil.withMetricEnv(verifier -> { + OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + retryOptionsWithRetry); + when(client.bulk(any(), eq(options))) + .thenReturn(failureResponse); + when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); + mockFailureResponse(); + + BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); + + assertEquals(response, failureResponse); + verify(client, times(3)).bulk(any(), eq(options)); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, 2); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC, 1); + }); } @Test public void withRetryWhenCallThrowsShouldNotRetry() throws Exception { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithRetry); - when(client.bulk(bulkRequest, options)).thenThrow(new RuntimeException("test")); - - assertThrows(RuntimeException.class, - () -> bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options)); - - verify(client).bulk(bulkRequest, options); + MetricsTestUtil.withMetricEnv(verifier -> { + OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + retryOptionsWithRetry); + when(client.bulk(bulkRequest, options)).thenThrow(new RuntimeException("test")); + when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); + + assertThrows(RuntimeException.class, + () -> bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options)); + + verify(client).bulk(bulkRequest, options); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, 0); + verifier.assertMetricNotExist(MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC); + }); } @Test public void withoutRetryWhenCallFail() throws Exception { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithoutRetry); - when(client.bulk(bulkRequest, options)) - .thenReturn(failureResponse); - mockFailureResponse(); - - BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); - - assertEquals(response, failureResponse); - verify(client).bulk(bulkRequest, options); + MetricsTestUtil.withMetricEnv(verifier -> { + OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + retryOptionsWithoutRetry); + when(client.bulk(bulkRequest, options)) + .thenReturn(failureResponse); + when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); + mockFailureResponse(); + + BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); + + assertEquals(response, failureResponse); + verify(client).bulk(bulkRequest, options); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); + verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, 0); + verifier.assertMetricNotExist(MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC); + }); } private void mockFailureResponse() {