Skip to content

Commit

Permalink
Change query range response structure
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Jul 17, 2023
1 parent 8c8e08c commit 77b6bb4
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 142 deletions.
10 changes: 10 additions & 0 deletions integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ public void testSortPushDownExplain() throws Exception {
);
}

@Test
public void explainQueryRange() throws Exception {
String expected = loadFromFile("expectedOutput/ppl/explain_query_range.json");
assertJsonEquals(
expected,
explainQueryToString("source = my_prometheus"
+ ".query_range('prometheus_http_requests_total',1689281439,1689291439,14)")
);
}

String loadFromFile(String filename) throws Exception {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"root": {
"name": "QueryRangeFunctionTableScanOperator",
"description": {
"request": "query_range(prometheus_http_requests_total, 1689281439, 1689291439, 14)"
},
"children": []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@

package org.opensearch.sql.prometheus.functions.response;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.sql.data.model.ExprCollectionValue;
import org.opensearch.sql.data.model.ExprDoubleValue;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants;

/**
* Default implementation of QueryRangeFunctionResponseHandle.
Expand All @@ -40,63 +41,61 @@ public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFuncti
*/
public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) {
this.responseObject = responseObject;
constructIteratorAndSchema();
constructSchema();
constructIterator();
}

private void constructIteratorAndSchema() {
private void constructIterator() {
List<ExprValue> result = new ArrayList<>();
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
if ("matrix".equals(responseObject.getString("resultType"))) {
JSONArray itemArray = responseObject.getJSONArray("result");
for (int i = 0; i < itemArray.length(); i++) {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
JSONObject item = itemArray.getJSONObject(i);
JSONObject metric = item.getJSONObject("metric");
JSONArray values = item.getJSONArray("values");
if (i == 0) {
columnList = getColumnList(metric);
}
for (int j = 0; j < values.length(); j++) {
LinkedHashMap<String, ExprValue> linkedHashMap =
extractRow(metric, values.getJSONArray(j), columnList);
result.add(new ExprTupleValue(linkedHashMap));
}
linkedHashMap.put(LABELS, extractLabels(item.getJSONObject("metric")));
extractTimestampAndValues(item.getJSONArray("values"), linkedHashMap);
result.add(new ExprTupleValue(linkedHashMap));
}
} else {
throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus "
+ "Response Parsing. 'matrix' resultType is expected",
responseObject.getString("resultType")));
}
this.schema = new ExecutionEngine.Schema(columnList);
this.responseIterator = result.iterator();
}

@NotNull
private static LinkedHashMap<String, ExprValue> extractRow(JSONObject metric,
JSONArray values, List<ExecutionEngine.Schema.Column> columnList) {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
for (ExecutionEngine.Schema.Column column : columnList) {
if (PrometheusFieldConstants.TIMESTAMP.equals(column.getName())) {
linkedHashMap.put(PrometheusFieldConstants.TIMESTAMP,
new ExprTimestampValue(Instant.ofEpochMilli((long) (values.getDouble(0) * 1000))));
} else if (column.getName().equals(VALUE)) {
linkedHashMap.put(VALUE, new ExprDoubleValue(values.getDouble(1)));
} else {
linkedHashMap.put(column.getName(),
new ExprStringValue(metric.getString(column.getName())));
}
private static void extractTimestampAndValues(JSONArray values,
LinkedHashMap<String, ExprValue> linkedHashMap) {
List<ExprValue> timestampList = new ArrayList<>();
List<ExprValue> valueList = new ArrayList<>();
for (int j = 0; j < values.length(); j++) {
JSONArray value = values.getJSONArray(j);
timestampList.add(new ExprTimestampValue(
Instant.ofEpochMilli((long) (value.getDouble(0) * 1000))));
valueList.add(new ExprDoubleValue(value.getDouble(1)));
}
return linkedHashMap;
linkedHashMap.put(TIMESTAMP,
new ExprCollectionValue(timestampList));
linkedHashMap.put(VALUE, new ExprCollectionValue(valueList));
}

private void constructSchema() {
this.schema = new ExecutionEngine.Schema(getColumnList());
}

private List<ExecutionEngine.Schema.Column> getColumnList(JSONObject metric) {
private ExprValue extractLabels(JSONObject metric) {
LinkedHashMap<String, ExprValue> labelsMap = new LinkedHashMap<>();
metric.keySet().forEach(key
-> labelsMap.put(key, new ExprStringValue(metric.getString(key))));
return new ExprTupleValue(labelsMap);
}


private List<ExecutionEngine.Schema.Column> getColumnList() {
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
columnList.add(new ExecutionEngine.Schema.Column(PrometheusFieldConstants.TIMESTAMP,
PrometheusFieldConstants.TIMESTAMP, ExprCoreType.TIMESTAMP));
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE));
for (String key : metric.keySet()) {
columnList.add(new ExecutionEngine.Schema.Column(key, key, ExprCoreType.STRING));
}
columnList.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY));
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY));
columnList.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT));
return columnList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public class PrometheusResponse implements Iterable<ExprValue> {

private final PrometheusResponseFieldNames prometheusResponseFieldNames;

private final Boolean isQueryRangeFunctionScan;

/**
* Constructor.
*
Expand All @@ -46,11 +44,9 @@ public class PrometheusResponse implements Iterable<ExprValue> {
* and timestamp fieldName.
*/
public PrometheusResponse(JSONObject responseObject,
PrometheusResponseFieldNames prometheusResponseFieldNames,
Boolean isQueryRangeFunctionScan) {
PrometheusResponseFieldNames prometheusResponseFieldNames) {
this.responseObject = responseObject;
this.prometheusResponseFieldNames = prometheusResponseFieldNames;
this.isQueryRangeFunctionScan = isQueryRangeFunctionScan;
}

@NonNull
Expand All @@ -70,24 +66,7 @@ public Iterator<ExprValue> iterator() {
new ExprTimestampValue(Instant.ofEpochMilli((long) (val.getDouble(0) * 1000))));
linkedHashMap.put(prometheusResponseFieldNames.getValueFieldName(), getValue(val, 1,
prometheusResponseFieldNames.getValueType()));
// Concept:
// {\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}"
// This is the label string in the prometheus response.
// Q: how do we map this to columns in a table.
// For queries like source = prometheus.metric_name | ....
// we can get the labels list in prior as we know which metric we are working on.
// In case of commands like source = prometheus.query_range('promQL');
// Any arbitrary command can be written and we don't know the labels
// in the prometheus response in prior.
// So for PPL like commands...output structure is @value, @timestamp
// and each label is treated as a separate column where as in case of query_range
// function irrespective of promQL, the output structure is
// @value, @timestamp, @labels [jsonfied string of all the labels for a data point]
if (isQueryRangeFunctionScan) {
linkedHashMap.put(LABELS, new ExprStringValue(metric.toString()));
} else {
insertLabels(linkedHashMap, metric);
}
insertLabels(linkedHashMap, metric);
result.add(new ExprTupleValue(linkedHashMap));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ public class PrometheusMetricScan extends TableScanOperator {

private Iterator<ExprValue> iterator;

@Setter
@Getter
private Boolean isQueryRangeFunctionScan = Boolean.FALSE;

@Setter
private PrometheusResponseFieldNames prometheusResponseFieldNames;

Expand All @@ -69,8 +65,7 @@ public void open() {
JSONObject responseObject = prometheusClient.queryRange(
request.getPromQl(),
request.getStartTime(), request.getEndTime(), request.getStep());
return new PrometheusResponse(responseObject, prometheusResponseFieldNames,
isQueryRangeFunctionScan).iterator();
return new PrometheusResponse(responseObject, prometheusResponseFieldNames).iterator();
} catch (IOException e) {
LOG.error(e.getMessage());
throw new RuntimeException("Error fetching data from prometheus server. " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ public Map<String, ExprType> getFieldTypes() {
public PhysicalPlan implement(LogicalPlan plan) {
PrometheusMetricScan metricScan =
new PrometheusMetricScan(prometheusClient);
if (prometheusQueryRequest != null) {
metricScan.setRequest(prometheusQueryRequest);
metricScan.setIsQueryRangeFunctionScan(Boolean.TRUE);
}
return plan.accept(new PrometheusDefaultImplementor(), metricScan);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
import static org.opensearch.sql.prometheus.constants.TestConstants.QUERY;
import static org.opensearch.sql.prometheus.constants.TestConstants.STARTTIME;
import static org.opensearch.sql.prometheus.constants.TestConstants.STEP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
import static org.opensearch.sql.prometheus.utils.TestUtils.getJson;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import lombok.SneakyThrows;
import org.json.JSONObject;
Expand All @@ -30,17 +32,19 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.data.model.ExprCollectionValue;
import org.opensearch.sql.data.model.ExprDoubleValue;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.request.PrometheusQueryRequest;

@ExtendWith(MockitoExtension.class)
public class QueryRangeFunctionTableScanOperatorTest {
class QueryRangeFunctionTableScanOperatorTest {
@Mock
private PrometheusClient prometheusClient;

Expand All @@ -61,22 +65,32 @@ void testQueryResponseIterator() {
.thenReturn(new JSONObject(getJson("query_range_result.json")));
queryRangeFunctionTableScanOperator.open();
Assertions.assertTrue(queryRangeFunctionTableScanOperator.hasNext());
ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{
put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)));
put(VALUE, new ExprDoubleValue(1));
LinkedHashMap<String, ExprValue> labelsMap = new LinkedHashMap<>() {{
put("instance", new ExprStringValue("localhost:9090"));
put("__name__", new ExprStringValue("up"));
put("job", new ExprStringValue("prometheus"));
}};
ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{
put(LABELS, new ExprTupleValue(labelsMap));
put(TIMESTAMP, new ExprCollectionValue(Collections
.singletonList(new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)))));
put(VALUE, new ExprCollectionValue(Collections.singletonList(new ExprDoubleValue(1))));
}
});

assertEquals(firstRow, queryRangeFunctionTableScanOperator.next());
Assertions.assertTrue(queryRangeFunctionTableScanOperator.hasNext());
ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{
put("@timestamp", new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)));
put("@value", new ExprDoubleValue(0));

LinkedHashMap<String, ExprValue> labelsMap2 = new LinkedHashMap<>() {{
put("instance", new ExprStringValue("localhost:9091"));
put("__name__", new ExprStringValue("up"));
put("job", new ExprStringValue("node"));
}};
ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{
put(LABELS, new ExprTupleValue(labelsMap2));
put(TIMESTAMP, new ExprCollectionValue(Collections
.singletonList(new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)))));
put(VALUE, new ExprCollectionValue(Collections.singletonList(new ExprDoubleValue(0))));
}
});
assertEquals(secondRow, queryRangeFunctionTableScanOperator.next());
Expand Down Expand Up @@ -120,11 +134,9 @@ void testQuerySchema() {
.thenReturn(new JSONObject(getJson("query_range_result.json")));
queryRangeFunctionTableScanOperator.open();
ArrayList<ExecutionEngine.Schema.Column> columns = new ArrayList<>();
columns.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.TIMESTAMP));
columns.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE));
columns.add(new ExecutionEngine.Schema.Column("instance", "instance", ExprCoreType.STRING));
columns.add(new ExecutionEngine.Schema.Column("__name__", "__name__", ExprCoreType.STRING));
columns.add(new ExecutionEngine.Schema.Column("job", "job", ExprCoreType.STRING));
columns.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY));
columns.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY));
columns.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT));
ExecutionEngine.Schema expectedSchema = new ExecutionEngine.Schema(columns);
assertEquals(expectedSchema, queryRangeFunctionTableScanOperator.schema());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,39 +209,6 @@ void testQueryResponseIteratorWithGivenPrometheusResponseWithBackQuotedFieldName
Assertions.assertFalse(prometheusMetricScan.hasNext());
}

@Test
@SneakyThrows
void testQueryResponseIteratorForQueryRangeFunction() {
PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient);
prometheusMetricScan.setIsQueryRangeFunctionScan(Boolean.TRUE);
prometheusMetricScan.getRequest().setPromQl(QUERY);
prometheusMetricScan.getRequest().setStartTime(STARTTIME);
prometheusMetricScan.getRequest().setEndTime(ENDTIME);
prometheusMetricScan.getRequest().setStep(STEP);

when(prometheusClient.queryRange(any(), any(), any(), any()))
.thenReturn(new JSONObject(getJson("query_range_result.json")));
prometheusMetricScan.open();
Assertions.assertTrue(prometheusMetricScan.hasNext());
ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{
put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)));
put(VALUE, new ExprLongValue(1));
put(LABELS, new ExprStringValue(
"{\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}"));
}
});
assertEquals(firstRow, prometheusMetricScan.next());
Assertions.assertTrue(prometheusMetricScan.hasNext());
ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{
put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)));
put(VALUE, new ExprLongValue(0));
put(LABELS, new ExprStringValue(
"{\"instance\":\"localhost:9091\",\"__name__\":\"up\",\"job\":\"node\"}"));
}
});
assertEquals(secondRow, prometheusMetricScan.next());
Assertions.assertFalse(prometheusMetricScan.hasNext());
}

@Test
@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,32 +111,6 @@ void testGetFieldTypesFromPrometheusQueryRequest() {
assertNull(prometheusMetricTable.getMetricName());
}

@Test
void testImplementWithQueryRangeFunction() {
PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest();
prometheusQueryRequest.setPromQl("test");
prometheusQueryRequest.setStep("15m");
PrometheusMetricTable prometheusMetricTable =
new PrometheusMetricTable(client, prometheusQueryRequest);
List<NamedExpression> finalProjectList = new ArrayList<>();
finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING)));
finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP)));
PhysicalPlan plan = prometheusMetricTable.implement(
project(relation("query_range", prometheusMetricTable),
finalProjectList, null));


assertTrue(plan instanceof ProjectOperator);
List<NamedExpression> projectList = ((ProjectOperator) plan).getProjectList();
List<String> outputFields
= projectList.stream().map(NamedExpression::getName).collect(Collectors.toList());
assertEquals(List.of(VALUE, TIMESTAMP), outputFields);
assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan);
PrometheusMetricScan prometheusMetricScan =
(PrometheusMetricScan) ((ProjectOperator) plan).getInput();
assertEquals(prometheusQueryRequest, prometheusMetricScan.getRequest());
}

@Test
void testImplementWithBasicMetricQuery() {
PrometheusMetricTable prometheusMetricTable =
Expand Down

0 comments on commit 77b6bb4

Please sign in to comment.