Skip to content

Commit

Permalink
Merge branch 'main' into local-test-instructions
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB committed Nov 14, 2024
2 parents ee47109 + 78f2fbe commit 593386b
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 17 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,16 @@ jobs:
- name: Style check
run: sbt scalafmtCheckAll

- name: Set SBT_OPTS
# Needed to extend the JVM memory size to avoid OutOfMemoryError for HTML test report
run: echo "SBT_OPTS=-Xmx2G" >> $GITHUB_ENV

- name: Integ Test
run: sbt integtest/integration

- name: Upload test report
if: always() # Ensures the artifact is saved even if tests fail
uses: actions/upload-artifact@v3
with:
name: test-reports
path: target/test-reports # Adjust this path if necessary
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ lazy val commonSettings = Seq(
compileScalastyle := (Compile / scalastyle).toTask("").value,
Compile / compile := ((Compile / compile) dependsOn compileScalastyle).value,
testScalastyle := (Test / scalastyle).toTask("").value,
// Enable HTML report and output to separate folder per package
Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-h", s"target/test-reports/${name.value}"),
Test / test := ((Test / test) dependsOn testScalastyle).value,
// Needed for HTML report
libraryDependencies += "com.vladsch.flexmark" % "flexmark-all" % "0.64.8" % "test",
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public final class MetricConstants {
*/
public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime";

/**
* Metric name for counting the number of queries executed per session.
*/
public static final String REPL_QUERY_COUNT_METRIC = "session.query.count";

/**
* Prefix for metrics related to the request metadata read operations.
*/
Expand Down Expand Up @@ -135,6 +140,17 @@ public final class MetricConstants {
*/
public static final String QUERY_EXECUTION_TIME_METRIC = "query.execution.processingTime";

/**
* Metric for query count of each query type (DROP/VACUUM/ALTER/REFRESH/CREATE INDEX)
*/
public static final String QUERY_DROP_COUNT_METRIC = "query.drop.count";
public static final String QUERY_VACUUM_COUNT_METRIC = "query.vacuum.count";
public static final String QUERY_ALTER_COUNT_METRIC = "query.alter.count";
public static final String QUERY_REFRESH_COUNT_METRIC = "query.refresh.count";
public static final String QUERY_CREATE_INDEX_COUNT_METRIC = "query.createIndex.count";
public static final String QUERY_CREATE_INDEX_AUTO_REFRESH_COUNT_METRIC = "query.createIndex.autoRefresh.count";
public static final String QUERY_CREATE_INDEX_MANUAL_REFRESH_COUNT_METRIC = "query.createIndex.manualRefresh.count";

/**
* Metric for tracking the total bytes read from input
*/
Expand Down Expand Up @@ -175,6 +191,16 @@ public final class MetricConstants {
*/
public static final String INITIAL_CONDITION_CHECK_FAILED_PREFIX = "initialConditionCheck.failed.";

/**
* Metric for tracking the JVM GC time per task
*/
public static final String TASK_JVM_GC_TIME_METRIC = "task.jvmGCTime.count";

/**
* Metric for tracking the total JVM GC time for query
*/
public static final String TOTAL_JVM_GC_TIME_METRIC = "query.totalJvmGCTime.count";

private MetricConstants() {
// Private constructor to prevent instantiation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
package org.opensearch.flint.core.metrics

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd}
import org.apache.spark.sql.SparkSession

/**
* Collect and emit bytesRead/Written and recordsRead/Written metrics
* Collect and emit metrics by listening spark events
*/
class ReadWriteBytesSparkListener extends SparkListener with Logging {
class MetricsSparkListener extends SparkListener with Logging {
var bytesRead: Long = 0
var recordsRead: Long = 0
var bytesWritten: Long = 0
var recordsWritten: Long = 0
var totalJvmGcTime: Long = 0

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val inputMetrics = taskEnd.taskMetrics.inputMetrics
Expand All @@ -31,21 +32,28 @@ class ReadWriteBytesSparkListener extends SparkListener with Logging {
recordsRead += inputMetrics.recordsRead
bytesWritten += outputMetrics.bytesWritten
recordsWritten += outputMetrics.recordsWritten
totalJvmGcTime += taskEnd.taskMetrics.jvmGCTime

MetricsUtil.addHistoricGauge(
MetricConstants.TASK_JVM_GC_TIME_METRIC,
taskEnd.taskMetrics.jvmGCTime)
}

def emitMetrics(): Unit = {
logInfo(s"Input: totalBytesRead=${bytesRead}, totalRecordsRead=${recordsRead}")
logInfo(s"Output: totalBytesWritten=${bytesWritten}, totalRecordsWritten=${recordsWritten}")
logInfo(s"totalJvmGcTime=${totalJvmGcTime}")
MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_BYTES_READ, bytesRead)
MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_RECORDS_READ, recordsRead)
MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_BYTES_WRITTEN, bytesWritten)
MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_RECORDS_WRITTEN, recordsWritten)
MetricsUtil.addHistoricGauge(MetricConstants.TOTAL_JVM_GC_TIME_METRIC, totalJvmGcTime)
}
}

object ReadWriteBytesSparkListener {
object MetricsSparkListener {
def withMetrics[T](spark: SparkSession, lambda: () => T): T = {
val listener = new ReadWriteBytesSparkListener()
val listener = new MetricsSparkListener()
spark.sparkContext.addSparkListener(listener)

val result = lambda()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.refresh

import java.util.Collections

import org.opensearch.flint.core.metrics.ReadWriteBytesSparkListener
import org.opensearch.flint.core.metrics.MetricsSparkListener
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}
Expand Down Expand Up @@ -68,7 +68,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
// Flint index has specialized logic and capability for incremental refresh
case refresh: StreamingRefresh =>
logInfo("Start refreshing index in streaming style")
val job = ReadWriteBytesSparkListener.withMetrics(
val job = MetricsSparkListener.withMetrics(
spark,
() =>
refresh
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql

import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}

trait IndexMetricHelper {
def emitCreateIndexMetric(autoRefresh: Boolean): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_COUNT_METRIC)
if (autoRefresh) {
MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_AUTO_REFRESH_COUNT_METRIC)
} else {
MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_MANUAL_REFRESH_COUNT_METRIC)
}
}

def emitRefreshIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_REFRESH_COUNT_METRIC)
}

def emitAlterIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_ALTER_COUNT_METRIC)
}

def emitDropIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_DROP_COUNT_METRIC)
}

def emitVacuumIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_VACUUM_COUNT_METRIC)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
package org.opensearch.flint.spark.sql.covering

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

Expand All @@ -20,7 +21,9 @@ import org.apache.spark.sql.types.StringType
/**
* Flint Spark AST builder that builds Spark command for Flint covering index statement.
*/
trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
trait FlintSparkCoveringIndexAstBuilder
extends FlintSparkSqlExtensionsVisitor[AnyRef]
with IndexMetricHelper {
self: SparkSqlAstBuilder =>

override def visitCreateCoveringIndexStatement(
Expand Down Expand Up @@ -49,6 +52,8 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
.options(indexOptions, indexName)
.create(ignoreIfExists)

emitCreateIndexMetric(indexOptions.autoRefresh())

// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && !indexBuilder.isExternalSchedulerEnabled()) {
Expand All @@ -62,6 +67,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitRefreshCoveringIndexStatement(
ctx: RefreshCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
MetricsUtil.incrementCounter(MetricConstants.QUERY_REFRESH_COUNT_METRIC)
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.refreshIndex(flintIndexName)
Seq.empty
Expand Down Expand Up @@ -107,6 +113,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitAlterCoveringIndexStatement(
ctx: AlterCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitAlterIndexMetric()
val indexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
Expand All @@ -121,6 +128,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitDropCoveringIndexStatement(
ctx: DropCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitDropIndexMetric()
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.deleteIndex(flintIndexName)
Seq.empty
Expand All @@ -130,6 +138,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitVacuumCoveringIndexStatement(
ctx: VacuumCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitVacuumIndexMetric()
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.vacuumIndex(flintIndexName)
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

Expand All @@ -22,7 +22,9 @@ import org.apache.spark.sql.types.StringType
/**
* Flint Spark AST builder that builds Spark command for Flint materialized view statement.
*/
trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
trait FlintSparkMaterializedViewAstBuilder
extends FlintSparkSqlExtensionsVisitor[AnyRef]
with IndexMetricHelper {
self: SparkSqlAstBuilder =>

override def visitCreateMaterializedViewStatement(
Expand All @@ -40,6 +42,8 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
val indexOptions = visitPropertyList(ctx.propertyList())
val flintIndexName = getFlintIndexName(flint, ctx.mvName)

emitCreateIndexMetric(indexOptions.autoRefresh())

mvBuilder
.options(indexOptions, flintIndexName)
.create(ignoreIfExists)
Expand All @@ -56,6 +60,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitRefreshMaterializedViewStatement(
ctx: RefreshMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitRefreshIndexMetric()
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName)
Seq.empty
Expand Down Expand Up @@ -106,6 +111,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitAlterMaterializedViewStatement(
ctx: AlterMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitAlterIndexMetric()
val indexName = getFlintIndexName(flint, ctx.mvName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
Expand All @@ -120,6 +126,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitDropIndexMetric()
flint.deleteIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
}
Expand All @@ -128,6 +135,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitVacuumMaterializedViewStatement(
ctx: VacuumMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitVacuumIndexMetric()
flint.vacuumIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

Expand All @@ -26,7 +26,9 @@ import org.apache.spark.sql.types.StringType
/**
* Flint Spark AST builder that builds Spark command for Flint skipping index statement.
*/
trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
trait FlintSparkSkippingIndexAstBuilder
extends FlintSparkSqlExtensionsVisitor[AnyRef]
with IndexMetricHelper {
self: SparkSqlAstBuilder =>

override def visitCreateSkippingIndexStatement(
Expand Down Expand Up @@ -73,6 +75,8 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
val indexOptions = visitPropertyList(ctx.propertyList())
val indexName = getSkippingIndexName(flint, ctx.tableName)

emitCreateIndexMetric(indexOptions.autoRefresh())

indexBuilder
.options(indexOptions, indexName)
.create(ignoreIfExists)
Expand All @@ -88,6 +92,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitRefreshSkippingIndexStatement(
ctx: RefreshSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
emitRefreshIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName)
Seq.empty
Expand Down Expand Up @@ -115,6 +120,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitAlterSkippingIndexStatement(
ctx: AlterSkippingIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitAlterIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
Expand Down Expand Up @@ -142,6 +148,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A

override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
emitDropIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.deleteIndex(indexName)
Seq.empty
Expand All @@ -150,6 +157,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitVacuumSkippingIndexStatement(
ctx: VacuumSkippingIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitVacuumIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.vacuumIndex(indexName)
Seq.empty
Expand Down
Loading

0 comments on commit 593386b

Please sign in to comment.