diff --git a/docs/index.md b/docs/index.md index 411877c30..d0228cceb 100644 --- a/docs/index.md +++ b/docs/index.md @@ -357,6 +357,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE (false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.read.scroll_size`: default value is 100. +- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. - `spark.flint.optimizer.enabled`: default is true. - `spark.flint.index.hybridscan.enabled`: default is false. - `spark.flint.index.checkpoint.mandatory`: default is true. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index c44dd939f..52ba61192 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -50,6 +50,12 @@ public class FlintOptions implements Serializable { public static final String SCROLL_SIZE = "read.scroll_size"; public static final int DEFAULT_SCROLL_SIZE = 100; + public static final String SCROLL_DURATION = "read.scroll_duration"; + /** + * 5 minutes; + */ + public static final int DEFAULT_SCROLL_DURATION = 5; + public static final String REFRESH_POLICY = "write.refresh_policy"; /** * NONE("false") @@ -76,6 +82,10 @@ public int getScrollSize() { return Integer.parseInt(options.getOrDefault(SCROLL_SIZE, String.valueOf(DEFAULT_SCROLL_SIZE))); } + public int getScrollDuration() { + return Integer.parseInt(options.getOrDefault(SCROLL_DURATION, String.valueOf(DEFAULT_SCROLL_DURATION))); + } + public String getRefreshPolicy() {return options.getOrDefault(REFRESH_POLICY, DEFAULT_REFRESH_POLICY);} public String getRegion() { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java index 96ec74bc2..472431bf1 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java @@ -14,6 +14,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Optional; /** * Abstract OpenSearch Reader. @@ -38,8 +39,12 @@ public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest) @Override public boolean hasNext() { try { if (iterator == null || !iterator.hasNext()) { - SearchResponse response = search(searchRequest); - List searchHits = Arrays.asList(response.getHits().getHits()); + Optional response = search(searchRequest); + if (response.isEmpty()) { + iterator = null; + return false; + } + List searchHits = Arrays.asList(response.get().getHits().getHits()); iterator = searchHits.iterator(); } return iterator.hasNext(); @@ -72,7 +77,7 @@ public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest) /** * search. */ - abstract SearchResponse search(SearchRequest request) throws IOException; + abstract Optional search(SearchRequest request) throws IOException; /** * clean. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index af44bb6a6..d71014c20 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -18,6 +18,7 @@ import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; +import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; @@ -28,30 +29,37 @@ public class OpenSearchScrollReader extends OpenSearchReader { private static final Logger LOG = Logger.getLogger(OpenSearchScrollReader.class.getName()); - /** Default scroll context timeout in minutes. */ - public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5L); - private final FlintOptions options; + private final TimeValue scrollDuration; + private String scrollId = null; public OpenSearchScrollReader(RestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) { super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder.size(options.getScrollSize()))); this.options = options; + this.scrollDuration = TimeValue.timeValueMinutes(options.getScrollDuration()); } /** * search. */ - SearchResponse search(SearchRequest request) throws IOException { + Optional search(SearchRequest request) throws IOException { if (Strings.isNullOrEmpty(scrollId)) { - // add scroll timeout making the request as scroll search request. - request.scroll(DEFAULT_SCROLL_TIMEOUT); + request.scroll(scrollDuration); SearchResponse response = client.search(request, RequestOptions.DEFAULT); scrollId = response.getScrollId(); - return response; + return Optional.of(response); } else { - return client.scroll(new SearchScrollRequest().scroll(DEFAULT_SCROLL_TIMEOUT).scrollId(scrollId), RequestOptions.DEFAULT); + try { + return Optional + .of(client.scroll(new SearchScrollRequest().scroll(scrollDuration).scrollId(scrollId), + RequestOptions.DEFAULT)); + } catch (OpenSearchStatusException e) { + LOG.log(Level.WARNING, "scroll context not exist", e); + scrollId = null; + return Optional.empty(); + } } } @@ -69,6 +77,15 @@ void clean() throws IOException { // OpenSearch throw exception if scroll already closed. https://github.com/opensearch-project/OpenSearch/issues/11121 LOG.log(Level.WARNING, "close scroll exception, it is a known bug https://github" + ".com/opensearch-project/OpenSearch/issues/11121.", e); + } finally { + scrollId = null; } } + + /** + * Public for testing. + */ + public String getScrollId() { + return scrollId; + } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 3edeaa01a..2c42f9f20 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -106,6 +106,11 @@ object FlintSparkConf { .doc("scroll read size") .createWithDefault("100") + val SCROLL_DURATION = FlintConfig(s"spark.datasource.flint.${FlintOptions.SCROLL_DURATION}") + .datasourceOption() + .doc("scroll duration in minutes") + .createWithDefault(String.valueOf(FlintOptions.DEFAULT_SCROLL_DURATION)) + val OPTIMIZER_RULE_ENABLED = FlintConfig("spark.flint.optimizer.enabled") .doc("Enable Flint optimizer rule for query rewrite with Flint index") .createWithDefault("true") @@ -158,6 +163,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable HOST_PORT, REFRESH_POLICY, SCROLL_SIZE, + SCROLL_DURATION, SCHEME, AUTH, REGION, diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 060c57940..9a762d9d6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -17,12 +17,12 @@ import org.opensearch.client.transport.rest_client.RestClientTransport import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction -import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.core.storage.{FlintOpenSearchClient, OpenSearchScrollReader} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar.mock -import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY +import org.apache.spark.sql.flint.config.FlintSparkConf.{REFRESH_POLICY, SCROLL_DURATION, SCROLL_SIZE} class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with Matchers { @@ -176,6 +176,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M reader.hasNext shouldBe false reader.close() + reader.asInstanceOf[OpenSearchScrollReader].getScrollId shouldBe null + scrollShouldClosed() + } + } + + it should "no item return after scroll timeout" in { + val indexName = "t0001" + withIndexName(indexName) { + multipleDocIndex(indexName, 2) + + val options = + openSearchOptions + (s"${SCROLL_DURATION.optionKey}" -> "1", s"${SCROLL_SIZE.optionKey}" -> "1") + val flintClient = new FlintOpenSearchClient(new FlintOptions(options.asJava)) + val match_all = null + val reader = flintClient.createReader(indexName, match_all) + + reader.hasNext shouldBe true + reader.next + // scroll context expired after 1 minutes + Thread.sleep(60 * 1000 * 2) + reader.hasNext shouldBe false + reader.close() + + reader.asInstanceOf[OpenSearchScrollReader].getScrollId shouldBe null scrollShouldClosed() } }