Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenSearch metrics #229

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ lazy val flintCore = (project in file("flint-core"))
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.mockito" % "mockito-core" % "2.23.0" % "test",
"org.mockito" % "mockito-core" % "4.6.1" % "test",
"org.mockito" % "mockito-inline" % "4.6.1" % "test",
"org.mockito" % "mockito-junit-jupiter" % "3.12.4" % "test",
"org.junit.jupiter" % "junit-jupiter-api" % "5.9.0" % "test",
"org.junit.jupiter" % "junit-jupiter-engine" % "5.9.0" % "test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core;

import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;

import java.io.Closeable;
import java.io.IOException;

/**
* Interface for wrapping the OpenSearch High Level REST Client with additional functionality,
* such as metrics tracking.
*/
public interface IRestHighLevelClient extends Closeable {

BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException;

ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException;

CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException;

void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException;

DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException;

GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException;

GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;

IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException;

Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;

SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException;

SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException;

DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core;

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.OpenSearchException;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.flint.core.metrics.MetricsUtil;

import java.io.IOException;

import static org.opensearch.flint.core.metrics.MetricConstants.*;

/**
* A wrapper class for RestHighLevelClient to facilitate OpenSearch operations
* with integrated metrics tracking.
*/
public class RestHighLevelClientWrapper implements IRestHighLevelClient {
private final RestHighLevelClient client;

/**
* Constructs a new RestHighLevelClientWrapper.
*
* @param client the RestHighLevelClient instance to wrap
*/
public RestHighLevelClientWrapper(RestHighLevelClient client) {
this.client = client;
}

@Override
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.bulk(bulkRequest, options));
noCharger marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.clearScroll(clearScrollRequest, options));
}

@Override
public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options));
}

@Override
public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options));
}

@Override
public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.delete(deleteRequest, options));
}

@Override
public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.get(getRequest, options));
}

@Override
public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().get(getIndexRequest, options));
}

@Override
public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.index(indexRequest, options));
}

@Override
public Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options));
}

@Override
public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.search(searchRequest, options));
}

@Override
public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.scroll(searchScrollRequest, options));
}

@Override
public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options));
}

/**
* Executes a given operation, tracks metrics, and handles exceptions.
*
* @param metricNamePrefix the prefix for the metric name
* @param operation the operation to execute
* @param <T> the return type of the operation
* @return the result of the operation
* @throws IOException if an I/O exception occurs
*/
private <T> T execute(String metricNamePrefix, IOCallable<T> operation) throws IOException {
try {
T result = operation.call();
recordOperationSuccess(metricNamePrefix);
return result;
} catch (Exception e) {
recordOperationFailure(metricNamePrefix, e);
throw e;
}
}

/**
* Records the success of an OpenSearch operation by incrementing the corresponding metric counter.
* This method constructs the metric name by appending ".200.count" to the provided metric name prefix.
* The metric name is then used to increment the counter, indicating a successful operation.
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for success
*/
private void recordOperationSuccess(String metricNamePrefix) {
String successMetricName = metricNamePrefix + ".200.count";
noCharger marked this conversation as resolved.
Show resolved Hide resolved
MetricsUtil.incrementCounter(successMetricName);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
* it increments a metric specifically for that status code.
* Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx).
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure
* @param e the exception encountered during the operation, used to determine the type of failure
*/
private void recordOperationFailure(String metricNamePrefix, Exception e) {
OpenSearchException openSearchException = extractOpenSearchException(e);
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;

if (statusCode == 403) {
noCharger marked this conversation as resolved.
Show resolved Hide resolved
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
MetricsUtil.incrementCounter(failureMetricName);
}

/**
* Extracts an OpenSearchException from the given Throwable.
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
noCharger marked this conversation as resolved.
Show resolved Hide resolved
return (OpenSearchException) ex.getCause();
}
return null;
}

/**
* Functional interface for operations that can throw IOException.
*
* @param <T> the return type of the operation
*/
@FunctionalInterface
private interface IOCallable<T> {
T call() throws IOException;
}

@Override
public void close() throws IOException {
client.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics;

/**
* This class defines custom metric constants used for monitoring flint operations.
*/
public class MetricConstants {

/**
* The prefix for all read-related metrics in OpenSearch.
* This constant is used as a part of metric names to categorize and identify metrics related to read operations.
*/
public static final String OS_READ_OP_METRIC_PREFIX = "opensearch.read";

/**
* The prefix for all write-related metrics in OpenSearch.
* Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations.
*/
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics;

import com.codahale.metrics.Counter;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.Source;
import scala.collection.Seq;

import java.util.logging.Logger;

/**
* Utility class for managing metrics in the OpenSearch Flint context.
*/
public final class MetricsUtil {

private static final Logger LOG = Logger.getLogger(MetricsUtil.class.getName());

// Private constructor to prevent instantiation
private MetricsUtil() {
}

public static void incrementCounter(String metricName) {
noCharger marked this conversation as resolved.
Show resolved Hide resolved
Counter counter = getOrCreateCounter(metricName);
if (counter != null) {
counter.inc();
}
}

// Retrieves or creates a new counter for the given metric name
private static Counter getOrCreateCounter(String metricName) {
SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv == null) {
LOG.warning("Spark environment not available, cannot instrument metric: " + metricName);
return null;
}

FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName);
if (counter == null) {
counter = flintMetricSource.metricRegistry().counter(metricName);
}
return counter;
}

// Gets or initializes the FlintMetricSource
private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) {
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());

if (metricSourceSeq == null || metricSourceSeq.isEmpty()) {
FlintMetricSource metricSource = new FlintMetricSource();
sparkEnv.metricsSystem().registerSource(metricSource);
noCharger marked this conversation as resolved.
Show resolved Hide resolved
return metricSource;
}
return (FlintMetricSource) metricSourceSeq.head();
}
}
Loading
Loading