diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index c4af2779d..4042bd40b 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -18,6 +18,7 @@ statement : skippingIndexStatement | coveringIndexStatement | materializedViewStatement + | indexJobManagementStatement ; skippingIndexStatement @@ -102,6 +103,14 @@ dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; +indexJobManagementStatement + : showIndexJobStatement + ; + +showIndexJobStatement + : SHOW INDEX (JOB|JOBS) + ; + /* * Match all remaining tokens in non-greedy way * so WITH clause won't be captured by this rule. diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 533d851ba..694bc5dfc 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -165,6 +165,8 @@ IF: 'IF'; IN: 'IN'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; +JOB: 'JOB'; +JOBS: 'JOBS'; MATERIALIZED: 'MATERIALIZED'; NOT: 'NOT'; ON: 'ON'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index eb2075b63..9f895803a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -60,7 +60,7 @@ case class FlintSparkSkippingIndex( val schemaJson = generateSchemaJSON(fieldTypes) metadataBuilder(this) - .name(name()) + .name("") // skipping index is unique per table without name .source(tableName) .indexedColumns(indexColumnMaps) .schema(schemaJson) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index a56d99f14..1fa4b5f88 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql import org.antlr.v4.runtime.tree.{ParseTree, RuleNode} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder +import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder @@ -23,6 +24,7 @@ class FlintSparkSqlAstBuilder with FlintSparkSkippingIndexAstBuilder with FlintSparkCoveringIndexAstBuilder with FlintSparkMaterializedViewAstBuilder + with FlintSparkIndexJobAstBuilder with SparkSqlAstBuilder { override def visit(tree: ParseTree): LogicalPlan = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala new file mode 100644 index 000000000..7492e8edb --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql.job + +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.Serialization +import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowIndexJobStatementContext + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.types.StringType + +/** + * Flint Spark AST builder that builds Spark command for Flint index job management statement. + */ +trait FlintSparkIndexJobAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => + + override def visitShowIndexJobStatement(ctx: ShowIndexJobStatementContext): AnyRef = { + val outputSchema = Seq( + AttributeReference("job_name", StringType, nullable = false)(), + AttributeReference("index_type", StringType, nullable = false)(), + AttributeReference("index_name", StringType, nullable = false)(), + AttributeReference("source", StringType, nullable = false)(), + AttributeReference("properties", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val indexNamePattern = "flint_*" + flint + .describeIndexes(indexNamePattern) + .collect { + case index: FlintSparkIndex if index.options.autoRefresh() => + val metadata = index.metadata() + Row( + index.name(), + metadata.kind, + metadata.name, + metadata.source, + jsonify(metadata.properties)) + } + } + } + + private def jsonify(map: java.util.Map[String, AnyRef]): String = { + implicit val formats: Formats = Serialization.formats(NoTypeHints) + Serialization.write(map) + } +} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index d52c43842..37e9e4395 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -40,7 +40,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { val metadata = index.metadata() metadata.kind shouldBe SKIPPING_INDEX_TYPE - metadata.name shouldBe index.name() + metadata.name shouldBe "" metadata.source shouldBe testTable metadata.indexedColumns shouldBe Array( Map( diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala new file mode 100644 index 000000000..c94f47eff --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.io.File + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE + +import org.apache.spark.sql.Row + +class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite { + + private val testTable = "spark_catalog.default.index_job_test" + private val testSkippingIndex = FlintSparkSkippingIndex.getSkippingIndexName(testTable) + + /** Covering index names */ + private val testIndex = "test" + private val testCoveringIndex = FlintSparkCoveringIndex.getFlintIndexName(testIndex, testTable) + + /** Materialized view names and query */ + private val testMv = "spark_catalog.default.mv_test" + private val testMvQuery = s"SELECT name, age FROM $testTable" + private val testMvIndex = FlintSparkMaterializedView.getFlintIndexName(testMv) + + override def beforeAll(): Unit = { + super.beforeAll() + + createTimeSeriesTable(testTable) + } + + protected override def afterEach(): Unit = { + super.afterEach() + + flint.deleteIndex(testSkippingIndex) + flint.deleteIndex(testCoveringIndex) + flint.deleteIndex(testMvIndex) + } + + test("show all index jobs") { + startSkippingIndexJob() + checkAnswer( + sql("SHOW INDEX JOBS"), + Seq(Row(testSkippingIndex, SKIPPING_INDEX_TYPE, "", testTable, "{}"))) + + startCoveringIndexJob() + checkAnswer( + sql("SHOW INDEX JOBS"), + Seq( + Row(testSkippingIndex, SKIPPING_INDEX_TYPE, "", testTable, "{}"), + Row(testCoveringIndex, COVERING_INDEX_TYPE, testIndex, testTable, "{}"))) + + withTempDir { checkpointDir => + startMaterializedViewIndexJob(checkpointDir) + checkAnswer( + sql("SHOW INDEX JOBS"), + Seq( + Row(testSkippingIndex, SKIPPING_INDEX_TYPE, "", testTable, "{}"), + Row(testCoveringIndex, COVERING_INDEX_TYPE, testIndex, testTable, "{}"), + Row(testMvIndex, MV_INDEX_TYPE, testMv, testMvQuery, "{}"))) + } + } + + // empty result cause exception. fixed in other PR. + ignore("should return emtpy if no index job") { + checkAnswer(sql("SHOW INDEX JOBS"), Seq.empty) + } + + private def startSkippingIndexJob(): Unit = { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | (name VALUE_SET) + | WITH (auto_refresh = true) + |""".stripMargin) + } + + private def startCoveringIndexJob(): Unit = { + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (time, name) + | WITH (auto_refresh = true) + |""".stripMargin) + } + + private def startMaterializedViewIndexJob(checkpointDir: File): Unit = { + sql(s""" + | CREATE MATERIALIZED VIEW $testMv + | AS + | $testMvQuery + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index e3fb467e6..d917e5c6d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -54,7 +54,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { index shouldBe defined index.get.metadata().getContent should matchJson(s"""{ | "_meta": { - | "name": "flint_spark_catalog_default_test_skipping_index", + | "name": "", | "version": "${current()}", | "kind": "skipping", | "indexedColumns": [ @@ -452,7 +452,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { index.get.metadata().getContent should matchJson( s"""{ | "_meta": { - | "name": "flint_spark_catalog_default_data_type_table_skipping_index", + | "name": "", | "version": "${current()}", | "kind": "skipping", | "indexedColumns": [