From 31438dad6afd864f326eebd2ffed3d2fe8654a0d Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 12 Oct 2023 17:09:59 -0700 Subject: [PATCH] Add javadoc and remove useless BatchRefresh interface Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 2 +- .../flint/spark/FlintSparkIndex.scala | 35 ++++++++++--------- .../spark/mv/FlintSparkMaterializedView.scala | 2 +- .../mv/FlintSparkMaterializedViewSuite.scala | 14 +++++++- 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 844b9f3b1..15684f004 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -152,7 +152,7 @@ class FlintSpark(val spark: SparkSession) { val job = index .asInstanceOf[StreamingRefresh] - .build(spark) + .buildStream(spark) .writeStream .queryName(indexName) .outputMode(Append()) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index eaf25021b..0e6fe3eee 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -8,7 +8,6 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.FlintSparkIndex.BatchRefresh import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.flint.datatype.FlintDataType @@ -17,7 +16,7 @@ import org.apache.spark.sql.types.StructType /** * Flint index interface in Spark. */ -trait FlintSparkIndex extends BatchRefresh { +trait FlintSparkIndex { /** * Index type @@ -45,30 +44,34 @@ trait FlintSparkIndex extends BatchRefresh { * Build a data frame to represent index data computation logic. Upper level code decides how to * use this, ex. batch or streaming, fully or incremental refresh. * + * @param spark + * Spark session for implementation class to use as needed * @param df - * data frame to append building logic + * data frame to append building logic. If none, implementation class create source data frame + * on its own * @return * index building data frame */ - /* - def build(df: DataFrame): DataFrame - - def buildBatch(spark: SparkSession): DataFrameWriter[Row] - - def buildStream(spark: SparkSession): DataStreamWriter[Row] - */ + def build(spark: SparkSession, df: Option[DataFrame]): DataFrame } object FlintSparkIndex { - trait BatchRefresh { - - def build(spark: SparkSession, df: Option[DataFrame]): DataFrame - } - + /** + * Interface indicates a Flint index has custom streaming refresh capability other than foreach + * batch streaming. + */ trait StreamingRefresh { - def build(spark: SparkSession): DataFrame + /** + * Build streaming refresh data frame. + * + * @param spark + * Spark session + * @return + * data frame represents streaming logic + */ + def buildStream(spark: SparkSession): DataFrame } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index a057e814d..c66cb43c7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -58,7 +58,7 @@ case class FlintSparkMaterializedView( spark.sql(query) } - override def build(spark: SparkSession): DataFrame = { + override def buildStream(spark: SparkSession): DataFrame = { val batchPlan = spark.sql(query).queryExecution.logical val streamingPlan = batchPlan transform { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index 41e3dac64..75317fd64 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -9,9 +9,11 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.spark.FlintSparkIndexOptions import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite +import org.apache.spark.sql.{DataFrame, Row} class FlintSparkMaterializedViewSuite extends FlintSuite { @@ -57,4 +59,14 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { "index_settings" -> indexSettings).asJava mv.metadata().indexSettings shouldBe Some(indexSettings) } + + test("build batch data frame") { + val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + mv.build(spark, None).collect() shouldBe Array(Row(1)) + } + + test("should fail if build given other source data frame") { + val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + the[IllegalArgumentException] thrownBy mv.build(spark, Some(mock[DataFrame])) + } }