Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] add partition scan infomation in audit log #51853

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public static PQueryStatistics toProtobuf(TAuditStatistics tb) {
pb.cpuCostNs = tb.getCpu_cost_ns();
pb.memCostBytes = tb.getMem_cost_bytes();
pb.spillBytes = tb.getSpill_bytes();
pb.scanPartitions = tb.getScan_partitions();
if (tb.isSetStats_items()) {
pb.statsItems = Lists.newArrayList();
for (TAuditStatisticsItem tItem : tb.getStats_items()) {
Expand Down Expand Up @@ -135,6 +136,9 @@ public static TAuditStatistics toThrift(PQueryStatistics pb) {
if (pb.returnedRows != null) {
tb.setReturned_rows(pb.returnedRows);
}
if (pb.scanPartitions != null) {
tb.setScan_partitions(pb.scanPartitions);
}
if (pb.cpuCostNs != null) {
tb.setCpu_cost_ns(pb.cpuCostNs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public HttpResultSender(HttpConnectContext context) {
}

// for select
public RowBatch sendQueryResult(Coordinator coord, ExecPlan execPlan, String sql) throws Exception {
public RowBatch sendQueryResult(Coordinator coord, ExecPlan execPlan, String sql,
String scanPartitionsInfo) throws Exception {
RowBatch batch;
ChannelHandlerContext nettyChannel = context.getNettyChannel();
// if some data already sent to client, when exception occurs,we just close the channel
Expand All @@ -89,6 +90,7 @@ public RowBatch sendQueryResult(Coordinator coord, ExecPlan execPlan, String sql
}
if (batch.isEos()) {
if (!context.isOnlyOutputResultRaw()) {
batch.getQueryStatistics().scanPartitions = scanPartitionsInfo;
ByteBuf statisticData = JsonSerializer.getStatistic(batch.getQueryStatistics());
nettyChannel.writeAndFlush(statisticData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class JsonSerializer {
private static final String STATISTICS_OBJ_NAME = "statistics";
private static final String STATISTICS_SCAN_ROWS = "scanRows";
private static final String STATISTICS_SCAN_BYTES = "scanBytes";
private static final String STATISTICS_SCAN_PARTITIONS = "scanPartitions";
private static final String STATISTICS_RETURN_ROWS = "returnRows";

public static ByteBuf getShowResult(ShowResultSet showResultSet) throws IOException {
Expand Down Expand Up @@ -80,6 +81,7 @@ public static ByteBuf getShowResult(ShowResultSet showResultSet) throws IOExcept
jsonWriter.name(STATISTICS_OBJ_NAME).beginObject();
jsonWriter.name(STATISTICS_SCAN_ROWS).value(0);
jsonWriter.name(STATISTICS_SCAN_BYTES).value(0);
jsonWriter.name(STATISTICS_SCAN_PARTITIONS).value("");
jsonWriter.name(STATISTICS_RETURN_ROWS).value(showResultSet.getResultRows().size());
jsonWriter.endObject();

Expand Down Expand Up @@ -127,16 +129,19 @@ public static ByteBuf getStatistic(PQueryStatistics queryStatistics) throws IOEx
long scanRows = 0;
long scanBytes = 0;
long returnRows = 0;
String scanPartitions = "";
if (null != queryStatistics && null != queryStatistics.statsItems) {
for (QueryStatisticsItemPB item : queryStatistics.statsItems) {
scanRows += item.scanRows;
scanBytes += item.scanBytes;
}
returnRows = queryStatistics.returnedRows;
scanPartitions = queryStatistics.scanPartitions;
}

jsonWriter.name(STATISTICS_SCAN_ROWS).value(scanRows);
jsonWriter.name(STATISTICS_SCAN_BYTES).value(scanBytes);
jsonWriter.name(STATISTICS_SCAN_PARTITIONS).value(scanPartitions);
jsonWriter.name(STATISTICS_RETURN_ROWS).value(returnRows);
jsonWriter.endObject();
jsonWriter.endObject();
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/PlanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,10 @@ private String getNodeVerboseExplain(String prefix) {
return getNodeExplainString(prefix, TExplainLevel.VERBOSE);
}

public List<String> getSelectedPartitionNames() {
return Lists.newArrayList();
}

// Convert this plan node, including all children, to its Thrift representation.
public TPlan treeToThrift() {
TPlan result = new TPlan();
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public String getTableName() {
return desc.getTable().getName();
}

public long getTableId() {
return desc.getTable().getId();
}

public boolean isLocalNativeTable() {
return false;
}
Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/plugin/AuditEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public enum EventType {
public long scanBytes = -1;
@AuditField(value = "ScanRows")
public long scanRows = -1;
@AuditField(value = "ScanPartitions")
public String scanPartitions = "";
@AuditField(value = "ReturnRows")
public long returnRows = -1;
@AuditField(value = "CpuCostNs", ignore_zero = true)
Expand Down Expand Up @@ -219,6 +221,11 @@ public AuditEventBuilder setScanRows(long scanRows) {
return this;
}

public AuditEventBuilder setScanPartitions(String scanPartitions) {
auditEvent.scanPartitions = scanPartitions;
return this;
}

public AuditEventBuilder setReturnRows(long returnRows) {
auditEvent.returnRows = returnRows;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public void auditAfterExec(String origStmt, StatementBase parsedStmt, PQueryStat
ctx.getAuditEventBuilder().setMemCostBytes(statistics.memCostBytes == null ? -1 : statistics.memCostBytes);
ctx.getAuditEventBuilder().setSpilledBytes(statistics.spillBytes == null ? -1 : statistics.spillBytes);
ctx.getAuditEventBuilder().setReturnRows(statistics.returnedRows == null ? 0 : statistics.returnedRows);
ctx.getAuditEventBuilder().setScanPartitions(statistics.scanPartitions == null ? "" : statistics.scanPartitions);
}

if (ctx.getState().isQuery()) {
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/QueryDetail.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public enum QueryMemState {
private String profile;
private String resourceGroupName;
private long scanRows = -1;
private String scanPartitions = "";
private long scanBytes = -1;
private long returnRows = -1;
private long cpuCostNs = -1;
Expand Down Expand Up @@ -139,6 +140,7 @@ public QueryDetail copy() {
queryDetail.explain = this.explain;
queryDetail.profile = this.profile;
queryDetail.scanRows = this.scanRows;
queryDetail.scanPartitions = this.scanPartitions;
queryDetail.scanBytes = this.scanBytes;
queryDetail.returnRows = this.returnRows;
queryDetail.cpuCostNs = this.cpuCostNs;
Expand Down Expand Up @@ -279,6 +281,14 @@ public void setScanRows(long scanRows) {
this.scanRows = scanRows;
}

public String getScanPartitions() {
return scanPartitions;
}

public void setScanPartitions(String scanPartitions) {
this.scanPartitions = scanPartitions;
}

public long getScanBytes() {
return scanBytes;
}
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String MAX_EXECUTION_TIME = "max_execution_time";
public static final String IS_REPORT_SUCCESS = "is_report_success";
public static final String ENABLE_PROFILE = "enable_profile";
public static final String ENABLE_SCAN_PARTITIONS_AUDIT = "enable_scan_partitions_audit";

public static final String ENABLE_LOAD_PROFILE = "enable_load_profile";
public static final String PROFILING = "profiling";
Expand Down Expand Up @@ -939,6 +940,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ENABLE_PROFILE, alias = IS_REPORT_SUCCESS)
private boolean enableProfile = false;

@VariableMgr.VarAttr(name = ENABLE_SCAN_PARTITIONS_AUDIT)
private boolean enableScanPartitionsAudit = false;

@VariableMgr.VarAttr(name = ENABLE_METADATA_PROFILE)
private boolean enableMetadataProfile = false;

Expand Down Expand Up @@ -2606,6 +2610,14 @@ public void setEnableProfile(boolean enableProfile) {
this.enableProfile = enableProfile;
}

public boolean isEnableScanPartitionsAudit() {
return enableScanPartitionsAudit;
}

public void setEnableScanPartitionsAudit(boolean enableScanPartitionsAudit) {
this.enableScanPartitionsAudit = enableScanPartitionsAudit;
}

public boolean isEnableLoadProfile() {
return enableLoadProfile;
}
Expand Down
25 changes: 24 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,7 @@ private void handleQueryStmt(ExecPlan execPlan) throws Exception {
}
boolean executeInFe = !isExplainAnalyze && !isSchedulerExplain && !isOutfileQuery
&& canExecuteInFe(context, execPlan.getPhysicalPlan());
boolean isEnableScanPartitionsAudit = context.getSessionVariable().isEnableScanPartitionsAudit();

if (isExplainAnalyze) {
context.getSessionVariable().setEnableProfile(true);
Expand Down Expand Up @@ -1198,9 +1199,26 @@ private void handleQueryStmt(ExecPlan execPlan) throws Exception {
coord.setTopProfileSupplier(this::buildTopLevelProfile);
coord.setExecPlan(execPlan);

String scanPartitionsInfo = "";
if (isEnableScanPartitionsAudit) {
List<String> scanPartitionsList = Lists.newArrayList();
Map<String, String> scanPartitionsMap = Maps.newHashMap();
for (ScanNode sn : scanNodes) {
Database db = MetaUtils.getDatabaseByTableId(sn.getTableId());
if (db.getOriginName() != "") {
scanPartitionsMap.put("catalogName", InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME);
scanPartitionsMap.put("databaseName", db.getOriginName());
scanPartitionsMap.put("tableName", sn.getTableName());
scanPartitionsMap.put("partitionIds", sn.getSelectedPartitionNames().toString());
scanPartitionsList.add(GSON.toJson(scanPartitionsMap));
}
}
scanPartitionsInfo = scanPartitionsList.toString().replace("\"", "");
}

RowBatch batch;
if (context instanceof HttpConnectContext) {
batch = httpResultSender.sendQueryResult(coord, execPlan, parsedStmt.getOrigStmt().getOrigStmt());
batch = httpResultSender.sendQueryResult(coord, execPlan, parsedStmt.getOrigStmt().getOrigStmt(), scanPartitionsInfo);
} else {
boolean needSendResult = !isPlanAdvisorAnalyze && !isExplainAnalyze
&& !context.getSessionVariable().isEnableExecutionOnly();
Expand Down Expand Up @@ -1273,6 +1291,7 @@ private void handleQueryStmt(ExecPlan execPlan) throws Exception {
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId);
entity.counterScanFinishedTotal.increase(1L);
}
statisticsForAuditLog.scanPartitions = scanPartitionsInfo;
}

private void analyzePlanWithExecStats(ExecPlan execPlan) {
Expand Down Expand Up @@ -2007,6 +2026,9 @@ public PQueryStatistics getQueryStatisticsForAuditLog() {
if (statisticsForAuditLog.scanRows == null) {
statisticsForAuditLog.scanRows = 0L;
}
if (statisticsForAuditLog.scanPartitions == null) {
statisticsForAuditLog.scanPartitions = "";
}
if (statisticsForAuditLog.cpuCostNs == null) {
statisticsForAuditLog.cpuCostNs = 0L;
}
Expand Down Expand Up @@ -2748,6 +2770,7 @@ public void addFinishedQueryDetail() {
if (statistics != null) {
queryDetail.setScanBytes(statistics.scanBytes);
queryDetail.setScanRows(statistics.scanRows);
queryDetail.setScanPartitions(statistics.scanPartitions == null ? "" : statistics.scanPartitions);
queryDetail.setCpuCostNs(statistics.cpuCostNs == null ? -1 : statistics.cpuCostNs);
queryDetail.setMemCostBytes(statistics.memCostBytes == null ? -1 : statistics.memCostBytes);
queryDetail.setSpillBytes(statistics.spillBytes == null ? -1 : statistics.spillBytes);
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/sql/common/MetaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ public static void checkNotSupportCatalog(String catalogName, String operation)
}
}

public static Database getDatabaseByTableId(long tableId) {
List<Long> dbIds = GlobalStateMgr.getCurrentState().getLocalMetastore().getDbIds();
for (long dbId : dbIds) {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
Table table = db.getTable(tableId);
if (table != null) {
return db;
}
}
return new Database();
}

// get table by tableName, unlike getTable, this interface is session aware,
// which means if there is a temporary table with the same name,
// use temporary table first, otherwise, treat it as a permanent table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void test1ExecuteSqlSuccess() throws Exception {
"{\"name\":\"Comment\",\"type\":\"varchar(30)\"}]," +
"\"data\":[{\"Catalog\":\"default_catalog\",\"Type\":\"Internal\"," +
"\"Comment\":\"An internal catalog contains this cluster's self-managed tables.\"}]," +
"\"statistics\":{\"scanRows\":0,\"scanBytes\":0,\"returnRows\":1}}";
"\"statistics\":{\"scanRows\":0,\"scanBytes\":0,\"scanPartitions\":\"\",\"returnRows\":1}}";
Assert.assertEquals(respStr, expected);

body = RequestBody.create(JSON,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public void testQueryDetailQueue() {
"testDb", "select * from table1 limit 1",
"root", "", "default_catalog");
startQueryDetail.setScanRows(100);
startQueryDetail.setScanPartitions("[{catalogName:default_catalog,databaseName:testDb," +
"tableName:table1,partitionIds:[table1]}]");
startQueryDetail.setScanBytes(10001);
startQueryDetail.setReturnRows(1);
startQueryDetail.setCpuCostNs(1002);
Expand All @@ -73,6 +75,8 @@ public void testQueryDetailQueue() {
+ "\"sql\":\"select * from table1 limit 1\","
+ "\"user\":\"root\","
+ "\"scanRows\":100,"
+ "\"scanPartitions\":\"[{catalogName:default_catalog,databaseName:testDb," +
"tableName:table1,partitionIds:[table1]}]\","
+ "\"scanBytes\":10001,"
+ "\"returnRows\":1,"
+ "\"cpuCostNs\":1002,"
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ message PQueryStatistics {
optional int64 cpu_cost_ns = 4;
optional int64 mem_cost_bytes = 5;
optional int64 spill_bytes = 6;
optional string scan_partitions = 7;
repeated QueryStatisticsItemPB stats_items = 10;
repeated NodeExecStatsItemPB node_exec_stats_items = 11;
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ struct TAuditStatistics {
7: optional i64 mem_cost_bytes
8: optional i64 spill_bytes
9: optional list<TAuditStatisticsItem> stats_items
10: optional string scan_partitions
}

struct TReportAuditStatisticsParams {
Expand Down
Loading
Loading