Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exactly-once guarantee for covering index and MV incremental refresh #143

Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor build logic
Signed-off-by: Chen Dai <[email protected]>
dai-chen committed Nov 1, 2023
commit 4bc67a6119edb015c542a7c6ce11b76ee9efe82b
Original file line number Diff line number Diff line change
@@ -66,21 +66,19 @@ case class FlintSparkCoveringIndex(
var job = df.getOrElse(spark.read.table(tableName))

// Add optional ID column
if (options.idExpression().isDefined) {
val idExpr = options.idExpression().get
logInfo(s"Generate ID column based on expression $idExpr")

job = job.withColumn(ID_COLUMN, expr(idExpr))
val idColumn =
options
.idExpression()
.map(idExpr => Some(expr(idExpr)))
.getOrElse(findTimestampColumn(job)
.map(tsCol => sha1(concat(input_file_name(), col(tsCol)))))

if (idColumn.isDefined) {
logInfo(s"Generate ID column based on expression $idColumn")
colNames = colNames :+ ID_COLUMN
job = job.withColumn(ID_COLUMN, idColumn.get)
} else {
val idColNames = job.columns.toSet.intersect(Set("timestamp", "@timestamp"))
if (idColNames.isEmpty) {
logWarning("Cannot generate ID column which may cause duplicate data when restart")
} else {
logInfo(s"Generate ID column based on first column in $idColNames")
job = job.withColumn(ID_COLUMN, sha1(concat(input_file_name(), col(idColNames.head))))
colNames = colNames :+ ID_COLUMN
}
logWarning("Cannot generate ID column which may cause duplicate data when restart")
}

// Add optional filtering condition
@@ -89,6 +87,10 @@ case class FlintSparkCoveringIndex(
.getOrElse(job)
.select(colNames.head, colNames.tail: _*)
}

private def findTimestampColumn(df: DataFrame): Option[String] = {
df.columns.toSet.intersect(Set("timestamp", "@timestamp")).headOption
}
}

object FlintSparkCoveringIndex {
Original file line number Diff line number Diff line change
@@ -121,6 +121,25 @@ class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
}
}

test("should build with filtering condition") {
withTable(testTable) {
sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON")
val index = FlintSparkCoveringIndex(
"name_idx",
testTable,
Map("name" -> "string"),
Some("name = 'test'"))

assertDataFrameEquals(
index.build(spark, None),
spark
.table(testTable)
.withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("timestamp"))))
.where("name = 'test'")
.select(col("name"), col(ID_COLUMN)))
}
}

/* Assert unresolved logical plan in DataFrame equals without semantic analysis */
private def assertDataFrameEquals(df1: DataFrame, df2: DataFrame): Unit = {
comparePlans(df1.queryExecution.logical, df2.queryExecution.logical, checkAnalysis = false)