Skip to content

Commit

Permalink
Add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 1, 2023
1 parent 4bc67a6 commit 1c2f3b2
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 9 deletions.
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `id_expression`: an expression string that generates an ID column to avoid duplicate data when incremental refresh job restart, especially for covering index. If unspecified, an ID column based on source file path and `timestamp` or `@timestamp` column. If not found, no ID column generated and may cause duplicate data when refresh job restart.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'

Note that the index option name is case-sensitive. Here is an example:
Expand All @@ -246,6 +247,7 @@ WITH (
watermark_delay = '1 Second',
output_mode = 'complete',
index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}',
id_expression = 'id_col_name',
extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}'
)
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.{defined, have}
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
Expand All @@ -24,21 +24,25 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY
class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {

/** Test table and index name */
private val testTable = "spark_catalog.default.covering_sql_test"
private val testIndex = "name_and_age"
private val testTable = "spark_catalog.default.covering_sql_test"
private val testFlintIndex = getFlintIndexName(testIndex, testTable)
private val testTimeSeriesTable = "spark_catalog.default.ci_time_test"
private val testFlintTimeSeriesIndex = getFlintIndexName(testIndex, testTimeSeriesTable)

override def beforeAll(): Unit = {
super.beforeAll()

createPartitionedTable(testTable)
createTimeSeriesTable(testTimeSeriesTable)
}

override def afterEach(): Unit = {
super.afterEach()

// Delete all test indices
flint.deleteIndex(testFlintIndex)
flint.deleteIndex(testFlintTimeSeriesIndex)
}

test("create covering index with auto refresh") {
Expand Down Expand Up @@ -211,7 +215,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
|""".stripMargin)
}

test("create skipping index with quoted index, table and column name") {
test("create covering index with quoted index, table and column name") {
sql(s"""
| CREATE INDEX `$testIndex` ON `spark_catalog`.`default`.`covering_sql_test`
| (`name`, `age`)
Expand All @@ -226,6 +230,72 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("name", "age")
}

test("create covering index on time series time with timestamp column") {
sql(s"""
| CREATE INDEX $testIndex ON $testTimeSeriesTable
| (timestamp, age)
| WITH (auto_refresh = true)
|""".stripMargin)

val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex)
awaitStreamingComplete(job.get.id.toString)

val indexData = flint.queryIndex(testFlintTimeSeriesIndex)
indexData.count() shouldBe 5
}

test("create covering index on time series time without indexing timestamp column") {
sql(s"""
| CREATE INDEX $testIndex ON $testTimeSeriesTable
| (name)
| WITH (auto_refresh = true)
|""".stripMargin)

val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex)
awaitStreamingComplete(job.get.id.toString)

val indexData = flint.queryIndex(testFlintTimeSeriesIndex)
indexData.count() shouldBe 5
}

test("create covering index on time series time with @timestamp column") {
val testTimeSeriesTable2 = "spark_catalog.default.ci_time_table2"
val testFlintTimeSeriesIndex2 = getFlintIndexName(testIndex, testTimeSeriesTable2)
withTable(testTimeSeriesTable2) {
sql(s"CREATE TABLE $testTimeSeriesTable2 (`@timestamp` TIMESTAMP, name STRING) USING JSON")
sql(s"INSERT INTO $testTimeSeriesTable2 VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A')")
sql(s"INSERT INTO $testTimeSeriesTable2 VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B')")
sql(s"""
| CREATE INDEX $testIndex ON $testTimeSeriesTable2
| (`@timestamp`, name)
| WITH (auto_refresh = true)
|""".stripMargin)

val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex2)
awaitStreamingComplete(job.get.id.toString)

val indexData = flint.queryIndex(testFlintTimeSeriesIndex2)
indexData.count() shouldBe 2
}
}

test("create covering index on time series time with ID expression") {
sql(s"""
| CREATE INDEX $testIndex ON $testTimeSeriesTable
| (timestamp, age)
| WITH (
| auto_refresh = true,
| id_expression = 'address'
| )
|""".stripMargin)

val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex)
awaitStreamingComplete(job.get.id.toString)

val indexData = flint.queryIndex(testFlintTimeSeriesIndex)
indexData.count() shouldBe 3 // only 3 rows left due to same ID
}

test("show all covering index on the source table") {
flint
.coveringIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable
| GROUP BY TUMBLE(time, '10 Minutes')
| GROUP BY TUMBLE(timestamp, '10 Minutes')
|""".stripMargin

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -134,7 +134,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable
| GROUP BY TUMBLE(time, '1 Hour')
| GROUP BY TUMBLE(timestamp, '1 Hour')
|""".stripMargin

withIncrementalMaterializedView(largeWindowQuery) { indexData =>
Expand All @@ -159,7 +159,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| COUNT(*) AS count
| FROM $testTable
| WHERE address = 'Seattle'
| GROUP BY TUMBLE(time, '5 Minutes')
| GROUP BY TUMBLE(timestamp, '5 Minutes')
|""".stripMargin

withIncrementalMaterializedView(filterQuery) { indexData =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable
| GROUP BY TUMBLE(time, '10 Minutes')
| GROUP BY TUMBLE(timestamp, '10 Minutes')
|""".stripMargin

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -163,7 +163,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
| window.start AS `start.time`,
| COUNT(*) AS `count`
| FROM `spark_catalog`.`default`.`mv_test`
| GROUP BY TUMBLE(`time`, '10 Minutes')""".stripMargin.trim
| GROUP BY TUMBLE(`timestamp`, '10 Minutes')""".stripMargin.trim

sql(s"""
| CREATE MATERIALIZED VIEW `spark_catalog`.`default`.`mv_test_metrics`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"""
| CREATE TABLE $testTable
| (
| time TIMESTAMP,
| timestamp TIMESTAMP,
| name STRING,
| age INT,
| address STRING
Expand Down

0 comments on commit 1c2f3b2

Please sign in to comment.