Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] merge full/sample statistics collect #52693

Merged
merged 27 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2111,6 +2111,9 @@ public class Config extends ConfigBase {
"we would use sample statistics instead of full statistics")
public static double statistic_sample_collect_ratio_threshold_of_first_load = 0.1;

@ConfField(mutable = true)
public static boolean statistic_use_meta_statistics = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put some comment on it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's temp config on main, I will remove it in next PR


/**
* default bucket size of histogram statistics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class FeConstants {
public static int checkpoint_interval_second = 60; // 1 minutes
// set this flag true to skip some step when running FE unit test
public static boolean runningUnitTest = false;
// set this flat true to enable unit statistics mock
public static boolean enableUnitStatistics = false;
// set this flag false to skip test view in plan test
public static boolean unitTestView = true;
// Set this flag false to suppress showing local shuffle columns in verbose explain, when running FE unit tests.
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {

public static final String STATISTIC_COLLECT_PARALLEL = "statistic_collect_parallel";

public static final String STATISTIC_META_COLLECT_PARALLEL = "statistic_meta_collect_parallel";

public static final String ENABLE_ANALYZE_PHASE_PRUNE_COLUMNS = "enable_analyze_phase_prune_columns";

public static final String ENABLE_SHOW_ALL_VARIABLES = "enable_show_all_variables";
Expand Down Expand Up @@ -1498,6 +1500,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VarAttr(name = STATISTIC_COLLECT_PARALLEL, flag = VariableMgr.INVISIBLE)
private int statisticCollectParallelism = 1;

@VarAttr(name = STATISTIC_META_COLLECT_PARALLEL, flag = VariableMgr.INVISIBLE)
private int statisticMetaCollectParallelism = 10;

@VarAttr(name = ENABLE_ANALYZE_PHASE_PRUNE_COLUMNS, flag = VariableMgr.INVISIBLE)
private boolean enableAnalyzePhasePruneColumns = false;

Expand Down Expand Up @@ -2570,6 +2575,14 @@ public void setStatisticCollectParallelism(int parallelism) {
this.statisticCollectParallelism = parallelism;
}

public int getStatisticMetaCollectParallelism() {
return statisticMetaCollectParallelism;
}

public void setStatisticMetaCollectParallelism(int statisticMetaCollectParallelism) {
this.statisticMetaCollectParallelism = statisticMetaCollectParallelism;
}

public boolean isEnableAnalyzePhasePruneColumns() {
return enableAnalyzePhasePruneColumns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,7 @@ private void executeAnalyze(AnalyzeStmt analyzeStmt, AnalyzeStatus analyzeStatus
statsConnectCtx.getSessionVariable().setStatisticCollectParallelism(
context.getSessionVariable().getStatisticCollectParallelism());
statsConnectCtx.setThreadLocalInfo();
statsConnectCtx.setStatisticsConnection(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be set in the StatisticExecutor, is it necessary to set it here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not all statistic SQL need set the variable, only collect job need

try {
executeAnalyze(statsConnectCtx, analyzeStmt, analyzeStatus, db, table);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.starrocks.common.AnalysisException;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.common.FeConstants;
import com.starrocks.common.util.DateUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
Expand Down Expand Up @@ -55,6 +56,9 @@ public class ColumnBasicStatsCacheLoader implements AsyncCacheLoader<ColumnStats
public @NonNull CompletableFuture<Optional<ColumnStatistic>> asyncLoad(@NonNull ColumnStatsCacheKey cacheKey,
@NonNull Executor executor) {
return CompletableFuture.supplyAsync(() -> {
if (FeConstants.enableUnitStatistics) {
return Optional.empty();
}
try {
ConnectContext connectContext = StatisticUtils.buildConnectContext();
connectContext.setThreadLocalInfo();
Expand All @@ -79,7 +83,13 @@ public class ColumnBasicStatsCacheLoader implements AsyncCacheLoader<ColumnStats
public CompletableFuture<Map<@NonNull ColumnStatsCacheKey, @NonNull Optional<ColumnStatistic>>> asyncLoadAll(
@NonNull Iterable<? extends @NonNull ColumnStatsCacheKey> keys, @NonNull Executor executor) {
return CompletableFuture.supplyAsync(() -> {

if (FeConstants.enableUnitStatistics) {
Map<ColumnStatsCacheKey, Optional<ColumnStatistic>> result = new HashMap<>();
for (ColumnStatsCacheKey key : keys) {
result.put(key, Optional.empty());
}
return result;
}
try {
long tableId = -1;
List<String> columns = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.statistic;

import com.google.common.collect.Lists;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.common.util.UUIDUtil;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.OriginStatement;
import com.starrocks.qe.QueryState;
import com.starrocks.qe.StmtExecutor;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.ast.QueryStatement;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.ast.ValuesRelation;
import com.starrocks.statistic.base.PartitionSampler;
import com.starrocks.statistic.hyper.HyperQueryJob;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;

public class HyperStatisticsCollectJob extends StatisticsCollectJob {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hyper ?
I guess it's actually a regular sample-like collection job

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think is hyper, because Full/Sample always use it

private static final Logger LOG = LogManager.getLogger(HyperStatisticsCollectJob.class);

private final List<Long> partitionIdList;

private final int batchRowsLimit;
private final List<String> sqlBuffer = Lists.newArrayList();
private final List<List<Expr>> rowsBuffer = Lists.newArrayList();

public HyperStatisticsCollectJob(Database db, Table table, List<Long> partitionIdList, List<String> columnNames,
List<Type> columnTypes, StatsConstants.AnalyzeType type,
StatsConstants.ScheduleType scheduleType, Map<String, String> properties) {
super(db, table, columnNames, columnTypes, type, scheduleType, properties);
this.partitionIdList = partitionIdList;
this.batchRowsLimit = (int) Math.max(1, Config.statistic_full_collect_buffer / 33 / 1024);
}

@Override
public void collect(ConnectContext context, AnalyzeStatus analyzeStatus) throws Exception {
if (table.isTemporaryTable()) {
context.setSessionId(((OlapTable) table).getSessionId());
}
context.getSessionVariable().setEnableAnalyzePhasePruneColumns(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to restore these variables? what if the connection is reused by other jobs ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think don't need, because ConnectContext of statistics is create by itself

context.getSessionVariable().setPipelineDop(context.getSessionVariable().getStatisticCollectParallelism());

int splitSize = Math.max(1, batchRowsLimit / columnNames.size());
List<HyperQueryJob> queryJobs;
if (type == StatsConstants.AnalyzeType.FULL) {
queryJobs = HyperQueryJob.createFullQueryJobs(context, db, table, columnNames, columnTypes,
partitionIdList, splitSize);
} else {
PartitionSampler sampler = PartitionSampler.create(table, partitionIdList, properties);
queryJobs = HyperQueryJob.createSampleQueryJobs(context, db, table, columnNames, columnTypes,
partitionIdList, splitSize, sampler);
}

long queryTotals = 0;
long queryFailures = 0;
long insertFailures = 0;

for (int i = 0; i < queryJobs.size(); i++) {
HyperQueryJob queryJob = queryJobs.get(i);
try {
queryJob.queryStatistics();
rowsBuffer.addAll(queryJob.getStatisticsData());
sqlBuffer.addAll(queryJob.getStatisticsValueSQL());

queryTotals += queryJob.getTotals();
queryFailures += queryJob.getFailures();
} catch (Exception e) {
LOG.warn("query statistics task failed in job: {}, {}", this, queryJob, e);
throw e;
}

if (queryFailures > Config.statistic_full_statistics_failure_tolerance_ratio * queryTotals) {
String message = String.format("query statistic job failed due to " +
"too many failed tasks: %d/%d, the last failure is %s",
queryFailures, queryTotals, queryJob.getLastFailure());
LOG.warn(message, queryJob.getLastFailure());
throw new RuntimeException(message, queryJob.getLastFailure());
}

try {
flushInsertStatisticsData(context);
} catch (Exception e) {
insertFailures++;
if (insertFailures > Config.statistic_full_statistics_failure_tolerance_ratio * queryJobs.size()) {
String message = String.format("insert statistic job failed due to " +
"too many failed tasks: %d/%d, the last failure is %s",
insertFailures, queryJobs.size(), e);
LOG.warn(message, queryJob.getLastFailure());
throw new RuntimeException(message, queryJob.getLastFailure());
} else {
LOG.warn("insert statistics task failed in job: {}, {}", this, queryJob, e);
}
} finally {
rowsBuffer.clear();
sqlBuffer.clear();
}
analyzeStatus.setProgress((i + 1) * 100L / queryJobs.size());
GlobalStateMgr.getCurrentState().getAnalyzeMgr().addAnalyzeStatus(analyzeStatus);
}
}

private void flushInsertStatisticsData(ConnectContext context) throws Exception {
if (rowsBuffer.isEmpty()) {
return;
}

int count = 0;
int maxRetryTimes = 5;
StatementBase insertStmt = createInsertStmt();
do {
LOG.debug("statistics insert sql size:" + rowsBuffer.size());
StmtExecutor executor = new StmtExecutor(context, insertStmt);

context.setExecutor(executor);
context.setQueryId(UUIDUtil.genUUID());
context.setStartTime();
executor.execute();

if (context.getState().getStateType() == QueryState.MysqlStateType.ERR) {
LOG.warn("Statistics insert fail | {} | Error Message [{}]", DebugUtil.printId(context.getQueryId()),
context.getState().getErrorMessage());
if (StringUtils.contains(context.getState().getErrorMessage(), "Too many versions")) {
Thread.sleep(Config.statistic_collect_too_many_version_sleep);
count++;
} else {
throw new DdlException(context.getState().getErrorMessage());
}
} else {
return;
}
} while (count < maxRetryTimes);

throw new DdlException(context.getState().getErrorMessage());
}

private StatementBase createInsertStmt() {
String sql = "INSERT INTO _statistics_.column_statistics values " + String.join(", ", sqlBuffer) + ";";
List<String> names = Lists.newArrayList("column_0", "column_1", "column_2", "column_3",
"column_4", "column_5", "column_6", "column_7", "column_8", "column_9",
"column_10", "column_11", "column_12");
QueryStatement qs = new QueryStatement(new ValuesRelation(rowsBuffer, names));
InsertStmt insert = new InsertStmt(new TableName("_statistics_", "column_statistics"), qs);
insert.setOrigStmt(new OriginStatement(sql, 0));
return insert;
}

@Override
public String toString() {
return "HyperStatisticsCollectJob{" + "type=" + type +
", scheduleType=" + scheduleType +
", db=" + db +
", table=" + table +
", partitionIdList=" + partitionIdList +
", columnNames=" + columnNames +
", properties=" + properties +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.starrocks.catalog.Type;
import com.starrocks.common.AuditLog;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.common.Status;
import com.starrocks.common.util.DebugUtil;
Expand Down Expand Up @@ -142,9 +143,8 @@ private static Table lookupTable(Long dbId, Long tableId) {
List<TStatisticData> columnStats = Lists.newArrayList();
if (CollectionUtils.isNotEmpty(columnWithFullStats)) {
List<String> columnNamesForStats = columnWithFullStats.stream().map(ColumnStatsMeta::getColumnName)
.collect(Collectors.toList());
List<Type> columnTypesForStats =
columnWithFullStats.stream()
.collect(Collectors.toList());
List<Type> columnTypesForStats = columnWithFullStats.stream()
.map(x -> StatisticUtils.getQueryStatisticsColumnType(table, x.getColumnName()))
.collect(Collectors.toList());

Expand All @@ -155,10 +155,20 @@ private static Table lookupTable(Long dbId, Long tableId) {
}
if (CollectionUtils.isNotEmpty(columnWithSampleStats)) {
List<String> columnNamesForStats = columnWithSampleStats.stream().map(ColumnStatsMeta::getColumnName)
.collect(Collectors.toList());
String statsSql = StatisticSQLBuilder.buildQuerySampleStatisticsSQL(dbId, tableId, columnNamesForStats);
List<TStatisticData> tStatisticData = executeStatisticDQL(context, statsSql);
columnStats.addAll(tStatisticData);
.collect(Collectors.toList());
if (Config.statistic_use_meta_statistics) {
List<Type> columnTypesForStats = columnWithSampleStats.stream()
.map(x -> StatisticUtils.getQueryStatisticsColumnType(table, x.getColumnName()))
.collect(Collectors.toList());
String statsSql = StatisticSQLBuilder.buildQueryFullStatisticsSQL(
dbId, tableId, columnNamesForStats, columnTypesForStats);
List<TStatisticData> tStatisticData = executeStatisticDQL(context, statsSql);
columnStats.addAll(tStatisticData);
} else {
String statsSql = StatisticSQLBuilder.buildQuerySampleStatisticsSQL(dbId, tableId, columnNamesForStats);
List<TStatisticData> tStatisticData = executeStatisticDQL(context, statsSql);
columnStats.addAll(tStatisticData);
}
}
return columnStats;
}
Expand Down Expand Up @@ -509,6 +519,9 @@ private List<TResultBatch> executeDQL(ConnectContext context, String sql) {
if (Config.enable_print_sql) {
LOG.info("Begin to execute sql, type: Statistics collect,query id:{}, sql:{}", context.getQueryId(), sql);
}
if (FeConstants.enableUnitStatistics) {
return Collections.emptyList();
}
StatementBase parsedStmt = SqlParser.parseOneWithStarRocksDialect(sql, context.getSessionVariable());
ExecPlan execPlan = StatementPlanner.plan(parsedStmt, context, TResultSinkType.STATISTIC);
StmtExecutor executor = new StmtExecutor(context, parsedStmt);
Expand Down
Loading
Loading