From d02920289cf58222134968a668da2a53254323e4 Mon Sep 17 00:00:00 2001 From: Hendrik Saly Date: Sat, 31 Aug 2024 15:22:44 +0200 Subject: [PATCH] Prototype for PPL took info Signed-off-by: Hendrik Saly --- .../transport/model/AsyncQueryResult.java | 4 ++-- .../sql/executor/ExecutionEngine.java | 1 + .../org/opensearch/sql/storage/TookAware.java | 10 ++++++++++ .../sql/executor/DefaultExecutionEngine.java | 2 +- .../org/opensearch/sql/ppl/StandaloneIT.java | 3 ++- .../sql/legacy/plugin/RestSQLQueryAction.java | 6 +++++- .../executor/OpenSearchExecutionEngine.java | 14 +++++++++++++- .../request/OpenSearchQueryRequest.java | 18 ++++++++++++------ .../response/OpenSearchResponse.java | 13 ++++++++++++- .../storage/scan/OpenSearchIndexScan.java | 11 ++++++++++- .../transport/TransportPPLQueryAction.java | 6 +++++- .../sql/protocol/response/QueryResult.java | 10 ++++++++-- .../format/SimpleJsonResponseFormatter.java | 7 ++++++- .../format/VisualizationResponseFormatter.java | 5 ++++- 14 files changed, 91 insertions(+), 19 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/storage/TookAware.java diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/model/AsyncQueryResult.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/model/AsyncQueryResult.java index 712cebf7e1..e66d47f81d 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/model/AsyncQueryResult.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/model/AsyncQueryResult.java @@ -24,7 +24,7 @@ public AsyncQueryResult( Collection exprValues, Cursor cursor, String error) { - super(schema, exprValues, cursor); + super(schema, exprValues, cursor, null); this.status = status; this.error = error; } @@ -34,7 +34,7 @@ public AsyncQueryResult( ExecutionEngine.Schema schema, Collection exprValues, String error) { - super(schema, exprValues); + super(schema, exprValues, null); this.status = status; this.error = error; } diff --git a/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java index 43b8ccb62e..94aa066902 100644 --- a/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java @@ -48,6 +48,7 @@ class QueryResponse { private final Schema schema; private final List results; private final Cursor cursor; + private final Long took; } @Data diff --git a/core/src/main/java/org/opensearch/sql/storage/TookAware.java b/core/src/main/java/org/opensearch/sql/storage/TookAware.java new file mode 100644 index 0000000000..bc14bdd5d4 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/storage/TookAware.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.storage; + +public interface TookAware { + Long getTook(); +} diff --git a/core/src/testFixtures/java/org/opensearch/sql/executor/DefaultExecutionEngine.java b/core/src/testFixtures/java/org/opensearch/sql/executor/DefaultExecutionEngine.java index 61219d4637..2c67f6bef8 100644 --- a/core/src/testFixtures/java/org/opensearch/sql/executor/DefaultExecutionEngine.java +++ b/core/src/testFixtures/java/org/opensearch/sql/executor/DefaultExecutionEngine.java @@ -32,7 +32,7 @@ public void execute( result.add(plan.next()); } QueryResponse response = - new QueryResponse(new Schema(new ArrayList<>()), new ArrayList<>(), Cursor.None); + new QueryResponse(new Schema(new ArrayList<>()), new ArrayList<>(), Cursor.None, 0L); listener.onResponse(response); } catch (Exception e) { listener.onFailure(e); diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index f81e1b6615..8ebca2f7b5 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -134,7 +134,8 @@ private String executeByStandaloneQueryEngine(String query) { @Override public void onResponse(QueryResponse response) { - QueryResult result = new QueryResult(response.getSchema(), response.getResults()); + QueryResult result = + new QueryResult(response.getSchema(), response.getResults(), response.getTook()); String json = new SimpleJsonResponseFormatter(PRETTY).format(result); actual.set(json); } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java index 4440219f1b..a6377230c1 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java @@ -183,7 +183,11 @@ public void onResponse(QueryResponse response) { channel, OK, formatter.format( - new QueryResult(response.getSchema(), response.getResults(), response.getCursor())), + new QueryResult( + response.getSchema(), + response.getResults(), + response.getCursor(), + response.getTook())), formatter.contentType()); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 21046956d0..aebc3efbb8 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -18,7 +18,9 @@ import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector; import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.planner.physical.ProjectOperator; import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.sql.storage.TookAware; /** OpenSearch execution engine implementation. */ @RequiredArgsConstructor @@ -52,9 +54,19 @@ public void execute( result.add(plan.next()); } + Long took = null; + + if (physicalPlan instanceof ProjectOperator) { + PhysicalPlan input = ((ProjectOperator) physicalPlan).getInput(); + + if (input instanceof TookAware) { + took = ((TookAware) input).getTook(); + } + } + QueryResponse response = new QueryResponse( - physicalPlan.schema(), result, planSerializer.convertToCursor(plan)); + physicalPlan.schema(), result, planSerializer.convertToCursor(plan), took); listener.onResponse(response); } catch (Exception e) { listener.onFailure(e); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 6447a3ff65..53f793dff8 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -15,6 +15,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.common.SetOnce; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.search.SearchHits; import org.opensearch.search.builder.SearchSourceBuilder; @@ -48,6 +49,8 @@ public class OpenSearchQueryRequest implements OpenSearchRequest { /** Indicate the search already done. */ private boolean searchDone = false; + private SetOnce took = new SetOnce<>(); + /** Constructor of OpenSearchQueryRequest. */ public OpenSearchQueryRequest( String indexName, int size, OpenSearchExprValueFactory factory, List includes) { @@ -83,14 +86,17 @@ public OpenSearchResponse search( Function searchAction, Function scrollAction) { if (searchDone) { - return new OpenSearchResponse(SearchHits.empty(), exprValueFactory, includes); + return new OpenSearchResponse(SearchHits.empty(), exprValueFactory, includes, took.get()); } else { searchDone = true; - return new OpenSearchResponse( - searchAction.apply( - new SearchRequest().indices(indexName.getIndexNames()).source(sourceBuilder)), - exprValueFactory, - includes); + OpenSearchResponse openSearchResponse = + new OpenSearchResponse( + searchAction.apply( + new SearchRequest().indices(indexName.getIndexNames()).source(sourceBuilder)), + exprValueFactory, + includes); + took.set(Long.valueOf(openSearchResponse.getTook())); + return openSearchResponse; } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java index e43777a740..78b97e4d33 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java @@ -48,6 +48,8 @@ public class OpenSearchResponse implements Iterable { /** List of requested include fields. */ private final List includes; + private final long took; + /** OpenSearchExprValueFactory used to build ExprValue from search result. */ @EqualsAndHashCode.Exclude private final OpenSearchExprValueFactory exprValueFactory; @@ -60,15 +62,20 @@ public OpenSearchResponse( this.aggregations = searchResponse.getAggregations(); this.exprValueFactory = exprValueFactory; this.includes = includes; + this.took = searchResponse.getTook().getMillis(); } /** Constructor of OpenSearchResponse with SearchHits. */ public OpenSearchResponse( - SearchHits hits, OpenSearchExprValueFactory exprValueFactory, List includes) { + SearchHits hits, + OpenSearchExprValueFactory exprValueFactory, + List includes, + long took) { this.hits = hits; this.aggregations = null; this.exprValueFactory = exprValueFactory; this.includes = includes; + this.took = took; } /** @@ -85,6 +92,10 @@ public boolean isAggregationResponse() { return aggregations != null; } + public long getTook() { + return took; + } + /** * Make response iterable without need to return internal data structure explicitly. * diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java index b1e4ccc463..a83d116d7a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java @@ -24,11 +24,12 @@ import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; import org.opensearch.sql.planner.SerializablePlan; import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.sql.storage.TookAware; /** OpenSearch index scan operator. */ @EqualsAndHashCode(onlyExplicitlyIncluded = true, callSuper = false) @ToString(onlyExplicitlyIncluded = true) -public class OpenSearchIndexScan extends TableScanOperator implements SerializablePlan { +public class OpenSearchIndexScan extends TableScanOperator implements SerializablePlan, TookAware { /** OpenSearch client. */ private OpenSearchClient client; @@ -45,6 +46,8 @@ public class OpenSearchIndexScan extends TableScanOperator implements Serializab /** Search response for current batch. */ private Iterator iterator; + private Long took; + /** Creates index scan based on a provided OpenSearchRequestBuilder. */ public OpenSearchIndexScan( OpenSearchClient client, int maxResponseSize, OpenSearchRequest request) { @@ -79,6 +82,7 @@ public ExprValue next() { private void fetchNextBatch() { OpenSearchResponse response = client.search(request); + took = Long.valueOf(response.getTook()); if (!response.isEmpty()) { iterator = response.iterator(); } @@ -149,4 +153,9 @@ public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(maxResponseSize); } + + @Override + public Long getTook() { + return took; + } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index 76283ac63a..5703f0bcbc 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -160,7 +160,11 @@ private ResponseListener createListener( public void onResponse(ExecutionEngine.QueryResponse response) { String responseContent = formatter.format( - new QueryResult(response.getSchema(), response.getResults(), response.getCursor())); + new QueryResult( + response.getSchema(), + response.getResults(), + response.getCursor(), + response.getTook())); listener.onResponse(new TransportPPLQueryResponse(responseContent)); } diff --git a/protocol/src/main/java/org/opensearch/sql/protocol/response/QueryResult.java b/protocol/src/main/java/org/opensearch/sql/protocol/response/QueryResult.java index 03be0875cf..cd2466180c 100644 --- a/protocol/src/main/java/org/opensearch/sql/protocol/response/QueryResult.java +++ b/protocol/src/main/java/org/opensearch/sql/protocol/response/QueryResult.java @@ -32,8 +32,10 @@ public class QueryResult implements Iterable { @Getter private final Cursor cursor; - public QueryResult(ExecutionEngine.Schema schema, Collection exprValues) { - this(schema, exprValues, Cursor.None); + private final Long took; + + public QueryResult(ExecutionEngine.Schema schema, Collection exprValues, Long took) { + this(schema, exprValues, Cursor.None, took); } /** @@ -45,6 +47,10 @@ public int size() { return exprValues.size(); } + public Long took() { + return took; + } + /** * Parse column name from results. * diff --git a/protocol/src/main/java/org/opensearch/sql/protocol/response/format/SimpleJsonResponseFormatter.java b/protocol/src/main/java/org/opensearch/sql/protocol/response/format/SimpleJsonResponseFormatter.java index c00174dc9f..3dfe4ec36c 100644 --- a/protocol/src/main/java/org/opensearch/sql/protocol/response/format/SimpleJsonResponseFormatter.java +++ b/protocol/src/main/java/org/opensearch/sql/protocol/response/format/SimpleJsonResponseFormatter.java @@ -28,7 +28,8 @@ * ["Smith"] * ], * "total": 2, - * "size": 2 + * "size": 2, + * "took": 47 * } * */ @@ -47,6 +48,9 @@ public Object buildJsonObject(QueryResult response) { response.columnNameTypes().forEach((name, type) -> json.column(new Column(name, type))); json.datarows(fetchDataRows(response)); + + json.took(response.took()); + return json.build(); } @@ -70,6 +74,7 @@ public static class JsonResponse { private long total; private long size; + private Long took; } @RequiredArgsConstructor diff --git a/protocol/src/main/java/org/opensearch/sql/protocol/response/format/VisualizationResponseFormatter.java b/protocol/src/main/java/org/opensearch/sql/protocol/response/format/VisualizationResponseFormatter.java index d5d220dd8d..61abb0a93c 100644 --- a/protocol/src/main/java/org/opensearch/sql/protocol/response/format/VisualizationResponseFormatter.java +++ b/protocol/src/main/java/org/opensearch/sql/protocol/response/format/VisualizationResponseFormatter.java @@ -49,7 +49,8 @@ * ] * }, * "size": 2, - * "status": 200 + * "status": 200, + * "took": 47 * } * */ @@ -65,6 +66,7 @@ protected Object buildJsonObject(QueryResult response) { .metadata(constructMetadata(response)) .size(response.size()) .status(200) + .took(response.took()) .build(); } @@ -144,6 +146,7 @@ public static class VisualizationResponse { private final Metadata metadata; private final long size; private final int status; + private final Long took; } @RequiredArgsConstructor