Skip to content

Commit

Permalink
Add pit in parent class's run()
Browse files Browse the repository at this point in the history
Signed-off-by: Rupal Mahajan <[email protected]>
  • Loading branch information
rupal-bq committed Jun 19, 2024
1 parent 8e1c101 commit 617d1db
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
Expand Down Expand Up @@ -55,16 +56,20 @@ public abstract class ElasticJoinExecutor implements ElasticHitsExecutor {
protected final int MAX_RESULTS_ON_ONE_FETCH = 10000;
private Set<String> aliasesOnReturn;
private boolean allFieldsReturn;
protected Client client;
protected String[] indices;
protected PointInTimeHandler pit;

protected ElasticJoinExecutor(JoinRequestBuilder requestBuilder) {
protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder) {
metaResults = new MetaSearchResult();
aliasesOnReturn = new HashSet<>();
List<Field> firstTableReturnedField = requestBuilder.getFirstTable().getReturnedFields();
List<Field> secondTableReturnedField = requestBuilder.getSecondTable().getReturnedFields();
allFieldsReturn =
(firstTableReturnedField == null || firstTableReturnedField.size() == 0)
&& (secondTableReturnedField == null || secondTableReturnedField.size() == 0);
indices = getIndices(requestBuilder);
this.client = client;
}

public void sendResponse(RestChannel channel) throws IOException {
Expand Down Expand Up @@ -92,10 +97,22 @@ public void sendResponse(RestChannel channel) throws IOException {
}

public void run() throws IOException, SqlParseException {
long timeBefore = System.currentTimeMillis();
results = innerRun();
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
this.metaResults.setTookImMilli(joinTimeInMilli);
try {
long timeBefore = System.currentTimeMillis();
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
pit = new PointInTimeHandler(client, indices);
pit.create();
}
results = innerRun();
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
this.metaResults.setTookImMilli(joinTimeInMilli);
} catch (Exception e) {
LOG.error("Failed during join query run.", e);
} finally {
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
pit.delete();
}
}
}

protected abstract List<SearchHit> innerRun() throws IOException, SqlParseException;
Expand All @@ -110,7 +127,7 @@ public SearchHits getHits() {
public static ElasticJoinExecutor createJoinExecutor(
Client client, SqlElasticRequestBuilder requestBuilder) {
if (requestBuilder instanceof HashJoinQueryPlanRequestBuilder) {
return new QueryPlanElasticExecutor((HashJoinQueryPlanRequestBuilder) requestBuilder);
return new QueryPlanElasticExecutor(client, (HashJoinQueryPlanRequestBuilder) requestBuilder);
} else if (requestBuilder instanceof HashJoinElasticRequestBuilder) {
HashJoinElasticRequestBuilder hashJoin = (HashJoinElasticRequestBuilder) requestBuilder;
return new HashJoinElasticExecutor(client, hashJoin);
Expand Down Expand Up @@ -264,10 +281,7 @@ protected void updateMetaSearchResults(SearchResponse searchResponse) {
}

public SearchResponse getResponseWithHits(
Client client,
TableInJoinRequestBuilder tableRequest,
int size,
SearchResponse previousResponse) {
TableInJoinRequestBuilder tableRequest, int size, SearchResponse previousResponse) {
// Set Size
SearchRequestBuilder request = tableRequest.getRequestBuilder().setSize(size);
SearchResponse responseWithHits;
Expand Down Expand Up @@ -302,11 +316,11 @@ public SearchResponse getResponseWithHits(
return responseWithHits;
}

public void createPointInTimeHandler(Client client, JoinRequestBuilder requestBuilder) {
String[] indices =
org.opensearch.common.util.ArrayUtils.concat(
requestBuilder.getFirstTable().getOriginalSelect().getIndexArr(),
requestBuilder.getSecondTable().getOriginalSelect().getIndexArr());
pit = new PointInTimeHandler(client, indices);
public String[] getIndices(JoinRequestBuilder joinRequestBuilder) {
return Stream.concat(
Stream.of(joinRequestBuilder.getFirstTable().getOriginalSelect().getIndexArr()),
Stream.of(joinRequestBuilder.getSecondTable().getOriginalSelect().getIndexArr()))
.distinct()
.toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,13 @@
/** Created by Eliran on 22/8/2015. */
public class HashJoinElasticExecutor extends ElasticJoinExecutor {
private HashJoinElasticRequestBuilder requestBuilder;

private Client client;
private boolean useQueryTermsFilterOptimization = false;
private final int MAX_RESULTS_FOR_FIRST_TABLE = 100000;
HashJoinComparisonStructure hashJoinComparisonStructure;
private Set<String> alreadyMatched;

public HashJoinElasticExecutor(Client client, HashJoinElasticRequestBuilder requestBuilder) {
super(requestBuilder);
this.client = client;
super(client, requestBuilder);
this.requestBuilder = requestBuilder;
this.useQueryTermsFilterOptimization = requestBuilder.isUseTermFiltersOptimization();
this.hashJoinComparisonStructure =
Expand All @@ -53,9 +50,6 @@ public HashJoinElasticExecutor(Client client, HashJoinElasticRequestBuilder requ
}

public List<SearchHit> innerRun() throws IOException, SqlParseException {
createPointInTimeHandler(client, requestBuilder);
pit.create();

Map<String, Map<String, List<Object>>> optimizationTermsFilterStructure =
initOptimizationStructure();

Expand Down Expand Up @@ -97,7 +91,6 @@ public int compare(SearchHit o1, SearchHit o2) {
}
});
}
pit.delete();
return combinedResult;
}

Expand Down Expand Up @@ -128,11 +121,10 @@ private List<SearchHit> createCombinedResults(TableInJoinRequestBuilder secondTa
boolean finishedScrolling;

if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {
searchResponse = getResponseWithHits(client, secondTableRequest, hintLimit, null);
searchResponse = getResponseWithHits(secondTableRequest, hintLimit, null);
finishedScrolling = true;
} else {
searchResponse =
getResponseWithHits(client, secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
searchResponse = getResponseWithHits(secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
// es5.0 no need to scroll again!
// searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
// .setScroll(new TimeValue(600000)).get();
Expand Down Expand Up @@ -213,8 +205,7 @@ private List<SearchHit> createCombinedResults(TableInJoinRequestBuilder secondTa
if (secondTableHits.length > 0
&& (hintLimit == null || fetchedSoFarFromSecondTable >= hintLimit)) {
searchResponse =
getResponseWithHits(
client, secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, searchResponse);
getResponseWithHits(secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, searchResponse);
} else {
break;
}
Expand Down Expand Up @@ -289,7 +280,7 @@ private List<SearchHit> fetchAllHits(TableInJoinRequestBuilder tableInJoinReques
private List<SearchHit> scrollTillLimit(
TableInJoinRequestBuilder tableInJoinRequest, Integer hintLimit) {
SearchResponse response =
getResponseWithHits(client, tableInJoinRequest, MAX_RESULTS_ON_ONE_FETCH, null);
getResponseWithHits(tableInJoinRequest, MAX_RESULTS_ON_ONE_FETCH, null);

updateMetaSearchResults(response);
List<SearchHit> hitsWithScan = new ArrayList<>();
Expand All @@ -308,8 +299,7 @@ private List<SearchHit> scrollTillLimit(
System.out.println("too many results for first table, stoping at:" + curentNumOfResults);
break;
}
response =
getResponseWithHits(client, tableInJoinRequest, MAX_RESULTS_FOR_FIRST_TABLE, response);
response = getResponseWithHits(tableInJoinRequest, MAX_RESULTS_FOR_FIRST_TABLE, response);
hits = response.getHits().getHits();
}
return hitsWithScan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,14 @@ public class NestedLoopsElasticExecutor extends ElasticJoinExecutor {
private static final Logger LOG = LogManager.getLogger();

private final NestedLoopsElasticRequestBuilder nestedLoopsRequest;
private final Client client;

public NestedLoopsElasticExecutor(Client client, NestedLoopsElasticRequestBuilder nestedLoops) {
super(nestedLoops);
this.client = client;
super(client, nestedLoops);
this.nestedLoopsRequest = nestedLoops;
}

@Override
protected List<SearchHit> innerRun() throws SqlParseException {
createPointInTimeHandler(client, nestedLoopsRequest);
pit.create();
List<SearchHit> combinedResults = new ArrayList<>();
int totalLimit = nestedLoopsRequest.getTotalLimit();
int multiSearchMaxSize = nestedLoopsRequest.getMultiSearchMaxSize();
Expand Down Expand Up @@ -116,11 +112,10 @@ protected List<SearchHit> innerRun() throws SqlParseException {
if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {
firstTableResponse =
getResponseWithHits(
client, nestedLoopsRequest.getFirstTable(), hintLimit, firstTableResponse);
nestedLoopsRequest.getFirstTable(), hintLimit, firstTableResponse);
} else {
firstTableResponse =
getResponseWithHits(
client,
nestedLoopsRequest.getFirstTable(),
MAX_RESULTS_ON_ONE_FETCH,
firstTableResponse);
Expand All @@ -130,7 +125,6 @@ protected List<SearchHit> innerRun() throws SqlParseException {
}
}
}
pit.delete();
return combinedResults;
}

Expand Down Expand Up @@ -301,7 +295,7 @@ private FetchWithScrollResponse firstFetch(TableInJoinRequestBuilder tableReques
needScrollForFirstTable = false;
} else {
// scroll request with max.
responseWithHits = getResponseWithHits(client, tableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
responseWithHits = getResponseWithHits(tableRequest, MAX_RESULTS_ON_ONE_FETCH, null);
if (responseWithHits.getHits().getTotalHits() != null
&& responseWithHits.getHits().getTotalHits().value < MAX_RESULTS_ON_ONE_FETCH) {
needScrollForFirstTable = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.legacy.executor.join;

import java.util.List;
import org.opensearch.client.Client;
import org.opensearch.search.SearchHit;
import org.opensearch.sql.legacy.query.planner.HashJoinQueryPlanRequestBuilder;
import org.opensearch.sql.legacy.query.planner.core.QueryPlanner;
Expand All @@ -19,8 +20,8 @@ class QueryPlanElasticExecutor extends ElasticJoinExecutor {

private final QueryPlanner queryPlanner;

QueryPlanElasticExecutor(HashJoinQueryPlanRequestBuilder request) {
super(request);
QueryPlanElasticExecutor(Client client, HashJoinQueryPlanRequestBuilder request) {
super(client, request);
this.queryPlanner = request.plan();
}

Expand Down

0 comments on commit 617d1db

Please sign in to comment.