Skip to content

Commit

Permalink
Add metrics for query types (#891)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored Nov 13, 2024
1 parent 3320885 commit 9d504ea
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ public final class MetricConstants {
*/
public static final String QUERY_EXECUTION_TIME_METRIC = "query.execution.processingTime";

/**
* Metric for query count of each query type (DROP/VACUUM/ALTER/REFRESH/CREATE INDEX)
*/
public static final String QUERY_DROP_COUNT_METRIC = "query.drop.count";
public static final String QUERY_VACUUM_COUNT_METRIC = "query.vacuum.count";
public static final String QUERY_ALTER_COUNT_METRIC = "query.alter.count";
public static final String QUERY_REFRESH_COUNT_METRIC = "query.refresh.count";
public static final String QUERY_CREATE_INDEX_COUNT_METRIC = "query.createIndex.count";
public static final String QUERY_CREATE_INDEX_AUTO_REFRESH_COUNT_METRIC = "query.createIndex.autoRefresh.count";
public static final String QUERY_CREATE_INDEX_MANUAL_REFRESH_COUNT_METRIC = "query.createIndex.manualRefresh.count";

/**
* Metric for tracking the total bytes read from input
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql

import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}

trait IndexMetricHelper {
def emitCreateIndexMetric(autoRefresh: Boolean): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_COUNT_METRIC)
if (autoRefresh) {
MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_AUTO_REFRESH_COUNT_METRIC)
} else {
MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_MANUAL_REFRESH_COUNT_METRIC)
}
}

def emitRefreshIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_REFRESH_COUNT_METRIC)
}

def emitAlterIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_ALTER_COUNT_METRIC)
}

def emitDropIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_DROP_COUNT_METRIC)
}

def emitVacuumIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_VACUUM_COUNT_METRIC)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
package org.opensearch.flint.spark.sql.covering

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

Expand All @@ -20,7 +21,9 @@ import org.apache.spark.sql.types.StringType
/**
* Flint Spark AST builder that builds Spark command for Flint covering index statement.
*/
trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
trait FlintSparkCoveringIndexAstBuilder
extends FlintSparkSqlExtensionsVisitor[AnyRef]
with IndexMetricHelper {
self: SparkSqlAstBuilder =>

override def visitCreateCoveringIndexStatement(
Expand Down Expand Up @@ -49,6 +52,8 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
.options(indexOptions, indexName)
.create(ignoreIfExists)

emitCreateIndexMetric(indexOptions.autoRefresh())

// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && !indexBuilder.isExternalSchedulerEnabled()) {
Expand All @@ -62,6 +67,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitRefreshCoveringIndexStatement(
ctx: RefreshCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
MetricsUtil.incrementCounter(MetricConstants.QUERY_REFRESH_COUNT_METRIC)
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.refreshIndex(flintIndexName)
Seq.empty
Expand Down Expand Up @@ -107,6 +113,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitAlterCoveringIndexStatement(
ctx: AlterCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitAlterIndexMetric()
val indexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
Expand All @@ -121,6 +128,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitDropCoveringIndexStatement(
ctx: DropCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitDropIndexMetric()
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.deleteIndex(flintIndexName)
Seq.empty
Expand All @@ -130,6 +138,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitVacuumCoveringIndexStatement(
ctx: VacuumCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitVacuumIndexMetric()
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.vacuumIndex(flintIndexName)
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

Expand All @@ -22,7 +22,9 @@ import org.apache.spark.sql.types.StringType
/**
* Flint Spark AST builder that builds Spark command for Flint materialized view statement.
*/
trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
trait FlintSparkMaterializedViewAstBuilder
extends FlintSparkSqlExtensionsVisitor[AnyRef]
with IndexMetricHelper {
self: SparkSqlAstBuilder =>

override def visitCreateMaterializedViewStatement(
Expand All @@ -40,6 +42,8 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
val indexOptions = visitPropertyList(ctx.propertyList())
val flintIndexName = getFlintIndexName(flint, ctx.mvName)

emitCreateIndexMetric(indexOptions.autoRefresh())

mvBuilder
.options(indexOptions, flintIndexName)
.create(ignoreIfExists)
Expand All @@ -56,6 +60,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitRefreshMaterializedViewStatement(
ctx: RefreshMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitRefreshIndexMetric()
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName)
Seq.empty
Expand Down Expand Up @@ -106,6 +111,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitAlterMaterializedViewStatement(
ctx: AlterMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitAlterIndexMetric()
val indexName = getFlintIndexName(flint, ctx.mvName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
Expand All @@ -120,6 +126,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitDropIndexMetric()
flint.deleteIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
}
Expand All @@ -128,6 +135,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitVacuumMaterializedViewStatement(
ctx: VacuumMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitVacuumIndexMetric()
flint.vacuumIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

Expand All @@ -26,7 +26,9 @@ import org.apache.spark.sql.types.StringType
/**
* Flint Spark AST builder that builds Spark command for Flint skipping index statement.
*/
trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
trait FlintSparkSkippingIndexAstBuilder
extends FlintSparkSqlExtensionsVisitor[AnyRef]
with IndexMetricHelper {
self: SparkSqlAstBuilder =>

override def visitCreateSkippingIndexStatement(
Expand Down Expand Up @@ -73,6 +75,8 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
val indexOptions = visitPropertyList(ctx.propertyList())
val indexName = getSkippingIndexName(flint, ctx.tableName)

emitCreateIndexMetric(indexOptions.autoRefresh())

indexBuilder
.options(indexOptions, indexName)
.create(ignoreIfExists)
Expand All @@ -88,6 +92,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitRefreshSkippingIndexStatement(
ctx: RefreshSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
emitRefreshIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName)
Seq.empty
Expand Down Expand Up @@ -115,6 +120,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitAlterSkippingIndexStatement(
ctx: AlterSkippingIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitAlterIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
Expand Down Expand Up @@ -142,6 +148,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A

override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
emitDropIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.deleteIndex(indexName)
Seq.empty
Expand All @@ -150,6 +157,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitVacuumSkippingIndexStatement(
ctx: VacuumSkippingIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitVacuumIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.vacuumIndex(indexName)
Seq.empty
Expand Down

0 comments on commit 9d504ea

Please sign in to comment.