Skip to content

Commit

Permalink
Prototype for PPL took info
Browse files Browse the repository at this point in the history
Signed-off-by: Hendrik Saly <[email protected]>
  • Loading branch information
salyh committed Aug 31, 2024
1 parent d260e0e commit d029202
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public AsyncQueryResult(
Collection<ExprValue> exprValues,
Cursor cursor,
String error) {
super(schema, exprValues, cursor);
super(schema, exprValues, cursor, null);
this.status = status;
this.error = error;
}
Expand All @@ -34,7 +34,7 @@ public AsyncQueryResult(
ExecutionEngine.Schema schema,
Collection<ExprValue> exprValues,
String error) {
super(schema, exprValues);
super(schema, exprValues, null);
this.status = status;
this.error = error;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class QueryResponse {
private final Schema schema;
private final List<ExprValue> results;
private final Cursor cursor;
private final Long took;
}

@Data
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/TookAware.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.storage;

public interface TookAware {
Long getTook();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void execute(
result.add(plan.next());
}
QueryResponse response =
new QueryResponse(new Schema(new ArrayList<>()), new ArrayList<>(), Cursor.None);
new QueryResponse(new Schema(new ArrayList<>()), new ArrayList<>(), Cursor.None, 0L);
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ private String executeByStandaloneQueryEngine(String query) {

@Override
public void onResponse(QueryResponse response) {
QueryResult result = new QueryResult(response.getSchema(), response.getResults());
QueryResult result =
new QueryResult(response.getSchema(), response.getResults(), response.getTook());
String json = new SimpleJsonResponseFormatter(PRETTY).format(result);
actual.set(json);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ public void onResponse(QueryResponse response) {
channel,
OK,
formatter.format(
new QueryResult(response.getSchema(), response.getResults(), response.getCursor())),
new QueryResult(
response.getSchema(),
response.getResults(),
response.getCursor(),
response.getTook())),
formatter.contentType());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.ProjectOperator;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.TookAware;

/** OpenSearch execution engine implementation. */
@RequiredArgsConstructor
Expand Down Expand Up @@ -52,9 +54,19 @@ public void execute(
result.add(plan.next());
}

Long took = null;

if (physicalPlan instanceof ProjectOperator) {
PhysicalPlan input = ((ProjectOperator) physicalPlan).getInput();

if (input instanceof TookAware) {
took = ((TookAware) input).getTook();
}
}

QueryResponse response =
new QueryResponse(
physicalPlan.schema(), result, planSerializer.convertToCursor(plan));
physicalPlan.schema(), result, planSerializer.convertToCursor(plan), took);
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.common.SetOnce;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class OpenSearchQueryRequest implements OpenSearchRequest {
/** Indicate the search already done. */
private boolean searchDone = false;

private SetOnce<Long> took = new SetOnce<>();

/** Constructor of OpenSearchQueryRequest. */
public OpenSearchQueryRequest(
String indexName, int size, OpenSearchExprValueFactory factory, List<String> includes) {
Expand Down Expand Up @@ -83,14 +86,17 @@ public OpenSearchResponse search(
Function<SearchRequest, SearchResponse> searchAction,
Function<SearchScrollRequest, SearchResponse> scrollAction) {
if (searchDone) {
return new OpenSearchResponse(SearchHits.empty(), exprValueFactory, includes);
return new OpenSearchResponse(SearchHits.empty(), exprValueFactory, includes, took.get());
} else {
searchDone = true;
return new OpenSearchResponse(
searchAction.apply(
new SearchRequest().indices(indexName.getIndexNames()).source(sourceBuilder)),
exprValueFactory,
includes);
OpenSearchResponse openSearchResponse =
new OpenSearchResponse(
searchAction.apply(
new SearchRequest().indices(indexName.getIndexNames()).source(sourceBuilder)),
exprValueFactory,
includes);
took.set(Long.valueOf(openSearchResponse.getTook()));
return openSearchResponse;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class OpenSearchResponse implements Iterable<ExprValue> {
/** List of requested include fields. */
private final List<String> includes;

private final long took;

/** OpenSearchExprValueFactory used to build ExprValue from search result. */
@EqualsAndHashCode.Exclude private final OpenSearchExprValueFactory exprValueFactory;

Expand All @@ -60,15 +62,20 @@ public OpenSearchResponse(
this.aggregations = searchResponse.getAggregations();
this.exprValueFactory = exprValueFactory;
this.includes = includes;
this.took = searchResponse.getTook().getMillis();
}

/** Constructor of OpenSearchResponse with SearchHits. */
public OpenSearchResponse(
SearchHits hits, OpenSearchExprValueFactory exprValueFactory, List<String> includes) {
SearchHits hits,
OpenSearchExprValueFactory exprValueFactory,
List<String> includes,
long took) {
this.hits = hits;
this.aggregations = null;
this.exprValueFactory = exprValueFactory;
this.includes = includes;
this.took = took;
}

/**
Expand All @@ -85,6 +92,10 @@ public boolean isAggregationResponse() {
return aggregations != null;
}

public long getTook() {
return took;
}

/**
* Make response iterable without need to return internal data structure explicitly.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine;
import org.opensearch.sql.planner.SerializablePlan;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.TookAware;

/** OpenSearch index scan operator. */
@EqualsAndHashCode(onlyExplicitlyIncluded = true, callSuper = false)
@ToString(onlyExplicitlyIncluded = true)
public class OpenSearchIndexScan extends TableScanOperator implements SerializablePlan {
public class OpenSearchIndexScan extends TableScanOperator implements SerializablePlan, TookAware {

/** OpenSearch client. */
private OpenSearchClient client;
Expand All @@ -45,6 +46,8 @@ public class OpenSearchIndexScan extends TableScanOperator implements Serializab
/** Search response for current batch. */
private Iterator<ExprValue> iterator;

private Long took;

/** Creates index scan based on a provided OpenSearchRequestBuilder. */
public OpenSearchIndexScan(
OpenSearchClient client, int maxResponseSize, OpenSearchRequest request) {
Expand Down Expand Up @@ -79,6 +82,7 @@ public ExprValue next() {

private void fetchNextBatch() {
OpenSearchResponse response = client.search(request);
took = Long.valueOf(response.getTook());
if (!response.isEmpty()) {
iterator = response.iterator();
}
Expand Down Expand Up @@ -149,4 +153,9 @@ public void writeExternal(ObjectOutput out) throws IOException {

out.writeInt(maxResponseSize);
}

@Override
public Long getTook() {
return took;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ private ResponseListener<ExecutionEngine.QueryResponse> createListener(
public void onResponse(ExecutionEngine.QueryResponse response) {
String responseContent =
formatter.format(
new QueryResult(response.getSchema(), response.getResults(), response.getCursor()));
new QueryResult(
response.getSchema(),
response.getResults(),
response.getCursor(),
response.getTook()));
listener.onResponse(new TransportPPLQueryResponse(responseContent));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ public class QueryResult implements Iterable<Object[]> {

@Getter private final Cursor cursor;

public QueryResult(ExecutionEngine.Schema schema, Collection<ExprValue> exprValues) {
this(schema, exprValues, Cursor.None);
private final Long took;

public QueryResult(ExecutionEngine.Schema schema, Collection<ExprValue> exprValues, Long took) {
this(schema, exprValues, Cursor.None, took);
}

/**
Expand All @@ -45,6 +47,10 @@ public int size() {
return exprValues.size();
}

public Long took() {
return took;
}

/**
* Parse column name from results.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
* ["Smith"]
* ],
* "total": 2,
* "size": 2
* "size": 2,
* "took": 47
* }
* </pre>
*/
Expand All @@ -47,6 +48,9 @@ public Object buildJsonObject(QueryResult response) {
response.columnNameTypes().forEach((name, type) -> json.column(new Column(name, type)));

json.datarows(fetchDataRows(response));

json.took(response.took());

return json.build();
}

Expand All @@ -70,6 +74,7 @@ public static class JsonResponse {

private long total;
private long size;
private Long took;
}

@RequiredArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
* ]
* },
* "size": 2,
* "status": 200
* "status": 200,
* "took": 47
* }
* </pre>
*/
Expand All @@ -65,6 +66,7 @@ protected Object buildJsonObject(QueryResult response) {
.metadata(constructMetadata(response))
.size(response.size())
.status(200)
.took(response.took())
.build();
}

Expand Down Expand Up @@ -144,6 +146,7 @@ public static class VisualizationResponse {
private final Metadata metadata;
private final long size;
private final int status;
private final Long took;
}

@RequiredArgsConstructor
Expand Down

0 comments on commit d029202

Please sign in to comment.