Skip to content

Commit

Permalink
Update remaining scroll search calls
Browse files Browse the repository at this point in the history
Signed-off-by: Rupal Mahajan <[email protected]>
  • Loading branch information
rupal-bq committed Jun 13, 2024
1 parent 281153f commit 5bcd134
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public void execute(
Client client, Map<String, String> params, QueryAction queryAction, RestChannel channel)
throws Exception {
ActionRequest request = requestBuilder.request();
boolean isSearchAfter =
LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER);

if (requestBuilder instanceof JoinRequestBuilder) {
boolean isSearchAfter =
LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER);
if (isSearchAfter) {
((JoinRequestBuilder) requestBuilder).updateRequestWithPit(client);
}
Expand All @@ -73,18 +73,13 @@ public void execute(
}
executor.sendResponse(channel);
} else if (requestBuilder instanceof MultiQueryRequestBuilder) {

boolean isSearchAfter =
LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER);
if (isSearchAfter) {
((MultiQueryRequestBuilder) requestBuilder).updateRequestWithPit(client);
}

ElasticHitsExecutor executor =
MultiRequestExecutorFactory.createExecutor(
client, (MultiQueryRequestBuilder) requestBuilder);
executor.run();

if (isSearchAfter) {
((MultiQueryRequestBuilder) requestBuilder).getPit().deletePointInTime(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.ElasticHitsExecutor;
import org.opensearch.sql.legacy.executor.join.ElasticUtils;
import org.opensearch.sql.legacy.query.DefaultQueryAction;
import org.opensearch.sql.legacy.query.multi.MultiQueryRequestBuilder;
import org.opensearch.sql.legacy.utils.Util;
Expand Down Expand Up @@ -192,7 +191,7 @@ private void fillMinusHitsFromResults(Set<ComperableHitResult> comperableHitResu

private Set<ComperableHitResult> runWithScrollings() {

SearchResponse response =
SearchResponse scrollResp =
getResponseWithHits(
client,
builder.getFirstSearchRequest(),
Expand All @@ -202,7 +201,7 @@ private Set<ComperableHitResult> runWithScrollings() {
builder.getPit().getPitId());
Set<ComperableHitResult> results = new HashSet<>();

SearchHit[] hits = response.getHits().getHits();
SearchHit[] hits = scrollResp.getHits().getHits();
if (hits == null || hits.length == 0) {
return new HashSet<>();
}
Expand All @@ -214,24 +213,26 @@ private Set<ComperableHitResult> runWithScrollings() {
if (totalDocsFetchedFromFirstTable > this.maxDocsToFetchOnFirstTable) {
break;
}
response =
scrollResp =
getResponseWithHits(
client,
builder.getFirstSearchRequest(),
builder.getOriginalSelect(true),
maxDocsToFetchOnEachScrollShard,
response,
scrollResp,
builder.getPit().getPitId());
hits = response.getHits().getHits();
hits = scrollResp.getHits().getHits();
}
response =
ElasticUtils.scrollOneTimeWithHits(
scrollResp =
getResponseWithHits(
this.client,
this.builder.getSecondSearchRequest(),
builder.getOriginalSelect(false),
this.maxDocsToFetchOnEachScrollShard);
this.maxDocsToFetchOnEachScrollShard,
null,
builder.getPit().getPitId());

hits = response.getHits().getHits();
hits = scrollResp.getHits().getHits();
if (hits == null || hits.length == 0) {
return results;
}
Expand All @@ -242,15 +243,15 @@ private Set<ComperableHitResult> runWithScrollings() {
if (totalDocsFetchedFromSecondTable > this.maxDocsToFetchOnSecondTable) {
break;
}
response =
scrollResp =
getResponseWithHits(
client,
builder.getSecondSearchRequest(),
builder.getOriginalSelect(false),
maxDocsToFetchOnEachScrollShard,
response,
scrollResp,
builder.getPit().getPitId());
hits = response.getHits().getHits();
hits = scrollResp.getHits().getHits();
}

return results;
Expand Down Expand Up @@ -348,11 +349,13 @@ private MinusOneFieldAndOptimizationResult runWithScrollingAndAddFilter(
break;
}
SearchResponse responseForSecondTable =
ElasticUtils.scrollOneTimeWithHits(
getResponseWithHits(
this.client,
queryAction.getRequestBuilder(),
secondQuerySelect,
this.maxDocsToFetchOnEachScrollShard);
this.maxDocsToFetchOnEachScrollShard,
null,
builder.getPit().getPitId());
SearchHits secondQuerySearchHits = responseForSecondTable.getHits();

SearchHit[] secondQueryHits = secondQuerySearchHits.getHits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public SqlElasticRequestBuilder explain() throws SqlParseException {
requestBuilder.fillTableAliases(
this.multiQuerySelect.getFirstSelect().getFields(),
this.multiQuerySelect.getSecondSelect().getFields());

return requestBuilder;
}

Expand Down

0 comments on commit 5bcd134

Please sign in to comment.