From 873cb9cce356f4369070174cdab673527ad659bf Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 6 Nov 2023 17:15:00 -0800 Subject: [PATCH 1/2] Bug fix, handle close scroll exception Signed-off-by: Peng Huo --- docs/index.md | 4 ++-- .../org/opensearch/flint/core/FlintOptions.java | 2 +- .../flint/core/storage/OpenSearchScrollReader.java | 13 +++++++++---- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/docs/index.md b/docs/index.md index 05a38dbeb..c5728c592 100644 --- a/docs/index.md +++ b/docs/index.md @@ -340,8 +340,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.write.id_name`: no default value. - `spark.datasource.flint.ignore.id_column` : default value is true. - `spark.datasource.flint.write.batch_size`: default value is 1000. -- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), - IMMEDIATE(true), WAIT_UNTIL(wait_for)] +- `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE + (false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.flint.optimizer.enabled`: default is true. - `spark.flint.index.hybridscan.enabled`: default is false. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 60b4bfc8c..c44dd939f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -58,7 +58,7 @@ public class FlintOptions implements Serializable { * * WAIT_UNTIL("wait_for") */ - public static final String DEFAULT_REFRESH_POLICY = "false"; + public static final String DEFAULT_REFRESH_POLICY = "wait_for"; public FlintOptions(Map options) { this.options = options; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index d916c8ad6..1e49def5a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -5,6 +5,7 @@ package org.opensearch.flint.core.storage; +import org.opensearch.OpenSearchStatusException; import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; @@ -54,10 +55,14 @@ SearchResponse search(SearchRequest request) throws IOException { * clean the scroll context. */ void clean() throws IOException { - if (!Strings.isNullOrEmpty(scrollId)) { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(scrollId); - client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + try { + if (!Strings.isNullOrEmpty(scrollId)) { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + } + } catch (OpenSearchStatusException e) { + // OpenSearch throw exception if scroll already closed. https://github.com/opensearch-project/OpenSearch/issues/11121 } } } From 71ccd859d5a863caa777208262bad552c7125523 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 6 Nov 2023 21:19:29 -0800 Subject: [PATCH 2/2] address comments Signed-off-by: Peng Huo --- .../flint/core/storage/OpenSearchScrollReader.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index 1e49def5a..af44bb6a6 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -18,12 +18,16 @@ import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; /** * {@link OpenSearchReader} using scroll search. https://opensearch.org/docs/latest/api-reference/scroll/ */ public class OpenSearchScrollReader extends OpenSearchReader { + private static final Logger LOG = Logger.getLogger(OpenSearchScrollReader.class.getName()); + /** Default scroll context timeout in minutes. */ public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5L); @@ -63,6 +67,8 @@ void clean() throws IOException { } } catch (OpenSearchStatusException e) { // OpenSearch throw exception if scroll already closed. https://github.com/opensearch-project/OpenSearch/issues/11121 + LOG.log(Level.WARNING, "close scroll exception, it is a known bug https://github" + + ".com/opensearch-project/OpenSearch/issues/11121.", e); } } }