From 19bc8dfdc0060387be09639895359ad7f95d3dae Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 13 Nov 2024 11:28:40 -0800 Subject: [PATCH] Add query count per session metric (#890) (#900) (cherry picked from commit 33208858ff3c13828b213a7c75898845403e0428) Signed-off-by: Tomoyuki Morita Signed-off-by: Tomoyuki MORITA Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../org/opensearch/flint/core/metrics/MetricConstants.java | 5 +++++ .../src/main/scala/org/apache/spark/sql/FlintREPL.scala | 3 +++ 2 files changed, 8 insertions(+) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 1ab8cf073..2fdecadd3 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -70,6 +70,11 @@ public final class MetricConstants { */ public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime"; + /** + * Metric name for counting the number of queries executed per session. + */ + public static final String REPL_QUERY_COUNT_METRIC = "session.query.count"; + /** * Prefix for metrics related to the request metadata read operations. */ diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 7f819415c..869541cc8 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -57,6 +57,7 @@ object FlintREPL extends Logging with FlintJobExecutor { private val sessionRunningCount = new AtomicInteger(0) private val statementRunningCount = new AtomicInteger(0) + private var queryCountMetric = 0 def main(args: Array[String]) { val (queryOption, resultIndexOption) = parseArgs(args) @@ -365,6 +366,7 @@ object FlintREPL extends Logging with FlintJobExecutor { if (threadPool != null) { threadPoolFactory.shutdownThreadPool(threadPool) } + MetricsUtil.addHistoricGauge(MetricConstants.REPL_QUERY_COUNT_METRIC, queryCountMetric) } } @@ -521,6 +523,7 @@ object FlintREPL extends Logging with FlintJobExecutor { flintStatement.running() statementExecutionManager.updateStatement(flintStatement) statementRunningCount.incrementAndGet() + queryCountMetric += 1 val statementTimerContext = getTimerContext( MetricConstants.STATEMENT_PROCESSING_TIME_METRIC)