Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.5-nexus] Add opensearch related metrics #844

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ static void recordOperationSuccess(String metricNamePrefix) {
MetricsUtil.incrementCounter(successMetricName);
}

static void recordLatency(String metricNamePrefix, long latencyMilliseconds) {
MetricsUtil.addHistoricGauge(metricNamePrefix + ".processingTime", latencyMilliseconds);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
Expand All @@ -107,6 +111,8 @@ static void recordOperationFailure(String metricNamePrefix, Exception e) {
if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
} else if (statusCode == 429) {
MetricsUtil.incrementCounter(metricNamePrefix + ".429.count");
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.core;

import java.util.Arrays;
import java.util.function.Consumer;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -40,7 +42,10 @@
import org.opensearch.flint.core.storage.BulkRequestRateLimiter;
import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper;

import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_CREATE_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_READ_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_SEARCH_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX;

/**
Expand All @@ -67,112 +72,126 @@ public RestHighLevelClientWrapper(RestHighLevelClient client, BulkRequestRateLim

@Override
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> {
return execute(() -> {
try {
rateLimiter.acquirePermit();
return bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options);
} catch (InterruptedException e) {
throw new RuntimeException("rateLimiter.acquirePermit was interrupted.", e);
}
});
}, OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX);
}

@Override
public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.clearScroll(clearScrollRequest, options));
return execute(() -> client.clearScroll(clearScrollRequest, options),
OS_READ_OP_METRIC_PREFIX);
}

@Override
public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options));
return execute(() -> client.indices().create(createIndexRequest, options),
OS_WRITE_OP_METRIC_PREFIX, OS_CREATE_OP_METRIC_PREFIX);
}

@Override
public void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().putMapping(putMappingRequest, options));
execute(() -> client.indices().putMapping(putMappingRequest, options),
OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options));
execute(() -> client.indices().delete(deleteIndexRequest, options),
OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.delete(deleteRequest, options));
return execute(() -> client.delete(deleteRequest, options), OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.get(getRequest, options));
return execute(() -> client.get(getRequest, options), OS_READ_OP_METRIC_PREFIX);
}

@Override
public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().get(getIndexRequest, options));
return execute(() -> client.indices().get(getIndexRequest, options),
OS_READ_OP_METRIC_PREFIX);
}

@Override
public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.index(indexRequest, options));
return execute(() -> client.index(indexRequest, options), OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options));
return execute(() -> client.indices().exists(getIndexRequest, options),
OS_READ_OP_METRIC_PREFIX);
}

@Override
public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.search(searchRequest, options));
return execute(() -> client.search(searchRequest, options), OS_READ_OP_METRIC_PREFIX, OS_SEARCH_OP_METRIC_PREFIX);
}

@Override
public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.scroll(searchScrollRequest, options));
return execute(() -> client.scroll(searchScrollRequest, options), OS_READ_OP_METRIC_PREFIX);
}

@Override
public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options));
return execute(() -> client.update(updateRequest, options), OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX,
() -> {
OpenSearchClient openSearchClient =
new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),
new JacksonJsonpMapper()));
return openSearchClient.indices().stats(request);
});
}
@Override
public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException {
return execute(() -> {
OpenSearchClient openSearchClient =
new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),
new JacksonJsonpMapper()));
return openSearchClient.indices().stats(request);
}, OS_READ_OP_METRIC_PREFIX
);
}

@Override
public CreatePitResponse createPit(CreatePitRequest request) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request));
return execute(() -> openSearchClient().createPit(request), OS_WRITE_OP_METRIC_PREFIX);
}

/**
* Executes a given operation, tracks metrics, and handles exceptions.
*
* @param metricNamePrefix the prefix for the metric name
* @param operation the operation to execute
* @param <T> the return type of the operation
* @param operation the operation to execute
* @param metricNamePrefixes array of prefixes for the metric name
* @return the result of the operation
* @throws IOException if an I/O exception occurs
*/
private <T> T execute(String metricNamePrefix, IOCallable<T> operation) throws IOException {
private <T> T execute(IOCallable<T> operation, String... metricNamePrefixes) throws IOException {
long startTime = System.currentTimeMillis();
try {
T result = operation.call();
IRestHighLevelClient.recordOperationSuccess(metricNamePrefix);
eachPrefix(IRestHighLevelClient::recordOperationSuccess, metricNamePrefixes);
return result;
} catch (Exception e) {
IRestHighLevelClient.recordOperationFailure(metricNamePrefix, e);
eachPrefix(prefix -> IRestHighLevelClient.recordOperationFailure(prefix, e), metricNamePrefixes);
throw e;
} finally {
long latency = System.currentTimeMillis() - startTime;
eachPrefix(prefix -> IRestHighLevelClient.recordLatency(prefix, latency), metricNamePrefixes);
}
}

private static void eachPrefix(Consumer<String> fn, String... prefixes) {
Arrays.stream(prefixes).forEach(fn);
}

/**
* Functional interface for operations that can throw IOException.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.core.metrics;

import com.codahale.metrics.Gauge;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
Expand Down Expand Up @@ -60,4 +61,9 @@ public List<DataPoint> pollDataPoints() {
}
return result;
}

@VisibleForTesting
public List<DataPoint> getDataPoints() {
return dataPoints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@ public final class MetricConstants {
*/
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write";

/**
* Prefixes for OpenSearch API specific metrics
*/
public static final String OS_CREATE_OP_METRIC_PREFIX = "opensearch.create";
public static final String OS_SEARCH_OP_METRIC_PREFIX = "opensearch.search";
public static final String OS_BULK_OP_METRIC_PREFIX = "opensearch.bulk";

/**
* Metric name for request size of opensearch bulk request
*/
public static final String OPENSEARCH_BULK_SIZE_METRIC = "opensearch.bulk.size.count";

/**
* Metric name for opensearch bulk request retry count
*/
public static final String OPENSEARCH_BULK_RETRY_COUNT_METRIC = "opensearch.bulk.retry.count";
public static final String OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC = "opensearch.bulk.allRetryFailed.count";

/**
* Metric name for counting the errors encountered with Amazon S3 operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import dev.failsafe.function.CheckedPredicate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import org.opensearch.action.DocWriteRequest;
Expand All @@ -15,6 +16,8 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.flint.core.http.FlintRetryOptions;
import org.opensearch.flint.core.metrics.MetricConstants;
import org.opensearch.flint.core.metrics.MetricsUtil;
import org.opensearch.rest.RestStatus;

public class OpenSearchBulkRetryWrapper {
Expand All @@ -37,23 +40,35 @@ public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions) {
*/
public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest,
RequestOptions options) {
final AtomicInteger requestCount = new AtomicInteger(0);
try {
final AtomicReference<BulkRequest> nextRequest = new AtomicReference<>(bulkRequest);
return Failsafe
BulkResponse res = Failsafe
.with(retryPolicy)
.onFailure((event) -> {
if (event.isRetry()) {
MetricsUtil.addHistoricGauge(
MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC, 1);
}
})
.get(() -> {
requestCount.incrementAndGet();
BulkResponse response = client.bulk(nextRequest.get(), options);
if (retryPolicy.getConfig().allowsRetries() && bulkItemRetryableResultPredicate.test(
response)) {
nextRequest.set(getRetryableRequest(nextRequest.get(), response));
}
return response;
});
return res;
} catch (FailsafeException ex) {
LOG.severe("Request failed permanently. Re-throwing original exception.");

// unwrap original exception and throw
throw new RuntimeException(ex.getCause());
} finally {
MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, bulkRequest.estimatedSizeInBytes());
MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, requestCount.get() - 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import com.codahale.metrics.MetricRegistry;
import java.util.List;
import lombok.AllArgsConstructor;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.Source;
import org.mockito.MockedStatic;
import org.opensearch.flint.core.metrics.HistoricGauge.DataPoint;

/**
* Utility class for verifying metrics
*/
public class MetricsTestUtil {
@AllArgsConstructor
public static class MetricsVerifier {

final MetricRegistry metricRegistry;

public void assertMetricExist(String metricName) {
assertNotNull(metricRegistry.getMetrics().get(metricName));
}

public void assertMetricClass(String metricName, Class<?> clazz) {
assertMetricExist(metricName);
assertEquals(clazz, metricRegistry.getMetrics().get(metricName).getClass());
}

public void assertHistoricGauge(String metricName, long... values) {
HistoricGauge historicGauge = getHistoricGauge(metricName);
List<DataPoint> dataPoints = historicGauge.getDataPoints();
for (int i = 0; i < values.length; i++) {
assertEquals(values[i], dataPoints.get(i).getValue().longValue());
}
}

private HistoricGauge getHistoricGauge(String metricName) {
assertMetricClass(metricName, HistoricGauge.class);
return (HistoricGauge) metricRegistry.getMetrics().get(metricName);
}

public void assertMetricNotExist(String metricName) {
assertNull(metricRegistry.getMetrics().get(metricName));
}
}

@FunctionalInterface
public interface ThrowableConsumer<T> {
void accept(T t) throws Exception;
}

public static void withMetricEnv(ThrowableConsumer<MetricsVerifier> test) throws Exception {
try (MockedStatic<SparkEnv> sparkEnvMock = mockStatic(SparkEnv.class)) {
SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS);
sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv);

Source metricSource = mock(Source.class);
MetricRegistry metricRegistry = new MetricRegistry();
when(metricSource.metricRegistry()).thenReturn(metricRegistry);
when(sparkEnv.metricsSystem().getSourcesByName(any()).head()).thenReturn(metricSource);

test.accept(new MetricsVerifier(metricRegistry));
}
}
}
Loading
Loading