Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pit for join queries #2703

Merged
merged 35 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e225977
Add search after for join
rupal-bq May 29, 2024
d69feda
Enable search after by default
rupal-bq May 29, 2024
b75d782
Add pit
rupal-bq Jun 2, 2024
a336e3b
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 2, 2024
afd24d8
nit
rupal-bq Jun 3, 2024
8f840ae
Fix tests
rupal-bq Jun 4, 2024
070b40f
ignore joinWithGeoIntersectNL
rupal-bq Jun 4, 2024
58c96f5
Rerun CI with scroll
rupal-bq Jun 4, 2024
3dad30d
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 5, 2024
1404cd5
Remove unused code and retrigger CI with search_after true
rupal-bq Jun 5, 2024
fd09367
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 5, 2024
fcb584a
Address comments
rupal-bq Jun 7, 2024
12f5abb
Merge branch 'join-search-after' of github.com:rupal-bq/opensearch-sq…
rupal-bq Jun 7, 2024
08a6a29
Remove unused code change
rupal-bq Jun 7, 2024
030f4b5
Merge branch 'main' into join-search-after
rupal-bq Jun 10, 2024
39f727a
Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE
rupal-bq Jun 10, 2024
be8d986
Fix scroll condition
rupal-bq Jun 12, 2024
7bb89d8
nit
rupal-bq Jun 12, 2024
60bd8fe
Add pit before query execution
rupal-bq Jun 13, 2024
17d1d0e
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 13, 2024
3af87c4
nit
rupal-bq Jun 14, 2024
bee4863
Move pit from join request builder to executor
rupal-bq Jun 16, 2024
0857a6b
Remove unused methods
rupal-bq Jun 16, 2024
8e1c101
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 16, 2024
617d1db
Add pit in parent class's run()
rupal-bq Jun 19, 2024
841db6b
Add comment for fetching subsequent result in NestedLoopsElasticExecutor
rupal-bq Jun 19, 2024
3185eda
Update comment
rupal-bq Jun 19, 2024
2070f68
Add javadoc for pit handler
rupal-bq Jun 24, 2024
0bf9792
Add pit interface
rupal-bq Jun 24, 2024
e13c50f
Add pit handler unit test
rupal-bq Jun 25, 2024
1366663
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 25, 2024
1408ccf
Fix failed unit test CI
rupal-bq Jun 25, 2024
dd10d77
Fix spotless error
rupal-bq Jun 25, 2024
b487abf
Rename pit class and add logs
rupal-bq Jun 26, 2024
f79d89e
Fix pit delete unit test
rupal-bq Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum Key {
SQL_SLOWLOG("plugins.sql.slowlog"),
SQL_CURSOR_KEEP_ALIVE("plugins.sql.cursor.keep_alive"),
SQL_DELETE_ENABLED("plugins.sql.delete.enabled"),
SQL_PAGINATION_API_SEARCH_AFTER("plugins.sql.pagination.api"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're not switching to PIT completely but support both PIT and Scroll?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current plan is to add for all pagination then remove scroll after performance comparison.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we will raise another PR for this after performance test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I was planning to add search_after for all queries (join, multi query, pagination) then a separate PR to remove scroll after performance test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the PIT turns out be bad? We will leave setting to customer? What will be the default setting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current default setting is true for using PIT with search after. After benchmark, we can decide if this needs to be changed. We can remove this while removing scroll.


/** PPL Settings. */
PPL_ENABLED("plugins.ppl.enabled"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ public void hintMultiSearchCanRunFewTimesNL() throws IOException {
Assert.assertThat(hits.length(), equalTo(42));
}

// TODO: Fix joinWithGeoIntersectNL test when SQL_PAGINATION_API_SEARCH_AFTER is true
@Ignore
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void joinWithGeoIntersectNL() throws IOException {

Expand Down Expand Up @@ -455,7 +457,7 @@ public void joinParseCheckSelectedFieldsSplitNLConditionOrderGT() throws IOExcep
"SELECT /*! USE_NL*/ a.firstname, a.lastname, a.gender, d.firstname, d.age FROM %s a"
+ " JOIN %s d on a.age < d.age WHERE (d.firstname = 'Lynn' OR d.firstname ="
+ " 'Obrien') AND a.firstname = 'Mcgee'",
TEST_INDEX_PEOPLE,
TEST_INDEX_PEOPLE2,
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
TEST_INDEX_ACCOUNT);

JSONObject result = executeQuery(query);
Expand Down Expand Up @@ -501,7 +503,7 @@ public void joinParseCheckSelectedFieldsSplitNLConditionOrderLT() throws IOExcep
"SELECT /*! USE_NL*/ a.firstname, a.lastname, a.gender, d.firstname, d.age FROM %s a"
+ " JOIN %s d on a.age > d.age WHERE (d.firstname = 'Sandoval' OR d.firstname ="
+ " 'Hewitt') AND a.firstname = 'Fulton'",
TEST_INDEX_PEOPLE,
TEST_INDEX_PEOPLE2,
TEST_INDEX_ACCOUNT);

JSONObject result = executeQuery(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

package org.opensearch.sql.legacy.executor.join;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
Expand All @@ -28,11 +32,15 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.legacy.domain.Field;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.ElasticHitsExecutor;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder;
import org.opensearch.sql.legacy.query.join.HashJoinElasticRequestBuilder;
import org.opensearch.sql.legacy.query.join.JoinRequestBuilder;
Expand All @@ -49,15 +57,20 @@ public abstract class ElasticJoinExecutor implements ElasticHitsExecutor {
protected final int MAX_RESULTS_ON_ONE_FETCH = 10000;
private Set<String> aliasesOnReturn;
private boolean allFieldsReturn;
protected Client client;
protected String[] indices;
protected PointInTimeHandler pit;

protected ElasticJoinExecutor(JoinRequestBuilder requestBuilder) {
protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder) {
metaResults = new MetaSearchResult();
aliasesOnReturn = new HashSet<>();
List<Field> firstTableReturnedField = requestBuilder.getFirstTable().getReturnedFields();
List<Field> secondTableReturnedField = requestBuilder.getSecondTable().getReturnedFields();
allFieldsReturn =
(firstTableReturnedField == null || firstTableReturnedField.size() == 0)
&& (secondTableReturnedField == null || secondTableReturnedField.size() == 0);
indices = getIndices(requestBuilder);
this.client = client;
}

public void sendResponse(RestChannel channel) throws IOException {
Expand Down Expand Up @@ -85,10 +98,22 @@ public void sendResponse(RestChannel channel) throws IOException {
}

public void run() throws IOException, SqlParseException {
long timeBefore = System.currentTimeMillis();
results = innerRun();
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
this.metaResults.setTookImMilli(joinTimeInMilli);
try {
long timeBefore = System.currentTimeMillis();
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
pit = new PointInTimeHandlerImpl(client, indices);
pit.create();
}
results = innerRun();
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
this.metaResults.setTookImMilli(joinTimeInMilli);
} catch (Exception e) {
LOG.error("Failed during join query run.", e);
} finally {
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
pit.delete();
}
}
}

protected abstract List<SearchHit> innerRun() throws IOException, SqlParseException;
Expand All @@ -103,7 +128,7 @@ public SearchHits getHits() {
public static ElasticJoinExecutor createJoinExecutor(
Client client, SqlElasticRequestBuilder requestBuilder) {
if (requestBuilder instanceof HashJoinQueryPlanRequestBuilder) {
return new QueryPlanElasticExecutor((HashJoinQueryPlanRequestBuilder) requestBuilder);
return new QueryPlanElasticExecutor(client, (HashJoinQueryPlanRequestBuilder) requestBuilder);
} else if (requestBuilder instanceof HashJoinElasticRequestBuilder) {
HashJoinElasticRequestBuilder hashJoin = (HashJoinElasticRequestBuilder) requestBuilder;
return new HashJoinElasticExecutor(client, hashJoin);
Expand Down Expand Up @@ -256,23 +281,47 @@ protected void updateMetaSearchResults(SearchResponse searchResponse) {
this.metaResults.updateTimeOut(searchResponse.isTimedOut());
}

protected SearchResponse scrollOneTimeWithMax(
Client client, TableInJoinRequestBuilder tableRequest) {
SearchRequestBuilder scrollRequest =
tableRequest
.getRequestBuilder()
.setScroll(new TimeValue(60000))
.setSize(MAX_RESULTS_ON_ONE_FETCH);
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect();
if (!ordered) {
scrollRequest.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
public SearchResponse getResponseWithHits(
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
TableInJoinRequestBuilder tableRequest, int size, SearchResponse previousResponse) {
// Set Size
SearchRequestBuilder request = tableRequest.getRequestBuilder().setSize(size);
SearchResponse responseWithHits;
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
// Set sort field for search_after
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect();
if (!ordered) {
request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
}
// Set PIT
request.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if pit request didn't complete by the time code execution reaches this line?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As commented in PointInTimeHandler, we are actually listening to the response in listener instead of waiting.

Copy link
Contributor Author

@rupal-bq rupal-bq Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried printing Thread.currentThread().getId() in run and while getting id from pitResponse, thread id is same so it's waiting. Looks like pit request will complete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Strange if it is working..I am good but I am still not sure...can you add logs somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logs in pit handler impl

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually mean thread id logs to comment or description. I am fine anyways.

if (previousResponse != null) {
request.searchAfter(previousResponse.getHits().getSortFields());
}
responseWithHits = request.get();
} else {
// Set scroll
TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE);
if (previousResponse != null) {
responseWithHits =
client
.prepareSearchScroll(previousResponse.getScrollId())
.setScroll(keepAlive)
.execute()
.actionGet();
} else {
request.setScroll(keepAlive);
responseWithHits = request.get();
}
}
SearchResponse responseWithHits = scrollRequest.get();
// on ordered select - not using SCAN , elastic returns hits on first scroll
// es5.0 elastic always return docs on scan
// if(!ordered)
// responseWithHits = client.prepareSearchScroll(responseWithHits.getScrollId())
// .setScroll(new TimeValue(600000)).get();

return responseWithHits;
}

public String[] getIndices(JoinRequestBuilder joinRequestBuilder) {
return Stream.concat(
Stream.of(joinRequestBuilder.getFirstTable().getOriginalSelect().getIndexArr()),
Stream.of(joinRequestBuilder.getSecondTable().getOriginalSelect().getIndexArr()))
.distinct()
.toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
Expand All @@ -36,16 +35,13 @@
/** Created by Eliran on 22/8/2015. */
public class HashJoinElasticExecutor extends ElasticJoinExecutor {
private HashJoinElasticRequestBuilder requestBuilder;

private Client client;
private boolean useQueryTermsFilterOptimization = false;
private final int MAX_RESULTS_FOR_FIRST_TABLE = 100000;
HashJoinComparisonStructure hashJoinComparisonStructure;
private Set<String> alreadyMatched;

public HashJoinElasticExecutor(Client client, HashJoinElasticRequestBuilder requestBuilder) {
super(requestBuilder);
this.client = client;
super(client, requestBuilder);
this.requestBuilder = requestBuilder;
this.useQueryTermsFilterOptimization = requestBuilder.isUseTermFiltersOptimization();
this.hashJoinComparisonStructure =
Expand All @@ -54,7 +50,6 @@ public HashJoinElasticExecutor(Client client, HashJoinElasticRequestBuilder requ
}

public List<SearchHit> innerRun() throws IOException, SqlParseException {

Map<String, Map<String, List<Object>>> optimizationTermsFilterStructure =
initOptimizationStructure();

Expand Down Expand Up @@ -124,16 +119,12 @@ private List<SearchHit> createCombinedResults(TableInJoinRequestBuilder secondTa
Integer hintLimit = secondTableRequest.getHintLimit();
SearchResponse searchResponse;
boolean finishedScrolling;

if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {
searchResponse = secondTableRequest.getRequestBuilder().setSize(hintLimit).get();
searchResponse = getResponseWithHits(secondTableRequest, hintLimit, null);
finishedScrolling = true;
} else {
searchResponse =
secondTableRequest
.getRequestBuilder()
.setScroll(new TimeValue(60000))
.setSize(MAX_RESULTS_ON_ONE_FETCH)
.get();
searchResponse = getResponseWithHits(secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
// es5.0 no need to scroll again!
// searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
// .setScroll(new TimeValue(600000)).get();
Expand Down Expand Up @@ -214,11 +205,7 @@ private List<SearchHit> createCombinedResults(TableInJoinRequestBuilder secondTa
if (secondTableHits.length > 0
&& (hintLimit == null || fetchedSoFarFromSecondTable >= hintLimit)) {
searchResponse =
client
.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
getResponseWithHits(secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, searchResponse);
} else {
break;
}
Expand Down Expand Up @@ -292,12 +279,13 @@ private List<SearchHit> fetchAllHits(TableInJoinRequestBuilder tableInJoinReques

private List<SearchHit> scrollTillLimit(
TableInJoinRequestBuilder tableInJoinRequest, Integer hintLimit) {
SearchResponse scrollResp = scrollOneTimeWithMax(client, tableInJoinRequest);
SearchResponse response =
getResponseWithHits(tableInJoinRequest, MAX_RESULTS_ON_ONE_FETCH, null);

updateMetaSearchResults(scrollResp);
updateMetaSearchResults(response);
List<SearchHit> hitsWithScan = new ArrayList<>();
int curentNumOfResults = 0;
SearchHit[] hits = scrollResp.getHits().getHits();
SearchHit[] hits = response.getHits().getHits();

if (hintLimit == null) {
hintLimit = MAX_RESULTS_FOR_FIRST_TABLE;
Expand All @@ -311,13 +299,8 @@ private List<SearchHit> scrollTillLimit(
System.out.println("too many results for first table, stoping at:" + curentNumOfResults);
break;
}
scrollResp =
client
.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
hits = scrollResp.getHits().getHits();
response = getResponseWithHits(tableInJoinRequest, MAX_RESULTS_FOR_FIRST_TABLE, response);
hits = response.getHits().getHits();
}
return hitsWithScan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
Expand All @@ -39,11 +38,9 @@ public class NestedLoopsElasticExecutor extends ElasticJoinExecutor {
private static final Logger LOG = LogManager.getLogger();

private final NestedLoopsElasticRequestBuilder nestedLoopsRequest;
private final Client client;

public NestedLoopsElasticExecutor(Client client, NestedLoopsElasticRequestBuilder nestedLoops) {
super(nestedLoops);
this.client = client;
super(client, nestedLoops);
this.nestedLoopsRequest = nestedLoops;
}

Expand Down Expand Up @@ -111,11 +108,26 @@ protected List<SearchHit> innerRun() throws SqlParseException {
if (!BackOffRetryStrategy.isHealthy()) {
throw new IllegalStateException("Memory circuit is broken");
}
firstTableResponse =
client
.prepareSearchScroll(firstTableResponse.getScrollId())
.setScroll(new TimeValue(600000))
.get();
/* Fetching next result page.
Using scroll api - only scrollId from previous response is required for scroll request.
Using pit with search_after - we need to recreate search request along with pitId and
sort fields from previous response.
Here we are finding required size for recreating search request with pit and search after.
Conditions for size are similar as firstFetch().
In case of scroll, this size will be ignored and size from first request will be used.
*/
Integer hintLimit = nestedLoopsRequest.getFirstTable().getHintLimit();
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {
firstTableResponse =
getResponseWithHits(
nestedLoopsRequest.getFirstTable(), hintLimit, firstTableResponse);
} else {
firstTableResponse =
getResponseWithHits(
nestedLoopsRequest.getFirstTable(),
MAX_RESULTS_ON_ONE_FETCH,
firstTableResponse);
}
} else {
finishedWithFirstTable = true;
}
Expand Down Expand Up @@ -287,12 +299,11 @@ private FetchWithScrollResponse firstFetch(TableInJoinRequestBuilder tableReques
boolean needScrollForFirstTable = false;
SearchResponse responseWithHits;
if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {

responseWithHits = tableRequest.getRequestBuilder().setSize(hintLimit).get();
needScrollForFirstTable = false;
} else {
// scroll request with max.
responseWithHits = scrollOneTimeWithMax(client, tableRequest);
responseWithHits = getResponseWithHits(tableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
if (responseWithHits.getHits().getTotalHits() != null
&& responseWithHits.getHits().getTotalHits().value < MAX_RESULTS_ON_ONE_FETCH) {
needScrollForFirstTable = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.legacy.executor.join;

import java.util.List;
import org.opensearch.client.Client;
import org.opensearch.search.SearchHit;
import org.opensearch.sql.legacy.query.planner.HashJoinQueryPlanRequestBuilder;
import org.opensearch.sql.legacy.query.planner.core.QueryPlanner;
Expand All @@ -19,8 +20,8 @@ class QueryPlanElasticExecutor extends ElasticJoinExecutor {

private final QueryPlanner queryPlanner;

QueryPlanElasticExecutor(HashJoinQueryPlanRequestBuilder request) {
super(request);
QueryPlanElasticExecutor(Client client, HashJoinQueryPlanRequestBuilder request) {
super(client, request);
this.queryPlanner = request.plan();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.legacy.pit;

/** Point In Time */
public interface PointInTimeHandler {
/** Create Point In Time */
void create();

/** Delete Point In Time */
void delete();

/** Get Point In Time Identifier */
String getPitId();
}
Loading
Loading