Skip to content

Commit

Permalink
Add flint opensearch metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jan 26, 2024
1 parent d289984 commit 23b50b0
Show file tree
Hide file tree
Showing 18 changed files with 475 additions and 88 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ lazy val flintCore = (project in file("flint-core"))
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.mockito" % "mockito-core" % "2.23.0" % "test",
"org.mockito" % "mockito-core" % "4.6.1" % "test",
"org.mockito" % "mockito-inline" % "4.6.1" % "test",
"org.mockito" % "mockito-junit-jupiter" % "3.12.4" % "test",
"org.junit.jupiter" % "junit-jupiter-api" % "5.9.0" % "test",
"org.junit.jupiter" % "junit-jupiter-engine" % "5.9.0" % "test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core;

import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;

import java.io.IOException;

/**
* Interface for wrapping the OpenSearch High Level REST Client with additional functionality,
* such as metrics tracking.
*/
public interface IRestHighLevelClient {

BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException;

ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException;

CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException;

void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException;

DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException;

GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException;

GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;

IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException;

Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;

SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException;

SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException;

DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core;

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.OpenSearchException;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.flint.core.metrics.MetricsUtil;

import java.io.Closeable;
import java.io.IOException;

import static org.opensearch.flint.core.metrics.MetricConstants.*;

/**
* A wrapper class for RestHighLevelClient to facilitate OpenSearch operations
* with integrated metrics tracking.
*/
public class RestHighLevelClientWrapper implements IRestHighLevelClient, Closeable {
private final RestHighLevelClient client;

/**
* Constructs a new RestHighLevelClientWrapper.
*
* @param client the RestHighLevelClient instance to wrap
*/
public RestHighLevelClientWrapper(RestHighLevelClient client) {
this.client = client;
}

@Override
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.bulk(bulkRequest, options));
}

@Override
public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.clearScroll(clearScrollRequest, options));
}

@Override
public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options));
}

@Override
public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options));
}

@Override
public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.delete(deleteRequest, options));
}

@Override
public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.get(getRequest, options));
}

@Override
public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().get(getIndexRequest, options));
}

@Override
public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.index(indexRequest, options));
}

@Override
public Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options));
}

@Override
public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.search(searchRequest, options));
}

@Override
public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.scroll(searchScrollRequest, options));
}

@Override
public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options));
}
/**
* Executes a given operation, tracks metrics, and handles exceptions.
*
* @param metricNamePrefix the prefix for the metric name
* @param operation the operation to execute
* @param <T> the return type of the operation
* @return the result of the operation
* @throws IOException if an I/O exception occurs
*/
private <T> T execute(String metricNamePrefix, IOCallable<T> operation) throws IOException {
try {
T result = operation.call();
MetricsUtil.publishOpenSearchMetric(metricNamePrefix, 200);
return result;
} catch (Exception e) {
OpenSearchException openSearchException = extractOpenSearchException(e);
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;
MetricsUtil.publishOpenSearchMetric(metricNamePrefix, statusCode);
throw e;
}
}

/**
* Extracts an OpenSearchException from the given Throwable.
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
return (OpenSearchException) ex.getCause();
}
return null;
}

/**
* Functional interface for operations that can throw IOException.
*
* @param <T> the return type of the operation
*/
@FunctionalInterface
private interface IOCallable<T> {
T call() throws IOException;
}

@Override
public void close() throws IOException {
client.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics;

/**
* This class defines custom metric constants used for monitoring flint operations.
*/
public class MetricConstants {

/**
* The prefix for all read-related metrics in OpenSearch.
* This constant is used as a part of metric names to categorize and identify metrics related to read operations.
*/
public static final String OS_READ_OP_METRIC_PREFIX = "opensearch.read";

/**
* The prefix for all write-related metrics in OpenSearch.
* Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations.
*/
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics;

import com.codahale.metrics.Counter;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.Source;
import org.opensearch.OpenSearchException;
import scala.collection.Seq;

import java.util.logging.Logger;

/**
* Utility class for managing metrics in the OpenSearch Flint context.
*/
public final class MetricsUtil {

private static final Logger LOG = Logger.getLogger(MetricsUtil.class.getName());

// Private constructor to prevent instantiation
private MetricsUtil() {
}

/**
* Publish an OpenSearch metric based on the status code.
*
* @param metricNamePrefix the prefix for the metric name
* @param statusCode the HTTP status code
*/
public static void publishOpenSearchMetric(String metricNamePrefix, int statusCode) {
String metricName = constructMetricName(metricNamePrefix, statusCode);
Counter counter = getOrCreateCounter(metricName);
if (counter != null) {
counter.inc();
}
}

// Constructs the metric name based on the provided prefix and status code
private static String constructMetricName(String metricNamePrefix, int statusCode) {
String metricSuffix = getMetricSuffixForStatusCode(statusCode);
return metricNamePrefix + "." + metricSuffix;
}

// Determines the metric suffix based on the HTTP status code
private static String getMetricSuffixForStatusCode(int statusCode) {
if (statusCode == 403) {
return "403.count";
} else if (statusCode >= 500) {
return "5xx.count";
} else if (statusCode >= 400) {
return "4xx.count";
} else if (statusCode >= 200) {
return "2xx.count";
}
return "unknown.count"; // default for unhandled status codes
}

// Retrieves or creates a new counter for the given metric name
private static Counter getOrCreateCounter(String metricName) {
SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv == null) {
LOG.warning("Spark environment not available, cannot instrument metric: " + metricName);
return null;
}

FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName);
if (counter == null) {
counter = flintMetricSource.metricRegistry().counter(metricName);
}
return counter;
}

// Gets or initializes the FlintMetricSource
private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) {
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());

if (metricSourceSeq == null || metricSourceSeq.isEmpty()) {
FlintMetricSource metricSource = new FlintMetricSource();
sparkEnv.metricsSystem().registerSource(metricSource);
return metricSource;
}
return (FlintMetricSource) metricSourceSeq.head();
}
}
Loading

0 comments on commit 23b50b0

Please sign in to comment.