diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index c4af2779d..4042bd40b 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -18,6 +18,7 @@ statement : skippingIndexStatement | coveringIndexStatement | materializedViewStatement + | indexJobManagementStatement ; skippingIndexStatement @@ -102,6 +103,14 @@ dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; +indexJobManagementStatement + : showIndexJobStatement + ; + +showIndexJobStatement + : SHOW INDEX (JOB|JOBS) + ; + /* * Match all remaining tokens in non-greedy way * so WITH clause won't be captured by this rule. diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 533d851ba..694bc5dfc 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -165,6 +165,8 @@ IF: 'IF'; IN: 'IN'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; +JOB: 'JOB'; +JOBS: 'JOBS'; MATERIALIZED: 'MATERIALIZED'; NOT: 'NOT'; ON: 'ON'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index a56d99f14..1fa4b5f88 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql import org.antlr.v4.runtime.tree.{ParseTree, RuleNode} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder +import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder @@ -23,6 +24,7 @@ class FlintSparkSqlAstBuilder with FlintSparkSkippingIndexAstBuilder with FlintSparkCoveringIndexAstBuilder with FlintSparkMaterializedViewAstBuilder + with FlintSparkIndexJobAstBuilder with SparkSqlAstBuilder { override def visit(tree: ParseTree): LogicalPlan = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala new file mode 100644 index 000000000..b05dd4603 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql.job + +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.Serialization +import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowIndexJobStatementContext + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.types.StringType + +/** + * Flint Spark AST builder that builds Spark command for Flint index job management statement. + */ +trait FlintSparkIndexJobAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => + + override def visitShowIndexJobStatement(ctx: ShowIndexJobStatementContext): AnyRef = { + val outputSchema = Seq( + AttributeReference("job name", StringType, nullable = false)(), + AttributeReference("index name", StringType, nullable = false)(), + AttributeReference("source", StringType, nullable = false)(), + AttributeReference("properties", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val indexNamePattern = "flint_*" + flint + .describeIndexes(indexNamePattern) + .collect { + case index: FlintSparkIndex if index.options.autoRefresh() => + val metadata = index.metadata() + Row(index.name(), metadata.name, metadata.source, jsonify(metadata.properties)) + } + } + } + + private def jsonify(map: java.util.Map[String, AnyRef]): String = { + implicit val formats: Formats = Serialization.formats(NoTypeHints) + Serialization.write(map) + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala new file mode 100644 index 000000000..e741c5710 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala @@ -0,0 +1,100 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.io.File + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex + +import org.apache.spark.sql.Row + +class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite { + + private val testTable = "spark_catalog.default.index_job_test" + private val testSkippingIndex = FlintSparkSkippingIndex.getSkippingIndexName(testTable) + + /** Covering index names */ + private val testIndex = "test" + private val testCoveringIndex = FlintSparkCoveringIndex.getFlintIndexName(testIndex, testTable) + + /** Materialized view names and query */ + private val testMv = "spark_catalog.default.mv_test" + private val testMvQuery = s"SELECT name, age FROM $testTable" + private val testMvIndex = FlintSparkMaterializedView.getFlintIndexName(testMv) + + override def beforeAll(): Unit = { + super.beforeAll() + + createTimeSeriesTable(testTable) + } + + protected override def afterEach(): Unit = { + super.afterEach() + + flint.deleteIndex(testSkippingIndex) + flint.deleteIndex(testCoveringIndex) + flint.deleteIndex(testMvIndex) + } + + test("show all index jobs") { + startSkippingIndexJob() + checkAnswer( + sql("SHOW INDEX JOBS"), + Seq(Row(testSkippingIndex, testSkippingIndex, testTable, "{}"))) + + startCoveringIndexJob() + checkAnswer( + sql("SHOW INDEX JOBS"), + Seq( + Row(testSkippingIndex, testSkippingIndex, testTable, "{}"), + Row(testCoveringIndex, testIndex, testTable, "{}"))) + + withTempDir { checkpointDir => + startMaterializedViewIndexJob(checkpointDir) + checkAnswer( + sql("SHOW INDEX JOBS"), + Seq( + Row(testSkippingIndex, testSkippingIndex, testTable, "{}"), + Row(testCoveringIndex, testIndex, testTable, "{}"), + Row(testMvIndex, testMv, testMvQuery, "{}"))) + } + } + + // empty result cause exception. fixed in other PR. + ignore("should return emtpy if no index job") { + checkAnswer(sql("SHOW INDEX JOBS"), Seq.empty) + } + + private def startSkippingIndexJob(): Unit = { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | (name VALUE_SET) + | WITH (auto_refresh = true) + |""".stripMargin) + } + + private def startCoveringIndexJob(): Unit = { + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (time, name) + | WITH (auto_refresh = true) + |""".stripMargin) + } + + private def startMaterializedViewIndexJob(checkpointDir: File): Unit = { + sql(s""" + | CREATE MATERIALIZED VIEW $testMv + | AS + | $testMvQuery + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + } +}