Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add show index job SQL statement #82

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
| indexJobManagementStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -102,6 +103,14 @@ dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

indexJobManagementStatement
: showIndexJobStatement
;

showIndexJobStatement
: SHOW INDEX (JOB|JOBS)
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
Expand Down
2 changes: 2 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
JOB: 'JOB';
JOBS: 'JOBS';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class FlintSparkSkippingIndex(
val schemaJson = generateSchemaJSON(fieldTypes)

metadataBuilder(this)
.name(name())
.name("") // skipping index is unique per table without name
.source(tableName)
.indexedColumns(indexColumnMaps)
.schema(schemaJson)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder
import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder

Expand All @@ -23,6 +24,7 @@ class FlintSparkSqlAstBuilder
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder
with FlintSparkMaterializedViewAstBuilder
with FlintSparkIndexJobAstBuilder
with SparkSqlAstBuilder {

override def visit(tree: ParseTree): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql.job

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowIndexJobStatementContext

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.types.StringType

/**
* Flint Spark AST builder that builds Spark command for Flint index job management statement.
*/
trait FlintSparkIndexJobAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
self: SparkSqlAstBuilder =>

override def visitShowIndexJobStatement(ctx: ShowIndexJobStatementContext): AnyRef = {
val outputSchema = Seq(
AttributeReference("job_name", StringType, nullable = false)(),
AttributeReference("index_type", StringType, nullable = false)(),
AttributeReference("index_name", StringType, nullable = false)(),
AttributeReference("source", StringType, nullable = false)(),
AttributeReference("properties", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val indexNamePattern = "flint_*"
flint
.describeIndexes(indexNamePattern)
.collect {
case index: FlintSparkIndex if index.options.autoRefresh() =>
val metadata = index.metadata()
Row(
index.name(),
metadata.kind,
metadata.name,
metadata.source,
jsonify(metadata.properties))
}
}
}

private def jsonify(map: java.util.Map[String, AnyRef]): String = {
implicit val formats: Formats = Serialization.formats(NoTypeHints)
Serialization.write(map)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {

val metadata = index.metadata()
metadata.kind shouldBe SKIPPING_INDEX_TYPE
metadata.name shouldBe index.name()
metadata.name shouldBe ""
metadata.source shouldBe testTable
metadata.indexedColumns shouldBe Array(
Map(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import java.io.File

import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE

import org.apache.spark.sql.Row

class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite {

private val testTable = "spark_catalog.default.index_job_test"
private val testSkippingIndex = FlintSparkSkippingIndex.getSkippingIndexName(testTable)

/** Covering index names */
private val testIndex = "test"
private val testCoveringIndex = FlintSparkCoveringIndex.getFlintIndexName(testIndex, testTable)

/** Materialized view names and query */
private val testMv = "spark_catalog.default.mv_test"
private val testMvQuery = s"SELECT name, age FROM $testTable"
private val testMvIndex = FlintSparkMaterializedView.getFlintIndexName(testMv)

override def beforeAll(): Unit = {
super.beforeAll()

createTimeSeriesTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()

flint.deleteIndex(testSkippingIndex)
flint.deleteIndex(testCoveringIndex)
flint.deleteIndex(testMvIndex)
}

test("show all index jobs") {
startSkippingIndexJob()
checkAnswer(
sql("SHOW INDEX JOBS"),
Seq(Row(testSkippingIndex, SKIPPING_INDEX_TYPE, "", testTable, "{}")))

startCoveringIndexJob()
checkAnswer(
sql("SHOW INDEX JOBS"),
Seq(
Row(testSkippingIndex, SKIPPING_INDEX_TYPE, "", testTable, "{}"),
Row(testCoveringIndex, COVERING_INDEX_TYPE, testIndex, testTable, "{}")))

withTempDir { checkpointDir =>
startMaterializedViewIndexJob(checkpointDir)
checkAnswer(
sql("SHOW INDEX JOBS"),
Seq(
Row(testSkippingIndex, SKIPPING_INDEX_TYPE, "", testTable, "{}"),
Row(testCoveringIndex, COVERING_INDEX_TYPE, testIndex, testTable, "{}"),
Row(testMvIndex, MV_INDEX_TYPE, testMv, testMvQuery, "{}")))
}
}

// empty result cause exception. fixed in other PR.
ignore("should return emtpy if no index job") {
checkAnswer(sql("SHOW INDEX JOBS"), Seq.empty)
}

private def startSkippingIndexJob(): Unit = {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| (name VALUE_SET)
| WITH (auto_refresh = true)
|""".stripMargin)
}

private def startCoveringIndexJob(): Unit = {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
| (time, name)
| WITH (auto_refresh = true)
|""".stripMargin)
}

private def startMaterializedViewIndexJob(checkpointDir: File): Unit = {
sql(s"""
| CREATE MATERIALIZED VIEW $testMv
| AS
| $testMvQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
index shouldBe defined
index.get.metadata().getContent should matchJson(s"""{
| "_meta": {
| "name": "flint_spark_catalog_default_test_skipping_index",
| "name": "",
| "version": "${current()}",
| "kind": "skipping",
| "indexedColumns": [
Expand Down Expand Up @@ -452,7 +452,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
index.get.metadata().getContent should matchJson(
s"""{
| "_meta": {
| "name": "flint_spark_catalog_default_data_type_table_skipping_index",
| "name": "",
| "version": "${current()}",
| "kind": "skipping",
| "indexedColumns": [
Expand Down
Loading