Skip to content

Commit

Permalink
Change to auto refresh and checkpoint location provided
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 10, 2023
1 parent c73906a commit 507e72a
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 25 deletions.
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `id_expression`: an expression string that generates an ID column to avoid duplicate data when incremental refresh job restart, especially for covering index. If unspecified, no ID column generated and may cause duplicate data when refresh job restart. This is mandatory for covering index or materialized view without aggregation.
+ `id_expression`: an expression string that generates an ID column to avoid duplicate data when incremental refresh job restart. This is mandatory for covering index or materialized view without aggregation if auto refresh enabled and checkpoint location provided.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'

Note that the index option name is case-sensitive. Here is an example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,40 +83,35 @@ object FlintSparkIndex extends Logging {
val ID_COLUMN: String = "__id__"

/**
* Generate an ID column in the precedence below: (1) Use ID expression directly if provided in
* index option; (2) SHA-1 based on all aggregated columns if found; (3) SHA-1 based on source
* file path and timestamp column; 4) No ID column generated
* Generate an ID column in the precedence below: 1) Use ID expression provided in the index
* option; 2) SHA-1 based on all columns if aggregated; 3) Throw exception if auto refresh and
* checkpoint location provided 4) Otherwise, no ID column generated.
*
* @param df
* data frame to generate ID column for
* @param idExpr
* ID expression option
* @param options
* Flint index options
* @return
* optional ID column expression
*/
def generateIdColumn(
df: DataFrame,
idExpr: Option[String],
isAutoRefresh: Boolean): Option[Column] = {
def generateIdColumn(df: DataFrame, options: FlintSparkIndexOptions): Option[Column] = {
def isAggregated: Boolean = {
df.queryExecution.logical.exists(_.isInstanceOf[Aggregate])
}

val idColumn =
if (idExpr.isDefined) {
Some(expr(idExpr.get))
if (options.idExpression().isDefined) {
Some(expr(options.idExpression().get))
} else if (isAggregated) {
Some(concat(df.columns.map(col): _*))
} else if (options.autoRefresh() && options.checkpointLocation().isDefined) {
throw new IllegalStateException(
"ID expression is required to avoid duplicate data when index refresh job restart")
} else {
if (isAutoRefresh) {
throw new IllegalStateException(
"ID expression is required to avoid duplicate data when index refresh job restart")
} else {
None
}
None
}

logInfo(s"Generate ID column based on expression $idColumn")
logInfo(s"Generated ID column based on expression $idColumn")
idColumn
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ case class FlintSparkCoveringIndex(
var job = df.getOrElse(spark.read.table(tableName))

// Add ID column
val idColumn = generateIdColumn(job, options.idExpression(), options.autoRefresh())
val idColumn = generateIdColumn(job, options)
if (idColumn.isDefined) {
colNames = colNames :+ ID_COLUMN
job = job.withColumn(ID_COLUMN, idColumn.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ case class FlintSparkMaterializedView(
val streamDf = logicalPlanToDataFrame(spark, streamingPlan)

// Add ID column
val idColumn = generateIdColumn(streamDf, options.idExpression(), options.autoRefresh())
val idColumn = generateIdColumn(streamDf, options)
if (idColumn.isDefined) {
streamDf.withColumn(ID_COLUMN, idColumn.get)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,16 @@ class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
}
}

test("should build failed if auto refresh but no ID expression provided") {
test(
"should build failed if auto refresh and checkpoint location provided but no ID generated") {
withTable(testTable) {
sql(s"CREATE TABLE $testTable (name STRING, age INTEGER) USING JSON")
val index = FlintSparkCoveringIndex(
"name_idx",
testTable,
Map("name" -> "string"),
options = FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
options = FlintSparkIndexOptions(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")))

assertThrows[IllegalStateException] {
index.build(spark, None)
Expand All @@ -92,14 +94,15 @@ class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
}

test(
"should build failed if auto refresh but micro batch doesn't have ID expression provided") {
"should build failed if auto refresh and checkpoint location provided but micro batch doesn't have ID generated") {
withTable(testTable) {
sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON")
val index = FlintSparkCoveringIndex(
"name_idx",
testTable,
Map("name" -> "string"),
options = FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
options = FlintSparkIndexOptions(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")))
val batch = spark.read.table(testTable).select("name")

assertThrows[IllegalStateException] {
Expand Down

0 comments on commit 507e72a

Please sign in to comment.