Skip to content

Commit

Permalink
Add pit before query execution
Browse files Browse the repository at this point in the history
Signed-off-by: Rupal Mahajan <[email protected]>
  • Loading branch information
rupal-bq committed Jun 13, 2024
1 parent 7bb89d8 commit 60bd8fe
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.legacy.executor;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Map;
Expand All @@ -24,6 +26,7 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.action.RestStatusToXContentListener;
import org.opensearch.search.SearchHits;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.join.ElasticJoinExecutor;
import org.opensearch.sql.legacy.executor.join.ElasticUtils;
Expand Down Expand Up @@ -58,8 +61,16 @@ public void execute(
ActionRequest request = requestBuilder.request();

if (requestBuilder instanceof JoinRequestBuilder) {
boolean isSearchAfter =
LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER);
if (isSearchAfter) {
((JoinRequestBuilder) requestBuilder).updateRequestWithPit(client);
}
ElasticJoinExecutor executor = ElasticJoinExecutor.createJoinExecutor(client, requestBuilder);
executor.run();
if (isSearchAfter) {
((JoinRequestBuilder) requestBuilder).getPit().deletePointInTime(client);
}
executor.sendResponse(channel);
} else if (requestBuilder instanceof MultiQueryRequestBuilder) {
ElasticHitsExecutor executor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.sql.legacy.query.ShowQueryAction;
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder;
import org.opensearch.sql.legacy.query.SqlOpenSearchRequestBuilder;
import org.opensearch.sql.legacy.query.join.JoinRequestBuilder;
import org.opensearch.sql.legacy.query.join.OpenSearchJoinQueryAction;
import org.opensearch.sql.legacy.query.multi.MultiQueryAction;
import org.opensearch.sql.legacy.query.multi.MultiQueryRequestBuilder;
Expand All @@ -45,11 +46,20 @@ public static SearchHits executeJoinSearchAction(
Client client, OpenSearchJoinQueryAction joinQueryAction)
throws IOException, SqlParseException {
SqlElasticRequestBuilder joinRequestBuilder = joinQueryAction.explain();

boolean isSearchAfter =
LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER);
if (isSearchAfter) {
JoinRequestBuilder requestBuilder = (JoinRequestBuilder) joinRequestBuilder;
requestBuilder.updateRequestWithPit(client);
}
ElasticJoinExecutor executor =
ElasticJoinExecutor.createJoinExecutor(client, joinRequestBuilder);
executor.run();
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
joinQueryAction.getPointInTimeHandler().deletePointInTime();

if (isSearchAfter) {
JoinRequestBuilder requestBuilder = (JoinRequestBuilder) joinRequestBuilder;
requestBuilder.getPit().deletePointInTime(client);
}
return executor.getHits();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import org.opensearch.sql.legacy.esdomain.LocalClusterState;

public class PointInTimeHandler {
private Client client;
@Getter private String pitId;
private static final Logger LOG = LogManager.getLogger();

public PointInTimeHandler(Client client, String[] indices) {
this.client = client;
CreatePitRequest createPitRequest =
new CreatePitRequest(
LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE), false, indices);
Expand All @@ -42,7 +40,7 @@ public void onFailure(Exception e) {
});
}

public void deletePointInTime() {
public void deletePointInTime(Client client) {
DeletePitRequest deletePitRequest = new DeletePitRequest(pitId);
client.deletePits(
deletePitRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.alibaba.druid.sql.ast.statement.SQLJoinTableSource;
import java.io.IOException;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestBuilder;
Expand All @@ -16,6 +17,7 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder;

/** Created by Eliran on 15/9/2015. */
Expand All @@ -26,13 +28,23 @@ public class JoinRequestBuilder implements SqlElasticRequestBuilder {
private TableInJoinRequestBuilder secondTable;
private SQLJoinTableSource.JoinType joinType;
private int totalLimit;
@Setter private String pitId;
@Setter @Getter private PointInTimeHandler pit;

public JoinRequestBuilder() {
firstTable = new TableInJoinRequestBuilder();
secondTable = new TableInJoinRequestBuilder();
}

public void updateRequestWithPit(org.opensearch.client.Client client) {
String[] indices =
org.opensearch.common.util.ArrayUtils.concat(
firstTable.getOriginalSelect().getIndexArr(),
secondTable.getOriginalSelect().getIndexArr());
pit = new PointInTimeHandler(client, indices);
firstTable.setPitId(pit.getPitId());
secondTable.setPitId(pit.getPitId());
}

@Override
public ActionRequest request() {
if (multi == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,14 @@

package org.opensearch.sql.legacy.query.join;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.util.List;
import org.opensearch.client.Client;
import org.opensearch.common.util.ArrayUtils;
import org.opensearch.sql.legacy.domain.Field;
import org.opensearch.sql.legacy.domain.JoinSelect;
import org.opensearch.sql.legacy.domain.Select;
import org.opensearch.sql.legacy.domain.TableOnJoinSelect;
import org.opensearch.sql.legacy.domain.hints.Hint;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.query.DefaultQueryAction;
import org.opensearch.sql.legacy.query.QueryAction;
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder;
Expand All @@ -28,7 +23,6 @@
public abstract class OpenSearchJoinQueryAction extends QueryAction {

protected JoinSelect joinSelect;
private PointInTimeHandler pointInTimeHandler;

public OpenSearchJoinQueryAction(Client client, JoinSelect joinSelect) {
super(client, joinSelect);
Expand All @@ -40,16 +34,9 @@ public SqlElasticRequestBuilder explain() throws SqlParseException {
JoinRequestBuilder requestBuilder = createSpecificBuilder();
fillBasicJoinRequestBuilder(requestBuilder);
fillSpecificRequestBuilder(requestBuilder);
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
updateRequestWithPitId(requestBuilder);
}
return requestBuilder;
}

public PointInTimeHandler getPointInTimeHandler() {
return pointInTimeHandler;
}

protected abstract void fillSpecificRequestBuilder(JoinRequestBuilder requestBuilder)
throws SqlParseException;

Expand Down Expand Up @@ -105,18 +92,6 @@ protected void updateRequestWithHints(JoinRequestBuilder requestBuilder) {
}
}

private void updateRequestWithPitId(JoinRequestBuilder requestBuilder) {
String[] indices =
ArrayUtils.concat(
requestBuilder.getFirstTable().getOriginalSelect().getIndexArr(),
requestBuilder.getSecondTable().getOriginalSelect().getIndexArr());
this.pointInTimeHandler = new PointInTimeHandler(client, indices);
String pitId = pointInTimeHandler.getPitId();
requestBuilder.setPitId(pitId);
requestBuilder.getFirstTable().setPitId(pitId);
requestBuilder.getSecondTable().setPitId(pitId);
}

private Config queryPlannerConfig(JoinRequestBuilder requestBuilder) {
return ((HashJoinQueryPlanRequestBuilder) requestBuilder).getConfig();
}
Expand Down

0 comments on commit 60bd8fe

Please sign in to comment.