Skip to content

Commit

Permalink
Add tablesQueried metadata to BrokerResponse (#14384)
Browse files Browse the repository at this point in the history
tablesQueried is a set of tables that were queried in the request. The field is set for both single stage and multi-stage queries.
  • Loading branch information
vrajat authored Nov 5, 2024
1 parent cc5d0f1 commit 969bbf0
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats,
requestContext);
}
brokerResponse.setTablesQueried(Set.of(rawTableName));

for (ProcessingException exception : exceptions) {
brokerResponse.addException(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO

BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.setResultTable(queryResults.getResultTable());
brokerResponse.setTablesQueried(tableNames);
// TODO: Add servers queried/responded stats
brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
Expand Down Expand Up @@ -294,4 +295,16 @@ default long getRealtimeTotalCpuTimeNs() {
* Returns the trace info for the query execution when tracing is enabled, empty map otherwise.
*/
Map<String, String> getTraceInfo();

/**
* Set the tables queried in the request
* @param tablesQueried Set of tables queried
*/
void setTablesQueried(Set<String> tablesQueried);

/**
* Get the tables queried in the request
* @return Set of tables queried
*/
Set<String> getTablesQueried();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
Expand All @@ -51,7 +53,7 @@
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo"
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried"
})
@JsonIgnoreProperties(ignoreUnknown = true)
public class BrokerResponseNative implements BrokerResponse {
Expand Down Expand Up @@ -99,6 +101,7 @@ public class BrokerResponseNative implements BrokerResponse {
private long _explainPlanNumEmptyFilterSegments = 0L;
private long _explainPlanNumMatchAllFilterSegments = 0L;
private Map<String, String> _traceInfo = new HashMap<>();
private Set<String> _tablesQueried = Set.of();

public BrokerResponseNative() {
}
Expand Down Expand Up @@ -485,4 +488,15 @@ public Map<String, String> getTraceInfo() {
public void setTraceInfo(Map<String, String> traceInfo) {
_traceInfo = traceInfo;
}

@Override
public void setTablesQueried(@NotNull Set<String> tablesQueried) {
_tablesQueried = tablesQueried;
}

@Override
@NotNull
public Set<String> getTablesQueried() {
return _tablesQueried;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
Expand All @@ -45,7 +47,8 @@
"numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo"
"realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo",
"tablesQueried"
})
public class BrokerResponseNativeV2 implements BrokerResponse {
private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
Expand Down Expand Up @@ -73,6 +76,7 @@ public class BrokerResponseNativeV2 implements BrokerResponse {
private int _numServersQueried;
private int _numServersResponded;
private long _brokerReduceTimeMs;
private Set<String> _tablesQueried = Set.of();

@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
Expand Down Expand Up @@ -384,4 +388,15 @@ public StatMap.Type getType() {
return _type;
}
}

@Override
public void setTablesQueried(@NotNull Set<String> tablesQueried) {
_tablesQueried = tablesQueried;
}

@Override
@NotNull
public Set<String> getTablesQueried() {
return _tablesQueried;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static org.apache.pinot.common.function.scalar.StringFunctions.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;


Expand Down Expand Up @@ -1222,6 +1223,34 @@ public void testCrossDatabaseQuery()
checkQueryPlanningErrorForDBTest(result, QueryException.QUERY_PLANNING_ERROR_CODE);
}

@Test(dataProvider = "useBothQueryEngines")
public void testTablesQueriedField(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String query = "select sum(ActualElapsedTime) from mytable;";
JsonNode jsonNode = postQuery(query);
JsonNode tablesQueried = jsonNode.get("tablesQueried");
assertNotNull(tablesQueried);
assertTrue(tablesQueried.isArray());
assertEquals(tablesQueried.size(), 1);
assertEquals(tablesQueried.get(0).asText(), "mytable");
}

@Test
public void testTablesQueriedWithJoin()
throws Exception {
// Self Join
String query = "select sum(ActualElapsedTime) from mytable WHERE ActualElapsedTime > "
+ "(select avg(ActualElapsedTime) as avg_profit from mytable)";
JsonNode jsonNode = postQuery(query);
JsonNode tablesQueried = jsonNode.get("tablesQueried");
assertNotNull(tablesQueried);
assertTrue(tablesQueried.isArray());
assertEquals(tablesQueried.size(), 1);
assertEquals(tablesQueried.get(0).asText(), "mytable");
}


private void checkQueryResultForDBTest(String column, String tableName)
throws Exception {
checkQueryResultForDBTest(column, tableName, null, null);
Expand Down

0 comments on commit 969bbf0

Please sign in to comment.