-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Louis Chu <[email protected]>
- Loading branch information
Showing
18 changed files
with
476 additions
and
90 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core; | ||
|
||
import org.opensearch.action.bulk.BulkRequest; | ||
import org.opensearch.action.bulk.BulkResponse; | ||
import org.opensearch.action.delete.DeleteRequest; | ||
import org.opensearch.action.delete.DeleteResponse; | ||
import org.opensearch.action.get.GetRequest; | ||
import org.opensearch.action.get.GetResponse; | ||
import org.opensearch.action.index.IndexRequest; | ||
import org.opensearch.action.index.IndexResponse; | ||
import org.opensearch.action.search.ClearScrollRequest; | ||
import org.opensearch.action.search.ClearScrollResponse; | ||
import org.opensearch.action.search.SearchRequest; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.action.search.SearchScrollRequest; | ||
import org.opensearch.action.update.UpdateRequest; | ||
import org.opensearch.action.DocWriteResponse; | ||
import org.opensearch.client.indices.CreateIndexRequest; | ||
import org.opensearch.client.indices.CreateIndexResponse; | ||
import org.opensearch.client.indices.GetIndexRequest; | ||
import org.opensearch.client.indices.GetIndexResponse; | ||
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; | ||
import org.opensearch.client.RequestOptions; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* Interface for wrapping the OpenSearch High Level REST Client with additional functionality, | ||
* such as metrics tracking. | ||
*/ | ||
public interface IRestHighLevelClient { | ||
|
||
BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException; | ||
|
||
ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException; | ||
|
||
CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException; | ||
|
||
void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException; | ||
|
||
DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException; | ||
|
||
GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException; | ||
|
||
GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException; | ||
|
||
IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException; | ||
|
||
Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException; | ||
|
||
SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException; | ||
|
||
SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException; | ||
|
||
DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException; | ||
} |
165 changes: 165 additions & 0 deletions
165
flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core; | ||
|
||
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; | ||
import org.opensearch.action.bulk.BulkRequest; | ||
import org.opensearch.action.bulk.BulkResponse; | ||
import org.opensearch.action.delete.DeleteRequest; | ||
import org.opensearch.action.delete.DeleteResponse; | ||
import org.opensearch.action.get.GetRequest; | ||
import org.opensearch.action.get.GetResponse; | ||
import org.opensearch.action.index.IndexRequest; | ||
import org.opensearch.action.index.IndexResponse; | ||
import org.opensearch.action.search.ClearScrollRequest; | ||
import org.opensearch.action.search.ClearScrollResponse; | ||
import org.opensearch.action.search.SearchRequest; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.action.search.SearchScrollRequest; | ||
import org.opensearch.action.update.UpdateRequest; | ||
import org.opensearch.OpenSearchException; | ||
import org.opensearch.action.update.UpdateResponse; | ||
import org.opensearch.client.RequestOptions; | ||
import org.opensearch.client.RestHighLevelClient; | ||
import org.opensearch.client.indices.CreateIndexRequest; | ||
import org.opensearch.client.indices.CreateIndexResponse; | ||
import org.opensearch.client.indices.GetIndexRequest; | ||
import org.opensearch.client.indices.GetIndexResponse; | ||
import org.opensearch.flint.core.metrics.MetricsUtil; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
|
||
import static org.opensearch.flint.core.metrics.MetricConstants.*; | ||
|
||
/** | ||
* A wrapper class for RestHighLevelClient to facilitate OpenSearch operations | ||
* with integrated metrics tracking. | ||
*/ | ||
public class RestHighLevelClientWrapper implements IRestHighLevelClient, Closeable { | ||
private final RestHighLevelClient client; | ||
|
||
/** | ||
* Constructs a new RestHighLevelClientWrapper. | ||
* | ||
* @param client the RestHighLevelClient instance to wrap | ||
*/ | ||
public RestHighLevelClientWrapper(RestHighLevelClient client) { | ||
this.client = client; | ||
} | ||
|
||
@Override | ||
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException { | ||
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.bulk(bulkRequest, options)); | ||
} | ||
|
||
@Override | ||
public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException { | ||
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.clearScroll(clearScrollRequest, options)); | ||
} | ||
|
||
@Override | ||
public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException { | ||
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options)); | ||
} | ||
|
||
@Override | ||
public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException { | ||
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options)); | ||
} | ||
|
||
@Override | ||
public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException { | ||
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.delete(deleteRequest, options)); | ||
} | ||
|
||
@Override | ||
public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException { | ||
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.get(getRequest, options)); | ||
} | ||
|
||
@Override | ||
public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { | ||
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().get(getIndexRequest, options)); | ||
} | ||
|
||
@Override | ||
public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException { | ||
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.index(indexRequest, options)); | ||
} | ||
|
||
@Override | ||
public Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { | ||
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options)); | ||
} | ||
|
||
@Override | ||
public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException { | ||
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.search(searchRequest, options)); | ||
} | ||
|
||
@Override | ||
public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException { | ||
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.scroll(searchScrollRequest, options)); | ||
} | ||
|
||
@Override | ||
public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException { | ||
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options)); | ||
} | ||
/** | ||
* Executes a given operation, tracks metrics, and handles exceptions. | ||
* | ||
* @param metricNamePrefix the prefix for the metric name | ||
* @param operation the operation to execute | ||
* @param <T> the return type of the operation | ||
* @return the result of the operation | ||
* @throws IOException if an I/O exception occurs | ||
*/ | ||
private <T> T execute(String metricNamePrefix, IOCallable<T> operation) throws IOException { | ||
try { | ||
T result = operation.call(); | ||
MetricsUtil.publishOpenSearchMetric(metricNamePrefix, 200); | ||
return result; | ||
} catch (Exception e) { | ||
OpenSearchException openSearchException = extractOpenSearchException(e); | ||
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500; | ||
MetricsUtil.publishOpenSearchMetric(metricNamePrefix, statusCode); | ||
throw e; | ||
} | ||
} | ||
|
||
/** | ||
* Extracts an OpenSearchException from the given Throwable. | ||
* Checks if the Throwable is an instance of OpenSearchException or caused by one. | ||
* | ||
* @param ex the exception to be checked | ||
* @return the extracted OpenSearchException, or null if not found | ||
*/ | ||
private OpenSearchException extractOpenSearchException(Throwable ex) { | ||
if (ex instanceof OpenSearchException) { | ||
return (OpenSearchException) ex; | ||
} else if (ex.getCause() instanceof OpenSearchException) { | ||
return (OpenSearchException) ex.getCause(); | ||
} | ||
return null; | ||
} | ||
|
||
/** | ||
* Functional interface for operations that can throw IOException. | ||
* | ||
* @param <T> the return type of the operation | ||
*/ | ||
@FunctionalInterface | ||
private interface IOCallable<T> { | ||
T call() throws IOException; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
client.close(); | ||
} | ||
} |
24 changes: 24 additions & 0 deletions
24
flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.metrics; | ||
|
||
/** | ||
* This class defines custom metric constants used for monitoring flint operations. | ||
*/ | ||
public class MetricConstants { | ||
|
||
/** | ||
* The prefix for all read-related metrics in OpenSearch. | ||
* This constant is used as a part of metric names to categorize and identify metrics related to read operations. | ||
*/ | ||
public static final String OS_READ_OP_METRIC_PREFIX = "opensearch.read"; | ||
|
||
/** | ||
* The prefix for all write-related metrics in OpenSearch. | ||
* Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations. | ||
*/ | ||
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write"; | ||
} |
89 changes: 89 additions & 0 deletions
89
flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.metrics; | ||
|
||
import com.codahale.metrics.Counter; | ||
import org.apache.spark.SparkEnv; | ||
import org.apache.spark.metrics.source.FlintMetricSource; | ||
import org.apache.spark.metrics.source.Source; | ||
import org.opensearch.OpenSearchException; | ||
import scala.collection.Seq; | ||
|
||
import java.util.logging.Logger; | ||
|
||
/** | ||
* Utility class for managing metrics in the OpenSearch Flint context. | ||
*/ | ||
public final class MetricsUtil { | ||
|
||
private static final Logger LOG = Logger.getLogger(MetricsUtil.class.getName()); | ||
|
||
// Private constructor to prevent instantiation | ||
private MetricsUtil() { | ||
} | ||
|
||
/** | ||
* Publish an OpenSearch metric based on the status code. | ||
* | ||
* @param metricNamePrefix the prefix for the metric name | ||
* @param statusCode the HTTP status code | ||
*/ | ||
public static void publishOpenSearchMetric(String metricNamePrefix, int statusCode) { | ||
String metricName = constructMetricName(metricNamePrefix, statusCode); | ||
Counter counter = getOrCreateCounter(metricName); | ||
if (counter != null) { | ||
counter.inc(); | ||
} | ||
} | ||
|
||
// Constructs the metric name based on the provided prefix and status code | ||
private static String constructMetricName(String metricNamePrefix, int statusCode) { | ||
String metricSuffix = getMetricSuffixForStatusCode(statusCode); | ||
return metricNamePrefix + "." + metricSuffix; | ||
} | ||
|
||
// Determines the metric suffix based on the HTTP status code | ||
private static String getMetricSuffixForStatusCode(int statusCode) { | ||
if (statusCode == 403) { | ||
return "403.count"; | ||
} else if (statusCode >= 500) { | ||
return "5xx.count"; | ||
} else if (statusCode >= 400) { | ||
return "4xx.count"; | ||
} else if (statusCode >= 200) { | ||
return "2xx.count"; | ||
} | ||
return "unknown.count"; // default for unhandled status codes | ||
} | ||
|
||
// Retrieves or creates a new counter for the given metric name | ||
private static Counter getOrCreateCounter(String metricName) { | ||
SparkEnv sparkEnv = SparkEnv.get(); | ||
if (sparkEnv == null) { | ||
LOG.warning("Spark environment not available, cannot instrument metric: " + metricName); | ||
return null; | ||
} | ||
|
||
FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv); | ||
Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName); | ||
if (counter == null) { | ||
counter = flintMetricSource.metricRegistry().counter(metricName); | ||
} | ||
return counter; | ||
} | ||
|
||
// Gets or initializes the FlintMetricSource | ||
private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) { | ||
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()); | ||
|
||
if (metricSourceSeq == null || metricSourceSeq.isEmpty()) { | ||
FlintMetricSource metricSource = new FlintMetricSource(); | ||
sparkEnv.metricsSystem().registerSource(metricSource); | ||
return metricSource; | ||
} | ||
return (FlintMetricSource) metricSourceSeq.head(); | ||
} | ||
} |
Oops, something went wrong.