diff --git a/docs/index.md b/docs/index.md index 82c147de2..46ce2ddab 100644 --- a/docs/index.md +++ b/docs/index.md @@ -394,6 +394,7 @@ User can provide the following options in `WITH` clause of create statement: + `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. ++ `id_expression`: an expression string that generates an ID column to avoid duplicate data when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated. + `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}' Note that the index option name is case-sensitive. Here is an example: @@ -406,6 +407,7 @@ WITH ( watermark_delay = '1 Second', output_mode = 'complete', index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}', + id_expression = 'uuid()', extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}' ) ``` diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSuite.scala new file mode 100644 index 000000000..1ab0f8e02 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSuite.scala @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.opensearch.flint.spark.FlintSparkIndex.{generateIdColumn, ID_COLUMN} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.FlintSuite +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.types.StructType + +class FlintSparkIndexSuite extends QueryTest with FlintSuite with Matchers { + + test("should generate ID column if ID expression is provided") { + val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name") + val options = new FlintSparkIndexOptions(Map("id_expression" -> "id + 10")) + + val resultDf = generateIdColumn(df, options) + checkAnswer(resultDf.select(ID_COLUMN), Seq(Row(11), Row(12))) + } + + test("should not generate ID column if ID expression is empty") { + val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name") + val options = FlintSparkIndexOptions.empty + + val resultDf = generateIdColumn(df, options) + resultDf.columns should not contain ID_COLUMN + } + + test("should generate ID column for aggregated query") { + val df = spark + .createDataFrame(Seq((1, "Alice"), (2, "Bob"), (3, "Alice"))) + .toDF("id", "name") + .groupBy("name") + .count() + val options = FlintSparkIndexOptions.empty + + val resultDf = generateIdColumn(df, options) + resultDf.select(ID_COLUMN).distinct().count() shouldBe 2 + } + + test("should not generate ID column for aggregated query if ID expression is empty") { + val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name") + val options = FlintSparkIndexOptions.empty + + val resultDf = generateIdColumn(df, options) + resultDf.columns should not contain ID_COLUMN + } + + test("should not generate ID column if ID expression is not provided") { + val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name") + val options = FlintSparkIndexOptions.empty + + val resultDf = generateIdColumn(df, options) + resultDf.columns should not contain ID_COLUMN + } + + test("should generate ID column for aggregated query with multiple columns") { + val schema = StructType.fromDDL(""" + boolean_col BOOLEAN, + string_col STRING, + long_col LONG, + int_col INT, + double_col DOUBLE, + float_col FLOAT, + timestamp_col TIMESTAMP, + date_col DATE, + struct_col STRUCT + """) + val data = Seq( + Row( + true, + "Alice", + 100L, + 10, + 10.5, + 3.14f, + java.sql.Timestamp.valueOf("2024-01-01 10:00:00"), + java.sql.Date.valueOf("2024-01-01"), + Row("sub1", 1))) + + val aggregatedDf = spark + .createDataFrame(sparkContext.parallelize(data), schema) + .groupBy( + "boolean_col", + "string_col", + "long_col", + "int_col", + "double_col", + "float_col", + "timestamp_col", + "date_col", + "struct_col.subfield1", + "struct_col.subfield2") + .count() + + val options = FlintSparkIndexOptions.empty + val resultDf = generateIdColumn(aggregatedDf, options) + resultDf.select(ID_COLUMN).distinct().count() shouldBe 1 + } +}