diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 9858ffd1e..a0fde6212 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -74,7 +74,7 @@ public class FlintOptions implements Serializable { * * WAIT_UNTIL("wait_for") */ - public static final String DEFAULT_REFRESH_POLICY = "wait_for"; + public static final String DEFAULT_REFRESH_POLICY = "false"; public static final String SOCKET_TIMEOUT_MILLIS = "socket_timeout_millis"; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index 30dd45fa1..dd565fd18 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -74,7 +74,6 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) .queryName(indexName) .format(FLINT_DATASOURCE) .options(flintSparkConf.properties) - .option(FlintSparkConf.REFRESH_POLICY.optionKey, "false") .addSinkOptions(options, flintSparkConf) .start(indexName) Some(job.id.toString) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala index c0ac7d98a..b2ce2ad34 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala @@ -5,7 +5,6 @@ package org.opensearch.flint.spark.refresh -import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.spark.FlintSparkIndex import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{FULL, RefreshMode} @@ -44,7 +43,6 @@ class FullIndexRefresh( .write .format(FLINT_DATASOURCE) .options(flintSparkConf.properties) - .option(FlintSparkConf.REFRESH_POLICY.optionKey, "false") .mode(Overwrite) .save(indexName) None diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index c1d2bf79b..fd91bd0b2 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -17,6 +17,7 @@ import play.api.libs.json._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY import org.apache.spark.sql.types._ import org.apache.spark.sql.util._ @@ -97,6 +98,7 @@ trait FlintJobExecutor { try { resultData.write .format("flint") + .option(REFRESH_POLICY.optionKey, "wait_for") .mode("append") .save(resultIndex) IRestHighLevelClient.recordOperationSuccess(