diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index b564582fb..118067068 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -426,8 +426,8 @@ class FlintSpark(val spark: SparkSession) extends Logging { } // Get the changed option names - val updateOptionNames = updatedOptions.options.filterNot { - case (k, v) => originalOptions.options.get(k).contains(v) + val updateOptionNames = updatedOptions.options.filterNot { case (k, v) => + originalOptions.options.get(k).contains(v) }.keys if (!updateOptionNames.forall(allowedOptionNames.map(_.toString).contains)) { throw new IllegalArgumentException( diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index 9ed759804..c597adade 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -71,9 +71,12 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { * @return * updated Flint index */ - def copyWithUpdate(index: FlintSparkIndex, updateOptions: FlintSparkIndexOptions): FlintSparkIndex = { + def copyWithUpdate( + index: FlintSparkIndex, + updateOptions: FlintSparkIndexOptions): FlintSparkIndex = { val originalOptions = index.options - val updatedOptions = originalOptions.copy(options = originalOptions.options ++ updateOptions.options) + val updatedOptions = + originalOptions.copy(options = originalOptions.options ++ updateOptions.options) val updatedMetadata = index .metadata() .copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index fb59ab326..8fa6d629f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -83,30 +83,18 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { ( "update index without changing auto_refresh option", Seq( - ( - Map("auto_refresh" -> "true"), - Map("auto_refresh" -> "true")), + (Map("auto_refresh" -> "true"), Map("auto_refresh" -> "true")), ( Map("auto_refresh" -> "true"), Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/")), - ( - Map("auto_refresh" -> "true"), - Map("checkpoint_location" -> "s3a://test/")), - ( - Map("auto_refresh" -> "true"), - Map("watermark_delay" -> "1 Minute")), - ( - Map.empty[String, String], - Map("auto_refresh" -> "false")), + (Map("auto_refresh" -> "true"), Map("checkpoint_location" -> "s3a://test/")), + (Map("auto_refresh" -> "true"), Map("watermark_delay" -> "1 Minute")), + (Map.empty[String, String], Map("auto_refresh" -> "false")), ( Map.empty[String, String], Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/")), - ( - Map.empty[String, String], - Map("incremental_refresh" -> "true")), - ( - Map.empty[String, String], - Map("checkpoint_location" -> "s3a://test/")))), + (Map.empty[String, String], Map("incremental_refresh" -> "true")), + (Map.empty[String, String], Map("checkpoint_location" -> "s3a://test/")))), ( "convert to full refresh with disallowed options", Seq( @@ -131,22 +119,17 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { ( "convert to auto refresh with disallowed options", Seq( - ( - Map.empty[String, String], - Map("auto_refresh" -> "true", "output_mode" -> "complete")))), + (Map.empty[String, String], Map("auto_refresh" -> "true", "output_mode" -> "complete")))), ( "convert to invalid refresh mode", Seq( ( Map.empty[String, String], Map("auto_refresh" -> "true", "incremental_refresh" -> "true")), - ( - Map("auto_refresh" -> "true"), - Map("incremental_refresh" -> "true")), + (Map("auto_refresh" -> "true"), Map("incremental_refresh" -> "true")), ( Map("incremental_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), - Map("auto_refresh" -> "true")))) - ).foreach { case (testName, testCases) => + Map("auto_refresh" -> "true"))))).foreach { case (testName, testCases) => test(s"should fail if $testName") { withTempDir { checkpointDir => testCases.foreach { case (initialOptionsMap, updateOptionsMap) => @@ -154,24 +137,27 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .skippingIndex() .onTable(testTable) .addPartitions("year", "month") - .options(FlintSparkIndexOptions( - initialOptionsMap + .options( + FlintSparkIndexOptions(initialOptionsMap .get("checkpoint_location") - .map(_ => initialOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath)) - .getOrElse(initialOptionsMap) - )) + .map(_ => + initialOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath)) + .getOrElse(initialOptionsMap))) .create() flint.refreshIndex(testIndex) val index = flint.describeIndex(testIndex).get the[IllegalArgumentException] thrownBy { val updatedIndex = flint .skippingIndex() - .copyWithUpdate(index, FlintSparkIndexOptions( - updateOptionsMap - .get("checkpoint_location") - .map(_ => updateOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath)) - .getOrElse(updateOptionsMap) - )) + .copyWithUpdate( + index, + FlintSparkIndexOptions( + updateOptionsMap + .get("checkpoint_location") + .map(_ => + updateOptionsMap + .updated("checkpoint_location", checkpointDir.getAbsolutePath)) + .getOrElse(updateOptionsMap))) flint.updateIndex(updatedIndex) } deleteTestIndex(testIndex) @@ -377,17 +363,24 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { val indexInitial = flint.describeIndex(testIndex).get val updatedIndexObsolete = flint .skippingIndex() - .copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) // This other update finishes first, converting to auto refresh - flint.updateIndex(flint - .skippingIndex() - .copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true")))) + flint.updateIndex( + flint + .skippingIndex() + .copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true")))) // Adding another update to convert to full refresh, so the obsolete update doesn't fail for option validation or state validation val indexUpdated = flint.describeIndex(testIndex).get - flint.updateIndex(flint - .skippingIndex() - .copyWithUpdate(indexUpdated, FlintSparkIndexOptions(Map("auto_refresh" -> "false")))) + flint.updateIndex( + flint + .skippingIndex() + .copyWithUpdate(indexUpdated, FlintSparkIndexOptions(Map("auto_refresh" -> "false")))) // This update trying to update an obsolete index should fail the[IllegalStateException] thrownBy