-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pit for join queries #2703
Add pit for join queries #2703
Changes from 27 commits
e225977
d69feda
b75d782
a336e3b
afd24d8
8f840ae
070b40f
58c96f5
3dad30d
1404cd5
fd09367
fcb584a
12f5abb
08a6a29
030f4b5
39f727a
be8d986
7bb89d8
60bd8fe
17d1d0e
3af87c4
bee4863
0857a6b
8e1c101
617d1db
841db6b
3185eda
2070f68
0bf9792
e13c50f
1366663
1408ccf
dd10d77
b487abf
f79d89e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,17 @@ | |
|
||
package org.opensearch.sql.legacy.executor.join; | ||
|
||
import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; | ||
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; | ||
|
||
import java.io.IOException; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.stream.Stream; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.lucene.search.TotalHits; | ||
|
@@ -28,11 +32,14 @@ | |
import org.opensearch.rest.RestChannel; | ||
import org.opensearch.search.SearchHit; | ||
import org.opensearch.search.SearchHits; | ||
import org.opensearch.search.builder.PointInTimeBuilder; | ||
import org.opensearch.search.sort.FieldSortBuilder; | ||
import org.opensearch.search.sort.SortOrder; | ||
import org.opensearch.sql.legacy.domain.Field; | ||
import org.opensearch.sql.legacy.esdomain.LocalClusterState; | ||
import org.opensearch.sql.legacy.exception.SqlParseException; | ||
import org.opensearch.sql.legacy.executor.ElasticHitsExecutor; | ||
import org.opensearch.sql.legacy.pit.PointInTimeHandler; | ||
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder; | ||
import org.opensearch.sql.legacy.query.join.HashJoinElasticRequestBuilder; | ||
import org.opensearch.sql.legacy.query.join.JoinRequestBuilder; | ||
|
@@ -49,15 +56,20 @@ public abstract class ElasticJoinExecutor implements ElasticHitsExecutor { | |
protected final int MAX_RESULTS_ON_ONE_FETCH = 10000; | ||
private Set<String> aliasesOnReturn; | ||
private boolean allFieldsReturn; | ||
protected Client client; | ||
protected String[] indices; | ||
protected PointInTimeHandler pit; | ||
|
||
protected ElasticJoinExecutor(JoinRequestBuilder requestBuilder) { | ||
protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder) { | ||
metaResults = new MetaSearchResult(); | ||
aliasesOnReturn = new HashSet<>(); | ||
List<Field> firstTableReturnedField = requestBuilder.getFirstTable().getReturnedFields(); | ||
List<Field> secondTableReturnedField = requestBuilder.getSecondTable().getReturnedFields(); | ||
allFieldsReturn = | ||
(firstTableReturnedField == null || firstTableReturnedField.size() == 0) | ||
&& (secondTableReturnedField == null || secondTableReturnedField.size() == 0); | ||
indices = getIndices(requestBuilder); | ||
this.client = client; | ||
} | ||
|
||
public void sendResponse(RestChannel channel) throws IOException { | ||
|
@@ -85,10 +97,22 @@ public void sendResponse(RestChannel channel) throws IOException { | |
} | ||
|
||
public void run() throws IOException, SqlParseException { | ||
long timeBefore = System.currentTimeMillis(); | ||
results = innerRun(); | ||
long joinTimeInMilli = System.currentTimeMillis() - timeBefore; | ||
this.metaResults.setTookImMilli(joinTimeInMilli); | ||
try { | ||
long timeBefore = System.currentTimeMillis(); | ||
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { | ||
pit = new PointInTimeHandler(client, indices); | ||
pit.create(); | ||
} | ||
results = innerRun(); | ||
long joinTimeInMilli = System.currentTimeMillis() - timeBefore; | ||
this.metaResults.setTookImMilli(joinTimeInMilli); | ||
} catch (Exception e) { | ||
LOG.error("Failed during join query run.", e); | ||
} finally { | ||
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { | ||
pit.delete(); | ||
} | ||
} | ||
} | ||
|
||
protected abstract List<SearchHit> innerRun() throws IOException, SqlParseException; | ||
|
@@ -103,7 +127,7 @@ public SearchHits getHits() { | |
public static ElasticJoinExecutor createJoinExecutor( | ||
Client client, SqlElasticRequestBuilder requestBuilder) { | ||
if (requestBuilder instanceof HashJoinQueryPlanRequestBuilder) { | ||
return new QueryPlanElasticExecutor((HashJoinQueryPlanRequestBuilder) requestBuilder); | ||
return new QueryPlanElasticExecutor(client, (HashJoinQueryPlanRequestBuilder) requestBuilder); | ||
} else if (requestBuilder instanceof HashJoinElasticRequestBuilder) { | ||
HashJoinElasticRequestBuilder hashJoin = (HashJoinElasticRequestBuilder) requestBuilder; | ||
return new HashJoinElasticExecutor(client, hashJoin); | ||
|
@@ -256,23 +280,47 @@ protected void updateMetaSearchResults(SearchResponse searchResponse) { | |
this.metaResults.updateTimeOut(searchResponse.isTimedOut()); | ||
} | ||
|
||
protected SearchResponse scrollOneTimeWithMax( | ||
Client client, TableInJoinRequestBuilder tableRequest) { | ||
SearchRequestBuilder scrollRequest = | ||
tableRequest | ||
.getRequestBuilder() | ||
.setScroll(new TimeValue(60000)) | ||
.setSize(MAX_RESULTS_ON_ONE_FETCH); | ||
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect(); | ||
if (!ordered) { | ||
scrollRequest.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); | ||
public SearchResponse getResponseWithHits( | ||
dai-chen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
TableInJoinRequestBuilder tableRequest, int size, SearchResponse previousResponse) { | ||
// Set Size | ||
SearchRequestBuilder request = tableRequest.getRequestBuilder().setSize(size); | ||
SearchResponse responseWithHits; | ||
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { | ||
// Set sort field for search_after | ||
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect(); | ||
if (!ordered) { | ||
request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); | ||
} | ||
// Set PIT | ||
request.setPointInTime(new PointInTimeBuilder(pit.getPitId())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if pit request didn't complete by the time code execution reaches this line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As commented in PointInTimeHandler, we are actually listening to the response in listener instead of waiting. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tried printing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. Strange if it is working..I am good but I am still not sure...can you add logs somewhere. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added logs in pit handler impl There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually mean thread id logs to comment or description. I am fine anyways. |
||
if (previousResponse != null) { | ||
request.searchAfter(previousResponse.getHits().getSortFields()); | ||
} | ||
responseWithHits = request.get(); | ||
} else { | ||
// Set scroll | ||
TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE); | ||
if (previousResponse != null) { | ||
responseWithHits = | ||
client | ||
.prepareSearchScroll(previousResponse.getScrollId()) | ||
.setScroll(keepAlive) | ||
.execute() | ||
.actionGet(); | ||
} else { | ||
request.setScroll(keepAlive); | ||
responseWithHits = request.get(); | ||
} | ||
} | ||
SearchResponse responseWithHits = scrollRequest.get(); | ||
// on ordered select - not using SCAN , elastic returns hits on first scroll | ||
// es5.0 elastic always return docs on scan | ||
// if(!ordered) | ||
// responseWithHits = client.prepareSearchScroll(responseWithHits.getScrollId()) | ||
// .setScroll(new TimeValue(600000)).get(); | ||
|
||
return responseWithHits; | ||
} | ||
|
||
public String[] getIndices(JoinRequestBuilder joinRequestBuilder) { | ||
return Stream.concat( | ||
Stream.of(joinRequestBuilder.getFirstTable().getOriginalSelect().getIndexArr()), | ||
Stream.of(joinRequestBuilder.getSecondTable().getOriginalSelect().getIndexArr())) | ||
.distinct() | ||
.toArray(String[]::new); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're not switching to PIT completely but support both PIT and Scroll?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current plan is to add for all pagination then remove scroll after performance comparison.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we will raise another PR for this after performance test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I was planning to add search_after for all queries (join, multi query, pagination) then a separate PR to remove scroll after performance test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the PIT turns out be bad? We will leave setting to customer? What will be the default setting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current default setting is true for using PIT with search after. After benchmark, we can decide if this needs to be changed. We can remove this while removing scroll.