From 84e5eecc6c9a510e5a4b7f5ff1601348714e63f9 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 31 Oct 2023 17:17:40 -0700 Subject: [PATCH 1/8] Add optional ID column Signed-off-by: Chen Dai --- .../covering/FlintSparkCoveringIndex.scala | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 91272309f..e87c2ebda 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -9,11 +9,13 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, ID_COLUMN} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} +import org.apache.spark.internal.Logging import org.apache.spark.sql._ +import org.apache.spark.sql.functions.{col, concat, input_file_name, sha1} /** * Flint covering index in Spark. @@ -31,7 +33,8 @@ case class FlintSparkCoveringIndex( indexedColumns: Map[String, String], filterCondition: Option[String] = None, override val options: FlintSparkIndexOptions = empty) - extends FlintSparkIndex { + extends FlintSparkIndex + with Logging { require(indexedColumns.nonEmpty, "indexed columns must not be empty") @@ -60,13 +63,29 @@ case class FlintSparkCoveringIndex( override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = { val colNames = indexedColumns.keys.toSeq - val job = df.getOrElse(spark.read.table(tableName)) + var job = df.getOrElse(spark.read.table(tableName)) // Add optional filtering condition - filterCondition + job = filterCondition .map(job.where) .getOrElse(job) .select(colNames.head, colNames.tail: _*) + + // Add optional ID column + val uniqueColNames = + spark + .table(tableName) + .columns + .toSet + .intersect(Set("timestamp", "@timestamp")) + + if (uniqueColNames.nonEmpty) { + logInfo(s"Generate ID column based on first column in $uniqueColNames") + job = job.withColumn(ID_COLUMN, sha1(concat(input_file_name(), col(uniqueColNames.head)))) + } else { + logWarning("Cannot generate ID column which may cause duplicate data when restart") + } + job } } From 9feae2ac5a5d25b03728f43799755305736d1dbd Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 31 Oct 2023 17:31:56 -0700 Subject: [PATCH 2/8] Add id expression index option Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexOptions.scala | 11 ++++++++++- .../flint/spark/FlintSparkIndexOptionsSuite.scala | 4 ++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index ffb479b54..2751c8762 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -8,7 +8,7 @@ package org.opensearch.flint.spark import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames /** @@ -70,6 +70,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS) + /** + * An expression that generates unique value as source data row ID. + * + * @return + * ID expression + */ + def idExpression(): Option[String] = getOptionValue(ID_EXPRESSION) + /** * Extra streaming source options that can be simply passed to DataStreamReader or * Relation.options @@ -136,6 +144,7 @@ object FlintSparkIndexOptions { val OUTPUT_MODE: OptionName.Value = Value("output_mode") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") val EXTRA_OPTIONS: OptionName.Value = Value("extra_options") + val ID_EXPRESSION: OptionName.Value = Value("id_expression") } /** diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index b678096ca..1f37c8547 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -19,6 +19,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { WATERMARK_DELAY.toString shouldBe "watermark_delay" OUTPUT_MODE.toString shouldBe "output_mode" INDEX_SETTINGS.toString shouldBe "index_settings" + ID_EXPRESSION.toString shouldBe "id_expression" EXTRA_OPTIONS.toString shouldBe "extra_options" } @@ -31,6 +32,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { "watermark_delay" -> "30 Seconds", "output_mode" -> "complete", "index_settings" -> """{"number_of_shards": 3}""", + "id_expression" -> """sha1(col("timestamp"))""", "extra_options" -> """ { | "alb_logs": { @@ -48,6 +50,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.watermarkDelay() shouldBe Some("30 Seconds") options.outputMode() shouldBe Some("complete") options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""") + options.idExpression() shouldBe Some("""sha1(col("timestamp"))""") options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1") options.extraSinkOptions() shouldBe Map("opt2" -> "val2", "opt3" -> "val3") } @@ -75,6 +78,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.watermarkDelay() shouldBe empty options.outputMode() shouldBe empty options.indexSettings() shouldBe empty + options.idExpression() shouldBe empty options.extraSourceOptions("alb_logs") shouldBe empty options.extraSinkOptions() shouldBe empty options.optionsWithDefault should contain("auto_refresh" -> "false") From 7fbe4874361d74d66f14f058c570052ed251a2ab Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 1 Nov 2023 14:03:48 -0700 Subject: [PATCH 3/8] Add UT for covering index ID column Signed-off-by: Chen Dai --- .../covering/FlintSparkCoveringIndex.scala | 46 +++++++++++-------- .../FlintSparkCoveringIndexSuite.scala | 30 ++++++++++-- 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index e87c2ebda..3aa4701d5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -15,7 +15,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintInde import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.functions.{col, concat, input_file_name, sha1} +import org.apache.spark.sql.functions._ /** * Flint covering index in Spark. @@ -62,30 +62,38 @@ case class FlintSparkCoveringIndex( } override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = { - val colNames = indexedColumns.keys.toSeq + var colNames = indexedColumns.keys.toSeq var job = df.getOrElse(spark.read.table(tableName)) + // Add optional ID column + if (options.idExpression().isDefined) { + val idExpr = options.idExpression().get + + logInfo(s"Generate ID column based on expression $idExpr") + job = job.withColumn(ID_COLUMN, expr(idExpr)) + colNames = colNames :+ ID_COLUMN + } else { + val idColNames = + spark + .table(tableName) + .columns + .toSet + .intersect(Set("timestamp", "@timestamp")) + + if (idColNames.isEmpty) { + logWarning("Cannot generate ID column which may cause duplicate data when restart") + } else { + logInfo(s"Generate ID column based on first column in $idColNames") + job = job.withColumn(ID_COLUMN, sha1(concat(input_file_name(), col(idColNames.head)))) + colNames = colNames :+ ID_COLUMN + } + } + // Add optional filtering condition - job = filterCondition + filterCondition .map(job.where) .getOrElse(job) .select(colNames.head, colNames.tail: _*) - - // Add optional ID column - val uniqueColNames = - spark - .table(tableName) - .columns - .toSet - .intersect(Set("timestamp", "@timestamp")) - - if (uniqueColNames.nonEmpty) { - logInfo(s"Generate ID column based on first column in $uniqueColNames") - job = job.withColumn(ID_COLUMN, sha1(concat(input_file_name(), col(uniqueColNames.head)))) - } else { - logWarning("Cannot generate ID column which may cause duplicate data when restart") - } - job } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index 8c144b46b..fe7df433b 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -5,15 +5,18 @@ package org.opensearch.flint.spark.covering -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN +import org.scalatest.matchers.should.Matchers import org.apache.spark.FlintSuite +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.{col, concat, input_file_name, sha1} -class FlintSparkCoveringIndexSuite extends FlintSuite { +class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers { test("get covering index name") { val index = - new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) + FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) index.name() shouldBe "flint_spark_catalog_default_test_ci_index" } @@ -26,7 +29,26 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { test("should fail if no indexed column given") { assertThrows[IllegalArgumentException] { - new FlintSparkCoveringIndex("ci", "default.test", Map.empty) + FlintSparkCoveringIndex("ci", "default.test", Map.empty) } } + + test("should generate id column based on timestamp column") { + val testTable = "spark_catalog.default.ci_test" + withTable(testTable) { + sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) + + assertDataFrameEquals( + index.build(spark, None), + spark + .table(testTable) + .withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("timestamp")))) + .select(col("name"), col(ID_COLUMN))) + } + } + + private def assertDataFrameEquals(df1: DataFrame, df2: DataFrame): Unit = { + comparePlans(df1.queryExecution.logical, df2.queryExecution.logical, checkAnalysis = false) + } } From 24f578bdeb44f6d41a68d56ded2ebf57745d2f1c Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 1 Nov 2023 14:31:06 -0700 Subject: [PATCH 4/8] Add more UT Signed-off-by: Chen Dai --- .../covering/FlintSparkCoveringIndex.scala | 10 +-- .../FlintSparkCoveringIndexSuite.scala | 80 ++++++++++++++++++- 2 files changed, 79 insertions(+), 11 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 3aa4701d5..27d971fc0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -68,18 +68,12 @@ case class FlintSparkCoveringIndex( // Add optional ID column if (options.idExpression().isDefined) { val idExpr = options.idExpression().get - logInfo(s"Generate ID column based on expression $idExpr") + job = job.withColumn(ID_COLUMN, expr(idExpr)) colNames = colNames :+ ID_COLUMN } else { - val idColNames = - spark - .table(tableName) - .columns - .toSet - .intersect(Set("timestamp", "@timestamp")) - + val idColNames = job.columns.toSet.intersect(Set("timestamp", "@timestamp")) if (idColNames.isEmpty) { logWarning("Cannot generate ID column which may cause duplicate data when restart") } else { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index fe7df433b..f4cacd385 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -6,14 +6,18 @@ package org.opensearch.flint.spark.covering import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN +import org.opensearch.flint.spark.FlintSparkIndexOptions import org.scalatest.matchers.should.Matchers import org.apache.spark.FlintSuite import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions.{col, concat, input_file_name, sha1} +import org.apache.spark.sql.functions._ class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers { + /** Test table name */ + val testTable = "spark_catalog.default.ci_test" + test("get covering index name") { val index = FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) @@ -33,21 +37,91 @@ class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers { } } - test("should generate id column based on timestamp column") { - val testTable = "spark_catalog.default.ci_test" + test("should generate id column based on ID expression in index options") { withTable(testTable) { sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = + FlintSparkCoveringIndex( + "name_idx", + testTable, + Map("name" -> "string"), + options = FlintSparkIndexOptions(Map("id_expression" -> "now()"))) + + assertDataFrameEquals( + index.build(spark, None), + spark + .table(testTable) + .withColumn(ID_COLUMN, expr("now()")) + .select(col("name"), col(ID_COLUMN))) + } + } + + test("should generate id column based on timestamp column if found") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) + + assertDataFrameEquals( + index.build(spark, None), + spark + .table(testTable) + .withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("timestamp")))) + .select(col("name"), col(ID_COLUMN))) + } + } + + test("should generate id column based on @timestamp column if found") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (`@timestamp` TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) + + assertDataFrameEquals( + index.build(spark, None), + spark + .table(testTable) + .withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("@timestamp")))) + .select(col("name"), col(ID_COLUMN))) + } + } + + test("should not generate id column if no ID expression or timestamp column") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING, age INTEGER) USING JSON") val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) assertDataFrameEquals( index.build(spark, None), spark .table(testTable) + .select(col("name"))) + } + } + + test("should generate id column if micro batch has timestamp column") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) + val batch = spark.read.table(testTable).select("timestamp", "name") + + assertDataFrameEquals( + index.build(spark, Some(batch)), + batch .withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("timestamp")))) .select(col("name"), col(ID_COLUMN))) } } + test("should not generate id column if micro batch doesn't have timestamp column") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) + val batch = spark.read.table(testTable).select("name") + + assertDataFrameEquals(index.build(spark, Some(batch)), batch.select(col("name"))) + } + } + + /* Assert unresolved logical plan in DataFrame equals without semantic analysis */ private def assertDataFrameEquals(df1: DataFrame, df2: DataFrame): Unit = { comparePlans(df1.queryExecution.logical, df2.queryExecution.logical, checkAnalysis = false) } From 4bc67a6119edb015c542a7c6ce11b76ee9efe82b Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 1 Nov 2023 15:03:28 -0700 Subject: [PATCH 5/8] Refactor build logic Signed-off-by: Chen Dai --- .../covering/FlintSparkCoveringIndex.scala | 28 ++++++++++--------- .../FlintSparkCoveringIndexSuite.scala | 19 +++++++++++++ 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 27d971fc0..802f4d818 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -66,21 +66,19 @@ case class FlintSparkCoveringIndex( var job = df.getOrElse(spark.read.table(tableName)) // Add optional ID column - if (options.idExpression().isDefined) { - val idExpr = options.idExpression().get - logInfo(s"Generate ID column based on expression $idExpr") - - job = job.withColumn(ID_COLUMN, expr(idExpr)) + val idColumn = + options + .idExpression() + .map(idExpr => Some(expr(idExpr))) + .getOrElse(findTimestampColumn(job) + .map(tsCol => sha1(concat(input_file_name(), col(tsCol))))) + + if (idColumn.isDefined) { + logInfo(s"Generate ID column based on expression $idColumn") colNames = colNames :+ ID_COLUMN + job = job.withColumn(ID_COLUMN, idColumn.get) } else { - val idColNames = job.columns.toSet.intersect(Set("timestamp", "@timestamp")) - if (idColNames.isEmpty) { - logWarning("Cannot generate ID column which may cause duplicate data when restart") - } else { - logInfo(s"Generate ID column based on first column in $idColNames") - job = job.withColumn(ID_COLUMN, sha1(concat(input_file_name(), col(idColNames.head)))) - colNames = colNames :+ ID_COLUMN - } + logWarning("Cannot generate ID column which may cause duplicate data when restart") } // Add optional filtering condition @@ -89,6 +87,10 @@ case class FlintSparkCoveringIndex( .getOrElse(job) .select(colNames.head, colNames.tail: _*) } + + private def findTimestampColumn(df: DataFrame): Option[String] = { + df.columns.toSet.intersect(Set("timestamp", "@timestamp")).headOption + } } object FlintSparkCoveringIndex { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index f4cacd385..f1f10fe26 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -121,6 +121,25 @@ class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers { } } + test("should build with filtering condition") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex( + "name_idx", + testTable, + Map("name" -> "string"), + Some("name = 'test'")) + + assertDataFrameEquals( + index.build(spark, None), + spark + .table(testTable) + .withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("timestamp")))) + .where("name = 'test'") + .select(col("name"), col(ID_COLUMN))) + } + } + /* Assert unresolved logical plan in DataFrame equals without semantic analysis */ private def assertDataFrameEquals(df1: DataFrame, df2: DataFrame): Unit = { comparePlans(df1.queryExecution.logical, df2.queryExecution.logical, checkAnalysis = false) From 1c2f3b20173fb9e3b7cefec3a42002eecdb4409f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 1 Nov 2023 15:53:04 -0700 Subject: [PATCH 6/8] Add more IT Signed-off-by: Chen Dai --- docs/index.md | 2 + .../FlintSparkCoveringIndexSqlITSuite.scala | 76 ++++++++++++++++++- .../FlintSparkMaterializedViewITSuite.scala | 6 +- ...FlintSparkMaterializedViewSqlITSuite.scala | 4 +- .../flint/spark/FlintSparkSuite.scala | 2 +- 5 files changed, 81 insertions(+), 9 deletions(-) diff --git a/docs/index.md b/docs/index.md index 05a38dbeb..f9ed36436 100644 --- a/docs/index.md +++ b/docs/index.md @@ -234,6 +234,7 @@ User can provide the following options in `WITH` clause of create statement: + `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. ++ `id_expression`: an expression string that generates an ID column to avoid duplicate data when incremental refresh job restart, especially for covering index. If unspecified, an ID column based on source file path and `timestamp` or `@timestamp` column. If not found, no ID column generated and may cause duplicate data when refresh job restart. + `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}' Note that the index option name is case-sensitive. Here is an example: @@ -246,6 +247,7 @@ WITH ( watermark_delay = '1 Second', output_mode = 'complete', index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}', + id_expression = 'id_col_name', extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}' ) ``` diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 27419b616..ea1ea5029 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.{defined, have} +import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -24,14 +24,17 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { /** Test table and index name */ - private val testTable = "spark_catalog.default.covering_sql_test" private val testIndex = "name_and_age" + private val testTable = "spark_catalog.default.covering_sql_test" private val testFlintIndex = getFlintIndexName(testIndex, testTable) + private val testTimeSeriesTable = "spark_catalog.default.ci_time_test" + private val testFlintTimeSeriesIndex = getFlintIndexName(testIndex, testTimeSeriesTable) override def beforeAll(): Unit = { super.beforeAll() createPartitionedTable(testTable) + createTimeSeriesTable(testTimeSeriesTable) } override def afterEach(): Unit = { @@ -39,6 +42,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { // Delete all test indices flint.deleteIndex(testFlintIndex) + flint.deleteIndex(testFlintTimeSeriesIndex) } test("create covering index with auto refresh") { @@ -211,7 +215,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { |""".stripMargin) } - test("create skipping index with quoted index, table and column name") { + test("create covering index with quoted index, table and column name") { sql(s""" | CREATE INDEX `$testIndex` ON `spark_catalog`.`default`.`covering_sql_test` | (`name`, `age`) @@ -226,6 +230,72 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("name", "age") } + test("create covering index on time series time with timestamp column") { + sql(s""" + | CREATE INDEX $testIndex ON $testTimeSeriesTable + | (timestamp, age) + | WITH (auto_refresh = true) + |""".stripMargin) + + val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex) + awaitStreamingComplete(job.get.id.toString) + + val indexData = flint.queryIndex(testFlintTimeSeriesIndex) + indexData.count() shouldBe 5 + } + + test("create covering index on time series time without indexing timestamp column") { + sql(s""" + | CREATE INDEX $testIndex ON $testTimeSeriesTable + | (name) + | WITH (auto_refresh = true) + |""".stripMargin) + + val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex) + awaitStreamingComplete(job.get.id.toString) + + val indexData = flint.queryIndex(testFlintTimeSeriesIndex) + indexData.count() shouldBe 5 + } + + test("create covering index on time series time with @timestamp column") { + val testTimeSeriesTable2 = "spark_catalog.default.ci_time_table2" + val testFlintTimeSeriesIndex2 = getFlintIndexName(testIndex, testTimeSeriesTable2) + withTable(testTimeSeriesTable2) { + sql(s"CREATE TABLE $testTimeSeriesTable2 (`@timestamp` TIMESTAMP, name STRING) USING JSON") + sql(s"INSERT INTO $testTimeSeriesTable2 VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A')") + sql(s"INSERT INTO $testTimeSeriesTable2 VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B')") + sql(s""" + | CREATE INDEX $testIndex ON $testTimeSeriesTable2 + | (`@timestamp`, name) + | WITH (auto_refresh = true) + |""".stripMargin) + + val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex2) + awaitStreamingComplete(job.get.id.toString) + + val indexData = flint.queryIndex(testFlintTimeSeriesIndex2) + indexData.count() shouldBe 2 + } + } + + test("create covering index on time series time with ID expression") { + sql(s""" + | CREATE INDEX $testIndex ON $testTimeSeriesTable + | (timestamp, age) + | WITH ( + | auto_refresh = true, + | id_expression = 'address' + | ) + |""".stripMargin) + + val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex) + awaitStreamingComplete(job.get.id.toString) + + val indexData = flint.queryIndex(testFlintTimeSeriesIndex) + indexData.count() shouldBe 3 // only 3 rows left due to same ID + } + test("show all covering index on the source table") { flint .coveringIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 1b16a9e16..17a29d7e9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -28,7 +28,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | window.start AS startTime, | COUNT(*) AS count | FROM $testTable - | GROUP BY TUMBLE(time, '10 Minutes') + | GROUP BY TUMBLE(timestamp, '10 Minutes') |""".stripMargin override def beforeAll(): Unit = { @@ -134,7 +134,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | window.start AS startTime, | COUNT(*) AS count | FROM $testTable - | GROUP BY TUMBLE(time, '1 Hour') + | GROUP BY TUMBLE(timestamp, '1 Hour') |""".stripMargin withIncrementalMaterializedView(largeWindowQuery) { indexData => @@ -159,7 +159,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | COUNT(*) AS count | FROM $testTable | WHERE address = 'Seattle' - | GROUP BY TUMBLE(time, '5 Minutes') + | GROUP BY TUMBLE(timestamp, '5 Minutes') |""".stripMargin withIncrementalMaterializedView(filterQuery) { indexData => diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 872939e5f..5d6258860 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -34,7 +34,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | window.start AS startTime, | COUNT(*) AS count | FROM $testTable - | GROUP BY TUMBLE(time, '10 Minutes') + | GROUP BY TUMBLE(timestamp, '10 Minutes') |""".stripMargin override def beforeAll(): Unit = { @@ -163,7 +163,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | window.start AS `start.time`, | COUNT(*) AS `count` | FROM `spark_catalog`.`default`.`mv_test` - | GROUP BY TUMBLE(`time`, '10 Minutes')""".stripMargin.trim + | GROUP BY TUMBLE(`timestamp`, '10 Minutes')""".stripMargin.trim sql(s""" | CREATE MATERIALIZED VIEW `spark_catalog`.`default`.`mv_test_metrics` diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 168279eb3..77f39b776 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -74,7 +74,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit sql(s""" | CREATE TABLE $testTable | ( - | time TIMESTAMP, + | timestamp TIMESTAMP, | name STRING, | age INT, | address STRING From 0164173ae4d5a650d5294d28ee5a0fd6f7d416b2 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 1 Nov 2023 16:58:13 -0700 Subject: [PATCH 7/8] Extract common generate ID column method Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndex.scala | 34 +++++++++++++++++-- .../covering/FlintSparkCoveringIndex.scala | 19 ++--------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index fe5329739..220b11a1e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -9,14 +9,16 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.flint.datatype.FlintDataType +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StructType /** * Flint index interface in Spark. */ -trait FlintSparkIndex { +trait FlintSparkIndex extends Logging { /** * Index type @@ -55,7 +57,7 @@ trait FlintSparkIndex { def build(spark: SparkSession, df: Option[DataFrame]): DataFrame } -object FlintSparkIndex { +object FlintSparkIndex extends Logging { /** * Interface indicates a Flint index has custom streaming refresh capability other than foreach @@ -79,6 +81,32 @@ object FlintSparkIndex { */ val ID_COLUMN: String = "__id__" + /** + * Generate an ID column in the precedence below: (1) Use ID expression directly if provided in + * index option; (2) SHA-1 based on all aggregated columns if found; (3) SHA-1 based on source + * file path and timestamp column; 4) No ID column generated + * + * @param df + * data frame to generate ID column for + * @param idExpr + * ID expression option + * @return + * optional ID column expression + */ + def generateIdColumn(df: DataFrame, idExpr: Option[String]): Option[Column] = { + def timestampColumn: Option[String] = { + df.columns.toSet.intersect(Set("timestamp", "@timestamp")).headOption + } + + if (idExpr.isDefined) { + Some(expr(idExpr.get)) + } else if (timestampColumn.isDefined) { + Some(sha1(concat(input_file_name(), col(timestampColumn.get)))) + } else { + None + } + } + /** * Common prefix of Flint index name which is "flint_database_table_" * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 802f4d818..5fbf581ce 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -9,13 +9,11 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, ID_COLUMN} +import org.opensearch.flint.spark.FlintSparkIndex._ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} -import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.functions._ /** * Flint covering index in Spark. @@ -33,8 +31,7 @@ case class FlintSparkCoveringIndex( indexedColumns: Map[String, String], filterCondition: Option[String] = None, override val options: FlintSparkIndexOptions = empty) - extends FlintSparkIndex - with Logging { + extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") @@ -66,13 +63,7 @@ case class FlintSparkCoveringIndex( var job = df.getOrElse(spark.read.table(tableName)) // Add optional ID column - val idColumn = - options - .idExpression() - .map(idExpr => Some(expr(idExpr))) - .getOrElse(findTimestampColumn(job) - .map(tsCol => sha1(concat(input_file_name(), col(tsCol))))) - + val idColumn = generateIdColumn(job, options.idExpression()) if (idColumn.isDefined) { logInfo(s"Generate ID column based on expression $idColumn") colNames = colNames :+ ID_COLUMN @@ -87,10 +78,6 @@ case class FlintSparkCoveringIndex( .getOrElse(job) .select(colNames.head, colNames.tail: _*) } - - private def findTimestampColumn(df: DataFrame): Option[String] = { - df.columns.toSet.intersect(Set("timestamp", "@timestamp")).headOption - } } object FlintSparkCoveringIndex { From e2d3f809376aa892e609c69cb301360aadbb4012 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 2 Nov 2023 09:23:30 -0700 Subject: [PATCH 8/8] Add more logging Signed-off-by: Chen Dai --- docs/index.md | 2 +- .../opensearch/flint/spark/FlintSparkIndex.scala | 2 +- .../spark/covering/FlintSparkCoveringIndex.scala | 14 ++++++++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docs/index.md b/docs/index.md index f9ed36436..16af00ed8 100644 --- a/docs/index.md +++ b/docs/index.md @@ -247,7 +247,7 @@ WITH ( watermark_delay = '1 Second', output_mode = 'complete', index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}', - id_expression = 'id_col_name', + id_expression = 'uuid()', extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}' ) ``` diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 220b11a1e..9dcd43843 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -57,7 +57,7 @@ trait FlintSparkIndex extends Logging { def build(spark: SparkSession, df: Option[DataFrame]): DataFrame } -object FlintSparkIndex extends Logging { +object FlintSparkIndex { /** * Interface indicates a Flint index has custom streaming refresh capability other than foreach diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 5fbf581ce..3dcc6a13f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -14,6 +14,7 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql._ +import org.apache.spark.sql.execution.SimpleMode /** * Flint covering index in Spark. @@ -73,10 +74,15 @@ case class FlintSparkCoveringIndex( } // Add optional filtering condition - filterCondition - .map(job.where) - .getOrElse(job) - .select(colNames.head, colNames.tail: _*) + if (filterCondition.isDefined) { + job = job.where(filterCondition.get) + } + + // Add indexed columns + job = job.select(colNames.head, colNames.tail: _*) + + logInfo(s"Building covering index by " + job.queryExecution.explainString(SimpleMode)) + job } }