From c8fb0faff3335d799e564ccfe1e66e676c07a889 Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Wed, 13 Nov 2024 11:29:29 -0800 Subject: [PATCH] Add metrics for query types (#891) Signed-off-by: Tomoyuki Morita --- .../flint/core/metrics/MetricConstants.java | 11 ++++++ .../flint/spark/sql/IndexMetricHelper.scala | 35 +++++++++++++++++++ .../FlintSparkCoveringIndexAstBuilder.scala | 13 +++++-- ...FlintSparkMaterializedViewAstBuilder.scala | 12 +++++-- .../FlintSparkSkippingIndexAstBuilder.scala | 12 +++++-- 5 files changed, 77 insertions(+), 6 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/IndexMetricHelper.scala diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 2fdecadd3..978950b3c 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -140,6 +140,17 @@ public final class MetricConstants { */ public static final String QUERY_EXECUTION_TIME_METRIC = "query.execution.processingTime"; + /** + * Metric for query count of each query type (DROP/VACUUM/ALTER/REFRESH/CREATE INDEX) + */ + public static final String QUERY_DROP_COUNT_METRIC = "query.drop.count"; + public static final String QUERY_VACUUM_COUNT_METRIC = "query.vacuum.count"; + public static final String QUERY_ALTER_COUNT_METRIC = "query.alter.count"; + public static final String QUERY_REFRESH_COUNT_METRIC = "query.refresh.count"; + public static final String QUERY_CREATE_INDEX_COUNT_METRIC = "query.createIndex.count"; + public static final String QUERY_CREATE_INDEX_AUTO_REFRESH_COUNT_METRIC = "query.createIndex.autoRefresh.count"; + public static final String QUERY_CREATE_INDEX_MANUAL_REFRESH_COUNT_METRIC = "query.createIndex.manualRefresh.count"; + /** * Metric for tracking the total bytes read from input */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/IndexMetricHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/IndexMetricHelper.scala new file mode 100644 index 000000000..45b439ff0 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/IndexMetricHelper.scala @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql + +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} + +trait IndexMetricHelper { + def emitCreateIndexMetric(autoRefresh: Boolean): Unit = { + MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_COUNT_METRIC) + if (autoRefresh) { + MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_AUTO_REFRESH_COUNT_METRIC) + } else { + MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_MANUAL_REFRESH_COUNT_METRIC) + } + } + + def emitRefreshIndexMetric(): Unit = { + MetricsUtil.incrementCounter(MetricConstants.QUERY_REFRESH_COUNT_METRIC) + } + + def emitAlterIndexMetric(): Unit = { + MetricsUtil.incrementCounter(MetricConstants.QUERY_ALTER_COUNT_METRIC) + } + + def emitDropIndexMetric(): Unit = { + MetricsUtil.incrementCounter(MetricConstants.QUERY_DROP_COUNT_METRIC) + } + + def emitVacuumIndexMetric(): Unit = { + MetricsUtil.incrementCounter(MetricConstants.QUERY_VACUUM_COUNT_METRIC) + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 4a8f9018e..35a780020 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -6,9 +6,10 @@ package org.opensearch.flint.spark.sql.covering import org.antlr.v4.runtime.tree.RuleNode +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -20,7 +21,9 @@ import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint covering index statement. */ -trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { +trait FlintSparkCoveringIndexAstBuilder + extends FlintSparkSqlExtensionsVisitor[AnyRef] + with IndexMetricHelper { self: SparkSqlAstBuilder => override def visitCreateCoveringIndexStatement( @@ -49,6 +52,8 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A .options(indexOptions, indexName) .create(ignoreIfExists) + emitCreateIndexMetric(indexOptions.autoRefresh()) + // Trigger auto refresh if enabled and not using external scheduler if (indexOptions .autoRefresh() && !indexBuilder.isExternalSchedulerEnabled()) { @@ -62,6 +67,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitRefreshCoveringIndexStatement( ctx: RefreshCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + MetricsUtil.incrementCounter(MetricConstants.QUERY_REFRESH_COUNT_METRIC) val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.refreshIndex(flintIndexName) Seq.empty @@ -107,6 +113,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitAlterCoveringIndexStatement( ctx: AlterCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitAlterIndexMetric() val indexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) val indexOptions = visitPropertyList(ctx.propertyList()) val index = flint @@ -121,6 +128,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitDropCoveringIndexStatement( ctx: DropCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitDropIndexMetric() val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.deleteIndex(flintIndexName) Seq.empty @@ -130,6 +138,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitVacuumCoveringIndexStatement( ctx: VacuumCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitVacuumIndexMetric() val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.vacuumIndex(flintIndexName) Seq.empty diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 8f3aa9917..9c8d2da0b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -10,7 +10,7 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.mv.FlintSparkMaterializedView -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -22,7 +22,9 @@ import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint materialized view statement. */ -trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { +trait FlintSparkMaterializedViewAstBuilder + extends FlintSparkSqlExtensionsVisitor[AnyRef] + with IndexMetricHelper { self: SparkSqlAstBuilder => override def visitCreateMaterializedViewStatement( @@ -40,6 +42,8 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito val indexOptions = visitPropertyList(ctx.propertyList()) val flintIndexName = getFlintIndexName(flint, ctx.mvName) + emitCreateIndexMetric(indexOptions.autoRefresh()) + mvBuilder .options(indexOptions, flintIndexName) .create(ignoreIfExists) @@ -56,6 +60,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito override def visitRefreshMaterializedViewStatement( ctx: RefreshMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitRefreshIndexMetric() val flintIndexName = getFlintIndexName(flint, ctx.mvName) flint.refreshIndex(flintIndexName) Seq.empty @@ -106,6 +111,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito override def visitAlterMaterializedViewStatement( ctx: AlterMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitAlterIndexMetric() val indexName = getFlintIndexName(flint, ctx.mvName) val indexOptions = visitPropertyList(ctx.propertyList()) val index = flint @@ -120,6 +126,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito override def visitDropMaterializedViewStatement( ctx: DropMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitDropIndexMetric() flint.deleteIndex(getFlintIndexName(flint, ctx.mvName)) Seq.empty } @@ -128,6 +135,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito override def visitVacuumMaterializedViewStatement( ctx: VacuumMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitVacuumIndexMetric() flint.vacuumIndex(getFlintIndexName(flint, ctx.mvName)) Seq.empty } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 67f6bc3d4..9ed06f6b0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -14,7 +14,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET} import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -26,7 +26,9 @@ import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint skipping index statement. */ -trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { +trait FlintSparkSkippingIndexAstBuilder + extends FlintSparkSqlExtensionsVisitor[AnyRef] + with IndexMetricHelper { self: SparkSqlAstBuilder => override def visitCreateSkippingIndexStatement( @@ -73,6 +75,8 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A val indexOptions = visitPropertyList(ctx.propertyList()) val indexName = getSkippingIndexName(flint, ctx.tableName) + emitCreateIndexMetric(indexOptions.autoRefresh()) + indexBuilder .options(indexOptions, indexName) .create(ignoreIfExists) @@ -88,6 +92,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitRefreshSkippingIndexStatement( ctx: RefreshSkippingIndexStatementContext): Command = FlintSparkSqlCommand() { flint => + emitRefreshIndexMetric() val indexName = getSkippingIndexName(flint, ctx.tableName) flint.refreshIndex(indexName) Seq.empty @@ -115,6 +120,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitAlterSkippingIndexStatement( ctx: AlterSkippingIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitAlterIndexMetric() val indexName = getSkippingIndexName(flint, ctx.tableName) val indexOptions = visitPropertyList(ctx.propertyList()) val index = flint @@ -142,6 +148,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command = FlintSparkSqlCommand() { flint => + emitDropIndexMetric() val indexName = getSkippingIndexName(flint, ctx.tableName) flint.deleteIndex(indexName) Seq.empty @@ -150,6 +157,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitVacuumSkippingIndexStatement( ctx: VacuumSkippingIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => + emitVacuumIndexMetric() val indexName = getSkippingIndexName(flint, ctx.tableName) flint.vacuumIndex(indexName) Seq.empty