Skip to content

Commit

Permalink
Add pit for join queries (#2703)
Browse files Browse the repository at this point in the history
* Add search after for join

Signed-off-by: Rupal Mahajan <[email protected]>

* Enable search after by default

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix tests

Signed-off-by: Rupal Mahajan <[email protected]>

* ignore joinWithGeoIntersectNL

Signed-off-by: Rupal Mahajan <[email protected]>

* Rerun CI with scroll

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused code and retrigger CI with search_after true

Signed-off-by: Rupal Mahajan <[email protected]>

* Address comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused code change

Signed-off-by: Rupal Mahajan <[email protected]>

* Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix scroll condition

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit before query execution

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Move pit from join request builder to executor

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused methods

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit in parent class's run()

Signed-off-by: Rupal Mahajan <[email protected]>

* Add comment for fetching subsequent result in NestedLoopsElasticExecutor

Signed-off-by: Rupal Mahajan <[email protected]>

* Update comment

Signed-off-by: Rupal Mahajan <[email protected]>

* Add javadoc for pit handler

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit interface

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit handler unit test

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix failed unit test CI

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix spotless error

Signed-off-by: Rupal Mahajan <[email protected]>

* Rename pit class and add logs

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix pit delete unit test

Signed-off-by: Rupal Mahajan <[email protected]>

---------

Signed-off-by: Rupal Mahajan <[email protected]>
  • Loading branch information
rupal-bq authored Jun 26, 2024
1 parent b2403ca commit 154cb26
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum Key {
SQL_SLOWLOG("plugins.sql.slowlog"),
SQL_CURSOR_KEEP_ALIVE("plugins.sql.cursor.keep_alive"),
SQL_DELETE_ENABLED("plugins.sql.delete.enabled"),
SQL_PAGINATION_API_SEARCH_AFTER("plugins.sql.pagination.api"),

/** PPL Settings. */
PPL_ENABLED("plugins.ppl.enabled"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ public void hintMultiSearchCanRunFewTimesNL() throws IOException {
Assert.assertThat(hits.length(), equalTo(42));
}

// TODO: Fix joinWithGeoIntersectNL test when SQL_PAGINATION_API_SEARCH_AFTER is true
@Ignore
@Test
public void joinWithGeoIntersectNL() throws IOException {

Expand Down Expand Up @@ -455,7 +457,7 @@ public void joinParseCheckSelectedFieldsSplitNLConditionOrderGT() throws IOExcep
"SELECT /*! USE_NL*/ a.firstname, a.lastname, a.gender, d.firstname, d.age FROM %s a"
+ " JOIN %s d on a.age < d.age WHERE (d.firstname = 'Lynn' OR d.firstname ="
+ " 'Obrien') AND a.firstname = 'Mcgee'",
TEST_INDEX_PEOPLE,
TEST_INDEX_PEOPLE2,
TEST_INDEX_ACCOUNT);

JSONObject result = executeQuery(query);
Expand Down Expand Up @@ -501,7 +503,7 @@ public void joinParseCheckSelectedFieldsSplitNLConditionOrderLT() throws IOExcep
"SELECT /*! USE_NL*/ a.firstname, a.lastname, a.gender, d.firstname, d.age FROM %s a"
+ " JOIN %s d on a.age > d.age WHERE (d.firstname = 'Sandoval' OR d.firstname ="
+ " 'Hewitt') AND a.firstname = 'Fulton'",
TEST_INDEX_PEOPLE,
TEST_INDEX_PEOPLE2,
TEST_INDEX_ACCOUNT);

JSONObject result = executeQuery(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

package org.opensearch.sql.legacy.executor.join;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
Expand All @@ -28,11 +32,15 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.legacy.domain.Field;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.ElasticHitsExecutor;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder;
import org.opensearch.sql.legacy.query.join.HashJoinElasticRequestBuilder;
import org.opensearch.sql.legacy.query.join.JoinRequestBuilder;
Expand All @@ -49,15 +57,20 @@ public abstract class ElasticJoinExecutor implements ElasticHitsExecutor {
protected final int MAX_RESULTS_ON_ONE_FETCH = 10000;
private Set<String> aliasesOnReturn;
private boolean allFieldsReturn;
protected Client client;
protected String[] indices;
protected PointInTimeHandler pit;

protected ElasticJoinExecutor(JoinRequestBuilder requestBuilder) {
protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder) {
metaResults = new MetaSearchResult();
aliasesOnReturn = new HashSet<>();
List<Field> firstTableReturnedField = requestBuilder.getFirstTable().getReturnedFields();
List<Field> secondTableReturnedField = requestBuilder.getSecondTable().getReturnedFields();
allFieldsReturn =
(firstTableReturnedField == null || firstTableReturnedField.size() == 0)
&& (secondTableReturnedField == null || secondTableReturnedField.size() == 0);
indices = getIndices(requestBuilder);
this.client = client;
}

public void sendResponse(RestChannel channel) throws IOException {
Expand Down Expand Up @@ -85,10 +98,22 @@ public void sendResponse(RestChannel channel) throws IOException {
}

public void run() throws IOException, SqlParseException {
long timeBefore = System.currentTimeMillis();
results = innerRun();
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
this.metaResults.setTookImMilli(joinTimeInMilli);
try {
long timeBefore = System.currentTimeMillis();
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
pit = new PointInTimeHandlerImpl(client, indices);
pit.create();
}
results = innerRun();
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
this.metaResults.setTookImMilli(joinTimeInMilli);
} catch (Exception e) {
LOG.error("Failed during join query run.", e);
} finally {
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
pit.delete();
}
}
}

protected abstract List<SearchHit> innerRun() throws IOException, SqlParseException;
Expand All @@ -103,7 +128,7 @@ public SearchHits getHits() {
public static ElasticJoinExecutor createJoinExecutor(
Client client, SqlElasticRequestBuilder requestBuilder) {
if (requestBuilder instanceof HashJoinQueryPlanRequestBuilder) {
return new QueryPlanElasticExecutor((HashJoinQueryPlanRequestBuilder) requestBuilder);
return new QueryPlanElasticExecutor(client, (HashJoinQueryPlanRequestBuilder) requestBuilder);
} else if (requestBuilder instanceof HashJoinElasticRequestBuilder) {
HashJoinElasticRequestBuilder hashJoin = (HashJoinElasticRequestBuilder) requestBuilder;
return new HashJoinElasticExecutor(client, hashJoin);
Expand Down Expand Up @@ -256,23 +281,47 @@ protected void updateMetaSearchResults(SearchResponse searchResponse) {
this.metaResults.updateTimeOut(searchResponse.isTimedOut());
}

protected SearchResponse scrollOneTimeWithMax(
Client client, TableInJoinRequestBuilder tableRequest) {
SearchRequestBuilder scrollRequest =
tableRequest
.getRequestBuilder()
.setScroll(new TimeValue(60000))
.setSize(MAX_RESULTS_ON_ONE_FETCH);
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect();
if (!ordered) {
scrollRequest.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
public SearchResponse getResponseWithHits(
TableInJoinRequestBuilder tableRequest, int size, SearchResponse previousResponse) {
// Set Size
SearchRequestBuilder request = tableRequest.getRequestBuilder().setSize(size);
SearchResponse responseWithHits;
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
// Set sort field for search_after
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect();
if (!ordered) {
request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
}
// Set PIT
request.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
if (previousResponse != null) {
request.searchAfter(previousResponse.getHits().getSortFields());
}
responseWithHits = request.get();
} else {
// Set scroll
TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE);
if (previousResponse != null) {
responseWithHits =
client
.prepareSearchScroll(previousResponse.getScrollId())
.setScroll(keepAlive)
.execute()
.actionGet();
} else {
request.setScroll(keepAlive);
responseWithHits = request.get();
}
}
SearchResponse responseWithHits = scrollRequest.get();
// on ordered select - not using SCAN , elastic returns hits on first scroll
// es5.0 elastic always return docs on scan
// if(!ordered)
// responseWithHits = client.prepareSearchScroll(responseWithHits.getScrollId())
// .setScroll(new TimeValue(600000)).get();

return responseWithHits;
}

public String[] getIndices(JoinRequestBuilder joinRequestBuilder) {
return Stream.concat(
Stream.of(joinRequestBuilder.getFirstTable().getOriginalSelect().getIndexArr()),
Stream.of(joinRequestBuilder.getSecondTable().getOriginalSelect().getIndexArr()))
.distinct()
.toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
Expand All @@ -36,16 +35,13 @@
/** Created by Eliran on 22/8/2015. */
public class HashJoinElasticExecutor extends ElasticJoinExecutor {
private HashJoinElasticRequestBuilder requestBuilder;

private Client client;
private boolean useQueryTermsFilterOptimization = false;
private final int MAX_RESULTS_FOR_FIRST_TABLE = 100000;
HashJoinComparisonStructure hashJoinComparisonStructure;
private Set<String> alreadyMatched;

public HashJoinElasticExecutor(Client client, HashJoinElasticRequestBuilder requestBuilder) {
super(requestBuilder);
this.client = client;
super(client, requestBuilder);
this.requestBuilder = requestBuilder;
this.useQueryTermsFilterOptimization = requestBuilder.isUseTermFiltersOptimization();
this.hashJoinComparisonStructure =
Expand All @@ -54,7 +50,6 @@ public HashJoinElasticExecutor(Client client, HashJoinElasticRequestBuilder requ
}

public List<SearchHit> innerRun() throws IOException, SqlParseException {

Map<String, Map<String, List<Object>>> optimizationTermsFilterStructure =
initOptimizationStructure();

Expand Down Expand Up @@ -124,16 +119,12 @@ private List<SearchHit> createCombinedResults(TableInJoinRequestBuilder secondTa
Integer hintLimit = secondTableRequest.getHintLimit();
SearchResponse searchResponse;
boolean finishedScrolling;

if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {
searchResponse = secondTableRequest.getRequestBuilder().setSize(hintLimit).get();
searchResponse = getResponseWithHits(secondTableRequest, hintLimit, null);
finishedScrolling = true;
} else {
searchResponse =
secondTableRequest
.getRequestBuilder()
.setScroll(new TimeValue(60000))
.setSize(MAX_RESULTS_ON_ONE_FETCH)
.get();
searchResponse = getResponseWithHits(secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
// es5.0 no need to scroll again!
// searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
// .setScroll(new TimeValue(600000)).get();
Expand Down Expand Up @@ -214,11 +205,7 @@ private List<SearchHit> createCombinedResults(TableInJoinRequestBuilder secondTa
if (secondTableHits.length > 0
&& (hintLimit == null || fetchedSoFarFromSecondTable >= hintLimit)) {
searchResponse =
client
.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
getResponseWithHits(secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, searchResponse);
} else {
break;
}
Expand Down Expand Up @@ -292,12 +279,13 @@ private List<SearchHit> fetchAllHits(TableInJoinRequestBuilder tableInJoinReques

private List<SearchHit> scrollTillLimit(
TableInJoinRequestBuilder tableInJoinRequest, Integer hintLimit) {
SearchResponse scrollResp = scrollOneTimeWithMax(client, tableInJoinRequest);
SearchResponse response =
getResponseWithHits(tableInJoinRequest, MAX_RESULTS_ON_ONE_FETCH, null);

updateMetaSearchResults(scrollResp);
updateMetaSearchResults(response);
List<SearchHit> hitsWithScan = new ArrayList<>();
int curentNumOfResults = 0;
SearchHit[] hits = scrollResp.getHits().getHits();
SearchHit[] hits = response.getHits().getHits();

if (hintLimit == null) {
hintLimit = MAX_RESULTS_FOR_FIRST_TABLE;
Expand All @@ -311,13 +299,8 @@ private List<SearchHit> scrollTillLimit(
System.out.println("too many results for first table, stoping at:" + curentNumOfResults);
break;
}
scrollResp =
client
.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
hits = scrollResp.getHits().getHits();
response = getResponseWithHits(tableInJoinRequest, MAX_RESULTS_FOR_FIRST_TABLE, response);
hits = response.getHits().getHits();
}
return hitsWithScan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
Expand All @@ -39,11 +38,9 @@ public class NestedLoopsElasticExecutor extends ElasticJoinExecutor {
private static final Logger LOG = LogManager.getLogger();

private final NestedLoopsElasticRequestBuilder nestedLoopsRequest;
private final Client client;

public NestedLoopsElasticExecutor(Client client, NestedLoopsElasticRequestBuilder nestedLoops) {
super(nestedLoops);
this.client = client;
super(client, nestedLoops);
this.nestedLoopsRequest = nestedLoops;
}

Expand Down Expand Up @@ -111,11 +108,26 @@ protected List<SearchHit> innerRun() throws SqlParseException {
if (!BackOffRetryStrategy.isHealthy()) {
throw new IllegalStateException("Memory circuit is broken");
}
firstTableResponse =
client
.prepareSearchScroll(firstTableResponse.getScrollId())
.setScroll(new TimeValue(600000))
.get();
/* Fetching next result page.
Using scroll api - only scrollId from previous response is required for scroll request.
Using pit with search_after - we need to recreate search request along with pitId and
sort fields from previous response.
Here we are finding required size for recreating search request with pit and search after.
Conditions for size are similar as firstFetch().
In case of scroll, this size will be ignored and size from first request will be used.
*/
Integer hintLimit = nestedLoopsRequest.getFirstTable().getHintLimit();
if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {
firstTableResponse =
getResponseWithHits(
nestedLoopsRequest.getFirstTable(), hintLimit, firstTableResponse);
} else {
firstTableResponse =
getResponseWithHits(
nestedLoopsRequest.getFirstTable(),
MAX_RESULTS_ON_ONE_FETCH,
firstTableResponse);
}
} else {
finishedWithFirstTable = true;
}
Expand Down Expand Up @@ -287,12 +299,11 @@ private FetchWithScrollResponse firstFetch(TableInJoinRequestBuilder tableReques
boolean needScrollForFirstTable = false;
SearchResponse responseWithHits;
if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {

responseWithHits = tableRequest.getRequestBuilder().setSize(hintLimit).get();
needScrollForFirstTable = false;
} else {
// scroll request with max.
responseWithHits = scrollOneTimeWithMax(client, tableRequest);
responseWithHits = getResponseWithHits(tableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
if (responseWithHits.getHits().getTotalHits() != null
&& responseWithHits.getHits().getTotalHits().value < MAX_RESULTS_ON_ONE_FETCH) {
needScrollForFirstTable = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.legacy.executor.join;

import java.util.List;
import org.opensearch.client.Client;
import org.opensearch.search.SearchHit;
import org.opensearch.sql.legacy.query.planner.HashJoinQueryPlanRequestBuilder;
import org.opensearch.sql.legacy.query.planner.core.QueryPlanner;
Expand All @@ -19,8 +20,8 @@ class QueryPlanElasticExecutor extends ElasticJoinExecutor {

private final QueryPlanner queryPlanner;

QueryPlanElasticExecutor(HashJoinQueryPlanRequestBuilder request) {
super(request);
QueryPlanElasticExecutor(Client client, HashJoinQueryPlanRequestBuilder request) {
super(client, request);
this.queryPlanner = request.plan();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.legacy.pit;

/** Point In Time */
public interface PointInTimeHandler {
/** Create Point In Time */
void create();

/** Delete Point In Time */
void delete();

/** Get Point In Time Identifier */
String getPitId();
}
Loading

0 comments on commit 154cb26

Please sign in to comment.