Skip to content

Commit

Permalink
Refactor build logic
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 1, 2023
1 parent 24f578b commit 4bc67a6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,19 @@ case class FlintSparkCoveringIndex(
var job = df.getOrElse(spark.read.table(tableName))

// Add optional ID column
if (options.idExpression().isDefined) {
val idExpr = options.idExpression().get
logInfo(s"Generate ID column based on expression $idExpr")

job = job.withColumn(ID_COLUMN, expr(idExpr))
val idColumn =
options
.idExpression()
.map(idExpr => Some(expr(idExpr)))
.getOrElse(findTimestampColumn(job)
.map(tsCol => sha1(concat(input_file_name(), col(tsCol)))))

if (idColumn.isDefined) {
logInfo(s"Generate ID column based on expression $idColumn")
colNames = colNames :+ ID_COLUMN
job = job.withColumn(ID_COLUMN, idColumn.get)
} else {
val idColNames = job.columns.toSet.intersect(Set("timestamp", "@timestamp"))
if (idColNames.isEmpty) {
logWarning("Cannot generate ID column which may cause duplicate data when restart")
} else {
logInfo(s"Generate ID column based on first column in $idColNames")
job = job.withColumn(ID_COLUMN, sha1(concat(input_file_name(), col(idColNames.head))))
colNames = colNames :+ ID_COLUMN
}
logWarning("Cannot generate ID column which may cause duplicate data when restart")
}

// Add optional filtering condition
Expand All @@ -89,6 +87,10 @@ case class FlintSparkCoveringIndex(
.getOrElse(job)
.select(colNames.head, colNames.tail: _*)
}

private def findTimestampColumn(df: DataFrame): Option[String] = {
df.columns.toSet.intersect(Set("timestamp", "@timestamp")).headOption
}
}

object FlintSparkCoveringIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,25 @@ class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
}
}

test("should build with filtering condition") {
withTable(testTable) {
sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON")
val index = FlintSparkCoveringIndex(
"name_idx",
testTable,
Map("name" -> "string"),
Some("name = 'test'"))

assertDataFrameEquals(
index.build(spark, None),
spark
.table(testTable)
.withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("timestamp"))))
.where("name = 'test'")
.select(col("name"), col(ID_COLUMN)))
}
}

/* Assert unresolved logical plan in DataFrame equals without semantic analysis */
private def assertDataFrameEquals(df1: DataFrame, df2: DataFrame): Unit = {
comparePlans(df1.queryExecution.logical, df2.queryExecution.logical, checkAnalysis = false)
Expand Down

0 comments on commit 4bc67a6

Please sign in to comment.