diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 1ab8cf073..2fdecadd3 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -70,6 +70,11 @@ public final class MetricConstants { */ public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime"; + /** + * Metric name for counting the number of queries executed per session. + */ + public static final String REPL_QUERY_COUNT_METRIC = "session.query.count"; + /** * Prefix for metrics related to the request metadata read operations. */ diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 7f819415c..869541cc8 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -57,6 +57,7 @@ object FlintREPL extends Logging with FlintJobExecutor { private val sessionRunningCount = new AtomicInteger(0) private val statementRunningCount = new AtomicInteger(0) + private var queryCountMetric = 0 def main(args: Array[String]) { val (queryOption, resultIndexOption) = parseArgs(args) @@ -365,6 +366,7 @@ object FlintREPL extends Logging with FlintJobExecutor { if (threadPool != null) { threadPoolFactory.shutdownThreadPool(threadPool) } + MetricsUtil.addHistoricGauge(MetricConstants.REPL_QUERY_COUNT_METRIC, queryCountMetric) } } @@ -521,6 +523,7 @@ object FlintREPL extends Logging with FlintJobExecutor { flintStatement.running() statementExecutionManager.updateStatement(flintStatement) statementRunningCount.incrementAndGet() + queryCountMetric += 1 val statementTimerContext = getTimerContext( MetricConstants.STATEMENT_PROCESSING_TIME_METRIC)