diff --git a/.github/workflows/test-and-build-workflow.yml b/.github/workflows/test-and-build-workflow.yml index f8d9bd682..e3b2b20f4 100644 --- a/.github/workflows/test-and-build-workflow.yml +++ b/.github/workflows/test-and-build-workflow.yml @@ -25,5 +25,16 @@ jobs: - name: Style check run: sbt scalafmtCheckAll + - name: Set SBT_OPTS + # Needed to extend the JVM memory size to avoid OutOfMemoryError for HTML test report + run: echo "SBT_OPTS=-Xmx2G" >> $GITHUB_ENV + - name: Integ Test run: sbt integtest/integration + + - name: Upload test report + if: always() # Ensures the artifact is saved even if tests fail + uses: actions/upload-artifact@v3 + with: + name: test-reports + path: target/test-reports # Adjust this path if necessary \ No newline at end of file diff --git a/build.sbt b/build.sbt index 781b4f51f..724d348ae 100644 --- a/build.sbt +++ b/build.sbt @@ -82,7 +82,11 @@ lazy val commonSettings = Seq( compileScalastyle := (Compile / scalastyle).toTask("").value, Compile / compile := ((Compile / compile) dependsOn compileScalastyle).value, testScalastyle := (Test / scalastyle).toTask("").value, + // Enable HTML report and output to separate folder per package + Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-h", s"target/test-reports/${name.value}"), Test / test := ((Test / test) dependsOn testScalastyle).value, + // Needed for HTML report + libraryDependencies += "com.vladsch.flexmark" % "flexmark-all" % "0.64.8" % "test", dependencyOverrides ++= Seq( "com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion, "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index ef4d01652..978950b3c 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -70,6 +70,11 @@ public final class MetricConstants { */ public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime"; + /** + * Metric name for counting the number of queries executed per session. + */ + public static final String REPL_QUERY_COUNT_METRIC = "session.query.count"; + /** * Prefix for metrics related to the request metadata read operations. */ @@ -135,6 +140,17 @@ public final class MetricConstants { */ public static final String QUERY_EXECUTION_TIME_METRIC = "query.execution.processingTime"; + /** + * Metric for query count of each query type (DROP/VACUUM/ALTER/REFRESH/CREATE INDEX) + */ + public static final String QUERY_DROP_COUNT_METRIC = "query.drop.count"; + public static final String QUERY_VACUUM_COUNT_METRIC = "query.vacuum.count"; + public static final String QUERY_ALTER_COUNT_METRIC = "query.alter.count"; + public static final String QUERY_REFRESH_COUNT_METRIC = "query.refresh.count"; + public static final String QUERY_CREATE_INDEX_COUNT_METRIC = "query.createIndex.count"; + public static final String QUERY_CREATE_INDEX_AUTO_REFRESH_COUNT_METRIC = "query.createIndex.autoRefresh.count"; + public static final String QUERY_CREATE_INDEX_MANUAL_REFRESH_COUNT_METRIC = "query.createIndex.manualRefresh.count"; + /** * Metric for tracking the total bytes read from input */ @@ -175,6 +191,16 @@ public final class MetricConstants { */ public static final String INITIAL_CONDITION_CHECK_FAILED_PREFIX = "initialConditionCheck.failed."; + /** + * Metric for tracking the JVM GC time per task + */ + public static final String TASK_JVM_GC_TIME_METRIC = "task.jvmGCTime.count"; + + /** + * Metric for tracking the total JVM GC time for query + */ + public static final String TOTAL_JVM_GC_TIME_METRIC = "query.totalJvmGCTime.count"; + private MetricConstants() { // Private constructor to prevent instantiation } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metrics/ReadWriteBytesSparkListener.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metrics/MetricsSparkListener.scala similarity index 74% rename from flint-core/src/main/scala/org/opensearch/flint/core/metrics/ReadWriteBytesSparkListener.scala rename to flint-core/src/main/scala/org/opensearch/flint/core/metrics/MetricsSparkListener.scala index bfafd3eb3..2ee941260 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metrics/ReadWriteBytesSparkListener.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metrics/MetricsSparkListener.scala @@ -6,17 +6,18 @@ package org.opensearch.flint.core.metrics import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd} import org.apache.spark.sql.SparkSession /** - * Collect and emit bytesRead/Written and recordsRead/Written metrics + * Collect and emit metrics by listening spark events */ -class ReadWriteBytesSparkListener extends SparkListener with Logging { +class MetricsSparkListener extends SparkListener with Logging { var bytesRead: Long = 0 var recordsRead: Long = 0 var bytesWritten: Long = 0 var recordsWritten: Long = 0 + var totalJvmGcTime: Long = 0 override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val inputMetrics = taskEnd.taskMetrics.inputMetrics @@ -31,21 +32,28 @@ class ReadWriteBytesSparkListener extends SparkListener with Logging { recordsRead += inputMetrics.recordsRead bytesWritten += outputMetrics.bytesWritten recordsWritten += outputMetrics.recordsWritten + totalJvmGcTime += taskEnd.taskMetrics.jvmGCTime + + MetricsUtil.addHistoricGauge( + MetricConstants.TASK_JVM_GC_TIME_METRIC, + taskEnd.taskMetrics.jvmGCTime) } def emitMetrics(): Unit = { logInfo(s"Input: totalBytesRead=${bytesRead}, totalRecordsRead=${recordsRead}") logInfo(s"Output: totalBytesWritten=${bytesWritten}, totalRecordsWritten=${recordsWritten}") + logInfo(s"totalJvmGcTime=${totalJvmGcTime}") MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_BYTES_READ, bytesRead) MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_RECORDS_READ, recordsRead) MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_BYTES_WRITTEN, bytesWritten) MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_RECORDS_WRITTEN, recordsWritten) + MetricsUtil.addHistoricGauge(MetricConstants.TOTAL_JVM_GC_TIME_METRIC, totalJvmGcTime) } } -object ReadWriteBytesSparkListener { +object MetricsSparkListener { def withMetrics[T](spark: SparkSession, lambda: () => T): T = { - val listener = new ReadWriteBytesSparkListener() + val listener = new MetricsSparkListener() spark.sparkContext.addSparkListener(listener) val result = lambda() diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index bedeeba54..ba605d3bf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.refresh import java.util.Collections -import org.opensearch.flint.core.metrics.ReadWriteBytesSparkListener +import org.opensearch.flint.core.metrics.MetricsSparkListener import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper} import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} @@ -68,7 +68,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) // Flint index has specialized logic and capability for incremental refresh case refresh: StreamingRefresh => logInfo("Start refreshing index in streaming style") - val job = ReadWriteBytesSparkListener.withMetrics( + val job = MetricsSparkListener.withMetrics( spark, () => refresh diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/IndexMetricHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/IndexMetricHelper.scala new file mode 100644 index 000000000..45b439ff0 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/IndexMetricHelper.scala @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql + +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} + +trait IndexMetricHelper { + def emitCreateIndexMetric(autoRefresh: Boolean): Unit = { + MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_COUNT_METRIC) + if (autoRefresh) { + MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_AUTO_REFRESH_COUNT_METRIC) + } else { + MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_MANUAL_REFRESH_COUNT_METRIC) + } + } + + def emitRefreshIndexMetric(): Unit = { + MetricsUtil.incrementCounter(MetricConstants.QUERY_REFRESH_COUNT_METRIC) + } + + def emitAlterIndexMetric(): Unit = { + MetricsUtil.incrementCounter(MetricConstants.QUERY_ALTER_COUNT_METRIC) + } + + def emitDropIndexMetric(): Unit = { + MetricsUtil.incrementCounter(MetricConstants.QUERY_DROP_COUNT_METRIC) + } + + def emitVacuumIndexMetric(): Unit = { + MetricsUtil.incrementCounter(MetricConstants.QUERY_VACUUM_COUNT_METRIC) + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 4a8f9018e..35a780020 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -6,9 +6,10 @@ package org.opensearch.flint.spark.sql.covering import org.antlr.v4.runtime.tree.RuleNode +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -20,7 +21,9 @@ import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint covering index statement. */ -trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { +trait FlintSparkCoveringIndexAstBuilder + extends FlintSparkSqlExtensionsVisitor[AnyRef] + with IndexMetricHelper { self: SparkSqlAstBuilder => override def visitCreateCoveringIndexStatement( @@ -49,6 +52,8 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A .options(indexOptions, indexName) .create(ignoreIfExists) + emitCreateIndexMetric(indexOptions.autoRefresh()) + // Trigger auto refresh if enabled and not using external scheduler if (indexOptions .autoRefresh() && !indexBuilder.isExternalSchedulerEnabled()) { @@ -62,6 +67,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitRefreshCoveringIndexStatement( ctx: RefreshCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + MetricsUtil.incrementCounter(MetricConstants.QUERY_REFRESH_COUNT_METRIC) val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.refreshIndex(flintIndexName) Seq.empty @@ -107,6 +113,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitAlterCoveringIndexStatement( ctx: AlterCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitAlterIndexMetric() val indexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) val indexOptions = visitPropertyList(ctx.propertyList()) val index = flint @@ -121,6 +128,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitDropCoveringIndexStatement( ctx: DropCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitDropIndexMetric() val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.deleteIndex(flintIndexName) Seq.empty @@ -130,6 +138,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitVacuumCoveringIndexStatement( ctx: VacuumCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitVacuumIndexMetric() val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.vacuumIndex(flintIndexName) Seq.empty diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 8f3aa9917..9c8d2da0b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -10,7 +10,7 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.mv.FlintSparkMaterializedView -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -22,7 +22,9 @@ import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint materialized view statement. */ -trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { +trait FlintSparkMaterializedViewAstBuilder + extends FlintSparkSqlExtensionsVisitor[AnyRef] + with IndexMetricHelper { self: SparkSqlAstBuilder => override def visitCreateMaterializedViewStatement( @@ -40,6 +42,8 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito val indexOptions = visitPropertyList(ctx.propertyList()) val flintIndexName = getFlintIndexName(flint, ctx.mvName) + emitCreateIndexMetric(indexOptions.autoRefresh()) + mvBuilder .options(indexOptions, flintIndexName) .create(ignoreIfExists) @@ -56,6 +60,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito override def visitRefreshMaterializedViewStatement( ctx: RefreshMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitRefreshIndexMetric() val flintIndexName = getFlintIndexName(flint, ctx.mvName) flint.refreshIndex(flintIndexName) Seq.empty @@ -106,6 +111,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito override def visitAlterMaterializedViewStatement( ctx: AlterMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitAlterIndexMetric() val indexName = getFlintIndexName(flint, ctx.mvName) val indexOptions = visitPropertyList(ctx.propertyList()) val index = flint @@ -120,6 +126,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito override def visitDropMaterializedViewStatement( ctx: DropMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitDropIndexMetric() flint.deleteIndex(getFlintIndexName(flint, ctx.mvName)) Seq.empty } @@ -128,6 +135,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito override def visitVacuumMaterializedViewStatement( ctx: VacuumMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitVacuumIndexMetric() flint.vacuumIndex(getFlintIndexName(flint, ctx.mvName)) Seq.empty } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 67f6bc3d4..9ed06f6b0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -14,7 +14,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET} import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -26,7 +26,9 @@ import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint skipping index statement. */ -trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { +trait FlintSparkSkippingIndexAstBuilder + extends FlintSparkSqlExtensionsVisitor[AnyRef] + with IndexMetricHelper { self: SparkSqlAstBuilder => override def visitCreateSkippingIndexStatement( @@ -73,6 +75,8 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A val indexOptions = visitPropertyList(ctx.propertyList()) val indexName = getSkippingIndexName(flint, ctx.tableName) + emitCreateIndexMetric(indexOptions.autoRefresh()) + indexBuilder .options(indexOptions, indexName) .create(ignoreIfExists) @@ -88,6 +92,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitRefreshSkippingIndexStatement( ctx: RefreshSkippingIndexStatementContext): Command = FlintSparkSqlCommand() { flint => + emitRefreshIndexMetric() val indexName = getSkippingIndexName(flint, ctx.tableName) flint.refreshIndex(indexName) Seq.empty @@ -115,6 +120,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitAlterSkippingIndexStatement( ctx: AlterSkippingIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitAlterIndexMetric() val indexName = getSkippingIndexName(flint, ctx.tableName) val indexOptions = visitPropertyList(ctx.propertyList()) val index = flint @@ -142,6 +148,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command = FlintSparkSqlCommand() { flint => + emitDropIndexMetric() val indexName = getSkippingIndexName(flint, ctx.tableName) flint.deleteIndex(indexName) Seq.empty @@ -150,6 +157,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitVacuumSkippingIndexStatement( ctx: VacuumSkippingIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitVacuumIndexMetric() val indexName = getSkippingIndexName(flint, ctx.tableName) flint.vacuumIndex(indexName) Seq.empty diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index ef0e76557..869541cc8 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -17,7 +17,7 @@ import com.codahale.metrics.Timer import org.opensearch.flint.common.model.{FlintStatement, InteractiveSession, SessionStates} import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.logging.CustomLogging -import org.opensearch.flint.core.metrics.{MetricConstants, ReadWriteBytesSparkListener} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsSparkListener} import org.opensearch.flint.core.metrics.MetricsUtil.{getTimerContext, incrementCounter, registerGauge, stopTimer} import org.apache.spark.SparkConf @@ -57,6 +57,7 @@ object FlintREPL extends Logging with FlintJobExecutor { private val sessionRunningCount = new AtomicInteger(0) private val statementRunningCount = new AtomicInteger(0) + private var queryCountMetric = 0 def main(args: Array[String]) { val (queryOption, resultIndexOption) = parseArgs(args) @@ -365,6 +366,7 @@ object FlintREPL extends Logging with FlintJobExecutor { if (threadPool != null) { threadPoolFactory.shutdownThreadPool(threadPool) } + MetricsUtil.addHistoricGauge(MetricConstants.REPL_QUERY_COUNT_METRIC, queryCountMetric) } } @@ -521,11 +523,12 @@ object FlintREPL extends Logging with FlintJobExecutor { flintStatement.running() statementExecutionManager.updateStatement(flintStatement) statementRunningCount.incrementAndGet() + queryCountMetric += 1 val statementTimerContext = getTimerContext( MetricConstants.STATEMENT_PROCESSING_TIME_METRIC) val (dataToWrite, returnedVerificationResult) = - ReadWriteBytesSparkListener.withMetrics( + MetricsSparkListener.withMetrics( spark, () => { processStatementOnVerification( diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index 6cdbdb16d..8582d3037 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -14,7 +14,7 @@ import scala.util.{Failure, Success, Try} import org.opensearch.flint.common.model.FlintStatement import org.opensearch.flint.common.scheduler.model.LangType -import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil, ReadWriteBytesSparkListener} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsSparkListener, MetricsUtil} import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter import org.opensearch.flint.spark.FlintSpark @@ -70,7 +70,7 @@ case class JobOperator( val statementExecutionManager = instantiateStatementExecutionManager(commandContext, resultIndex, osClient) - val readWriteBytesSparkListener = new ReadWriteBytesSparkListener() + val readWriteBytesSparkListener = new MetricsSparkListener() sparkSession.sparkContext.addSparkListener(readWriteBytesSparkListener) val statement =