diff --git a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java index ce35c34e8..23205fe99 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java @@ -5,6 +5,7 @@ package org.opensearch.flint.core; +import org.opensearch.OpenSearchException; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.delete.DeleteRequest; @@ -26,6 +27,7 @@ import org.opensearch.client.indices.GetIndexResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.RequestOptions; +import org.opensearch.flint.core.metrics.MetricsUtil; import java.io.Closeable; import java.io.IOException; @@ -52,11 +54,62 @@ public interface IRestHighLevelClient extends Closeable { IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException; - Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException; + Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException; SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException; SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException; DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException; + + + /** + * Records the success of an OpenSearch operation by incrementing the corresponding metric counter. + * This method constructs the metric name by appending ".200.count" to the provided metric name prefix. + * The metric name is then used to increment the counter, indicating a successful operation. + * + * @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for success + */ + static void recordOperationSuccess(String metricNamePrefix) { + String successMetricName = metricNamePrefix + ".2xx.count"; + MetricsUtil.incrementCounter(successMetricName); + } + + /** + * Records the failure of an OpenSearch operation by incrementing the corresponding metric counter. + * If the exception is an OpenSearchException with a specific status code (e.g., 403), + * it increments a metric specifically for that status code. + * Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx). + * + * @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure + * @param e the exception encountered during the operation, used to determine the type of failure + */ + static void recordOperationFailure(String metricNamePrefix, Exception e) { + OpenSearchException openSearchException = extractOpenSearchException(e); + int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500; + + if (statusCode == 403) { + String forbiddenErrorMetricName = metricNamePrefix + ".403.count"; + MetricsUtil.incrementCounter(forbiddenErrorMetricName); + } + + String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count"; + MetricsUtil.incrementCounter(failureMetricName); + } + + /** + * Extracts an OpenSearchException from the given Throwable. + * Checks if the Throwable is an instance of OpenSearchException or caused by one. + * + * @param ex the exception to be checked + * @return the extracted OpenSearchException, or null if not found + */ + private static OpenSearchException extractOpenSearchException(Throwable ex) { + if (ex instanceof OpenSearchException) { + return (OpenSearchException) ex; + } else if (ex.getCause() instanceof OpenSearchException) { + return (OpenSearchException) ex.getCause(); + } + return null; + } } diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java index 3556c7e24..b2e8a4a0e 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -91,7 +91,7 @@ public IndexResponse index(IndexRequest indexRequest, RequestOptions options) th } @Override - public Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { + public Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options)); } @@ -122,64 +122,14 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options private T execute(String metricNamePrefix, IOCallable operation) throws IOException { try { T result = operation.call(); - recordOperationSuccess(metricNamePrefix); + IRestHighLevelClient.recordOperationSuccess(metricNamePrefix); return result; } catch (Exception e) { - recordOperationFailure(metricNamePrefix, e); + IRestHighLevelClient.recordOperationFailure(metricNamePrefix, e); throw e; } } - /** - * Records the success of an OpenSearch operation by incrementing the corresponding metric counter. - * This method constructs the metric name by appending ".200.count" to the provided metric name prefix. - * The metric name is then used to increment the counter, indicating a successful operation. - * - * @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for success - */ - private void recordOperationSuccess(String metricNamePrefix) { - String successMetricName = metricNamePrefix + ".2xx.count"; - MetricsUtil.incrementCounter(successMetricName); - } - - /** - * Records the failure of an OpenSearch operation by incrementing the corresponding metric counter. - * If the exception is an OpenSearchException with a specific status code (e.g., 403), - * it increments a metric specifically for that status code. - * Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx). - * - * @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure - * @param e the exception encountered during the operation, used to determine the type of failure - */ - private void recordOperationFailure(String metricNamePrefix, Exception e) { - OpenSearchException openSearchException = extractOpenSearchException(e); - int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500; - - if (statusCode == 403) { - String forbiddenErrorMetricName = metricNamePrefix + ".403.count"; - MetricsUtil.incrementCounter(forbiddenErrorMetricName); - } - - String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count"; - MetricsUtil.incrementCounter(failureMetricName); - } - - /** - * Extracts an OpenSearchException from the given Throwable. - * Checks if the Throwable is an instance of OpenSearchException or caused by one. - * - * @param ex the exception to be checked - * @return the extracted OpenSearchException, or null if not found - */ - private OpenSearchException extractOpenSearchException(Throwable ex) { - if (ex instanceof OpenSearchException) { - return (OpenSearchException) ex; - } else if (ex.getCause() instanceof OpenSearchException) { - return (OpenSearchException) ex.getCause(); - } - return null; - } - /** * Functional interface for operations that can throw IOException. * diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 544a18c28..34fb57d4c 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -46,6 +46,10 @@ public final class MetricConstants { * Metric name for tracking the processing time of sessions. */ public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime"; + public static final String REQUEST_METADATA_READ_METRIC_PREFIX = "request.metadata.read"; + public static final String REQUEST_METADATA_WRITE_METRIC_PREFIX = "request.metadata.write"; + public static final String REQUEST_METADATA_HEARTBEAT_FAILED_METRIC = "request.metadata.heartbeat.failed.count"; + public static final String RESULT_METADATA_WRITE_METRIC_PREFIX = "result.metadata.write"; /** * Metric name for counting the number of statements currently running. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 29ebad206..45aedbaa6 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -99,7 +99,7 @@ public OptimisticTransaction startTransaction(String indexName, String da String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName; try (IRestHighLevelClient client = createClient()) { - if (client.isIndexExists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { + if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { LOG.info("Found metadata log index " + metaLogIndexName); } else { if (forceInit) { @@ -149,7 +149,7 @@ public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); String osIndexName = sanitizeIndexName(indexName); try (IRestHighLevelClient client = createClient()) { - return client.isIndexExists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); + return client.doesIndexExist(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); } catch (IOException e) { throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 9c1502b29..7195ae177 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -175,7 +175,7 @@ private FlintMetadataLogEntry writeLogEntry( private boolean exists() { LOG.info("Checking if Flint index exists " + metaLogIndexName); try (IRestHighLevelClient client = flintClient.createClient()) { - return client.isIndexExists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT); + return client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT); } catch (IOException e) { throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index ffe771b15..5fd05404e 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -12,80 +12,79 @@ import java.util.logging.Level; import java.util.logging.Logger; +import static org.opensearch.flint.core.metrics.MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX; +import static org.opensearch.flint.core.metrics.MetricConstants.REQUEST_METADATA_WRITE_METRIC_PREFIX; + public class OpenSearchUpdater { private static final Logger LOG = Logger.getLogger(OpenSearchUpdater.class.getName()); private final String indexName; - private final FlintClient flintClient; - public OpenSearchUpdater(String indexName, FlintClient flintClient) { this.indexName = indexName; this.flintClient = flintClient; } public void upsert(String id, String doc) { - // we might need to keep the updater for a long time. Reusing the client may not work as the temporary - // credentials may expire. - // also, failure to close the client causes the job to be stuck in the running state as the client resource - // is not released. - try (IRestHighLevelClient client = flintClient.createClient()) { - assertIndexExist(client, indexName); - UpdateRequest - updateRequest = - new UpdateRequest(indexName, id).doc(doc, XContentType.JSON) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .docAsUpsert(true); - client.update(updateRequest, RequestOptions.DEFAULT); - } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to execute update request on index: %s, id: %s", - indexName, - id), e); - } + updateDocument(id, doc, true, -1, -1); } public void update(String id, String doc) { - try (IRestHighLevelClient client = flintClient.createClient()) { - assertIndexExist(client, indexName); - UpdateRequest - updateRequest = - new UpdateRequest(indexName, id).doc(doc, XContentType.JSON) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - client.update(updateRequest, RequestOptions.DEFAULT); - } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to execute update request on index: %s, id: %s", - indexName, - id), e); - } + updateDocument(id, doc, false, -1, -1); } public void updateIf(String id, String doc, long seqNo, long primaryTerm) { + updateDocument(id, doc, false, seqNo, primaryTerm); + } + + private void updateDocument(String id, String doc, boolean upsert, long seqNo, long primaryTerm) { + // we might need to keep the updater for a long time. Reusing the client may not work as the temporary + // credentials may expire. + // also, failure to close the client causes the job to be stuck in the running state as the client resource + // is not released. try (IRestHighLevelClient client = flintClient.createClient()) { assertIndexExist(client, indexName); - UpdateRequest - updateRequest = - new UpdateRequest(indexName, id).doc(doc, XContentType.JSON) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setIfSeqNo(seqNo) - .setIfPrimaryTerm(primaryTerm); - client.update(updateRequest, RequestOptions.DEFAULT); + UpdateRequest updateRequest = new UpdateRequest(indexName, id) + .doc(doc, XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + + if (upsert) { + updateRequest.docAsUpsert(true); + } + if (seqNo >= 0 && primaryTerm >= 0) { + updateRequest.setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm); + } + + try { + client.update(updateRequest, RequestOptions.DEFAULT); + IRestHighLevelClient.recordOperationSuccess(REQUEST_METADATA_WRITE_METRIC_PREFIX); + } catch (Exception e) { + IRestHighLevelClient.recordOperationFailure(REQUEST_METADATA_WRITE_METRIC_PREFIX, e); + } } catch (IOException e) { throw new RuntimeException(String.format( "Failed to execute update request on index: %s, id: %s", - indexName, - id), e); + indexName, id), e); } } private void assertIndexExist(IRestHighLevelClient client, String indexName) throws IOException { - LOG.info("Checking if index exists " + indexName); - if (!client.isIndexExists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) { - String errorMsg = "Index not found " + indexName; + LOG.info("Checking if index exists: " + indexName); + boolean exists; + try { + exists = client.doesIndexExist(new GetIndexRequest(indexName), RequestOptions.DEFAULT); + IRestHighLevelClient.recordOperationSuccess(REQUEST_METADATA_READ_METRIC_PREFIX); + } catch (Exception e) { + IRestHighLevelClient.recordOperationFailure(REQUEST_METADATA_READ_METRIC_PREFIX, e); + throw e; + } + + if (!exists) { + String errorMsg = "Index not found: " + indexName; LOG.log(Level.SEVERE, errorMsg); throw new IllegalStateException(errorMsg); } } } + diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 4aeb0db17..1814a8d8e 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -11,7 +11,7 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException} import scala.concurrent.duration.{Duration, MINUTES} import com.amazonaws.services.s3.model.AmazonS3Exception -import org.opensearch.flint.core.FlintClient +import org.opensearch.flint.core.{FlintClient, IRestHighLevelClient} import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter @@ -19,7 +19,6 @@ import play.api.libs.json.{JsArray, JsBoolean, JsObject, Json, JsString, JsValue import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.sql.FlintJob.{checkAndCreateIndex, createIndex, currentTimeProvider, executeQuery, getFailedData, getFormattedData, isSuperset, logError, logInfo, processQueryException, writeDataFrameToOpensearch} import org.apache.spark.sql.FlintREPL.envinromentProvider import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.DataSource @@ -100,10 +99,19 @@ trait FlintJobExecutor { } private def writeData(resultData: DataFrame, resultIndex: String): Unit = { - resultData.write - .format("flint") - .mode("append") - .save(resultIndex) + try { + resultData.write + .format("flint") + .mode("append") + .save(resultIndex) + IRestHighLevelClient.recordOperationSuccess( + MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX) + } catch { + case e: Exception => + IRestHighLevelClient.recordOperationFailure( + MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX, + e) + } } /** @@ -123,7 +131,7 @@ trait FlintJobExecutor { if (osClient.doesIndexExist(resultIndex)) { writeData(resultData, resultIndex) } else { - createIndex(osClient, resultIndex, resultIndexMapping) + createResultIndex(osClient, resultIndex, resultIndexMapping) writeData(resultData, resultIndex) } } @@ -321,7 +329,7 @@ trait FlintJobExecutor { case e: IllegalStateException if e.getCause != null && e.getCause.getMessage.contains("index_not_found_exception") => - createIndex(osClient, resultIndex, resultIndexMapping) + createResultIndex(osClient, resultIndex, resultIndexMapping) case e: InterruptedException => val error = s"Interrupted by the main thread: ${e.getMessage}" Thread.currentThread().interrupt() // Preserve the interrupt status @@ -334,7 +342,7 @@ trait FlintJobExecutor { } } - def createIndex( + def createResultIndex( osClient: OSClient, resultIndex: String, mapping: String): Either[String, Unit] = { diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index e294a388d..d30669cca 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -970,11 +970,17 @@ object FlintREPL extends Logging with FlintJobExecutor { // Preserve the interrupt status Thread.currentThread().interrupt() logError("HeartBeatUpdater task was interrupted", ie) + incrementCounter( + MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC + ) // Record heartbeat failure metric // maybe due to invalid sequence number or primary term case e: Exception => logWarning( s"""Fail to update the last update time of the flint instance ${sessionId}""", e) + incrementCounter( + MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC + ) // Record heartbeat failure metric } } }, diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index cd784e704..f5e4ec2be 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -9,6 +9,8 @@ import java.io.IOException import java.util.ArrayList import java.util.Locale +import scala.util.{Failure, Success, Try} + import org.opensearch.action.get.{GetRequest, GetResponse} import org.opensearch.action.search.{SearchRequest, SearchResponse} import org.opensearch.client.{RequestOptions, RestHighLevelClient} @@ -18,7 +20,8 @@ import org.opensearch.common.Strings import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.{NamedXContentRegistry, XContentParser, XContentType} import org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS -import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} +import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions, IRestHighLevelClient} +import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.storage.{FlintReader, OpenSearchQueryReader, OpenSearchScrollReader, OpenSearchUpdater} import org.opensearch.index.query.{AbstractQueryBuilder, MatchAllQueryBuilder, QueryBuilder} import org.opensearch.plugins.SearchPlugin @@ -74,8 +77,13 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { try { client.createIndex(request, RequestOptions.DEFAULT) logInfo(s"create $osIndexName successfully") + IRestHighLevelClient.recordOperationSuccess( + MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX) } catch { case e: Exception => + IRestHighLevelClient.recordOperationFailure( + MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX, + e) throw new IllegalStateException(s"Failed to create index $osIndexName", e); } } @@ -109,11 +117,17 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { def getDoc(osIndexName: String, id: String): GetResponse = { using(flintClient.createClient()) { client => - try { - val request = new GetRequest(osIndexName, id) - client.get(request, RequestOptions.DEFAULT) - } catch { - case e: Exception => + val request = new GetRequest(osIndexName, id) + val result = Try(client.get(request, RequestOptions.DEFAULT)) + result match { + case Success(response) => + IRestHighLevelClient.recordOperationSuccess( + MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX) + response + case Failure(e: Exception) => + IRestHighLevelClient.recordOperationFailure( + MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX, + e) throw new IllegalStateException( String.format( Locale.ROOT, @@ -146,7 +160,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { using(flintClient.createClient()) { client => try { val request = new GetIndexRequest(indexName) - client.isIndexExists(request, RequestOptions.DEFAULT) + client.doesIndexExist(request, RequestOptions.DEFAULT) } catch { case e: Exception => throw new IllegalStateException(s"Failed to check if index $indexName exists", e)