diff --git a/docs/index.md b/docs/index.md index 05a38dbeb..c5728c592 100644 --- a/docs/index.md +++ b/docs/index.md @@ -340,8 +340,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.write.id_name`: no default value. - `spark.datasource.flint.ignore.id_column` : default value is true. - `spark.datasource.flint.write.batch_size`: default value is 1000. -- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), - IMMEDIATE(true), WAIT_UNTIL(wait_for)] +- `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE + (false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.flint.optimizer.enabled`: default is true. - `spark.flint.index.hybridscan.enabled`: default is false. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 60b4bfc8c..c44dd939f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -58,7 +58,7 @@ public class FlintOptions implements Serializable { * * WAIT_UNTIL("wait_for") */ - public static final String DEFAULT_REFRESH_POLICY = "false"; + public static final String DEFAULT_REFRESH_POLICY = "wait_for"; public FlintOptions(Map options) { this.options = options; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index d916c8ad6..af44bb6a6 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -5,6 +5,7 @@ package org.opensearch.flint.core.storage; +import org.opensearch.OpenSearchStatusException; import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; @@ -17,12 +18,16 @@ import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; /** * {@link OpenSearchReader} using scroll search. https://opensearch.org/docs/latest/api-reference/scroll/ */ public class OpenSearchScrollReader extends OpenSearchReader { + private static final Logger LOG = Logger.getLogger(OpenSearchScrollReader.class.getName()); + /** Default scroll context timeout in minutes. */ public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5L); @@ -54,10 +59,16 @@ SearchResponse search(SearchRequest request) throws IOException { * clean the scroll context. */ void clean() throws IOException { - if (!Strings.isNullOrEmpty(scrollId)) { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(scrollId); - client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + try { + if (!Strings.isNullOrEmpty(scrollId)) { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + } + } catch (OpenSearchStatusException e) { + // OpenSearch throw exception if scroll already closed. https://github.com/opensearch-project/OpenSearch/issues/11121 + LOG.log(Level.WARNING, "close scroll exception, it is a known bug https://github" + + ".com/opensearch-project/OpenSearch/issues/11121.", e); } } }