Skip to content

Commit

Permalink
[Feature] (MV For TimeSeries Scene: Part 3) Add AggregateTimeSeriesRu…
Browse files Browse the repository at this point in the history
…le rule to support time series split in mv rewrite (#51713)

Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing authored Oct 18, 2024
1 parent e259267 commit 2b3d80a
Show file tree
Hide file tree
Showing 25 changed files with 1,818 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@
import com.starrocks.sql.analyzer.RelationFields;
import com.starrocks.sql.analyzer.RelationId;
import com.starrocks.sql.analyzer.Scope;
import com.starrocks.sql.analyzer.SelectAnalyzer;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.common.PRangeCell;
import com.starrocks.sql.optimizer.CachingMvPlanContextBuilder;
import com.starrocks.sql.optimizer.MvRewritePreprocessor;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.rule.mv.MVUtils;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.sql.plan.ExecPlan;
Expand Down Expand Up @@ -973,6 +975,7 @@ public void onReload() {
if (desiredActive && reloadActive) {
setActive();
}
analyzePartitionExprs();
} catch (Throwable e) {
LOG.error("reload mv failed: {}", this, e);
setInactiveAndReason("reload failed: " + e.getMessage());
Expand Down Expand Up @@ -1805,6 +1808,53 @@ public void gsonPostProcess() throws IOException {
}
}

private void analyzePartitionExprs() {
try {
Map<Table, Expr> refBaseTablePartitionExprs = getRefBaseTablePartitionExprs();
ConnectContext connectContext = new ConnectContext();
if (refBaseTablePartitionExprs != null) {
for (BaseTableInfo baseTableInfo : baseTableInfos) {
Optional<Table> refBaseTableOpt = MvUtils.getTable(baseTableInfo);
if (refBaseTableOpt.isEmpty()) {
continue;
}
Table refBaseTable = refBaseTableOpt.get();
if (!refBaseTablePartitionExprs.containsKey(refBaseTable)) {
continue;
}
Expr partitionExpr = refBaseTablePartitionExprs.get(refBaseTable);
TableName tableName = new TableName(baseTableInfo.getCatalogName(),
baseTableInfo.getDbName(), baseTableInfo.getTableName());
analyzePartitionExpr(connectContext, refBaseTable, tableName, partitionExpr);
}
}
} catch (Exception e) {
LOG.warn("Analyze partition exprs failed", e);
}
}

private void analyzePartitionExpr(ConnectContext connectContext,
Table refBaseTable,
TableName tableName,
Expr partitionExpr) {
if (partitionExpr == null) {
return;
}
if (tableName == null) {
return;
}
SelectAnalyzer.SlotRefTableNameCleaner visitor = MVUtils.buildSlotRefTableNameCleaner(
connectContext, refBaseTable, tableName);
partitionExpr.accept(visitor, null);
ExpressionAnalyzer.analyzeExpression(partitionExpr, new AnalyzeState(),
new Scope(RelationId.anonymous(),
new RelationFields(refBaseTable.getBaseSchema().stream()
.map(col -> new Field(col.getName(), col.getType(),
tableName, null))
.collect(Collectors.toList()))), connectContext);

}

/**
* Parse partition expr from sql
* @param sql serialized partition expr sql
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String ENABLE_MATERIALIZED_VIEW_REWRITE_PARTITION_COMPENSATE =
"enable_materialized_view_rewrite_partition_compensate";
public static final String ENABLE_MATERIALIZED_VIEW_AGG_PUSHDOWN_REWRITE = "enable_materialized_view_agg_pushdown_rewrite";
public static final String ENABLE_MATERIALIZED_VIEW_TIMESERIES_AGG_PUSHDOWN_REWRITE =
"enable_materialized_view_timeseries_agg_pushdown_rewrite";

public static final String ENABLE_MATERIALIZED_VIEW_TEXT_MATCH_REWRITE =
"enable_materialized_view_text_match_rewrite";
Expand Down Expand Up @@ -1848,6 +1850,9 @@ public long getConnectorSinkTargetMaxFileSize() {
@VarAttr(name = ENABLE_MATERIALIZED_VIEW_AGG_PUSHDOWN_REWRITE)
private boolean enableMaterializedViewPushDownRewrite = false;

@VarAttr(name = ENABLE_MATERIALIZED_VIEW_TIMESERIES_AGG_PUSHDOWN_REWRITE)
private boolean enableMaterializedViewTimeSeriesPushDownRewrite = true;

@VarAttr(name = ENABLE_FORCE_RULE_BASED_MV_REWRITE)
private boolean enableForceRuleBasedMvRewrite = true;

Expand Down Expand Up @@ -3678,6 +3683,14 @@ public void setEnableMaterializedViewPushDownRewrite(boolean enableMaterializedV
this.enableMaterializedViewPushDownRewrite = enableMaterializedViewPushDownRewrite;
}

public boolean isEnableMaterializedViewTimeSeriesPushDownRewrite() {
return enableMaterializedViewTimeSeriesPushDownRewrite;
}

public void setEnableMaterializedViewTimeSeriesPushDownRewrite(boolean enableMaterializedViewTimeSeriesPushDownRewrite) {
this.enableMaterializedViewTimeSeriesPushDownRewrite = enableMaterializedViewTimeSeriesPushDownRewrite;
}

public boolean isEnableMaterializedViewViewDeltaRewrite() {
return enableMaterializedViewViewDeltaRewrite;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@

public class ReplaceShuttle extends BaseScalarOperatorShuttle {
private Map<ScalarOperator, ScalarOperator> replaceMap;
private final boolean isCheckIntersect;

public ReplaceShuttle(Map<ScalarOperator, ScalarOperator> replaceMap) {
this(replaceMap, true);
}

public ReplaceShuttle(Map<ScalarOperator, ScalarOperator> replaceMap, boolean isCheckIntersect) {
this.replaceMap = replaceMap;
this.isCheckIntersect = isCheckIntersect;
}

public ScalarOperator rewrite(ScalarOperator scalarOperator) {
ScalarOperator result = scalarOperator.accept(this, null);
// failed to replace the scalarOperator
if (scalarOperator.getUsedColumns().isIntersect(result.getUsedColumns())) {
if (isCheckIntersect && scalarOperator.getUsedColumns().isIntersect(result.getUsedColumns())) {
return null;
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
import com.starrocks.sql.optimizer.rule.transformation.materialization.rule.AggregateJoinPushDownRule;
import com.starrocks.sql.optimizer.rule.transformation.materialization.rule.AggregateJoinRule;
import com.starrocks.sql.optimizer.rule.transformation.materialization.rule.AggregateScanRule;
import com.starrocks.sql.optimizer.rule.transformation.materialization.rule.AggregateTimeSeriesRule;
import com.starrocks.sql.optimizer.rule.transformation.materialization.rule.OnlyJoinRule;
import com.starrocks.sql.optimizer.rule.transformation.materialization.rule.OnlyScanRule;
import com.starrocks.sql.optimizer.rule.transformation.pruner.CboTablePruneRule;
Expand Down Expand Up @@ -415,6 +416,7 @@ public class RuleSet {

REWRITE_RULES.put(RuleSetType.SINGLE_TABLE_MV_REWRITE, ImmutableList.of(
AggregateScanRule.getInstance(),
AggregateTimeSeriesRule.getInstance(),
OnlyScanRule.getInstance()
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public enum RuleType {
TF_MV_CBO_SINGLE_TABLE_REWRITE_RULE,
TF_MV_TRANSPARENT_REWRITE_RULE,
TF_MV_AGGREGATE_JOIN_PUSH_DOWN_RULE,
TF_MV_AGGREGATE_TIME_SERIES_SCAN_RULE,

TF_GROUP_BY_COUNT_DISTINCT_DATA_SKEW_ELIMINATE_RULE,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.List;
import java.util.Map;

import static com.starrocks.sql.optimizer.rule.transformation.materialization.common.AggregateFunctionRollupUtils.getRollupAggregateFunc;

/**
* `AggregateFunctionRewriter` will try to rewrite some agg functions to some transformations so can be
* better to be rewritten.
Expand Down Expand Up @@ -163,8 +165,7 @@ private CallOperator getRollupFunction(CallOperator aggFunc) {
if (rewritten == null || !(rewritten instanceof ColumnRefOperator)) {
return null;
}
return AggregatedMaterializedViewRewriter.getRollupAggregateFunc(aggFunc,
(ColumnRefOperator) rewritten, false);
return getRollupAggregateFunc(aggFunc, (ColumnRefOperator) rewritten, false);
}

public Map<ColumnRefOperator, CallOperator> getNewColumnRefToAggFuncMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public AggRewriteInfo visitLogicalTableScan(OptExpression optExpression, AggRewr
List<Table> queryTables = MvUtils.getAllTables(optExpression);

final List<Table> mvTables = MvUtils.getAllTables(materializationContext.getMvExpression());
MatchMode matchMode = MaterializedViewRewriter.getMatchMode(queryTables, mvTables);
MatchMode matchMode = getMatchMode(queryTables, mvTables);
if (matchMode == MatchMode.NOT_MATCH && mvTables.stream().noneMatch(queryTables::contains)) {
return AggRewriteInfo.NOT_REWRITE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.catalog.Function;
import com.starrocks.catalog.FunctionSet;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.RandomDistributionInfo;
import com.starrocks.catalog.Type;
import com.starrocks.catalog.combinator.AggStateUnionCombinator;
import com.starrocks.common.Pair;
import com.starrocks.sql.optimizer.MvRewriteContext;
import com.starrocks.sql.optimizer.OptExpression;
Expand Down Expand Up @@ -55,12 +52,10 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.sql.optimizer.operator.scalar.ScalarOperatorUtil.findArithmeticFunction;
import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils.addExtraPredicate;
import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils.deriveLogicalProperty;
import static com.starrocks.sql.optimizer.rule.transformation.materialization.common.AggregateFunctionRollupUtils.TO_REWRITE_ROLLUP_FUNCTION_MAP;
import static com.starrocks.sql.optimizer.rule.transformation.materialization.common.AggregateFunctionRollupUtils.genRollupProject;
import static com.starrocks.sql.optimizer.rule.transformation.materialization.common.AggregateFunctionRollupUtils.getRollupFunctionName;
import static com.starrocks.sql.optimizer.rule.transformation.materialization.common.AggregateFunctionRollupUtils.getRollupAggregateFunc;

/**
* SPJG materialized view rewriter, based on
Expand Down Expand Up @@ -814,43 +809,4 @@ private Map<ColumnRefOperator, CallOperator> rewriteAggregatesForUnion(

return rewrittens;
}

/**
* Return rollup aggregate of the input agg function.
* NOTE: this is only targeted for aggregate functions which are supported by function rollup not equivalent class rewrite.
* eg: count(col) -> sum(col)
*/
public static CallOperator getRollupAggregateFunc(CallOperator aggCall,
ColumnRefOperator targetColumn,
boolean isUnionRewrite) {
if (aggCall.getFunction() instanceof AggStateUnionCombinator) {
Type[] argTypes = {targetColumn.getType()};
String rollupFuncName = aggCall.getFnName();
Function rollupFn = findArithmeticFunction(argTypes, rollupFuncName);
return new CallOperator(rollupFuncName, aggCall.getFunction().getReturnType(),
Lists.newArrayList(targetColumn), rollupFn);
}

String rollupFuncName = getRollupFunctionName(aggCall, isUnionRewrite);
if (rollupFuncName == null) {
return null;
}

String aggFuncName = aggCall.getFnName();
if (TO_REWRITE_ROLLUP_FUNCTION_MAP.containsKey(aggFuncName)) {
Type[] argTypes = {targetColumn.getType()};
Function rollupFn = findArithmeticFunction(argTypes, rollupFuncName);
return new CallOperator(rollupFuncName, aggCall.getFunction().getReturnType(),
Lists.newArrayList(targetColumn), rollupFn);
} else {
// NOTE:
// 1. Change fn's type as 1th child has change, otherwise physical plan
// will still use old arg input's type.
// 2. the rollup function is the same as origin, but use the new column as argument
Function newFunc = aggCall.getFunction()
.updateArgType(new Type[] {targetColumn.getType()});
return new CallOperator(aggCall.getFnName(), aggCall.getType(), Lists.newArrayList(targetColumn),
newFunc);
}
}
}
Loading

0 comments on commit 2b3d80a

Please sign in to comment.