From 834eaa82cdb81306017e689584a47e16db7ad111 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Mon, 11 Nov 2024 19:56:06 -0800 Subject: [PATCH] Add query count per session metric Signed-off-by: Tomoyuki Morita --- .../org/opensearch/flint/core/metrics/MetricConstants.java | 5 +++++ .../src/main/scala/org/apache/spark/sql/FlintREPL.scala | 5 ++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index ef4d01652..b2a3af864 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -70,6 +70,11 @@ public final class MetricConstants { */ public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime"; + /** + * Metric name for counting the number of queries executed per session. + */ + public static final String REPL_QUERY_COUNT_METRIC = "session.query.count"; + /** * Prefix for metrics related to the request metadata read operations. */ diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index ef0e76557..2cdb038ed 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -17,7 +17,7 @@ import com.codahale.metrics.Timer import org.opensearch.flint.common.model.{FlintStatement, InteractiveSession, SessionStates} import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.logging.CustomLogging -import org.opensearch.flint.core.metrics.{MetricConstants, ReadWriteBytesSparkListener} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil, ReadWriteBytesSparkListener} import org.opensearch.flint.core.metrics.MetricsUtil.{getTimerContext, incrementCounter, registerGauge, stopTimer} import org.apache.spark.SparkConf @@ -57,6 +57,7 @@ object FlintREPL extends Logging with FlintJobExecutor { private val sessionRunningCount = new AtomicInteger(0) private val statementRunningCount = new AtomicInteger(0) + private var queryCountMetric = 0 def main(args: Array[String]) { val (queryOption, resultIndexOption) = parseArgs(args) @@ -365,6 +366,7 @@ object FlintREPL extends Logging with FlintJobExecutor { if (threadPool != null) { threadPoolFactory.shutdownThreadPool(threadPool) } + MetricsUtil.addHistoricGauge(MetricConstants.REPL_QUERY_COUNT_METRIC, queryCountMetric) } } @@ -521,6 +523,7 @@ object FlintREPL extends Logging with FlintJobExecutor { flintStatement.running() statementExecutionManager.updateStatement(flintStatement) statementRunningCount.incrementAndGet() + queryCountMetric += 1 val statementTimerContext = getTimerContext( MetricConstants.STATEMENT_PROCESSING_TIME_METRIC)