Skip to content

Commit

Permalink
Add show index job statement
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 20, 2023
1 parent f8ec62e commit 01a8f34
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
| indexJobManagementStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -102,6 +103,14 @@ dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

indexJobManagementStatement
: showIndexJobStatement
;

showIndexJobStatement
: SHOW INDEX (JOB|JOBS)
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
Expand Down
2 changes: 2 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
JOB: 'JOB';
JOBS: 'JOBS';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder
import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder

Expand All @@ -23,6 +24,7 @@ class FlintSparkSqlAstBuilder
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder
with FlintSparkMaterializedViewAstBuilder
with FlintSparkIndexJobAstBuilder
with SparkSqlAstBuilder {

override def visit(tree: ParseTree): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql.job

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowIndexJobStatementContext

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.types.StringType

/**
* Flint Spark AST builder that builds Spark command for Flint index job management statement.
*/
trait FlintSparkIndexJobAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
self: SparkSqlAstBuilder =>

override def visitShowIndexJobStatement(ctx: ShowIndexJobStatementContext): AnyRef = {
val outputSchema = Seq(
AttributeReference("job name", StringType, nullable = false)(),
AttributeReference("index name", StringType, nullable = false)(),
AttributeReference("source", StringType, nullable = false)(),
AttributeReference("properties", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val indexNamePattern = "flint_*"
flint
.describeIndexes(indexNamePattern)
.collect {
case index: FlintSparkIndex if index.options.autoRefresh() =>
val metadata = index.metadata()
Row(index.name(), metadata.name, metadata.source, jsonify(metadata.properties))
}
}
}

private def jsonify(map: java.util.Map[String, AnyRef]): String = {
implicit val formats: Formats = Serialization.formats(NoTypeHints)
Serialization.write(map)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import java.io.File

import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex

import org.apache.spark.sql.Row

class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite {

private val testTable = "spark_catalog.default.index_job_test"
private val testSkippingIndex = FlintSparkSkippingIndex.getSkippingIndexName(testTable)

/** Covering index names */
private val testIndex = "test"
private val testCoveringIndex = FlintSparkCoveringIndex.getFlintIndexName(testIndex, testTable)

/** Materialized view names and query */
private val testMv = "spark_catalog.default.mv_test"
private val testMvQuery = s"SELECT name, age FROM $testTable"
private val testMvIndex = FlintSparkMaterializedView.getFlintIndexName(testMv)

override def beforeAll(): Unit = {
super.beforeAll()

createTimeSeriesTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()

flint.deleteIndex(testSkippingIndex)
flint.deleteIndex(testCoveringIndex)
flint.deleteIndex(testMvIndex)
}

test("show all index jobs") {
startSkippingIndexJob()
checkAnswer(
sql("SHOW INDEX JOBS"),
Seq(Row(testSkippingIndex, testSkippingIndex, testTable, "{}")))

startCoveringIndexJob()
checkAnswer(
sql("SHOW INDEX JOBS"),
Seq(
Row(testSkippingIndex, testSkippingIndex, testTable, "{}"),
Row(testCoveringIndex, testIndex, testTable, "{}")))

withTempDir { checkpointDir =>
startMaterializedViewIndexJob(checkpointDir)
checkAnswer(
sql("SHOW INDEX JOBS"),
Seq(
Row(testSkippingIndex, testSkippingIndex, testTable, "{}"),
Row(testCoveringIndex, testIndex, testTable, "{}"),
Row(testMvIndex, testMv, testMvQuery, "{}")))
}
}

// empty result cause exception. fixed in other PR.
ignore("should return emtpy if no index job") {
checkAnswer(sql("SHOW INDEX JOBS"), Seq.empty)
}

private def startSkippingIndexJob(): Unit = {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| (name VALUE_SET)
| WITH (auto_refresh = true)
|""".stripMargin)
}

private def startCoveringIndexJob(): Unit = {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
| (time, name)
| WITH (auto_refresh = true)
|""".stripMargin)
}

private def startMaterializedViewIndexJob(checkpointDir: File): Unit = {
sql(s"""
| CREATE MATERIALIZED VIEW $testMv
| AS
| $testMvQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)
}
}

0 comments on commit 01a8f34

Please sign in to comment.