Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/feature/pit' into pit-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
manasvinibs committed Aug 14, 2024
2 parents 05c961e + 8c6dc0c commit 0b095f9
Show file tree
Hide file tree
Showing 13 changed files with 473 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum Key {
SQL_SLOWLOG("plugins.sql.slowlog"),
SQL_CURSOR_KEEP_ALIVE("plugins.sql.cursor.keep_alive"),
SQL_DELETE_ENABLED("plugins.sql.delete.enabled"),
SQL_PAGINATION_API_SEARCH_AFTER("plugins.sql.pagination.api"),

/** PPL Settings. */
PPL_ENABLED("plugins.ppl.enabled"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ public void hintMultiSearchCanRunFewTimesNL() throws IOException {
Assert.assertThat(hits.length(), equalTo(42));
}

// TODO: Fix joinWithGeoIntersectNL test when SQL_PAGINATION_API_SEARCH_AFTER is true
@Ignore
@Test
public void joinWithGeoIntersectNL() throws IOException {

Expand Down Expand Up @@ -455,7 +457,7 @@ public void joinParseCheckSelectedFieldsSplitNLConditionOrderGT() throws IOExcep
"SELECT /*! USE_NL*/ a.firstname, a.lastname, a.gender, d.firstname, d.age FROM %s a"
+ " JOIN %s d on a.age < d.age WHERE (d.firstname = 'Lynn' OR d.firstname ="
+ " 'Obrien') AND a.firstname = 'Mcgee'",
TEST_INDEX_PEOPLE,
TEST_INDEX_PEOPLE2,
TEST_INDEX_ACCOUNT);

JSONObject result = executeQuery(query);
Expand Down Expand Up @@ -501,7 +503,7 @@ public void joinParseCheckSelectedFieldsSplitNLConditionOrderLT() throws IOExcep
"SELECT /*! USE_NL*/ a.firstname, a.lastname, a.gender, d.firstname, d.age FROM %s a"
+ " JOIN %s d on a.age > d.age WHERE (d.firstname = 'Sandoval' OR d.firstname ="
+ " 'Hewitt') AND a.firstname = 'Fulton'",
TEST_INDEX_PEOPLE,
TEST_INDEX_PEOPLE2,
TEST_INDEX_ACCOUNT);

JSONObject result = executeQuery(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,96 @@

package org.opensearch.sql.legacy.executor;

import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME;
import static org.opensearch.search.sort.SortOrder.ASC;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.sql.legacy.domain.Select;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;

/** Executor for search requests with pagination. */
public abstract class ElasticHitsExecutor {
protected static final Logger LOG = LogManager.getLogger();
protected PointInTimeHandler pit;
protected Client client;

/**
* Executes search request
*
* @throws IOException If an input or output exception occurred
* @throws SqlParseException If parsing exception occurred
*/
protected abstract void run() throws IOException, SqlParseException;

/**
* Get search hits after execution
*
* @return Search hits
*/
protected abstract SearchHits getHits();

/**
* Get response for search request with pit/scroll
*
* @param request search request
* @param select sql select
* @param size fetch size
* @param previousResponse response for previous request
* @param pit point in time
* @return search response for subsequent request
*/
public SearchResponse getResponseWithHits(
SearchRequestBuilder request,
Select select,
int size,
SearchResponse previousResponse,
PointInTimeHandler pit) {
// Set Size
request.setSize(size);
SearchResponse responseWithHits;

/** Created by Eliran on 21/8/2016. */
public interface ElasticHitsExecutor {
void run() throws IOException, SqlParseException;
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
// Set sort field for search_after
boolean ordered = select.isOrderdSelect();
if (!ordered) {
request.addSort(DOC_FIELD_NAME, ASC);
}
// Set PIT
request.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
// from and size is alternate method to paginate result.
// If select has from clause, search after is not required.
if (previousResponse != null && select.getFrom().isEmpty()) {
request.searchAfter(previousResponse.getHits().getSortFields());
}
responseWithHits = request.get();
} else {
// Set scroll
TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE);
if (previousResponse != null) {
responseWithHits =
client
.prepareSearchScroll(previousResponse.getScrollId())
.setScroll(keepAlive)
.execute()
.actionGet();
} else {
request.setScroll(keepAlive);
responseWithHits = request.get();
}
}

SearchHits getHits();
return responseWithHits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,33 @@

package org.opensearch.sql.legacy.executor.join;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.stream.Stream;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.legacy.domain.Field;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.ElasticHitsExecutor;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder;
import org.opensearch.sql.legacy.query.join.HashJoinElasticRequestBuilder;
import org.opensearch.sql.legacy.query.join.JoinRequestBuilder;
Expand All @@ -41,23 +40,25 @@
import org.opensearch.sql.legacy.query.planner.HashJoinQueryPlanRequestBuilder;

/** Created by Eliran on 15/9/2015. */
public abstract class ElasticJoinExecutor implements ElasticHitsExecutor {
private static final Logger LOG = LogManager.getLogger();
public abstract class ElasticJoinExecutor extends ElasticHitsExecutor {

protected List<SearchHit> results; // Keep list to avoid copy to new array in SearchHits
protected MetaSearchResult metaResults;
protected final int MAX_RESULTS_ON_ONE_FETCH = 10000;
private Set<String> aliasesOnReturn;
private boolean allFieldsReturn;
protected String[] indices;

protected ElasticJoinExecutor(JoinRequestBuilder requestBuilder) {
protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder) {
metaResults = new MetaSearchResult();
aliasesOnReturn = new HashSet<>();
List<Field> firstTableReturnedField = requestBuilder.getFirstTable().getReturnedFields();
List<Field> secondTableReturnedField = requestBuilder.getSecondTable().getReturnedFields();
allFieldsReturn =
(firstTableReturnedField == null || firstTableReturnedField.size() == 0)
&& (secondTableReturnedField == null || secondTableReturnedField.size() == 0);
indices = getIndices(requestBuilder);
this.client = client;
}

public void sendResponse(RestChannel channel) throws IOException {
Expand Down Expand Up @@ -85,10 +86,22 @@ public void sendResponse(RestChannel channel) throws IOException {
}

public void run() throws IOException, SqlParseException {
long timeBefore = System.currentTimeMillis();
results = innerRun();
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
this.metaResults.setTookImMilli(joinTimeInMilli);
try {
long timeBefore = System.currentTimeMillis();
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
pit = new PointInTimeHandlerImpl(client, indices);
pit.create();
}
results = innerRun();
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
this.metaResults.setTookImMilli(joinTimeInMilli);
} catch (Exception e) {
LOG.error("Failed during join query run.", e);
} finally {
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
pit.delete();
}
}
}

protected abstract List<SearchHit> innerRun() throws IOException, SqlParseException;
Expand All @@ -103,7 +116,7 @@ public SearchHits getHits() {
public static ElasticJoinExecutor createJoinExecutor(
Client client, SqlElasticRequestBuilder requestBuilder) {
if (requestBuilder instanceof HashJoinQueryPlanRequestBuilder) {
return new QueryPlanElasticExecutor((HashJoinQueryPlanRequestBuilder) requestBuilder);
return new QueryPlanElasticExecutor(client, (HashJoinQueryPlanRequestBuilder) requestBuilder);
} else if (requestBuilder instanceof HashJoinElasticRequestBuilder) {
HashJoinElasticRequestBuilder hashJoin = (HashJoinElasticRequestBuilder) requestBuilder;
return new HashJoinElasticExecutor(client, hashJoin);
Expand Down Expand Up @@ -256,23 +269,22 @@ protected void updateMetaSearchResults(SearchResponse searchResponse) {
this.metaResults.updateTimeOut(searchResponse.isTimedOut());
}

protected SearchResponse scrollOneTimeWithMax(
Client client, TableInJoinRequestBuilder tableRequest) {
SearchRequestBuilder scrollRequest =
tableRequest
.getRequestBuilder()
.setScroll(new TimeValue(60000))
.setSize(MAX_RESULTS_ON_ONE_FETCH);
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect();
if (!ordered) {
scrollRequest.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
}
SearchResponse responseWithHits = scrollRequest.get();
// on ordered select - not using SCAN , elastic returns hits on first scroll
// es5.0 elastic always return docs on scan
// if(!ordered)
// responseWithHits = client.prepareSearchScroll(responseWithHits.getScrollId())
// .setScroll(new TimeValue(600000)).get();
return responseWithHits;
public SearchResponse getResponseWithHits(
TableInJoinRequestBuilder tableRequest, int size, SearchResponse previousResponse) {

return getResponseWithHits(
tableRequest.getRequestBuilder(),
tableRequest.getOriginalSelect(),
size,
previousResponse,
pit);
}

public String[] getIndices(JoinRequestBuilder joinRequestBuilder) {
return Stream.concat(
Stream.of(joinRequestBuilder.getFirstTable().getOriginalSelect().getIndexArr()),
Stream.of(joinRequestBuilder.getSecondTable().getOriginalSelect().getIndexArr()))
.distinct()
.toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
Expand All @@ -36,16 +35,13 @@
/** Created by Eliran on 22/8/2015. */
public class HashJoinElasticExecutor extends ElasticJoinExecutor {
private HashJoinElasticRequestBuilder requestBuilder;

private Client client;
private boolean useQueryTermsFilterOptimization = false;
private final int MAX_RESULTS_FOR_FIRST_TABLE = 100000;
HashJoinComparisonStructure hashJoinComparisonStructure;
private Set<String> alreadyMatched;

public HashJoinElasticExecutor(Client client, HashJoinElasticRequestBuilder requestBuilder) {
super(requestBuilder);
this.client = client;
super(client, requestBuilder);
this.requestBuilder = requestBuilder;
this.useQueryTermsFilterOptimization = requestBuilder.isUseTermFiltersOptimization();
this.hashJoinComparisonStructure =
Expand All @@ -54,7 +50,6 @@ public HashJoinElasticExecutor(Client client, HashJoinElasticRequestBuilder requ
}

public List<SearchHit> innerRun() throws IOException, SqlParseException {

Map<String, Map<String, List<Object>>> optimizationTermsFilterStructure =
initOptimizationStructure();

Expand Down Expand Up @@ -124,16 +119,12 @@ private List<SearchHit> createCombinedResults(TableInJoinRequestBuilder secondTa
Integer hintLimit = secondTableRequest.getHintLimit();
SearchResponse searchResponse;
boolean finishedScrolling;

if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {
searchResponse = secondTableRequest.getRequestBuilder().setSize(hintLimit).get();
searchResponse = getResponseWithHits(secondTableRequest, hintLimit, null);
finishedScrolling = true;
} else {
searchResponse =
secondTableRequest
.getRequestBuilder()
.setScroll(new TimeValue(60000))
.setSize(MAX_RESULTS_ON_ONE_FETCH)
.get();
searchResponse = getResponseWithHits(secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
// es5.0 no need to scroll again!
// searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
// .setScroll(new TimeValue(600000)).get();
Expand Down Expand Up @@ -214,11 +205,7 @@ private List<SearchHit> createCombinedResults(TableInJoinRequestBuilder secondTa
if (secondTableHits.length > 0
&& (hintLimit == null || fetchedSoFarFromSecondTable >= hintLimit)) {
searchResponse =
client
.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
getResponseWithHits(secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, searchResponse);
} else {
break;
}
Expand Down Expand Up @@ -292,12 +279,13 @@ private List<SearchHit> fetchAllHits(TableInJoinRequestBuilder tableInJoinReques

private List<SearchHit> scrollTillLimit(
TableInJoinRequestBuilder tableInJoinRequest, Integer hintLimit) {
SearchResponse scrollResp = scrollOneTimeWithMax(client, tableInJoinRequest);
SearchResponse response =
getResponseWithHits(tableInJoinRequest, MAX_RESULTS_ON_ONE_FETCH, null);

updateMetaSearchResults(scrollResp);
updateMetaSearchResults(response);
List<SearchHit> hitsWithScan = new ArrayList<>();
int curentNumOfResults = 0;
SearchHit[] hits = scrollResp.getHits().getHits();
SearchHit[] hits = response.getHits().getHits();

if (hintLimit == null) {
hintLimit = MAX_RESULTS_FOR_FIRST_TABLE;
Expand All @@ -311,13 +299,8 @@ private List<SearchHit> scrollTillLimit(
System.out.println("too many results for first table, stoping at:" + curentNumOfResults);
break;
}
scrollResp =
client
.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
hits = scrollResp.getHits().getHits();
response = getResponseWithHits(tableInJoinRequest, MAX_RESULTS_FOR_FIRST_TABLE, response);
hits = response.getHits().getHits();
}
return hitsWithScan;
}
Expand Down
Loading

0 comments on commit 0b095f9

Please sign in to comment.