From 63fa7ff46544be6616f97b4db440b3f053dd3e01 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 16 Oct 2023 12:31:59 -0700 Subject: [PATCH] Add more javadoc and comment Signed-off-by: Chen Dai --- .../org/opensearch/flint/spark/FlintSpark.scala | 1 + .../flint/spark/mv/FlintSparkMaterializedView.scala | 13 +++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index bb22d5691..64f103d56 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -254,6 +254,7 @@ class FlintSpark(val spark: SparkSession) { .getOrElse(Trigger.ProcessingTime(0L)) } + // TODO: move this to Flint index factory private def deserialize(metadata: FlintMetadata): FlintSparkIndex = { val indexOptions = FlintSparkIndexOptions( metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 7b9c92c34..92906f6ab 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -76,9 +76,11 @@ case class FlintSparkMaterializedView( override def buildStream(spark: SparkSession): DataFrame = { val batchPlan = spark.sql(query).queryExecution.logical - // Convert unresolved batch plan to streaming plan by: - // 1.Insert Watermark operator below Aggregate (required by Spark streaming) - // 2.Set isStreaming flag to true in Relation operator + /* + * Convert unresolved batch plan to streaming plan by: + * 1.Insert Watermark operator below Aggregate (required by Spark streaming) + * 2.Set isStreaming flag to true in Relation operator + */ val streamingPlan = batchPlan transform { case WindowingAggregate(agg, timeCol) => agg.copy(child = watermark(timeCol, watermarkDelay, agg.child)) @@ -92,10 +94,13 @@ case class FlintSparkMaterializedView( private def watermark(timeCol: Attribute, delay: String, child: LogicalPlan) = { EventTimeWatermark( timeCol, - IntervalUtils.stringToInterval(UTF8String.fromString(watermarkDelay)), + IntervalUtils.stringToInterval(UTF8String.fromString(delay)), child) } + /** + * Extractor that extract event time column out of Aggregate operator. + */ private object WindowingAggregate { def unapply(agg: Aggregate): Option[(Aggregate, Attribute)] = {