Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Fix, handle scroll timeout exception #147

Merged
merged 2 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE
(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public class FlintOptions implements Serializable {
public static final String SCROLL_SIZE = "read.scroll_size";
public static final int DEFAULT_SCROLL_SIZE = 100;

public static final String SCROLL_DURATION = "read.scroll_duration";
/**
* 5 minutes;
*/
public static final int DEFAULT_SCROLL_DURATION = 5;

public static final String REFRESH_POLICY = "write.refresh_policy";
/**
* NONE("false")
Expand All @@ -76,6 +82,10 @@ public int getScrollSize() {
return Integer.parseInt(options.getOrDefault(SCROLL_SIZE, String.valueOf(DEFAULT_SCROLL_SIZE)));
}

public int getScrollDuration() {
return Integer.parseInt(options.getOrDefault(SCROLL_DURATION, String.valueOf(DEFAULT_SCROLL_DURATION)));
}

public String getRefreshPolicy() {return options.getOrDefault(REFRESH_POLICY, DEFAULT_REFRESH_POLICY);}

public String getRegion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;

/**
* Abstract OpenSearch Reader.
Expand All @@ -38,8 +39,12 @@ public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest)
@Override public boolean hasNext() {
try {
if (iterator == null || !iterator.hasNext()) {
SearchResponse response = search(searchRequest);
List<SearchHit> searchHits = Arrays.asList(response.getHits().getHits());
Optional<SearchResponse> response = search(searchRequest);
if (response.isEmpty()) {
iterator = null;
return false;
}
List<SearchHit> searchHits = Arrays.asList(response.get().getHits().getHits());
iterator = searchHits.iterator();
}
return iterator.hasNext();
Expand Down Expand Up @@ -72,7 +77,7 @@ public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest)
/**
* search.
*/
abstract SearchResponse search(SearchRequest request) throws IOException;
abstract Optional<SearchResponse> search(SearchRequest request) throws IOException;

/**
* clean.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -28,30 +29,37 @@ public class OpenSearchScrollReader extends OpenSearchReader {

private static final Logger LOG = Logger.getLogger(OpenSearchScrollReader.class.getName());

/** Default scroll context timeout in minutes. */
public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5L);

private final FlintOptions options;

private final TimeValue scrollDuration;

private String scrollId = null;

public OpenSearchScrollReader(RestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) {
super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder.size(options.getScrollSize())));
this.options = options;
this.scrollDuration = TimeValue.timeValueMinutes(options.getScrollDuration());
}

/**
* search.
*/
SearchResponse search(SearchRequest request) throws IOException {
Optional<SearchResponse> search(SearchRequest request) throws IOException {
if (Strings.isNullOrEmpty(scrollId)) {
// add scroll timeout making the request as scroll search request.
request.scroll(DEFAULT_SCROLL_TIMEOUT);
request.scroll(scrollDuration);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
scrollId = response.getScrollId();
return response;
return Optional.of(response);
} else {
return client.scroll(new SearchScrollRequest().scroll(DEFAULT_SCROLL_TIMEOUT).scrollId(scrollId), RequestOptions.DEFAULT);
try {
return Optional
.of(client.scroll(new SearchScrollRequest().scroll(scrollDuration).scrollId(scrollId),
RequestOptions.DEFAULT));
} catch (OpenSearchStatusException e) {
LOG.log(Level.WARNING, "scroll context not exist", e);
scrollId = null;
return Optional.empty();
}
}
}

Expand All @@ -69,6 +77,15 @@ void clean() throws IOException {
// OpenSearch throw exception if scroll already closed. https://github.com/opensearch-project/OpenSearch/issues/11121
LOG.log(Level.WARNING, "close scroll exception, it is a known bug https://github" +
".com/opensearch-project/OpenSearch/issues/11121.", e);
} finally {
scrollId = null;
}
}

/**
* Public for testing.
*/
public String getScrollId() {
return scrollId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ object FlintSparkConf {
.doc("scroll read size")
.createWithDefault("100")

val SCROLL_DURATION = FlintConfig(s"spark.datasource.flint.${FlintOptions.SCROLL_DURATION}")
.datasourceOption()
.doc("scroll duration in minutes")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SCROLL_DURATION))

val OPTIMIZER_RULE_ENABLED = FlintConfig("spark.flint.optimizer.enabled")
.doc("Enable Flint optimizer rule for query rewrite with Flint index")
.createWithDefault("true")
Expand Down Expand Up @@ -158,6 +163,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
HOST_PORT,
REFRESH_POLICY,
SCROLL_SIZE,
SCROLL_DURATION,
SCHEME,
AUTH,
REGION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import org.opensearch.client.transport.rest_client.RestClientTransport
import org.opensearch.flint.OpenSearchSuite
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, OpenSearchScrollReader}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY
import org.apache.spark.sql.flint.config.FlintSparkConf.{REFRESH_POLICY, SCROLL_DURATION, SCROLL_SIZE, SCROLL_TIMEOUT}

class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with Matchers {

Expand Down Expand Up @@ -176,6 +176,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
reader.hasNext shouldBe false
reader.close()

reader.asInstanceOf[OpenSearchScrollReader].getScrollId shouldBe null
scrollShouldClosed()
}
}

it should "no item return after scroll timeout" in {
val indexName = "t0001"
withIndexName(indexName) {
multipleDocIndex(indexName, 2)

val options =
openSearchOptions + (s"${SCROLL_DURATION.optionKey}" -> "1", s"${SCROLL_SIZE.optionKey}" -> "1")
val flintClient = new FlintOpenSearchClient(new FlintOptions(options.asJava))
val match_all = null
val reader = flintClient.createReader(indexName, match_all)

reader.hasNext shouldBe true
reader.next
// scroll context expired after 1 minutes
Thread.sleep(60 * 1000 * 2)
reader.hasNext shouldBe false
reader.close()

reader.asInstanceOf[OpenSearchScrollReader].getScrollId shouldBe null
scrollShouldClosed()
}
}
Expand Down
Loading