From bd68653f1ac21272093c5ca764f3aed779729376 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 20 Sep 2023 10:04:47 -0700 Subject: [PATCH] Add create, drop and refresh covering index SQL support (#32) * Separate out skipping index ast builder Signed-off-by: Chen Dai * Add create index statement and IT Signed-off-by: Chen Dai * Refactor Flint sql IT Signed-off-by: Chen Dai * Refactor Flint AST builder for mix-in Signed-off-by: Chen Dai * Extract util methods from each AST builder Signed-off-by: Chen Dai * Add refresh and drop statement support Signed-off-by: Chen Dai * Update doc Signed-off-by: Chen Dai * Fix full table name in AST builder Signed-off-by: Chen Dai --------- Signed-off-by: Chen Dai --- docs/index.md | 49 +++++++- .../main/antlr4/FlintSparkSqlExtensions.g4 | 21 ++++ .../src/main/antlr4/SparkSqlBase.g4 | 8 ++ .../spark/sql/FlintSparkSqlAstBuilder.scala | 105 +++++------------- .../FlintSparkCoveringIndexAstBuilder.scala | 75 +++++++++++++ .../FlintSparkSkippingIndexAstBuilder.scala | 91 +++++++++++++++ .../FlintSparkCoveringIndexSqlITSuite.scala | 79 +++++++++++++ ...> FlintSparkSkippingIndexSqlITSuite.scala} | 53 ++------- .../flint/spark/FlintSparkSuite.scala | 7 +- 9 files changed, 361 insertions(+), 127 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala rename integ-test/src/test/scala/org/opensearch/flint/spark/{FlintSparkSqlITSuite.scala => FlintSparkSkippingIndexSqlITSuite.scala} (72%) diff --git a/docs/index.md b/docs/index.md index 40ea21892..7698f98ac 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,10 +10,11 @@ A Flint index is ... ### Feature Highlights -- Skipping Index +- Skipping Index: accelerate data scan by maintaining compact aggregate data structure which includes - Partition: skip data scan by maintaining and filtering partitioned column value per file. - MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file. - ValueSet: skip data scan by building a unique value set of the indexed column per file. +- Covering Index: create index for selected columns within the source dataset to improve query performance Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation. @@ -117,7 +118,7 @@ High level API is dependent on query engine implementation. Please see Query Eng ### SQL -DDL statement: +#### Skipping Index ```sql CREATE SKIPPING INDEX @@ -128,7 +129,7 @@ WITH (auto_refresh = (true|false)) REFRESH SKIPPING INDEX ON -DESCRIBE SKIPPING INDEX ON +[DESC|DESCRIBE] SKIPPING INDEX ON DROP SKIPPING INDEX ON @@ -157,6 +158,38 @@ DESCRIBE SKIPPING INDEX ON alb_logs DROP SKIPPING INDEX ON alb_logs ``` +#### Covering Index + +```sql +CREATE INDEX name ON +( column [, ...] ) +WHERE +WITH (auto_refresh = (true|false)) + +REFRESH INDEX name ON + +SHOW [INDEX|INDEXES] ON + +[DESC|DESCRIBE] INDEX name ON + +DROP INDEX name ON +``` + +Example: + +```sql +CREATE INDEX elb_and_requestUri +ON alb_logs ( elb, requestUri ) + +REFRESH INDEX elb_and_requestUri ON alb_logs + +SHOW INDEX ON alb_logs + +DESCRIBE INDEX elb_and_requestUri ON alb_logs + +DROP INDEX elb_and_requestUri ON alb_logs +``` + ## Index Store ### OpenSearch @@ -264,6 +297,7 @@ Here is an example for Flint Spark integration: ```scala val flint = new FlintSpark(spark) +// Skipping index flint.skippingIndex() .onTable("alb_logs") .filterBy("time > 2023-04-01 00:00:00") @@ -273,6 +307,15 @@ flint.skippingIndex() .create() flint.refresh("flint_alb_logs_skipping_index", FULL) + +// Covering index +flint.coveringIndex() + .name("elb_and_requestUri") + .onTable("alb_logs") + .addIndexColumns("elb", "requestUri") + .create() + +flint.refresh("flint_alb_logs_elb_and_requestUri_index") ``` #### Skipping Index Provider SPI diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 0ee976cb7..303084970 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -16,6 +16,7 @@ singleStatement statement : skippingIndexStatement + | coveringIndexStatement ; skippingIndexStatement @@ -43,6 +44,26 @@ dropSkippingIndexStatement : DROP SKIPPING INDEX ON tableName=multipartIdentifier ; +coveringIndexStatement + : createCoveringIndexStatement + | refreshCoveringIndexStatement + | dropCoveringIndexStatement + ; + +createCoveringIndexStatement + : CREATE INDEX indexName=identifier ON tableName=multipartIdentifier + LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN + (WITH LEFT_PAREN propertyList RIGHT_PAREN)? + ; + +refreshCoveringIndexStatement + : REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier + ; + +dropCoveringIndexStatement + : DROP INDEX indexName=identifier ON tableName=multipartIdentifier + ; + indexColTypeList : indexColType (COMMA indexColType)* ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index a777cc59f..17627c190 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -85,6 +85,14 @@ grammar SparkSqlBase; } +multipartIdentifierPropertyList + : multipartIdentifierProperty (COMMA multipartIdentifierProperty)* + ; + +multipartIdentifierProperty + : multipartIdentifier (options=propertyList)? + ; + propertyList : property (COMMA property)* ; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 36af20fbd..5c03c1a26 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -7,84 +7,34 @@ package org.opensearch.flint.spark.sql import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.FlintSpark.RefreshMode -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.PropertyListContext +import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder +import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.Command -import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint index statement. + * This class mix-in all other AST builders and provides util methods. */ -class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] { +class FlintSparkSqlAstBuilder + extends FlintSparkSqlExtensionsBaseVisitor[Command] + with FlintSparkSkippingIndexAstBuilder + with FlintSparkCoveringIndexAstBuilder { - override def visitCreateSkippingIndexStatement( - ctx: CreateSkippingIndexStatementContext): Command = - FlintSparkSqlCommand() { flint => - // Create skipping index - val indexBuilder = flint - .skippingIndex() - .onTable(getFullTableName(flint, ctx.tableName)) - - ctx.indexColTypeList().indexColType().forEach { colTypeCtx => - val colName = colTypeCtx.identifier().getText - val skipType = SkippingKind.withName(colTypeCtx.skipType.getText) - skipType match { - case PARTITION => indexBuilder.addPartitions(colName) - case VALUE_SET => indexBuilder.addValueSet(colName) - case MIN_MAX => indexBuilder.addMinMax(colName) - } - } - indexBuilder.create() - - // Trigger auto refresh if enabled - if (isAutoRefreshEnabled(ctx.propertyList())) { - val indexName = getSkippingIndexName(flint, ctx.tableName) - flint.refreshIndex(indexName, RefreshMode.INCREMENTAL) - } - Seq.empty - } - - override def visitRefreshSkippingIndexStatement( - ctx: RefreshSkippingIndexStatementContext): Command = - FlintSparkSqlCommand() { flint => - val indexName = getSkippingIndexName(flint, ctx.tableName) - flint.refreshIndex(indexName, RefreshMode.FULL) - Seq.empty - } - - override def visitDescribeSkippingIndexStatement( - ctx: DescribeSkippingIndexStatementContext): Command = { - val outputSchema = Seq( - AttributeReference("indexed_col_name", StringType, nullable = false)(), - AttributeReference("data_type", StringType, nullable = false)(), - AttributeReference("skip_type", StringType, nullable = false)()) - - FlintSparkSqlCommand(outputSchema) { flint => - val indexName = getSkippingIndexName(flint, ctx.tableName) - flint - .describeIndex(indexName) - .map { case index: FlintSparkSkippingIndex => - index.indexedColumns.map(strategy => - Row(strategy.columnName, strategy.columnType, strategy.kind.toString)) - } - .getOrElse(Seq.empty) - } - } + override def aggregateResult(aggregate: Command, nextResult: Command): Command = + if (nextResult != null) nextResult else aggregate +} - override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command = - FlintSparkSqlCommand() { flint => - val indexName = getSkippingIndexName(flint, ctx.tableName) - flint.deleteIndex(indexName) - Seq.empty - } +object FlintSparkSqlAstBuilder { - private def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = { + /** + * Check if auto_refresh is true in property list. + * + * @param ctx + * property list + */ + def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = { if (ctx == null) { false } else { @@ -99,10 +49,16 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command } } - private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String = - FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx)) - - private def getFullTableName(flint: FlintSpark, tableNameCtx: RuleNode): String = { + /** + * Get full table name if database not specified. + * + * @param flint + * Flint Spark which has access to Spark Catalog + * @param tableNameCtx + * table name + * @return + */ + def getFullTableName(flint: FlintSpark, tableNameCtx: RuleNode): String = { val tableName = tableNameCtx.getText if (tableName.contains(".")) { tableName @@ -111,7 +67,4 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command s"$db.$tableName" } } - - override def aggregateResult(aggregate: Command, nextResult: Command): Command = - if (nextResult != null) nextResult else aggregate } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala new file mode 100644 index 000000000..c412b6eb6 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql.covering + +import org.antlr.v4.runtime.tree.RuleNode +import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.FlintSpark.RefreshMode +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext} + +import org.apache.spark.sql.catalyst.plans.logical.Command + +/** + * Flint Spark AST builder that builds Spark command for Flint covering index statement. + */ +trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[Command] { + + override def visitCreateCoveringIndexStatement( + ctx: CreateCoveringIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val indexName = ctx.indexName.getText + val tableName = ctx.tableName.getText + val indexBuilder = + flint + .coveringIndex() + .name(indexName) + .onTable(tableName) + + ctx.indexColumns.multipartIdentifierProperty().forEach { indexColCtx => + val colName = indexColCtx.multipartIdentifier().getText + indexBuilder.addIndexColumns(colName) + } + indexBuilder.create() + + // Trigger auto refresh if enabled + if (isAutoRefreshEnabled(ctx.propertyList())) { + val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) + flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL) + } + Seq.empty + } + } + + override def visitRefreshCoveringIndexStatement( + ctx: RefreshCoveringIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) + flint.refreshIndex(flintIndexName, RefreshMode.FULL) + Seq.empty + } + } + + override def visitDropCoveringIndexStatement( + ctx: DropCoveringIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) + flint.deleteIndex(flintIndexName) + Seq.empty + } + } + + private def getFlintIndexName( + flint: FlintSpark, + indexNameCtx: RuleNode, + tableNameCtx: RuleNode): String = { + val indexName = indexNameCtx.getText + val fullTableName = getFullTableName(flint, tableNameCtx) + FlintSparkCoveringIndex.getFlintIndexName(indexName, fullTableName) + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala new file mode 100644 index 000000000..c58972f30 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql.skipping + +import org.antlr.v4.runtime.tree.RuleNode +import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.FlintSpark.RefreshMode +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.Command +import org.apache.spark.sql.types.StringType + +/** + * Flint Spark AST builder that builds Spark command for Flint skipping index statement. + */ +trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[Command] { + + override def visitCreateSkippingIndexStatement( + ctx: CreateSkippingIndexStatementContext): Command = + FlintSparkSqlCommand() { flint => + // Create skipping index + val indexBuilder = flint + .skippingIndex() + .onTable(getFullTableName(flint, ctx.tableName)) + + ctx.indexColTypeList().indexColType().forEach { colTypeCtx => + val colName = colTypeCtx.identifier().getText + val skipType = SkippingKind.withName(colTypeCtx.skipType.getText) + skipType match { + case PARTITION => indexBuilder.addPartitions(colName) + case VALUE_SET => indexBuilder.addValueSet(colName) + case MIN_MAX => indexBuilder.addMinMax(colName) + } + } + indexBuilder.create() + + // Trigger auto refresh if enabled + if (isAutoRefreshEnabled(ctx.propertyList())) { + val indexName = getSkippingIndexName(flint, ctx.tableName) + flint.refreshIndex(indexName, RefreshMode.INCREMENTAL) + } + Seq.empty + } + + override def visitRefreshSkippingIndexStatement( + ctx: RefreshSkippingIndexStatementContext): Command = + FlintSparkSqlCommand() { flint => + val indexName = getSkippingIndexName(flint, ctx.tableName) + flint.refreshIndex(indexName, RefreshMode.FULL) + Seq.empty + } + + override def visitDescribeSkippingIndexStatement( + ctx: DescribeSkippingIndexStatementContext): Command = { + val outputSchema = Seq( + AttributeReference("indexed_col_name", StringType, nullable = false)(), + AttributeReference("data_type", StringType, nullable = false)(), + AttributeReference("skip_type", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val indexName = getSkippingIndexName(flint, ctx.tableName) + flint + .describeIndex(indexName) + .map { case index: FlintSparkSkippingIndex => + index.indexedColumns.map(strategy => + Row(strategy.columnName, strategy.columnType, strategy.kind.toString)) + } + .getOrElse(Seq.empty) + } + } + + override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command = + FlintSparkSqlCommand() { flint => + val indexName = getSkippingIndexName(flint, ctx.tableName) + flint.deleteIndex(indexName) + Seq.empty + } + + private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String = + FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx)) +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala new file mode 100644 index 000000000..04b3ed0c8 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import scala.Option.empty + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName +import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { + + /** Test table and index name */ + private val testTable = "default.covering_sql_test" + private val testIndex = "name_and_age" + private val testFlintIndex = getFlintIndexName(testIndex, testTable) + + override def beforeAll(): Unit = { + super.beforeAll() + + createPartitionedTable(testTable) + } + + override def afterEach(): Unit = { + super.afterEach() + + // Delete all test indices + flint.deleteIndex(testFlintIndex) + } + + test("create covering index with auto refresh") { + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WITH (auto_refresh = true) + |""".stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testFlintIndex) + job shouldBe defined + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + + val indexData = flint.queryIndex(testFlintIndex) + indexData.count() shouldBe 2 + } + + test("create covering index with manual refresh") { + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + |""".stripMargin) + + val indexData = flint.queryIndex(testFlintIndex) + + flint.describeIndex(testFlintIndex) shouldBe defined + indexData.count() shouldBe 0 + + sql(s"REFRESH INDEX $testIndex ON $testTable") + indexData.count() shouldBe 2 + } + + test("drop covering index") { + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + + sql(s"DROP INDEX $testIndex ON $testTable") + + flint.describeIndex(testFlintIndex) shouldBe empty + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala similarity index 72% rename from integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlITSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index ffcedd991..e01850b4f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -18,61 +18,22 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} import org.apache.spark.sql.streaming.StreamTest -class FlintSparkSqlITSuite - extends QueryTest - with FlintSuite - with OpenSearchSuite - with StreamTest { - - /** Flint Spark high level API for assertion */ - private lazy val flint: FlintSpark = new FlintSpark(spark) +class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { /** Test table and index name */ - private val testTable = "default.flint_sql_test" + private val testTable = "default.skipping_sql_test" private val testIndex = getSkippingIndexName(testTable) override def beforeAll(): Unit = { super.beforeAll() - // Configure for FlintSpark explicit created above and the one behind Flint SQL - setFlintSparkConf(HOST_ENDPOINT, openSearchHost) - setFlintSparkConf(HOST_PORT, openSearchPort) - setFlintSparkConf(REFRESH_POLICY, true) - - // Create test table - sql(s""" - | CREATE TABLE $testTable - | ( - | name STRING, - | age INT - | ) - | USING CSV - | OPTIONS ( - | header 'false', - | delimiter '\t' - | ) - | PARTITIONED BY ( - | year INT, - | month INT - | ) - |""".stripMargin) - - sql(s""" - | INSERT INTO $testTable - | PARTITION (year=2023, month=4) - | VALUES ('Hello', 30) - | """.stripMargin) + createPartitionedTable(testTable) } protected override def afterEach(): Unit = { super.afterEach() - flint.deleteIndex(testIndex) - // Stop all streaming jobs if any - spark.streams.active.foreach { job => - job.stop() - job.awaitTermination() - } + flint.deleteIndex(testIndex) } test("create skipping index with auto refresh") { @@ -95,7 +56,7 @@ class FlintSparkSqlITSuite val indexData = spark.read.format(FLINT_DATASOURCE).load(testIndex) flint.describeIndex(testIndex) shouldBe defined - indexData.count() shouldBe 1 + indexData.count() shouldBe 2 } test("create skipping index with manual refresh") { @@ -114,7 +75,7 @@ class FlintSparkSqlITSuite indexData.count() shouldBe 0 sql(s"REFRESH SKIPPING INDEX ON $testTable") - indexData.count() shouldBe 1 + indexData.count() shouldBe 2 } test("describe skipping index") { @@ -138,7 +99,7 @@ class FlintSparkSqlITSuite test("create skipping index on table without database name") { sql(s""" - | CREATE SKIPPING INDEX ON flint_sql_test + | CREATE SKIPPING INDEX ON skipping_sql_test | ( | year PARTITION, | name VALUE_SET, diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 3ee6deda1..edbf5935a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -22,11 +22,14 @@ trait FlintSparkSuite with StreamTest { /** Flint Spark high level API being tested */ - lazy protected val flint: FlintSpark = { + lazy protected val flint: FlintSpark = new FlintSpark(spark) + + override def beforeAll(): Unit = { + super.beforeAll() + setFlintSparkConf(HOST_ENDPOINT, openSearchHost) setFlintSparkConf(HOST_PORT, openSearchPort) setFlintSparkConf(REFRESH_POLICY, "true") - new FlintSpark(spark) } protected def createPartitionedTable(testTable: String): Unit = {