Skip to content

Commit

Permalink
Add more javadoc and comment
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 17, 2023
1 parent ede5a84 commit e327590
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class FlintSpark(val spark: SparkSession) {
.getOrElse(Trigger.ProcessingTime(0L))
}

// TODO: move this to Flint index factory
private def deserialize(metadata: FlintMetadata): FlintSparkIndex = {
val indexOptions = FlintSparkIndexOptions(
metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ case class FlintSparkMaterializedView(
override def buildStream(spark: SparkSession): DataFrame = {
val batchPlan = spark.sql(query).queryExecution.logical

// Convert unresolved batch plan to streaming plan by:
// 1.Insert Watermark operator below Aggregate (required by Spark streaming)
// 2.Set isStreaming flag to true in Relation operator
/*
* Convert unresolved batch plan to streaming plan by:
* 1.Insert Watermark operator below Aggregate (required by Spark streaming)
* 2.Set isStreaming flag to true in Relation operator
*/
val streamingPlan = batchPlan transform {
case WindowingAggregate(agg, timeCol) =>
agg.copy(child = watermark(timeCol, watermarkDelay, agg.child))
Expand All @@ -92,10 +94,13 @@ case class FlintSparkMaterializedView(
private def watermark(timeCol: Attribute, delay: String, child: LogicalPlan) = {
EventTimeWatermark(
timeCol,
IntervalUtils.stringToInterval(UTF8String.fromString(watermarkDelay)),
IntervalUtils.stringToInterval(UTF8String.fromString(delay)),
child)
}

/**
* Extractor that extract event time column out of Aggregate operator.
*/
private object WindowingAggregate {

def unapply(agg: Aggregate): Option[(Aggregate, Attribute)] = {
Expand Down

0 comments on commit e327590

Please sign in to comment.