From 1a923be2b73d0de29b248f32581f2e5736ecaaa8 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Mon, 29 Jan 2024 02:51:02 -0800 Subject: [PATCH] Add repl metrics Signed-off-by: Louis Chu --- .../flint/core/metrics/MetricConstants.java | 8 +++++ .../flint/core/metrics/MetricsUtil.java | 12 ++++++++ .../apache/spark/sql/FlintJobExecutor.scala | 3 ++ .../org/apache/spark/sql/FlintREPL.scala | 30 +++++++++++++++++++ 4 files changed, 53 insertions(+) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index d34a3705d..7385f484f 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -21,4 +21,12 @@ public class MetricConstants { * Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations. */ public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write"; + + public static final String S3_ERR_CNT_METRIC = "s3.error.count"; + public static final String REPL_RUNNING_METRIC = "session.running.count"; + public static final String REPL_FAILED_METRIC = "session.failed.count"; + public static final String REPL_SUCCESS_METRIC = "session.success.count"; + public static final String STATEMENT_RUNNING_METRIC = "statement.running.count"; + public static final String STATEMENT_FAILED_METRIC = "statement.failed.count"; + public static final String STATEMENT_SUCCESS_METRIC = "statement.success.count"; } \ No newline at end of file diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java index 0edce3e36..37a17115d 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java @@ -38,6 +38,18 @@ public static void incrementCounter(String metricName) { } } + public static void decrementCounter(String metricName) { + Counter counter = getOrCreateCounter(metricName); + if (counter != null) { + if (counter.getCount() > 0) { + counter.dec(); + LOG.info(metricName + ": decreased"); + } else { + LOG.info(metricName + ": already at minimum, cannot decrease"); + } + } + } + // Retrieves or creates a new counter for the given metric name private static Counter getOrCreateCounter(String metricName) { SparkEnv sparkEnv = SparkEnv.get(); diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index a44e70401..48ef09bb3 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -13,6 +13,8 @@ import scala.concurrent.duration.{Duration, MINUTES} import com.amazonaws.services.s3.model.AmazonS3Exception import org.opensearch.flint.core.FlintClient import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.metrics.MetricConstants +import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter import play.api.libs.json.{JsArray, JsBoolean, JsObject, Json, JsString, JsValue} import org.apache.spark.{SparkConf, SparkException} @@ -393,6 +395,7 @@ trait FlintJobExecutor { case r: ParseException => handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId) case r: AmazonS3Exception => + incrementCounter(MetricConstants.S3_ERR_CNT_METRIC) handleQueryException( r, "Fail to read data from S3. Cause", diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 6c3fd957d..7e5d52944 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -18,6 +18,8 @@ import org.opensearch.action.get.GetResponse import org.opensearch.common.Strings import org.opensearch.flint.app.{FlintCommand, FlintInstance} import org.opensearch.flint.app.FlintInstance.formats +import org.opensearch.flint.core.metrics.MetricConstants +import org.opensearch.flint.core.metrics.MetricsUtil.{decrementCounter, incrementCounter} import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater} import org.apache.spark.SparkConf @@ -155,6 +157,7 @@ object FlintREPL extends Logging with FlintJobExecutor { exponentialBackoffRetry(maxRetries = 5, initialDelay = 2.seconds) { queryLoop(commandContext) } + recordSessionSuccess() } catch { case e: Exception => handleSessionError( @@ -364,6 +367,7 @@ object FlintREPL extends Logging with FlintJobExecutor { FlintInstance.serializeWithoutJobId(flintJob, currentTime) } flintSessionIndexUpdater.upsert(sessionId, serializedFlintInstance) + incrementCounter(MetricConstants.REPL_RUNNING_METRIC) logInfo( s"""Updated job: {"jobid": ${flintJob.jobId}, "sessionId": ${flintJob.sessionId}} from $sessionIndex""") } @@ -384,6 +388,9 @@ object FlintREPL extends Logging with FlintJobExecutor { .getOrElse(createFailedFlintInstance(applicationId, jobId, sessionId, jobStartTime, error)) updateFlintInstance(flintInstance, flintSessionIndexUpdater, sessionId) + if (flintInstance.state.equals("fail")) { + recordSessionFailed() + } } private def getExistingFlintInstance( @@ -567,6 +574,7 @@ object FlintREPL extends Logging with FlintJobExecutor { flintCommand.complete() } updateSessionIndex(flintCommand, flintSessionIndexUpdater) + recordCommandStateChange(flintCommand) } catch { // e.g., maybe due to authentication service connection issue // or invalid catalog (e.g., we are operating on data not defined in provided data source) @@ -575,6 +583,7 @@ object FlintREPL extends Logging with FlintJobExecutor { logError(error, e) flintCommand.fail() updateSessionIndex(flintCommand, flintSessionIndexUpdater) + recordCommandStateChange(flintCommand) } } @@ -770,6 +779,7 @@ object FlintREPL extends Logging with FlintJobExecutor { flintCommand.running() logDebug(s"command running: $flintCommand") updateSessionIndex(flintCommand, flintSessionIndexUpdater) + incrementCounter(MetricConstants.STATEMENT_SUCCESS_METRIC) flintCommand } @@ -862,6 +872,7 @@ object FlintREPL extends Logging with FlintJobExecutor { currentTimeProvider.currentEpochMillis()), getResponse.getSeqNo, getResponse.getPrimaryTerm) + recordSessionSuccess() } /** @@ -1023,4 +1034,23 @@ object FlintREPL extends Logging with FlintJobExecutor { result.getOrElse(throw new RuntimeException("Failed after retries")) } + + private def recordSessionSuccess(): Unit = { + decrementCounter(MetricConstants.REPL_RUNNING_METRIC) + incrementCounter(MetricConstants.REPL_SUCCESS_METRIC) + } + + private def recordSessionFailed(): Unit = { + decrementCounter(MetricConstants.REPL_RUNNING_METRIC) + incrementCounter(MetricConstants.REPL_FAILED_METRIC) + } + + private def recordCommandStateChange(flintCommand: FlintCommand): Unit = { + decrementCounter(MetricConstants.STATEMENT_RUNNING_METRIC) + if (flintCommand.isComplete()) { + incrementCounter(MetricConstants.STATEMENT_SUCCESS_METRIC) + } else if (flintCommand.isFailed()) { + incrementCounter(MetricConstants.STATEMENT_FAILED_METRIC) + } + } }