Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pit for join queries #2703

Merged
merged 35 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e225977
Add search after for join
rupal-bq May 29, 2024
d69feda
Enable search after by default
rupal-bq May 29, 2024
b75d782
Add pit
rupal-bq Jun 2, 2024
a336e3b
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 2, 2024
afd24d8
nit
rupal-bq Jun 3, 2024
8f840ae
Fix tests
rupal-bq Jun 4, 2024
070b40f
ignore joinWithGeoIntersectNL
rupal-bq Jun 4, 2024
58c96f5
Rerun CI with scroll
rupal-bq Jun 4, 2024
3dad30d
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 5, 2024
1404cd5
Remove unused code and retrigger CI with search_after true
rupal-bq Jun 5, 2024
fd09367
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 5, 2024
fcb584a
Address comments
rupal-bq Jun 7, 2024
12f5abb
Merge branch 'join-search-after' of github.com:rupal-bq/opensearch-sq…
rupal-bq Jun 7, 2024
08a6a29
Remove unused code change
rupal-bq Jun 7, 2024
030f4b5
Merge branch 'main' into join-search-after
rupal-bq Jun 10, 2024
39f727a
Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE
rupal-bq Jun 10, 2024
be8d986
Fix scroll condition
rupal-bq Jun 12, 2024
7bb89d8
nit
rupal-bq Jun 12, 2024
60bd8fe
Add pit before query execution
rupal-bq Jun 13, 2024
17d1d0e
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 13, 2024
3af87c4
nit
rupal-bq Jun 14, 2024
bee4863
Move pit from join request builder to executor
rupal-bq Jun 16, 2024
0857a6b
Remove unused methods
rupal-bq Jun 16, 2024
8e1c101
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 16, 2024
617d1db
Add pit in parent class's run()
rupal-bq Jun 19, 2024
841db6b
Add comment for fetching subsequent result in NestedLoopsElasticExecutor
rupal-bq Jun 19, 2024
3185eda
Update comment
rupal-bq Jun 19, 2024
2070f68
Add javadoc for pit handler
rupal-bq Jun 24, 2024
0bf9792
Add pit interface
rupal-bq Jun 24, 2024
e13c50f
Add pit handler unit test
rupal-bq Jun 25, 2024
1366663
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 25, 2024
1408ccf
Fix failed unit test CI
rupal-bq Jun 25, 2024
dd10d77
Fix spotless error
rupal-bq Jun 25, 2024
b487abf
Rename pit class and add logs
rupal-bq Jun 26, 2024
f79d89e
Fix pit delete unit test
rupal-bq Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum Key {
SQL_SLOWLOG("plugins.sql.slowlog"),
SQL_CURSOR_KEEP_ALIVE("plugins.sql.cursor.keep_alive"),
SQL_DELETE_ENABLED("plugins.sql.delete.enabled"),
SQL_PAGINATION_API_SEARCH_AFTER("plugins.sql.pagination.api"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're not switching to PIT completely but support both PIT and Scroll?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current plan is to add for all pagination then remove scroll after performance comparison.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we will raise another PR for this after performance test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I was planning to add search_after for all queries (join, multi query, pagination) then a separate PR to remove scroll after performance test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the PIT turns out be bad? We will leave setting to customer? What will be the default setting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current default setting is true for using PIT with search after. After benchmark, we can decide if this needs to be changed. We can remove this while removing scroll.


/** PPL Settings. */
PPL_ENABLED("plugins.ppl.enabled"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ public void hintMultiSearchCanRunFewTimesNL() throws IOException {
Assert.assertThat(hits.length(), equalTo(42));
}

// TODO: Fix joinWithGeoIntersectNL test when SQL_PAGINATION_API_SEARCH_AFTER is true
@Ignore
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void joinWithGeoIntersectNL() throws IOException {

Expand Down Expand Up @@ -455,7 +457,7 @@ public void joinParseCheckSelectedFieldsSplitNLConditionOrderGT() throws IOExcep
"SELECT /*! USE_NL*/ a.firstname, a.lastname, a.gender, d.firstname, d.age FROM %s a"
+ " JOIN %s d on a.age < d.age WHERE (d.firstname = 'Lynn' OR d.firstname ="
+ " 'Obrien') AND a.firstname = 'Mcgee'",
TEST_INDEX_PEOPLE,
TEST_INDEX_PEOPLE2,
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
TEST_INDEX_ACCOUNT);

JSONObject result = executeQuery(query);
Expand Down Expand Up @@ -501,7 +503,7 @@ public void joinParseCheckSelectedFieldsSplitNLConditionOrderLT() throws IOExcep
"SELECT /*! USE_NL*/ a.firstname, a.lastname, a.gender, d.firstname, d.age FROM %s a"
+ " JOIN %s d on a.age > d.age WHERE (d.firstname = 'Sandoval' OR d.firstname ="
+ " 'Hewitt') AND a.firstname = 'Fulton'",
TEST_INDEX_PEOPLE,
TEST_INDEX_PEOPLE2,
TEST_INDEX_ACCOUNT);

JSONObject result = executeQuery(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static SearchHits executeJoinSearchAction(
ElasticJoinExecutor executor =
ElasticJoinExecutor.createJoinExecutor(client, joinRequestBuilder);
executor.run();
joinQueryAction.getPointInTimeHandler().deletePointInTime();
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
return executor.getHits();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.sql.legacy.executor.join;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -28,9 +31,11 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.legacy.domain.Field;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.ElasticHitsExecutor;
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder;
Expand Down Expand Up @@ -256,23 +261,44 @@ protected void updateMetaSearchResults(SearchResponse searchResponse) {
this.metaResults.updateTimeOut(searchResponse.isTimedOut());
}

protected SearchResponse scrollOneTimeWithMax(
Client client, TableInJoinRequestBuilder tableRequest) {
SearchRequestBuilder scrollRequest =
tableRequest
.getRequestBuilder()
.setScroll(new TimeValue(60000))
.setSize(MAX_RESULTS_ON_ONE_FETCH);
public SearchResponse getResponseWithHits(
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
Client client,
TableInJoinRequestBuilder tableRequest,
int size,
SearchResponse previousResponse) {
// Set Size
SearchRequestBuilder request = tableRequest.getRequestBuilder().setSize(size);

// Set sort field for search_after
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect();
if (!ordered) {
scrollRequest.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
}
SearchResponse responseWithHits = scrollRequest.get();
// on ordered select - not using SCAN , elastic returns hits on first scroll
// es5.0 elastic always return docs on scan
// if(!ordered)
// responseWithHits = client.prepareSearchScroll(responseWithHits.getScrollId())
// .setScroll(new TimeValue(600000)).get();

SearchResponse responseWithHits;
// Set PIT or scroll
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
request.setPointInTime(new PointInTimeBuilder(tableRequest.getPitId()));
if (previousResponse != null) {
request.searchAfter(previousResponse.getHits().getSortFields());
}
responseWithHits = request.get();
} else {
if (previousResponse == null) {
responseWithHits =
client
.prepareSearchScroll(previousResponse.getScrollId())
.setScroll(
new TimeValue(LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE)))
.execute()
.actionGet();
} else {
request.setScroll(
new TimeValue(LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE)));
responseWithHits = request.get();
}
}

return responseWithHits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -124,16 +123,13 @@ private List<SearchHit> createCombinedResults(TableInJoinRequestBuilder secondTa
Integer hintLimit = secondTableRequest.getHintLimit();
SearchResponse searchResponse;
boolean finishedScrolling;

if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {
searchResponse = secondTableRequest.getRequestBuilder().setSize(hintLimit).get();
searchResponse = getResponseWithHits(client, secondTableRequest, hintLimit, null);
finishedScrolling = true;
} else {
searchResponse =
secondTableRequest
.getRequestBuilder()
.setScroll(new TimeValue(60000))
.setSize(MAX_RESULTS_ON_ONE_FETCH)
.get();
getResponseWithHits(client, secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
// es5.0 no need to scroll again!
// searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
// .setScroll(new TimeValue(600000)).get();
Expand Down Expand Up @@ -214,11 +210,8 @@ private List<SearchHit> createCombinedResults(TableInJoinRequestBuilder secondTa
if (secondTableHits.length > 0
&& (hintLimit == null || fetchedSoFarFromSecondTable >= hintLimit)) {
searchResponse =
client
.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
getResponseWithHits(
client, secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, searchResponse);
} else {
break;
}
Expand Down Expand Up @@ -292,12 +285,13 @@ private List<SearchHit> fetchAllHits(TableInJoinRequestBuilder tableInJoinReques

private List<SearchHit> scrollTillLimit(
TableInJoinRequestBuilder tableInJoinRequest, Integer hintLimit) {
SearchResponse scrollResp = scrollOneTimeWithMax(client, tableInJoinRequest);
SearchResponse response =
getResponseWithHits(client, tableInJoinRequest, MAX_RESULTS_ON_ONE_FETCH, null);

updateMetaSearchResults(scrollResp);
updateMetaSearchResults(response);
List<SearchHit> hitsWithScan = new ArrayList<>();
int curentNumOfResults = 0;
SearchHit[] hits = scrollResp.getHits().getHits();
SearchHit[] hits = response.getHits().getHits();

if (hintLimit == null) {
hintLimit = MAX_RESULTS_FOR_FIRST_TABLE;
Expand All @@ -311,13 +305,9 @@ private List<SearchHit> scrollTillLimit(
System.out.println("too many results for first table, stoping at:" + curentNumOfResults);
break;
}
scrollResp =
client
.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
hits = scrollResp.getHits().getHits();
response =
getResponseWithHits(client, tableInJoinRequest, MAX_RESULTS_FOR_FIRST_TABLE, response);
hits = response.getHits().getHits();
}
return hitsWithScan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
Expand Down Expand Up @@ -111,11 +110,19 @@ protected List<SearchHit> innerRun() throws SqlParseException {
if (!BackOffRetryStrategy.isHealthy()) {
throw new IllegalStateException("Memory circuit is broken");
}
firstTableResponse =
client
.prepareSearchScroll(firstTableResponse.getScrollId())
.setScroll(new TimeValue(600000))
.get();
Integer hintLimit = nestedLoopsRequest.getFirstTable().getHintLimit();
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {
firstTableResponse =
getResponseWithHits(
client, nestedLoopsRequest.getFirstTable(), hintLimit, firstTableResponse);
} else {
firstTableResponse =
getResponseWithHits(
client,
nestedLoopsRequest.getFirstTable(),
MAX_RESULTS_ON_ONE_FETCH,
firstTableResponse);
}
} else {
finishedWithFirstTable = true;
}
Expand Down Expand Up @@ -287,12 +294,11 @@ private FetchWithScrollResponse firstFetch(TableInJoinRequestBuilder tableReques
boolean needScrollForFirstTable = false;
SearchResponse responseWithHits;
if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {

responseWithHits = tableRequest.getRequestBuilder().setSize(hintLimit).get();
needScrollForFirstTable = false;
} else {
// scroll request with max.
responseWithHits = scrollOneTimeWithMax(client, tableRequest);
responseWithHits = getResponseWithHits(client, tableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
if (responseWithHits.getHits().getTotalHits() != null
&& responseWithHits.getHits().getTotalHits().value < MAX_RESULTS_ON_ONE_FETCH) {
needScrollForFirstTable = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.legacy.pit;

import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;

public class PointInTimeHandler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write the classes to interface?
Also add java documentation to the class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private Client client;
@Getter private String pitId;
private static final Logger LOG = LogManager.getLogger();

public PointInTimeHandler(Client client, String[] indices) {
this.client = client;

CreatePitRequest createPitRequest = new CreatePitRequest(new TimeValue(600000), false, indices);
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved
client.createPit(
createPitRequest,
new ActionListener<>() {
@Override
public void onResponse(CreatePitResponse createPitResponse) {
pitId = createPitResponse.getId();
}

@Override
public void onFailure(Exception e) {
LOG.error("Error occurred while creating PIT", e);
}
});
}

public void deletePointInTime() {
DeletePitRequest deletePitRequest = new DeletePitRequest(pitId);
client.deletePits(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have synchronous calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no

deletePitRequest,
new ActionListener<>() {
@Override
public void onResponse(org.opensearch.action.search.DeletePitResponse deletePitResponse) {
LOG.debug(deletePitResponse);
}

@Override
public void onFailure(Exception e) {
LOG.error("Error occurred while deleting PIT", e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.alibaba.druid.sql.ast.statement.SQLJoinTableSource;
import java.io.IOException;
import lombok.Setter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestBuilder;
import org.opensearch.action.search.MultiSearchRequest;
Expand All @@ -25,6 +26,7 @@ public class JoinRequestBuilder implements SqlElasticRequestBuilder {
private TableInJoinRequestBuilder secondTable;
private SQLJoinTableSource.JoinType joinType;
private int totalLimit;
@Setter private String pitId;

public JoinRequestBuilder() {
firstTable = new TableInJoinRequestBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

import java.util.List;
import org.opensearch.client.Client;
import org.opensearch.common.util.ArrayUtils;
import org.opensearch.sql.legacy.domain.Field;
import org.opensearch.sql.legacy.domain.JoinSelect;
import org.opensearch.sql.legacy.domain.Select;
import org.opensearch.sql.legacy.domain.TableOnJoinSelect;
import org.opensearch.sql.legacy.domain.hints.Hint;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.query.DefaultQueryAction;
import org.opensearch.sql.legacy.query.QueryAction;
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder;
Expand All @@ -23,6 +25,7 @@
public abstract class OpenSearchJoinQueryAction extends QueryAction {

protected JoinSelect joinSelect;
private PointInTimeHandler pointInTimeHandler;

public OpenSearchJoinQueryAction(Client client, JoinSelect joinSelect) {
super(client, joinSelect);
Expand All @@ -34,9 +37,14 @@ public SqlElasticRequestBuilder explain() throws SqlParseException {
JoinRequestBuilder requestBuilder = createSpecificBuilder();
fillBasicJoinRequestBuilder(requestBuilder);
fillSpecificRequestBuilder(requestBuilder);
updateRequestWithPitId(requestBuilder);
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
return requestBuilder;
}

public PointInTimeHandler getPointInTimeHandler() {
return pointInTimeHandler;
}

protected abstract void fillSpecificRequestBuilder(JoinRequestBuilder requestBuilder)
throws SqlParseException;

Expand Down Expand Up @@ -92,6 +100,18 @@ protected void updateRequestWithHints(JoinRequestBuilder requestBuilder) {
}
}

private void updateRequestWithPitId(JoinRequestBuilder requestBuilder) {
String[] indices =
ArrayUtils.concat(
requestBuilder.getFirstTable().getOriginalSelect().getIndexArr(),
requestBuilder.getSecondTable().getOriginalSelect().getIndexArr());
this.pointInTimeHandler = new PointInTimeHandler(client, indices);
String pitId = pointInTimeHandler.getPitId();
requestBuilder.setPitId(pitId);
requestBuilder.getFirstTable().setPitId(pitId);
requestBuilder.getSecondTable().setPitId(pitId);
}

private Config queryPlannerConfig(JoinRequestBuilder requestBuilder) {
return ((HashJoinQueryPlanRequestBuilder) requestBuilder).getConfig();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class TableInJoinRequestBuilder {
private List<Field> returnedFields;
private Select originalSelect;
private Integer hintLimit;
private String pitId;

public TableInJoinRequestBuilder() {}

Expand Down Expand Up @@ -59,4 +60,12 @@ public Integer getHintLimit() {
public void setHintLimit(Integer hintLimit) {
this.hintLimit = hintLimit;
}

public String getPitId() {
return pitId;
}

public void setPitId(String pitId) {
this.pitId = pitId;
}
}
Loading
Loading