Skip to content

Commit

Permalink
Update user manual and scaladoc
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Dec 20, 2024
1 parent 15ed31b commit 01b48fa
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `id_expression`: an expression string that generates an ID column to avoid duplicate data when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated.
+ `id_expression`: an expression string that generates an ID column to guarantee idempotency when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'

Note that the index option name is case-sensitive. Here is an example:
Expand All @@ -407,7 +407,7 @@ WITH (
watermark_delay = '1 Second',
output_mode = 'complete',
index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}',
id_expression = 'uuid()',
id_expression = "sha1(concat_ws('\0',startTime,status))",
extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}'
)
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,7 @@ object FlintSparkIndex extends Logging {
}

/**
* Generate an ID column in the precedence below:
* ```
* 1. Use ID expression provided in the index option;
* 2. SHA-1 based on all output columns if aggregated;
* 3. Otherwise, no ID column generated.
* ```
* Generate an ID column using ID expression provided in the index option.
*
* @param df
* which DataFrame to generate ID column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,20 @@ trait FlintSuite extends SharedSparkSession {
}
}

/**
* Implicit class to extend DataFrame functionality with additional utilities.
*
* @param df
* the DataFrame to which the additional methods are added
*/
protected implicit class DataFrameExtensions(val df: DataFrame) {

/**
* Retrieves the ID column expression from the logical plan of the DataFrame, if it exists.
*
* @return
* an `Option` containing the `Expression` for the ID column if present, or `None` otherwise
*/
def idColumn(): Option[Expression] = {
df.queryExecution.logical.collectFirst { case Project(projectList, _) =>
projectList.collectFirst { case Alias(child, ID_COLUMN) =>
Expand Down

0 comments on commit 01b48fa

Please sign in to comment.