diff --git a/build.sbt b/build.sbt index 469d57223..48e4bca5b 100644 --- a/build.sbt +++ b/build.sbt @@ -67,7 +67,8 @@ lazy val flintCore = (project in file("flint-core")) "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", "org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test", "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", - "org.mockito" % "mockito-core" % "2.23.0" % "test", + "org.mockito" % "mockito-core" % "4.6.1" % "test", + "org.mockito" % "mockito-inline" % "4.6.1" % "test", "org.mockito" % "mockito-junit-jupiter" % "3.12.4" % "test", "org.junit.jupiter" % "junit-jupiter-api" % "5.9.0" % "test", "org.junit.jupiter" % "junit-jupiter-engine" % "5.9.0" % "test", diff --git a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java new file mode 100644 index 000000000..ca829f31c --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core; + +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.CreateIndexResponse; +import org.opensearch.client.indices.GetIndexRequest; +import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.client.RequestOptions; + +import java.io.IOException; + +/** + * Interface for wrapping the OpenSearch High Level REST Client with additional functionality, + * such as metrics tracking. + */ +public interface IRestHighLevelClient { + + BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException; + + ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException; + + CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException; + + void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException; + + DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException; + + GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException; + + GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException; + + IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException; + + Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException; + + SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException; + + SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException; + + DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException; +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java new file mode 100644 index 000000000..820bd159f --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -0,0 +1,165 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core; + +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.OpenSearchException; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.CreateIndexResponse; +import org.opensearch.client.indices.GetIndexRequest; +import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.flint.core.metrics.MetricsUtil; + +import java.io.Closeable; +import java.io.IOException; + +import static org.opensearch.flint.core.metrics.MetricConstants.*; + +/** + * A wrapper class for RestHighLevelClient to facilitate OpenSearch operations + * with integrated metrics tracking. + */ +public class RestHighLevelClientWrapper implements IRestHighLevelClient, Closeable { + private final RestHighLevelClient client; + + /** + * Constructs a new RestHighLevelClientWrapper. + * + * @param client the RestHighLevelClient instance to wrap + */ + public RestHighLevelClientWrapper(RestHighLevelClient client) { + this.client = client; + } + + @Override + public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException { + return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.bulk(bulkRequest, options)); + } + + @Override + public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException { + return execute(OS_READ_OP_METRIC_PREFIX, () -> client.clearScroll(clearScrollRequest, options)); + } + + @Override + public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException { + return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options)); + } + + @Override + public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException { + execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options)); + } + + @Override + public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException { + return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.delete(deleteRequest, options)); + } + + @Override + public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException { + return execute(OS_READ_OP_METRIC_PREFIX, () -> client.get(getRequest, options)); + } + + @Override + public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { + return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().get(getIndexRequest, options)); + } + + @Override + public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException { + return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.index(indexRequest, options)); + } + + @Override + public Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { + return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options)); + } + + @Override + public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException { + return execute(OS_READ_OP_METRIC_PREFIX, () -> client.search(searchRequest, options)); + } + + @Override + public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException { + return execute(OS_READ_OP_METRIC_PREFIX, () -> client.scroll(searchScrollRequest, options)); + } + + @Override + public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException { + return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options)); + } + /** + * Executes a given operation, tracks metrics, and handles exceptions. + * + * @param metricNamePrefix the prefix for the metric name + * @param operation the operation to execute + * @param the return type of the operation + * @return the result of the operation + * @throws IOException if an I/O exception occurs + */ + private T execute(String metricNamePrefix, IOCallable operation) throws IOException { + try { + T result = operation.call(); + MetricsUtil.publishOpenSearchMetric(metricNamePrefix, 200); + return result; + } catch (Exception e) { + OpenSearchException openSearchException = extractOpenSearchException(e); + int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500; + MetricsUtil.publishOpenSearchMetric(metricNamePrefix, statusCode); + throw e; + } + } + + /** + * Extracts an OpenSearchException from the given Throwable. + * Checks if the Throwable is an instance of OpenSearchException or caused by one. + * + * @param ex the exception to be checked + * @return the extracted OpenSearchException, or null if not found + */ + private OpenSearchException extractOpenSearchException(Throwable ex) { + if (ex instanceof OpenSearchException) { + return (OpenSearchException) ex; + } else if (ex.getCause() instanceof OpenSearchException) { + return (OpenSearchException) ex.getCause(); + } + return null; + } + + /** + * Functional interface for operations that can throw IOException. + * + * @param the return type of the operation + */ + @FunctionalInterface + private interface IOCallable { + T call() throws IOException; + } + + @Override + public void close() throws IOException { + client.close(); + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java new file mode 100644 index 000000000..d34a3705d --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metrics; + +/** + * This class defines custom metric constants used for monitoring flint operations. + */ +public class MetricConstants { + + /** + * The prefix for all read-related metrics in OpenSearch. + * This constant is used as a part of metric names to categorize and identify metrics related to read operations. + */ + public static final String OS_READ_OP_METRIC_PREFIX = "opensearch.read"; + + /** + * The prefix for all write-related metrics in OpenSearch. + * Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations. + */ + public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write"; +} \ No newline at end of file diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java new file mode 100644 index 000000000..2c63ae3ec --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metrics; + +import com.codahale.metrics.Counter; +import org.apache.spark.SparkEnv; +import org.apache.spark.metrics.source.FlintMetricSource; +import org.apache.spark.metrics.source.Source; +import org.opensearch.OpenSearchException; +import scala.collection.Seq; + +import java.util.logging.Logger; + +/** + * Utility class for managing metrics in the OpenSearch Flint context. + */ +public final class MetricsUtil { + + private static final Logger LOG = Logger.getLogger(MetricsUtil.class.getName()); + + // Private constructor to prevent instantiation + private MetricsUtil() { + } + + /** + * Publish an OpenSearch metric based on the status code. + * + * @param metricNamePrefix the prefix for the metric name + * @param statusCode the HTTP status code + */ + public static void publishOpenSearchMetric(String metricNamePrefix, int statusCode) { + String metricName = constructMetricName(metricNamePrefix, statusCode); + Counter counter = getOrCreateCounter(metricName); + if (counter != null) { + counter.inc(); + } + } + + // Constructs the metric name based on the provided prefix and status code + private static String constructMetricName(String metricNamePrefix, int statusCode) { + String metricSuffix = getMetricSuffixForStatusCode(statusCode); + return metricNamePrefix + "." + metricSuffix; + } + + // Determines the metric suffix based on the HTTP status code + private static String getMetricSuffixForStatusCode(int statusCode) { + if (statusCode == 403) { + return "403.count"; + } else if (statusCode >= 500) { + return "5xx.count"; + } else if (statusCode >= 400) { + return "4xx.count"; + } else if (statusCode >= 200) { + return "2xx.count"; + } + return "unknown.count"; // default for unhandled status codes + } + + // Retrieves or creates a new counter for the given metric name + private static Counter getOrCreateCounter(String metricName) { + SparkEnv sparkEnv = SparkEnv.get(); + if (sparkEnv == null) { + LOG.warning("Spark environment not available, cannot instrument metric: " + metricName); + return null; + } + + FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv); + Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName); + if (counter == null) { + counter = flintMetricSource.metricRegistry().counter(metricName); + } + return counter; + } + + // Gets or initializes the FlintMetricSource + private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) { + Seq metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()); + + if (metricSourceSeq == null || metricSourceSeq.isEmpty()) { + FlintMetricSource metricSource = new FlintMetricSource(); + sparkEnv.metricsSystem().registerSource(metricSource); + return metricSource; + } + return (FlintMetricSource) metricSourceSeq.head(); + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java index 4c4cd1694..3d1876448 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java @@ -48,7 +48,6 @@ import java.util.stream.LongStream; import java.util.stream.Stream; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -366,36 +365,42 @@ private void stageMetricDatum(final boolean metricConfigured, } } - private MetricInfo getMetricInfo(DimensionedName dimensionedName) { - final String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", UNKNOWN); - final String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", UNKNOWN); - final String domainId = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN); - final Dimension jobDimension = new Dimension().withName(DIMENSION_JOB_ID).withValue(jobId); - final Dimension applicationDimension = new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(applicationId); - final Dimension domainIdDimension = new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(domainId); - Dimension instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(UNKNOWN); + public MetricInfo getMetricInfo(DimensionedName dimensionedName) { String metricName = dimensionedName.getName(); String[] parts = metricName.split("\\."); + + Set dimensions = new HashSet<>(); if (doesNameConsistsOfMetricNameSpace(parts)) { metricName = Stream.of(parts).skip(2).collect(Collectors.joining(".")); - //For executors only id is added to the metric name, that's why the numeric check. - //If it is not numeric then the instance is driver. - if (StringUtils.isNumeric(parts[1])) { - instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("executor" + parts[1]); - } - else { - instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(parts[1]); - } + addInstanceRoleDimension(dimensions, parts); } - Set dimensions = new HashSet<>(); - dimensions.add(jobDimension); - dimensions.add(applicationDimension); - dimensions.add(instanceRoleDimension); - dimensions.add(domainIdDimension); + addDefaultDimensionsForSparkJobMetrics(dimensions); dimensions.addAll(dimensionedName.getDimensions()); return new MetricInfo(metricName, dimensions); } + // These dimensions are for all metrics + private static void addDefaultDimensionsForSparkJobMetrics(Set dimensions) { + final String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", UNKNOWN); + final String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", UNKNOWN); + dimensions.add(new Dimension().withName(DIMENSION_JOB_ID).withValue(jobId)); + dimensions.add(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(applicationId)); + } + + private static void addInstanceRoleDimension(Set dimensions, String[] parts) { + Dimension instanceRoleDimension; + if (StringUtils.isNumeric(parts[1])) { + String roleName = "executor"; + // Exclude executor id for flint repl metrics + if (!parts[3].equals("repl")) { + roleName += parts[1]; + } + instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(roleName); + } else { + instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(parts[1]); + } + dimensions.add(instanceRoleDimension); + } // This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137 // Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName. private boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) { diff --git a/flint-core/src/main/scala/apache/spark/metrics/source/FlintMetricSource.scala b/flint-core/src/main/scala/apache/spark/metrics/source/FlintMetricSource.scala new file mode 100644 index 000000000..d5f241572 --- /dev/null +++ b/flint-core/src/main/scala/apache/spark/metrics/source/FlintMetricSource.scala @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.metrics.source + +import com.codahale.metrics.MetricRegistry + +class FlintMetricSource() extends Source { + + // Implementing the Source trait + override val sourceName: String = FlintMetricSource.FLINT_METRIC_SOURCE_NAME + override val metricRegistry: MetricRegistry = new MetricRegistry +} + +object FlintMetricSource { + val FLINT_METRIC_SOURCE_NAME = "Flint" // Default source name +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 6cdf5187d..8741291fd 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -7,7 +7,6 @@ import java.util.List; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.flint.core.metadata.FlintMetadata; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; import org.opensearch.flint.core.storage.FlintReader; @@ -96,8 +95,8 @@ OptimisticTransaction startTransaction(String indexName, String dataSourc */ FlintWriter createWriter(String indexName); /** - * Create {@link RestHighLevelClient}. - * @return {@link RestHighLevelClient} + * Create {@link RestHighLevelClientWrapper}. + * @return {@link RestHighLevelClientWrapper} */ - public RestHighLevelClient createClient(); + public RestHighLevelClientWrapper createClient(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 4e549df2b..334112e06 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -49,6 +49,7 @@ import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; +import org.opensearch.flint.core.RestHighLevelClientWrapper; import org.opensearch.index.query.AbstractQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; @@ -96,8 +97,8 @@ public OptimisticTransaction startTransaction(String indexName, String da LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName); String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName; - try (RestHighLevelClient client = createClient()) { - if (client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { + try (RestHighLevelClientWrapper client = createClient()) { + if (client.isIndexExists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { LOG.info("Found metadata log index " + metaLogIndexName); } else { if (forceInit) { @@ -130,13 +131,13 @@ public void createIndex(String indexName, FlintMetadata metadata) { protected void createIndex(String indexName, String mapping, Option settings) { LOG.info("Creating Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (RestHighLevelClient client = createClient()) { + try (RestHighLevelClientWrapper client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(mapping, XContentType.JSON); if (settings.isDefined()) { request.settings(settings.get(), XContentType.JSON); } - client.indices().create(request, RequestOptions.DEFAULT); + client.createIndex(request, RequestOptions.DEFAULT); } catch (Exception e) { throw new IllegalStateException("Failed to create Flint index " + osIndexName, e); } @@ -146,8 +147,8 @@ protected void createIndex(String indexName, String mapping, Option sett public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (RestHighLevelClient client = createClient()) { - return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); + try (RestHighLevelClientWrapper client = createClient()) { + return client.isIndexExists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); } catch (IOException e) { throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e); } @@ -157,9 +158,9 @@ public boolean exists(String indexName) { public List getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); String osIndexNamePattern = sanitizeIndexName(indexNamePattern); - try (RestHighLevelClient client = createClient()) { + try (RestHighLevelClientWrapper client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); - GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); + GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) .map(index -> FlintMetadata.apply( @@ -175,9 +176,9 @@ public List getAllIndexMetadata(String indexNamePattern) { public FlintMetadata getIndexMetadata(String indexName) { LOG.info("Fetching Flint index metadata for " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (RestHighLevelClient client = createClient()) { + try (RestHighLevelClientWrapper client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexName); - GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); + GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); MappingMetadata mapping = response.getMappings().get(osIndexName); Settings settings = response.getSettings().get(osIndexName); @@ -191,10 +192,9 @@ public FlintMetadata getIndexMetadata(String indexName) { public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (RestHighLevelClient client = createClient()) { + try (RestHighLevelClientWrapper client = createClient()) { DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); - - client.indices().delete(request, RequestOptions.DEFAULT); + client.deleteIndex(request, RequestOptions.DEFAULT); } catch (Exception e) { throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e); } @@ -233,7 +233,7 @@ public FlintWriter createWriter(String indexName) { } @Override - public RestHighLevelClient createClient() { + public RestHighLevelClientWrapper createClient() { RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); @@ -283,7 +283,7 @@ public RestHighLevelClient createClient() { final RequestConfigurator callback = new RequestConfigurator(options); restClientBuilder.setRequestConfigCallback(callback); - return new RestHighLevelClient(restClientBuilder); + return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder)); } /* diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index ab38a5f60..bd301f42c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -21,12 +21,12 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; +import org.opensearch.flint.core.RestHighLevelClientWrapper; /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history @@ -77,7 +77,7 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) { @Override public Optional getLatest() { LOG.info("Fetching latest log entry with id " + latestId); - try (RestHighLevelClient client = flintClient.createClient()) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { GetResponse response = client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); @@ -102,7 +102,7 @@ public Optional getLatest() { @Override public void purge() { LOG.info("Purging log entry with id " + latestId); - try (RestHighLevelClient client = flintClient.createClient()) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { DeleteResponse response = client.delete( new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); @@ -150,8 +150,8 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { private FlintMetadataLogEntry writeLogEntry( FlintMetadataLogEntry logEntry, - CheckedFunction write) { - try (RestHighLevelClient client = flintClient.createClient()) { + CheckedFunction write) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { // Write (create or update) the doc DocWriteResponse response = write.apply(client); @@ -174,8 +174,8 @@ private FlintMetadataLogEntry writeLogEntry( private boolean exists() { LOG.info("Checking if Flint index exists " + metaLogIndexName); - try (RestHighLevelClient client = flintClient.createClient()) { - return client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT); + try (RestHighLevelClientWrapper client = flintClient.createClient()) { + return client.isIndexExists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT); } catch (IOException e) { throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java index 472431bf1..2184c0727 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java @@ -7,7 +7,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.RestHighLevelClient; +import org.opensearch.flint.core.RestHighLevelClientWrapper; import org.opensearch.search.SearchHit; import java.io.IOException; @@ -24,14 +24,14 @@ public abstract class OpenSearchReader implements FlintReader { /** Search request source builder. */ private final SearchRequest searchRequest; - protected final RestHighLevelClient client; + protected final RestHighLevelClientWrapper client; /** * iterator of one-shot search result. */ private Iterator iterator = null; - public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest) { + public OpenSearchReader(RestHighLevelClientWrapper client, SearchRequest searchRequest) { this.client = client; this.searchRequest = searchRequest; } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index d71014c20..f05b0fcd7 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -11,10 +11,10 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.Strings; import org.opensearch.common.unit.TimeValue; import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.RestHighLevelClientWrapper; import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; @@ -35,7 +35,7 @@ public class OpenSearchScrollReader extends OpenSearchReader { private String scrollId = null; - public OpenSearchScrollReader(RestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) { + public OpenSearchScrollReader(RestHighLevelClientWrapper client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) { super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder.size(options.getScrollSize()))); this.options = options; this.scrollDuration = TimeValue.timeValueMinutes(options.getScrollDuration()); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index 4a6424512..6f66ca317 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -3,10 +3,10 @@ import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; +import org.opensearch.flint.core.RestHighLevelClientWrapper; import java.io.IOException; import java.util.logging.Level; @@ -30,7 +30,7 @@ public void upsert(String id, String doc) { // credentials may expire. // also, failure to close the client causes the job to be stuck in the running state as the client resource // is not released. - try (RestHighLevelClient client = flintClient.createClient()) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { assertIndexExist(client, indexName); UpdateRequest updateRequest = @@ -47,7 +47,7 @@ public void upsert(String id, String doc) { } public void update(String id, String doc) { - try (RestHighLevelClient client = flintClient.createClient()) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { assertIndexExist(client, indexName); UpdateRequest updateRequest = @@ -63,7 +63,7 @@ public void update(String id, String doc) { } public void updateIf(String id, String doc, long seqNo, long primaryTerm) { - try (RestHighLevelClient client = flintClient.createClient()) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { assertIndexExist(client, indexName); UpdateRequest updateRequest = @@ -80,9 +80,9 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) { } } - private void assertIndexExist(RestHighLevelClient client, String indexName) throws IOException { + private void assertIndexExist(RestHighLevelClientWrapper client, String indexName) throws IOException { LOG.info("Checking if index exists " + indexName); - if (!client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) { + if (!client.isIndexExists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) { String errorMsg = "Index not found " + indexName; LOG.log(Level.SEVERE, errorMsg); throw new IllegalStateException(errorMsg); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java index 1e55084b2..8db9add63 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java @@ -9,18 +9,12 @@ import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.action.support.WriteRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.core.RestHighLevelClientWrapper; import org.opensearch.rest.RestStatus; -import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.StringWriter; -import java.io.Writer; import java.util.Arrays; /** @@ -35,9 +29,9 @@ public class OpenSearchWriter extends FlintWriter { private StringBuilder sb; - private RestHighLevelClient client; + private RestHighLevelClientWrapper client; - public OpenSearchWriter(RestHighLevelClient client, String indexName, String refreshPolicy) { + public OpenSearchWriter(RestHighLevelClientWrapper client, String indexName, String refreshPolicy) { this.client = client; this.indexName = indexName; this.sb = new StringBuilder(); diff --git a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java index 2a875db2d..aea2a2ecf 100644 --- a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java +++ b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java @@ -51,7 +51,6 @@ import static org.mockito.Mockito.when; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_APPLICATION_ID; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_COUNT; -import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_DOMAIN_ID; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_GAUGE; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_INSTANCE_ROLE; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_JOB_ID; @@ -111,7 +110,7 @@ public void shouldReportWithoutGlobalDimensionsWhenGlobalDimensionsNotConfigured final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); - assertThat(dimensions).hasSize(5); + assertThat(dimensions).hasSize(3); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT)); assertDefaultDimensionsWithUnknownValue(dimensions); } @@ -501,7 +500,6 @@ public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceDriverMetric() assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1")); assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2")); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN)); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN)); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN)); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("driver")); assertThat(metricInfo.getMetricName()).isEqualTo("LiveListenerBus.listenerProcessingTime.org.apache.spark.HeartbeatReceiver"); @@ -520,10 +518,9 @@ public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceExecutorMetric( assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2")); assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1")); assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2")); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue( "executor1")); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN)); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN)); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN)); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue( "executor1")); assertThat(metricInfo.getMetricName()).isEqualTo("NettyBlockTransfer.shuffle-client.usedDirectMemory"); } @@ -531,8 +528,6 @@ public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceExecutorMetric( private void assertDefaultDimensionsWithUnknownValue(List dimensions) { assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN)); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(UNKNOWN)); - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN)); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN)); } @@ -611,5 +606,4 @@ private static void setFinalStaticField(final Class clazz, final String fieldNam modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL); field.set(null, value); } - } diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java new file mode 100644 index 000000000..0561e0ace --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java @@ -0,0 +1,42 @@ +package org.opensearch.flint.core.metrics; + +import org.apache.spark.SparkEnv; +import org.apache.spark.metrics.source.FlintMetricSource; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MetricsUtilTest { + + @Test + public void incOpenSearchMetric() { + try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { + // Mock SparkEnv + SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); + sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); + + // Mock FlintMetricSource + FlintMetricSource flintMetricSource = Mockito.spy(new FlintMetricSource()); + when(sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()).head()) + .thenReturn(flintMetricSource); + + // Test the method + MetricsUtil.publishOpenSearchMetric("testPrefix", 200); + + // Verify interactions + verify(sparkEnv.metricsSystem(), times(0)).registerSource(any()); + verify(flintMetricSource, times(2)).metricRegistry(); + Assertions.assertNotNull( + flintMetricSource.metricRegistry().getCounters().get("testPrefix.2xx.count")); + } + } +} \ No newline at end of file diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 99085185c..6c3fd957d 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -9,7 +9,7 @@ import java.net.ConnectException import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture} import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, TimeoutException} -import scala.concurrent.duration.{Duration, MINUTES, _} +import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal @@ -49,7 +49,7 @@ object FlintREPL extends Logging with FlintJobExecutor { val INITIAL_DELAY_MILLIS = 3000L val EARLY_TERMIANTION_CHECK_FREQUENCY = 60000L - def update(flintCommand: FlintCommand, updater: OpenSearchUpdater): Unit = { + def updateSessionIndex(flintCommand: FlintCommand, updater: OpenSearchUpdater): Unit = { updater.update(flintCommand.statementId, FlintCommand.serialize(flintCommand)) } @@ -363,9 +363,7 @@ object FlintREPL extends Logging with FlintJobExecutor { } else { FlintInstance.serializeWithoutJobId(flintJob, currentTime) } - flintSessionIndexUpdater.upsert(sessionId, serializedFlintInstance) - logInfo( s"""Updated job: {"jobid": ${flintJob.jobId}, "sessionId": ${flintJob.sessionId}} from $sessionIndex""") } @@ -568,7 +566,7 @@ object FlintREPL extends Logging with FlintJobExecutor { // we have set failed state in exception handling flintCommand.complete() } - update(flintCommand, flintSessionIndexUpdater) + updateSessionIndex(flintCommand, flintSessionIndexUpdater) } catch { // e.g., maybe due to authentication service connection issue // or invalid catalog (e.g., we are operating on data not defined in provided data source) @@ -576,7 +574,7 @@ object FlintREPL extends Logging with FlintJobExecutor { val error = s"""Fail to write result of ${flintCommand}, cause: ${e.getMessage}""" logError(error, e) flintCommand.fail() - update(flintCommand, flintSessionIndexUpdater) + updateSessionIndex(flintCommand, flintSessionIndexUpdater) } } @@ -771,7 +769,7 @@ object FlintREPL extends Logging with FlintJobExecutor { logDebug(s"command: $flintCommand") flintCommand.running() logDebug(s"command running: $flintCommand") - update(flintCommand, flintSessionIndexUpdater) + updateSessionIndex(flintCommand, flintSessionIndexUpdater) flintCommand } @@ -829,7 +827,6 @@ object FlintREPL extends Logging with FlintJobExecutor { shutdownHookManager.addShutdownHook(() => { logInfo("Shutting down REPL") - val getResponse = osClient.getDoc(sessionIndex, sessionId) if (!getResponse.isExists()) { return @@ -858,7 +855,6 @@ object FlintREPL extends Logging with FlintJobExecutor { sessionId: String): Unit = { val flintInstance = FlintInstance.deserializeFromMap(source) flintInstance.state = "dead" - flintSessionIndexUpdater.updateIf( sessionId, FlintInstance.serializeWithoutJobId( diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index ceacc7bcd..e2e44bddd 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -41,7 +41,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { using(flintClient.createClient()) { client => val request = new GetIndexRequest(osIndexName) try { - val response = client.indices.get(request, RequestOptions.DEFAULT) + val response = client.getIndex(request, RequestOptions.DEFAULT) response.getMappings.get(osIndexName).source.string } catch { case e: Exception => @@ -71,7 +71,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { request.mapping(mapping, XContentType.JSON) try { - client.indices.create(request, RequestOptions.DEFAULT) + client.createIndex(request, RequestOptions.DEFAULT) logInfo(s"create $osIndexName successfully") } catch { case e: Exception => @@ -145,7 +145,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { using(flintClient.createClient()) { client => try { val request = new GetIndexRequest(indexName) - client.indices().exists(request, RequestOptions.DEFAULT) + client.isIndexExists(request, RequestOptions.DEFAULT) } catch { case e: Exception => throw new IllegalStateException(s"Failed to check if index $indexName exists", e)