From 01b48facc6de8702905df3652982b328e8f3f730 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 20 Dec 2024 14:43:41 -0800 Subject: [PATCH] Update user manual and scaladoc Signed-off-by: Chen Dai --- docs/index.md | 4 ++-- .../org/opensearch/flint/spark/FlintSparkIndex.scala | 7 +------ .../src/test/scala/org/apache/spark/FlintSuite.scala | 12 ++++++++++++ 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/docs/index.md b/docs/index.md index 1c02f0cb0..684ba7da6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -394,7 +394,7 @@ User can provide the following options in `WITH` clause of create statement: + `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. -+ `id_expression`: an expression string that generates an ID column to avoid duplicate data when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated. ++ `id_expression`: an expression string that generates an ID column to guarantee idempotency when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated. + `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}' Note that the index option name is case-sensitive. Here is an example: @@ -407,7 +407,7 @@ WITH ( watermark_delay = '1 Second', output_mode = 'complete', index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}', - id_expression = 'uuid()', + id_expression = "sha1(concat_ws('\0',startTime,status))", extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}' ) ``` diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 32988a5b2..300233777 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -120,12 +120,7 @@ object FlintSparkIndex extends Logging { } /** - * Generate an ID column in the precedence below: - * ``` - * 1. Use ID expression provided in the index option; - * 2. SHA-1 based on all output columns if aggregated; - * 3. Otherwise, no ID column generated. - * ``` + * Generate an ID column using ID expression provided in the index option. * * @param df * which DataFrame to generate ID column diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala index a6d771534..78debda35 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala @@ -72,8 +72,20 @@ trait FlintSuite extends SharedSparkSession { } } + /** + * Implicit class to extend DataFrame functionality with additional utilities. + * + * @param df + * the DataFrame to which the additional methods are added + */ protected implicit class DataFrameExtensions(val df: DataFrame) { + /** + * Retrieves the ID column expression from the logical plan of the DataFrame, if it exists. + * + * @return + * an `Option` containing the `Expression` for the ID column if present, or `None` otherwise + */ def idColumn(): Option[Expression] = { df.queryExecution.logical.collectFirst { case Project(projectList, _) => projectList.collectFirst { case Alias(child, ID_COLUMN) =>