diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index b89c97191..818d9a143 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -88,15 +88,20 @@ object FlintSparkSqlAstBuilder { * @param updateOptions * options to update */ - def updateIndex(flint: FlintSpark, indexName: String, updateOptions: Map[String, String]): Option[String] = { - val oldIndex = flint.describeIndex(indexName) + def updateIndex( + flint: FlintSpark, + indexName: String, + updateOptions: Map[String, String]): Option[String] = { + val oldIndex = flint + .describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) val oldOptions = oldIndex.options.options validateUpdateOptions(oldOptions, updateOptions, oldIndex.kind) val mergedOptions = oldOptions ++ updateOptions - val newMetadata = oldIndex.metadata().copy(options = mergedOptions.mapValues(_.asInstanceOf[AnyRef]).asJava) + val newMetadata = + oldIndex.metadata().copy(options = mergedOptions.mapValues(_.asInstanceOf[AnyRef]).asJava) val newIndex = FlintSparkIndexFactory.create(newMetadata) val updateMode = newIndex.options.autoRefresh() match { @@ -108,9 +113,8 @@ object FlintSparkSqlAstBuilder { } /** - * Validate update options. - * These are rules specific for updating index, validating the update is allowed. - * It doesn't check whether the resulting index options will be valid. + * Validate update options. These are rules specific for updating index, validating the update + * is allowed. It doesn't check whether the resulting index options will be valid. * * @param oldOptions * existing options @@ -119,7 +123,10 @@ object FlintSparkSqlAstBuilder { * @param indexKind * index kind */ - private def validateUpdateOptions(oldOptions: Map[String, String], updateOptions: Map[String, String], indexKind: String): Unit = { + private def validateUpdateOptions( + oldOptions: Map[String, String], + updateOptions: Map[String, String], + indexKind: String): Unit = { val mergedOptions = oldOptions ++ updateOptions val newAutoRefresh = mergedOptions.getOrElse(AUTO_REFRESH.toString, "false") val oldAutoRefresh = oldOptions.getOrElse(AUTO_REFRESH.toString, "false") @@ -135,17 +142,24 @@ object FlintSparkSqlAstBuilder { case ("false", "false") => FULL case ("false", "true") => INCREMENTAL case ("true", "true") => - throw new IllegalArgumentException("auto_refresh and incremental_refresh options cannot both be true") + throw new IllegalArgumentException( + "auto_refresh and incremental_refresh options cannot both be true") } // validate allowed options depending on refresh mode val allowedOptions = refreshMode match { case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH) case AUTO | INCREMENTAL => - Set(AUTO_REFRESH, INCREMENTAL_REFRESH, REFRESH_INTERVAL, CHECKPOINT_LOCATION, WATERMARK_DELAY) + Set( + AUTO_REFRESH, + INCREMENTAL_REFRESH, + REFRESH_INTERVAL, + CHECKPOINT_LOCATION, + WATERMARK_DELAY) } if (!updateOptions.keys.forall(allowedOptions.map(_.toString).contains)) { - throw new IllegalArgumentException(s"Altering ${indexKind} index to ${refreshMode} refresh only allows options: ${allowedOptions}") + throw new IllegalArgumentException( + s"Altering ${indexKind} index to ${refreshMode} refresh only allows options: ${allowedOptions}") } } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 1dc9c771d..6991e60d8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -308,8 +308,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe defined flint.queryIndex(testFlintIndex).count() shouldBe 0 - sql( - s""" + sql(s""" | ALTER INDEX $testIndex ON $testTable | WITH (auto_refresh = true) | """.stripMargin) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index ddd38ed61..906523696 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -296,8 +296,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe defined flint.queryIndex(testFlintIndex).count() shouldBe 0 - sql( - s""" + sql(s""" | ALTER MATERIALIZED VIEW $testMvName | WITH ( | auto_refresh = true, diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index e82266c37..53d08bda7 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -355,8 +355,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { } test("update full refresh skipping index to auto refresh") { - sql( - s""" + sql(s""" | CREATE SKIPPING INDEX ON $testTable | ( | year PARTITION, @@ -368,8 +367,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testIndex) shouldBe defined flint.queryIndex(testIndex).count() shouldBe 0 - sql( - s""" + sql(s""" | ALTER SKIPPING INDEX ON $testTable | WITH (auto_refresh = true) | """.stripMargin) @@ -382,8 +380,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { test("update incremental refresh skipping index to auto refresh") { withTempDir { checkpointDir => - sql( - s""" + sql(s""" | CREATE SKIPPING INDEX ON $testTable | ( year PARTITION ) | WITH ( @@ -397,15 +394,13 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { flint.queryIndex(testIndex).count() shouldBe 2 // New data will be refreshed after updating index to auto refresh - sql( - s""" + sql(s""" | INSERT INTO $testTable | PARTITION (year=2023, month=5) | VALUES ('Hello', 50, 'Vancouver') |""".stripMargin) - sql( - s""" + sql(s""" | ALTER SKIPPING INDEX ON $testTable | WITH ( | auto_refresh = true, @@ -421,8 +416,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { } test("update auto refresh skipping index to full refresh") { - sql( - s""" + sql(s""" | CREATE SKIPPING INDEX ON $testTable | ( year PARTITION ) | WITH (auto_refresh = true) @@ -435,8 +429,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testIndex) shouldBe defined flint.queryIndex(testIndex).count() shouldBe 2 - sql( - s""" + sql(s""" | ALTER SKIPPING INDEX ON $testTable | WITH (auto_refresh = false) | """.stripMargin) @@ -444,8 +437,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { spark.streams.active.find(_.name == testIndex) shouldBe empty // New data won't be refreshed until refresh statement triggered - sql( - s""" + sql(s""" | INSERT INTO $testTable | PARTITION (year=2023, month=5) | VALUES ('Hello', 50, 'Vancouver') @@ -458,8 +450,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { test("update auto refresh skipping index to incremental refresh") { withTempDir { checkpointDir => - sql( - s""" + sql(s""" | CREATE SKIPPING INDEX ON $testTable | ( year PARTITION ) | WITH ( @@ -475,8 +466,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testIndex) shouldBe defined flint.queryIndex(testIndex).count() shouldBe 2 - sql( - s""" + sql(s""" | ALTER SKIPPING INDEX ON $testTable | WITH ( | auto_refresh = false, @@ -487,8 +477,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { spark.streams.active.find(_.name == testIndex) shouldBe empty // New data won't be refreshed until refresh statement triggered - sql( - s""" + sql(s""" | INSERT INTO $testTable | PARTITION (year=2023, month=5) | VALUES ('Hello', 50, 'Vancouver') diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index b1a0278ae..13c93617a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -49,18 +49,15 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .create() flint.describeIndex(testIndex) shouldBe defined - val updateOptions = Map( - "auto_refresh" -> "true", - "incremental_refresh" -> "false" - ) + val updateOptions = Map("auto_refresh" -> "true", "incremental_refresh" -> "false") updateIndex(flint, testIndex, updateOptions) val readNewIndex = flint.describeIndex(testIndex) readNewIndex shouldBe defined - val optionJson = compact(render(parse(readNewIndex.get.metadata().getContent) \ "_meta" \ "options")) - optionJson should matchJson( - s""" + val optionJson = + compact(render(parse(readNewIndex.get.metadata().getContent) \ "_meta" \ "options")) + optionJson should matchJson(s""" | { | "auto_refresh": "true", | "incremental_refresh": "false", @@ -97,9 +94,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { updateIndex( flint, testIndex, - Map( - "auto_refresh" -> "true", - "checkpoint_location" -> "s3a://test/")) + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/")) // auto_refresh not provided the[IllegalArgumentException] thrownBy @@ -123,9 +118,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { updateIndex( flint, testIndex, - Map( - "auto_refresh" -> "false", - "checkpoint_location" -> "s3a://test/")) + Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/")) // auto_refresh not provided the[IllegalArgumentException] thrownBy @@ -146,9 +139,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { updateIndex( flint, testIndex, - Map( - "auto_refresh" -> "false", - "incremental_refresh" -> "false")) + Map("auto_refresh" -> "false", "incremental_refresh" -> "false")) val readNewIndex = flint.describeIndex(testIndex) readNewIndex shouldBe defined @@ -169,23 +160,17 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { updateIndex( flint, testIndex, - Map( - "auto_refresh" -> "false", - "checkpoint_location" -> "s3a://test/")) + Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/")) the[IllegalArgumentException] thrownBy updateIndex( flint, testIndex, - Map( - "auto_refresh" -> "false", - "refresh_interval" -> "5 Minute")) + Map("auto_refresh" -> "false", "refresh_interval" -> "5 Minute")) the[IllegalArgumentException] thrownBy updateIndex( flint, testIndex, - Map( - "auto_refresh" -> "false", - "watermark_delay" -> "1 Minute")) + Map("auto_refresh" -> "false", "watermark_delay" -> "1 Minute")) } test("should succeed if convert to incremental refresh with refresh_interval") { @@ -285,12 +270,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .create() - updateIndex( - flint, - testIndex, - Map( - "auto_refresh" -> "true", - "refresh_interval" -> "5 Minute")) + updateIndex(flint, testIndex, Map("auto_refresh" -> "true", "refresh_interval" -> "5 Minute")) val readNewIndex = flint.describeIndex(testIndex) readNewIndex shouldBe defined @@ -311,9 +291,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { updateIndex( flint, testIndex, - Map( - "auto_refresh" -> "true", - "checkpoint_location" -> checkpointDir.getAbsolutePath)) + Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath)) val readNewIndex = flint.describeIndex(testIndex) readNewIndex shouldBe defined @@ -331,12 +309,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .create() - updateIndex( - flint, - testIndex, - Map( - "auto_refresh" -> "true", - "watermark_delay" -> "5 Minute")) + updateIndex(flint, testIndex, Map("auto_refresh" -> "true", "watermark_delay" -> "5 Minute")) val readNewIndex = flint.describeIndex(testIndex) readNewIndex shouldBe defined @@ -354,12 +327,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .create() the[IllegalArgumentException] thrownBy - updateIndex( - flint, - testIndex, - Map( - "auto_refresh" -> "true", - "output_mode" -> "complete")) + updateIndex(flint, testIndex, Map("auto_refresh" -> "true", "output_mode" -> "complete")) } test("should fail if convert to invalid refresh mode") { @@ -373,9 +341,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { updateIndex( flint, testIndex, - Map( - "auto_refresh" -> "true", - "incremental_refresh" -> "true")) + Map("auto_refresh" -> "true", "incremental_refresh" -> "true")) deleteTestIndex(testIndex) @@ -446,8 +412,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { openSearchClient.deleteByQuery( new DeleteByQueryRequest(testIndex).setQuery(QueryBuilders.matchAllQuery()), RequestOptions.DEFAULT) - sql( - s""" + sql(s""" | INSERT INTO $testTable | PARTITION (year=2023, month=4) | VALUES ('Hello', 35, 'Vancouver') @@ -457,9 +422,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { val jobId = updateIndex( flint, testIndex, - Map( - "auto_refresh" -> "true", - "incremental_refresh" -> "false")) + Map("auto_refresh" -> "true", "incremental_refresh" -> "false")) jobId shouldBe defined val job = spark.streams.get(jobId.get) @@ -496,8 +459,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { spark.streams.active.find(_.name == testIndex) shouldBe empty // Generate a new source file - sql( - s""" + sql(s""" | INSERT INTO $testTable | PARTITION (year=2023, month=4) | VALUES ('Hello', 35, 'Vancouver') @@ -518,11 +480,8 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .skippingIndex() .onTable(testTable) .addPartitions("year", "month") - .options( - FlintSparkIndexOptions( - Map( - "auto_refresh" -> "true", - "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .options(FlintSparkIndexOptions( + Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath))) .create() val jobId = flint.refreshIndex(testIndex) @@ -537,16 +496,13 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { updateIndex( flint, testIndex, - Map( - "auto_refresh" -> "false", - "incremental_refresh" -> "true")) shouldBe empty + Map("auto_refresh" -> "false", "incremental_refresh" -> "true")) shouldBe empty // Expect refresh job to be stopped spark.streams.active.find(_.name == testIndex) shouldBe empty // Generate a new source file - sql( - s""" + sql(s""" | INSERT INTO $testTable | PARTITION (year=2023, month=4) | VALUES ('Hello', 35, 'Vancouver')