Skip to content

Commit

Permalink
Add opensearch related metrics (opensearch-project#818)
Browse files Browse the repository at this point in the history
* Add opensearch related metrics

Signed-off-by: Tomoyuki Morita <[email protected]>

* Add unit tests

Signed-off-by: Tomoyuki Morita <[email protected]>

---------

Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored and 14yapkc1 committed Dec 11, 2024
1 parent 5814497 commit 830fc1c
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ static void recordOperationSuccess(String metricNamePrefix) {
MetricsUtil.incrementCounter(successMetricName);
}

static void recordLatency(String metricNamePrefix, long latencyMilliseconds) {
MetricsUtil.addHistoricGauge(metricNamePrefix + ".processingTime", latencyMilliseconds);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
Expand All @@ -107,6 +111,8 @@ static void recordOperationFailure(String metricNamePrefix, Exception e) {
if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
} else if (statusCode == 429) {
MetricsUtil.incrementCounter(metricNamePrefix + ".429.count");
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.core;

import java.util.Arrays;
import java.util.function.Consumer;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -40,7 +42,10 @@
import org.opensearch.flint.core.storage.BulkRequestRateLimiter;
import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper;

import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_CREATE_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_READ_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_SEARCH_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX;

/**
Expand All @@ -67,112 +72,126 @@ public RestHighLevelClientWrapper(RestHighLevelClient client, BulkRequestRateLim

@Override
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> {
return execute(() -> {
try {
rateLimiter.acquirePermit();
return bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options);
} catch (InterruptedException e) {
throw new RuntimeException("rateLimiter.acquirePermit was interrupted.", e);
}
});
}, OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX);
}

@Override
public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.clearScroll(clearScrollRequest, options));
return execute(() -> client.clearScroll(clearScrollRequest, options),
OS_READ_OP_METRIC_PREFIX);
}

@Override
public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options));
return execute(() -> client.indices().create(createIndexRequest, options),
OS_WRITE_OP_METRIC_PREFIX, OS_CREATE_OP_METRIC_PREFIX);
}

@Override
public void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().putMapping(putMappingRequest, options));
execute(() -> client.indices().putMapping(putMappingRequest, options),
OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options));
execute(() -> client.indices().delete(deleteIndexRequest, options),
OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.delete(deleteRequest, options));
return execute(() -> client.delete(deleteRequest, options), OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.get(getRequest, options));
return execute(() -> client.get(getRequest, options), OS_READ_OP_METRIC_PREFIX);
}

@Override
public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().get(getIndexRequest, options));
return execute(() -> client.indices().get(getIndexRequest, options),
OS_READ_OP_METRIC_PREFIX);
}

@Override
public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.index(indexRequest, options));
return execute(() -> client.index(indexRequest, options), OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options));
return execute(() -> client.indices().exists(getIndexRequest, options),
OS_READ_OP_METRIC_PREFIX);
}

@Override
public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.search(searchRequest, options));
return execute(() -> client.search(searchRequest, options), OS_READ_OP_METRIC_PREFIX, OS_SEARCH_OP_METRIC_PREFIX);
}

@Override
public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.scroll(searchScrollRequest, options));
return execute(() -> client.scroll(searchScrollRequest, options), OS_READ_OP_METRIC_PREFIX);
}

@Override
public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options));
return execute(() -> client.update(updateRequest, options), OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX,
() -> {
OpenSearchClient openSearchClient =
new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),
new JacksonJsonpMapper()));
return openSearchClient.indices().stats(request);
});
}
@Override
public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException {
return execute(() -> {
OpenSearchClient openSearchClient =
new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),
new JacksonJsonpMapper()));
return openSearchClient.indices().stats(request);
}, OS_READ_OP_METRIC_PREFIX
);
}

@Override
public CreatePitResponse createPit(CreatePitRequest request) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request));
return execute(() -> openSearchClient().createPit(request), OS_WRITE_OP_METRIC_PREFIX);
}

/**
* Executes a given operation, tracks metrics, and handles exceptions.
*
* @param metricNamePrefix the prefix for the metric name
* @param operation the operation to execute
* @param <T> the return type of the operation
* @param operation the operation to execute
* @param metricNamePrefixes array of prefixes for the metric name
* @return the result of the operation
* @throws IOException if an I/O exception occurs
*/
private <T> T execute(String metricNamePrefix, IOCallable<T> operation) throws IOException {
private <T> T execute(IOCallable<T> operation, String... metricNamePrefixes) throws IOException {
long startTime = System.currentTimeMillis();
try {
T result = operation.call();
IRestHighLevelClient.recordOperationSuccess(metricNamePrefix);
eachPrefix(IRestHighLevelClient::recordOperationSuccess, metricNamePrefixes);
return result;
} catch (Exception e) {
IRestHighLevelClient.recordOperationFailure(metricNamePrefix, e);
eachPrefix(prefix -> IRestHighLevelClient.recordOperationFailure(prefix, e), metricNamePrefixes);
throw e;
} finally {
long latency = System.currentTimeMillis() - startTime;
eachPrefix(prefix -> IRestHighLevelClient.recordLatency(prefix, latency), metricNamePrefixes);
}
}

private static void eachPrefix(Consumer<String> fn, String... prefixes) {
Arrays.stream(prefixes).forEach(fn);
}

/**
* Functional interface for operations that can throw IOException.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.core.metrics;

import com.codahale.metrics.Gauge;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
Expand Down Expand Up @@ -60,4 +61,9 @@ public List<DataPoint> pollDataPoints() {
}
return result;
}

@VisibleForTesting
public List<DataPoint> getDataPoints() {
return dataPoints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@ public final class MetricConstants {
*/
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write";

/**
* Prefixes for OpenSearch API specific metrics
*/
public static final String OS_CREATE_OP_METRIC_PREFIX = "opensearch.create";
public static final String OS_SEARCH_OP_METRIC_PREFIX = "opensearch.search";
public static final String OS_BULK_OP_METRIC_PREFIX = "opensearch.bulk";

/**
* Metric name for request size of opensearch bulk request
*/
public static final String OPENSEARCH_BULK_SIZE_METRIC = "opensearch.bulk.size.count";

/**
* Metric name for opensearch bulk request retry count
*/
public static final String OPENSEARCH_BULK_RETRY_COUNT_METRIC = "opensearch.bulk.retry.count";
public static final String OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC = "opensearch.bulk.allRetryFailed.count";

/**
* Metric name for counting the errors encountered with Amazon S3 operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import dev.failsafe.function.CheckedPredicate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import org.opensearch.action.DocWriteRequest;
Expand All @@ -15,6 +16,8 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.flint.core.http.FlintRetryOptions;
import org.opensearch.flint.core.metrics.MetricConstants;
import org.opensearch.flint.core.metrics.MetricsUtil;
import org.opensearch.rest.RestStatus;

public class OpenSearchBulkRetryWrapper {
Expand All @@ -37,23 +40,35 @@ public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions) {
*/
public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest,
RequestOptions options) {
final AtomicInteger requestCount = new AtomicInteger(0);
try {
final AtomicReference<BulkRequest> nextRequest = new AtomicReference<>(bulkRequest);
return Failsafe
BulkResponse res = Failsafe
.with(retryPolicy)
.onFailure((event) -> {
if (event.isRetry()) {
MetricsUtil.addHistoricGauge(
MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC, 1);
}
})
.get(() -> {
requestCount.incrementAndGet();
BulkResponse response = client.bulk(nextRequest.get(), options);
if (retryPolicy.getConfig().allowsRetries() && bulkItemRetryableResultPredicate.test(
response)) {
nextRequest.set(getRetryableRequest(nextRequest.get(), response));
}
return response;
});
return res;
} catch (FailsafeException ex) {
LOG.severe("Request failed permanently. Re-throwing original exception.");

// unwrap original exception and throw
throw new RuntimeException(ex.getCause());
} finally {
MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, bulkRequest.estimatedSizeInBytes());
MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, requestCount.get() - 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import com.codahale.metrics.MetricRegistry;
import java.util.List;
import lombok.AllArgsConstructor;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.Source;
import org.mockito.MockedStatic;
import org.opensearch.flint.core.metrics.HistoricGauge.DataPoint;

/**
* Utility class for verifying metrics
*/
public class MetricsTestUtil {
@AllArgsConstructor
public static class MetricsVerifier {

final MetricRegistry metricRegistry;

public void assertMetricExist(String metricName) {
assertNotNull(metricRegistry.getMetrics().get(metricName));
}

public void assertMetricClass(String metricName, Class<?> clazz) {
assertMetricExist(metricName);
assertEquals(clazz, metricRegistry.getMetrics().get(metricName).getClass());
}

public void assertHistoricGauge(String metricName, long... values) {
HistoricGauge historicGauge = getHistoricGauge(metricName);
List<DataPoint> dataPoints = historicGauge.getDataPoints();
for (int i = 0; i < values.length; i++) {
assertEquals(values[i], dataPoints.get(i).getValue().longValue());
}
}

private HistoricGauge getHistoricGauge(String metricName) {
assertMetricClass(metricName, HistoricGauge.class);
return (HistoricGauge) metricRegistry.getMetrics().get(metricName);
}

public void assertMetricNotExist(String metricName) {
assertNull(metricRegistry.getMetrics().get(metricName));
}
}

@FunctionalInterface
public interface ThrowableConsumer<T> {
void accept(T t) throws Exception;
}

public static void withMetricEnv(ThrowableConsumer<MetricsVerifier> test) throws Exception {
try (MockedStatic<SparkEnv> sparkEnvMock = mockStatic(SparkEnv.class)) {
SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS);
sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv);

Source metricSource = mock(Source.class);
MetricRegistry metricRegistry = new MetricRegistry();
when(metricSource.metricRegistry()).thenReturn(metricRegistry);
when(sparkEnv.metricsSystem().getSourcesByName(any()).head()).thenReturn(metricSource);

test.accept(new MetricsVerifier(metricRegistry));
}
}
}
Loading

0 comments on commit 830fc1c

Please sign in to comment.