Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more flint metrics #255

Merged
merged 4 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.core;

import org.opensearch.OpenSearchException;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
Expand All @@ -26,6 +27,7 @@
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.flint.core.metrics.MetricsUtil;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -52,11 +54,62 @@ public interface IRestHighLevelClient extends Closeable {

IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException;

Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;
Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;

SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException;

SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException;

DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException;


/**
* Records the success of an OpenSearch operation by incrementing the corresponding metric counter.
* This method constructs the metric name by appending ".200.count" to the provided metric name prefix.
* The metric name is then used to increment the counter, indicating a successful operation.
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for success
*/
static void recordOperationSuccess(String metricNamePrefix) {
String successMetricName = metricNamePrefix + ".2xx.count";
MetricsUtil.incrementCounter(successMetricName);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
* it increments a metric specifically for that status code.
* Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx).
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure
* @param e the exception encountered during the operation, used to determine the type of failure
*/
static void recordOperationFailure(String metricNamePrefix, Exception e) {
OpenSearchException openSearchException = extractOpenSearchException(e);
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;

if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
MetricsUtil.incrementCounter(failureMetricName);
}

/**
* Extracts an OpenSearchException from the given Throwable.
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private static OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
return (OpenSearchException) ex.getCause();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.OpenSearchException;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.flint.core.metrics.MetricsUtil;

import java.io.IOException;

Expand Down Expand Up @@ -91,7 +89,7 @@ public IndexResponse index(IndexRequest indexRequest, RequestOptions options) th
}

@Override
public Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
public Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options));
}

Expand Down Expand Up @@ -122,64 +120,14 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options
private <T> T execute(String metricNamePrefix, IOCallable<T> operation) throws IOException {
try {
T result = operation.call();
recordOperationSuccess(metricNamePrefix);
IRestHighLevelClient.recordOperationSuccess(metricNamePrefix);
return result;
} catch (Exception e) {
recordOperationFailure(metricNamePrefix, e);
IRestHighLevelClient.recordOperationFailure(metricNamePrefix, e);
throw e;
}
}

/**
* Records the success of an OpenSearch operation by incrementing the corresponding metric counter.
* This method constructs the metric name by appending ".200.count" to the provided metric name prefix.
* The metric name is then used to increment the counter, indicating a successful operation.
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for success
*/
private void recordOperationSuccess(String metricNamePrefix) {
String successMetricName = metricNamePrefix + ".2xx.count";
MetricsUtil.incrementCounter(successMetricName);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
* it increments a metric specifically for that status code.
* Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx).
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure
* @param e the exception encountered during the operation, used to determine the type of failure
*/
private void recordOperationFailure(String metricNamePrefix, Exception e) {
OpenSearchException openSearchException = extractOpenSearchException(e);
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;

if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
MetricsUtil.incrementCounter(failureMetricName);
}

/**
* Extracts an OpenSearchException from the given Throwable.
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
return (OpenSearchException) ex.getCause();
}
return null;
}

/**
* Functional interface for operations that can throw IOException.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/**
* This class defines custom metric constants used for monitoring flint operations.
*/
public class MetricConstants {
public final class MetricConstants {

/**
* The prefix for all read-related metrics in OpenSearch.
Expand Down Expand Up @@ -47,6 +47,26 @@ public class MetricConstants {
*/
public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime";

/**
* Prefix for metrics related to the request metadata read operations.
*/
public static final String REQUEST_METADATA_READ_METRIC_PREFIX = "request.metadata.read";

/**
* Prefix for metrics related to the request metadata write operations.
*/
public static final String REQUEST_METADATA_WRITE_METRIC_PREFIX = "request.metadata.write";

/**
* Metric name for counting failed heartbeat operations on request metadata.
*/
public static final String REQUEST_METADATA_HEARTBEAT_FAILED_METRIC = "request.metadata.heartbeat.failed.count";
noCharger marked this conversation as resolved.
Show resolved Hide resolved

/**
* Prefix for metrics related to the result metadata write operations.
*/
public static final String RESULT_METADATA_WRITE_METRIC_PREFIX = "result.metadata.write";

/**
* Metric name for counting the number of statements currently running.
*/
Expand All @@ -65,5 +85,29 @@ public class MetricConstants {
/**
* Metric name for tracking the processing time of statements.
*/
public static final String STATEMENT_PROCESSING_TIME_METRIC = "STATEMENT.processingTime";
public static final String STATEMENT_PROCESSING_TIME_METRIC = "statement.processingTime";

/**
* Metric for tracking the count of currently running streaming jobs.
*/
public static final String STREAMING_RUNNING_METRIC = "streaming.running.count";

/**
* Metric for tracking the count of streaming jobs that have failed.
*/
public static final String STREAMING_FAILED_METRIC = "streaming.failed.count";

/**
* Metric for tracking the count of streaming jobs that have completed successfully.
*/
public static final String STREAMING_SUCCESS_METRIC = "streaming.success.count";

/**
* Metric for tracking the count of failed heartbeat signals in streaming jobs.
*/
public static final String STREAMING_HEARTBEAT_FAILED_METRIC = "streaming.heartbeat.failed.count";

private MetricConstants() {
// Private constructor to prevent instantiation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
package org.opensearch.flint.core.metrics;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.Source;
import scala.collection.Seq;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

/**
Expand All @@ -21,8 +24,8 @@ public final class MetricsUtil {

private static final Logger LOG = Logger.getLogger(MetricsUtil.class.getName());

// Private constructor to prevent instantiation
private MetricsUtil() {
// Private constructor to prevent instantiation
}

/**
Expand Down Expand Up @@ -60,10 +63,7 @@ public static void decrementCounter(String metricName) {
*/
public static Timer.Context getTimerContext(String metricName) {
Timer timer = getOrCreateTimer(metricName);
if (timer != null) {
return timer.time();
}
return null;
return timer != null ? timer.time() : null;
}

/**
Expand All @@ -74,42 +74,47 @@ public static Timer.Context getTimerContext(String metricName) {
* @return The elapsed time in nanoseconds since the timer was started, or {@code null} if the context was {@code null}.
*/
public static Long stopTimer(Timer.Context context) {
if (context != null) {
return context.stop();
return context != null ? context.stop() : null;
}

/**
* Registers a gauge metric with the provided name and value.
* The gauge will reflect the current value of the AtomicInteger provided.
*
* @param metricName The name of the gauge metric to register.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
*/
public static void registerGauge(String metricName, final AtomicInteger value) {
MetricRegistry metricRegistry = getMetricRegistry();
if (metricRegistry == null) {
LOG.warning("MetricRegistry not available, cannot register gauge: " + metricName);
return;
}
return null;
metricRegistry.register(metricName, (Gauge<Integer>) value::get);
}

// Retrieves or creates a new counter for the given metric name
private static Counter getOrCreateCounter(String metricName) {
SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv == null) {
LOG.warning("Spark environment not available, cannot instrument metric: " + metricName);
return null;
}

FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName);
if (counter == null) {
counter = flintMetricSource.metricRegistry().counter(metricName);
}
return counter;
MetricRegistry metricRegistry = getMetricRegistry();
return metricRegistry != null ? metricRegistry.counter(metricName) : null;
}

// Retrieves or creates a new Timer for the given metric name
private static Timer getOrCreateTimer(String metricName) {
MetricRegistry metricRegistry = getMetricRegistry();
return metricRegistry != null ? metricRegistry.timer(metricName) : null;
}

// Retrieves the MetricRegistry from the current Spark environment.
private static MetricRegistry getMetricRegistry() {
SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv == null) {
LOG.warning("Spark environment not available, cannot instrument metric: " + metricName);
LOG.warning("Spark environment not available, cannot access MetricRegistry.");
return null;
}

FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
Timer timer = flintMetricSource.metricRegistry().getTimers().get(metricName);
if (timer == null) {
timer = flintMetricSource.metricRegistry().timer(metricName);
}
return timer;
return flintMetricSource.metricRegistry();
}

// Gets or initializes the FlintMetricSource
Expand Down
Loading
Loading