Skip to content

Commit

Permalink
Update with doc and UT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 22, 2024
1 parent d7e535c commit 0fb985a
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `id_expression`: an expression string that generates an ID column to avoid duplicate data when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'

Note that the index option name is case-sensitive. Here is an example:
Expand All @@ -406,6 +407,7 @@ WITH (
watermark_delay = '1 Second',
output_mode = 'complete',
index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}',
id_expression = 'uuid()',
extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}'
)
```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndex.{generateIdColumn, ID_COLUMN}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.types.StructType

class FlintSparkIndexSuite extends QueryTest with FlintSuite with Matchers {

test("should generate ID column if ID expression is provided") {
val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
val options = new FlintSparkIndexOptions(Map("id_expression" -> "id + 10"))

val resultDf = generateIdColumn(df, options)
checkAnswer(resultDf.select(ID_COLUMN), Seq(Row(11), Row(12)))
}

test("should not generate ID column if ID expression is empty") {
val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
val options = FlintSparkIndexOptions.empty

val resultDf = generateIdColumn(df, options)
resultDf.columns should not contain ID_COLUMN
}

test("should generate ID column for aggregated query") {
val df = spark
.createDataFrame(Seq((1, "Alice"), (2, "Bob"), (3, "Alice")))
.toDF("id", "name")
.groupBy("name")
.count()
val options = FlintSparkIndexOptions.empty

val resultDf = generateIdColumn(df, options)
resultDf.select(ID_COLUMN).distinct().count() shouldBe 2
}

test("should not generate ID column for aggregated query if ID expression is empty") {
val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
val options = FlintSparkIndexOptions.empty

val resultDf = generateIdColumn(df, options)
resultDf.columns should not contain ID_COLUMN
}

test("should not generate ID column if ID expression is not provided") {
val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
val options = FlintSparkIndexOptions.empty

val resultDf = generateIdColumn(df, options)
resultDf.columns should not contain ID_COLUMN
}

test("should generate ID column for aggregated query with multiple columns") {
val schema = StructType.fromDDL("""
boolean_col BOOLEAN,
string_col STRING,
long_col LONG,
int_col INT,
double_col DOUBLE,
float_col FLOAT,
timestamp_col TIMESTAMP,
date_col DATE,
struct_col STRUCT<subfield1: STRING, subfield2: INT>
""")
val data = Seq(
Row(
true,
"Alice",
100L,
10,
10.5,
3.14f,
java.sql.Timestamp.valueOf("2024-01-01 10:00:00"),
java.sql.Date.valueOf("2024-01-01"),
Row("sub1", 1)))

val aggregatedDf = spark
.createDataFrame(sparkContext.parallelize(data), schema)
.groupBy(
"boolean_col",
"string_col",
"long_col",
"int_col",
"double_col",
"float_col",
"timestamp_col",
"date_col",
"struct_col.subfield1",
"struct_col.subfield2")
.count()

val options = FlintSparkIndexOptions.empty
val resultDf = generateIdColumn(aggregatedDf, options)
resultDf.select(ID_COLUMN).distinct().count() shouldBe 1
}
}

0 comments on commit 0fb985a

Please sign in to comment.