Skip to content

Commit

Permalink
Merge branch 'main' into support-covering-index-and-mv-idempotency
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Nov 10, 2023
2 parents 65ba58c + 7ebbc9d commit a5066ed
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 15 deletions.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE
(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public class FlintOptions implements Serializable {
public static final String SCROLL_SIZE = "read.scroll_size";
public static final int DEFAULT_SCROLL_SIZE = 100;

public static final String SCROLL_DURATION = "read.scroll_duration";
/**
* 5 minutes;
*/
public static final int DEFAULT_SCROLL_DURATION = 5;

public static final String REFRESH_POLICY = "write.refresh_policy";
/**
* NONE("false")
Expand All @@ -76,6 +82,10 @@ public int getScrollSize() {
return Integer.parseInt(options.getOrDefault(SCROLL_SIZE, String.valueOf(DEFAULT_SCROLL_SIZE)));
}

public int getScrollDuration() {
return Integer.parseInt(options.getOrDefault(SCROLL_DURATION, String.valueOf(DEFAULT_SCROLL_DURATION)));
}

public String getRefreshPolicy() {return options.getOrDefault(REFRESH_POLICY, DEFAULT_REFRESH_POLICY);}

public String getRegion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;

/**
* Abstract OpenSearch Reader.
Expand All @@ -38,8 +39,12 @@ public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest)
@Override public boolean hasNext() {
try {
if (iterator == null || !iterator.hasNext()) {
SearchResponse response = search(searchRequest);
List<SearchHit> searchHits = Arrays.asList(response.getHits().getHits());
Optional<SearchResponse> response = search(searchRequest);
if (response.isEmpty()) {
iterator = null;
return false;
}
List<SearchHit> searchHits = Arrays.asList(response.get().getHits().getHits());
iterator = searchHits.iterator();
}
return iterator.hasNext();
Expand Down Expand Up @@ -72,7 +77,7 @@ public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest)
/**
* search.
*/
abstract SearchResponse search(SearchRequest request) throws IOException;
abstract Optional<SearchResponse> search(SearchRequest request) throws IOException;

/**
* clean.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -28,30 +29,37 @@ public class OpenSearchScrollReader extends OpenSearchReader {

private static final Logger LOG = Logger.getLogger(OpenSearchScrollReader.class.getName());

/** Default scroll context timeout in minutes. */
public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5L);

private final FlintOptions options;

private final TimeValue scrollDuration;

private String scrollId = null;

public OpenSearchScrollReader(RestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) {
super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder.size(options.getScrollSize())));
this.options = options;
this.scrollDuration = TimeValue.timeValueMinutes(options.getScrollDuration());
}

/**
* search.
*/
SearchResponse search(SearchRequest request) throws IOException {
Optional<SearchResponse> search(SearchRequest request) throws IOException {
if (Strings.isNullOrEmpty(scrollId)) {
// add scroll timeout making the request as scroll search request.
request.scroll(DEFAULT_SCROLL_TIMEOUT);
request.scroll(scrollDuration);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
scrollId = response.getScrollId();
return response;
return Optional.of(response);
} else {
return client.scroll(new SearchScrollRequest().scroll(DEFAULT_SCROLL_TIMEOUT).scrollId(scrollId), RequestOptions.DEFAULT);
try {
return Optional
.of(client.scroll(new SearchScrollRequest().scroll(scrollDuration).scrollId(scrollId),
RequestOptions.DEFAULT));
} catch (OpenSearchStatusException e) {
LOG.log(Level.WARNING, "scroll context not exist", e);
scrollId = null;
return Optional.empty();
}
}
}

Expand All @@ -69,6 +77,15 @@ void clean() throws IOException {
// OpenSearch throw exception if scroll already closed. https://github.com/opensearch-project/OpenSearch/issues/11121
LOG.log(Level.WARNING, "close scroll exception, it is a known bug https://github" +
".com/opensearch-project/OpenSearch/issues/11121.", e);
} finally {
scrollId = null;
}
}

/**
* Public for testing.
*/
public String getScrollId() {
return scrollId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,18 @@ object FlintSparkConf {
val REFRESH_POLICY = FlintConfig("spark.datasource.flint.write.refresh_policy")
.datasourceOption()
.doc("refresh_policy, possible value are NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)")
.createWithDefault("false")
.createWithDefault(FlintOptions.DEFAULT_REFRESH_POLICY)

val SCROLL_SIZE = FlintConfig("spark.datasource.flint.read.scroll_size")
.datasourceOption()
.doc("scroll read size")
.createWithDefault("100")

val SCROLL_DURATION = FlintConfig(s"spark.datasource.flint.${FlintOptions.SCROLL_DURATION}")
.datasourceOption()
.doc("scroll duration in minutes")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SCROLL_DURATION))

val OPTIMIZER_RULE_ENABLED = FlintConfig("spark.flint.optimizer.enabled")
.doc("Enable Flint optimizer rule for query rewrite with Flint index")
.createWithDefault("true")
Expand Down Expand Up @@ -158,6 +163,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
HOST_PORT,
REFRESH_POLICY,
SCROLL_SIZE,
SCROLL_DURATION,
SCHEME,
AUTH,
REGION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class FlintSparkConfSuite extends FlintSuite {

// default value
assert(flintOptions.getPort == 9200)
assert(flintOptions.getRefreshPolicy == "false")
assert(flintOptions.getRefreshPolicy == "wait_for")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import org.opensearch.client.transport.rest_client.RestClientTransport
import org.opensearch.flint.OpenSearchSuite
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, OpenSearchScrollReader}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY
import org.apache.spark.sql.flint.config.FlintSparkConf.{REFRESH_POLICY, SCROLL_DURATION, SCROLL_SIZE}

class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with Matchers {

Expand Down Expand Up @@ -176,6 +176,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
reader.hasNext shouldBe false
reader.close()

reader.asInstanceOf[OpenSearchScrollReader].getScrollId shouldBe null
scrollShouldClosed()
}
}

it should "no item return after scroll timeout" in {
val indexName = "t0001"
withIndexName(indexName) {
multipleDocIndex(indexName, 2)

val options =
openSearchOptions + (s"${SCROLL_DURATION.optionKey}" -> "1", s"${SCROLL_SIZE.optionKey}" -> "1")
val flintClient = new FlintOpenSearchClient(new FlintOptions(options.asJava))
val match_all = null
val reader = flintClient.createReader(indexName, match_all)

reader.hasNext shouldBe true
reader.next
// scroll context expired after 1 minutes
Thread.sleep(60 * 1000 * 2)
reader.hasNext shouldBe false
reader.close()

reader.asInstanceOf[OpenSearchScrollReader].getScrollId shouldBe null
scrollShouldClosed()
}
}
Expand Down

0 comments on commit a5066ed

Please sign in to comment.