Skip to content

Commit

Permalink
Add opensearch related metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Oct 25, 2024
1 parent 7bc0927 commit e015e40
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ static void recordOperationSuccess(String metricNamePrefix) {
MetricsUtil.incrementCounter(successMetricName);
}

static void recordLatency(String metricNamePrefix, long latencyMilliseconds) {
MetricsUtil.addHistoricGauge(metricNamePrefix + ".processingTime", latencyMilliseconds);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
Expand All @@ -107,6 +111,8 @@ static void recordOperationFailure(String metricNamePrefix, Exception e) {
if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
} else if (statusCode == 429) {
MetricsUtil.incrementCounter(metricNamePrefix + ".429.count");
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.core;

import java.util.Arrays;
import java.util.function.Consumer;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -40,7 +42,10 @@
import org.opensearch.flint.core.storage.BulkRequestRateLimiter;
import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper;

import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_CREATE_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_READ_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_SEARCH_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX;

/**
Expand All @@ -67,112 +72,126 @@ public RestHighLevelClientWrapper(RestHighLevelClient client, BulkRequestRateLim

@Override
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> {
return execute(() -> {
try {
rateLimiter.acquirePermit();
return bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options);
} catch (InterruptedException e) {
throw new RuntimeException("rateLimiter.acquirePermit was interrupted.", e);
}
});
}, OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX);
}

@Override
public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.clearScroll(clearScrollRequest, options));
return execute(() -> client.clearScroll(clearScrollRequest, options),
OS_READ_OP_METRIC_PREFIX);
}

@Override
public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options));
return execute(() -> client.indices().create(createIndexRequest, options),
OS_WRITE_OP_METRIC_PREFIX, OS_CREATE_OP_METRIC_PREFIX);
}

@Override
public void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().putMapping(putMappingRequest, options));
execute(() -> client.indices().putMapping(putMappingRequest, options),
OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options));
execute(() -> client.indices().delete(deleteIndexRequest, options),
OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.delete(deleteRequest, options));
return execute(() -> client.delete(deleteRequest, options), OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.get(getRequest, options));
return execute(() -> client.get(getRequest, options), OS_READ_OP_METRIC_PREFIX);
}

@Override
public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().get(getIndexRequest, options));
return execute(() -> client.indices().get(getIndexRequest, options),
OS_READ_OP_METRIC_PREFIX);
}

@Override
public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.index(indexRequest, options));
return execute(() -> client.index(indexRequest, options), OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options));
return execute(() -> client.indices().exists(getIndexRequest, options),
OS_READ_OP_METRIC_PREFIX);
}

@Override
public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.search(searchRequest, options));
return execute(() -> client.search(searchRequest, options), OS_READ_OP_METRIC_PREFIX, OS_SEARCH_OP_METRIC_PREFIX);
}

@Override
public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.scroll(searchScrollRequest, options));
return execute(() -> client.scroll(searchScrollRequest, options), OS_READ_OP_METRIC_PREFIX);
}

@Override
public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options));
return execute(() -> client.update(updateRequest, options), OS_WRITE_OP_METRIC_PREFIX);
}

@Override
public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX,
() -> {
OpenSearchClient openSearchClient =
new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),
new JacksonJsonpMapper()));
return openSearchClient.indices().stats(request);
});
}
@Override
public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException {
return execute(() -> {
OpenSearchClient openSearchClient =
new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),
new JacksonJsonpMapper()));
return openSearchClient.indices().stats(request);
}, OS_READ_OP_METRIC_PREFIX
);
}

@Override
public CreatePitResponse createPit(CreatePitRequest request) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request));
return execute(() -> openSearchClient().createPit(request), OS_WRITE_OP_METRIC_PREFIX);
}

/**
* Executes a given operation, tracks metrics, and handles exceptions.
*
* @param metricNamePrefix the prefix for the metric name
* @param operation the operation to execute
* @param <T> the return type of the operation
* @param operation the operation to execute
* @param metricNamePrefixes array of prefixes for the metric name
* @return the result of the operation
* @throws IOException if an I/O exception occurs
*/
private <T> T execute(String metricNamePrefix, IOCallable<T> operation) throws IOException {
private <T> T execute(IOCallable<T> operation, String... metricNamePrefixes) throws IOException {
long startTime = System.currentTimeMillis();
try {
T result = operation.call();
IRestHighLevelClient.recordOperationSuccess(metricNamePrefix);
eachPrefix(IRestHighLevelClient::recordOperationSuccess, metricNamePrefixes);
return result;
} catch (Exception e) {
IRestHighLevelClient.recordOperationFailure(metricNamePrefix, e);
eachPrefix(prefix -> IRestHighLevelClient.recordOperationFailure(prefix, e), metricNamePrefixes);
throw e;
} finally {
long latency = System.currentTimeMillis() - startTime;
eachPrefix(prefix -> IRestHighLevelClient.recordLatency(prefix, latency), metricNamePrefixes);
}
}

private static void eachPrefix(Consumer<String> fn, String... prefixes) {
Arrays.stream(prefixes).forEach(fn);
}

/**
* Functional interface for operations that can throw IOException.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@ public final class MetricConstants {
*/
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write";

/**
* Prefixes for OpenSearch API specific metrics
*/
public static final String OS_CREATE_OP_METRIC_PREFIX = "opensearch.create";
public static final String OS_SEARCH_OP_METRIC_PREFIX = "opensearch.search";
public static final String OS_BULK_OP_METRIC_PREFIX = "opensearch.bulk";

/**
* Metric name for request size of opensearch bulk request
*/
public static final String OPENSEARCH_BULK_SIZE_METRIC = "opensearch.bulk.size.count";

/**
* Metric name for opensearch bulk request retry count
*/
public static final String OPENSEARCH_BULK_RETRY_COUNT_METRIC = "opensearch.bulk.retry.count";
public static final String OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC = "opensearch.bulk.allRetryFailed.count";

/**
* Metric name for counting the errors encountered with Amazon S3 operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import java.util.function.Supplier;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.FlintIndexMetricSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import dev.failsafe.function.CheckedPredicate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import org.opensearch.action.DocWriteRequest;
Expand All @@ -15,6 +16,8 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.flint.core.http.FlintRetryOptions;
import org.opensearch.flint.core.metrics.MetricConstants;
import org.opensearch.flint.core.metrics.MetricsUtil;
import org.opensearch.rest.RestStatus;

public class OpenSearchBulkRetryWrapper {
Expand All @@ -37,20 +40,27 @@ public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions) {
*/
public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest,
RequestOptions options) {
final AtomicInteger retryCount = new AtomicInteger(0);
try {
final AtomicReference<BulkRequest> nextRequest = new AtomicReference<>(bulkRequest);
return Failsafe
BulkResponse res = Failsafe
.with(retryPolicy)
.get(() -> {
BulkResponse response = client.bulk(nextRequest.get(), options);
if (retryPolicy.getConfig().allowsRetries() && bulkItemRetryableResultPredicate.test(
response)) {
nextRequest.set(getRetryableRequest(nextRequest.get(), response));
retryCount.incrementAndGet();
}
return response;
});
MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, bulkRequest.estimatedSizeInBytes());
MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, retryCount.get());
return res;
} catch (FailsafeException ex) {
LOG.severe("Request failed permanently. Re-throwing original exception.");
MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, retryCount.get() - 1);
MetricsUtil.addHistoricGauge(MetricConstants.OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC, 1);

// unwrap original exception and throw
throw new RuntimeException(ex.getCause());
Expand Down

0 comments on commit e015e40

Please sign in to comment.