From 607e0b06795a40ef9461a1b930a75a290fbb7e8e Mon Sep 17 00:00:00 2001 From: "shuming.li" Date: Tue, 16 Jul 2024 19:27:30 +0800 Subject: [PATCH 1/2] [Enhancement] (Multi Ref Base Table Part2) MVPCTRefreshPlanBuilder supports multi ref base tables in mv refresh (#48284) Signed-off-by: shuming.li (cherry picked from commit e22a42af917c3f2aebafff99e47ff72e1e4aa13c) --- .../starrocks/catalog/MaterializedView.java | 15 - .../com/starrocks/common/profile/Tracers.java | 17 + .../starrocks/scheduler/MvTaskRunContext.java | 27 -- .../PartitionBasedMvRefreshProcessor.java | 51 +-- .../mv/MVPCTRefreshListPartitioner.java | 15 +- .../scheduler/mv/MVPCTRefreshPlanBuilder.java | 346 +++++++++++++----- .../mv/MVPCTRefreshRangePartitioner.java | 10 +- .../sql/common/QueryDebugOptions.java | 16 + .../OptExpressionDuplicator.java | 18 +- .../PCTRefreshListPartitionOlapTest.java | 22 +- ...titionBasedMvRefreshProcessorHiveTest.java | 114 +++--- ...nBasedMvRefreshProcessorOlapPart2Test.java | 5 - ...titionBasedMvRefreshProcessorOlapTest.java | 2 +- .../materialization/MvRewriteTest.java | 2 - .../R/test_mv_refresh_with_multi_union2 | 47 ++- .../R/test_mv_refresh_with_multi_union4 | 272 ++++++++++++++ .../R/test_mv_refresh_with_time_slice | 10 + .../T/test_mv_refresh_with_multi_union4 | 143 ++++++++ .../T/test_mv_refresh_with_time_slice | 25 ++ .../R/test_transparent_mv_hive | 18 +- .../R/test_transparent_mv_iceberg_part2 | 16 +- test/test_sql_cases.py | 14 + 22 files changed, 916 insertions(+), 289 deletions(-) create mode 100644 test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_multi_union4 create mode 100644 test/sql/test_materialized_view_refresh/T/test_mv_refresh_with_multi_union4 diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java b/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java index 43660d937a321..860463c90f6bd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java @@ -637,21 +637,6 @@ public void setQueryOutputIndices(List queryOutputIndices) { this.queryOutputIndices = queryOutputIndices; } - /** - * @param materializedView : materialized view to check - * @return : return the column slot ref which materialized view's partition column comes from. - *

- * NOTE: Only support one column for Materialized View's partition column for now. - */ - public static SlotRef getRefBaseTablePartitionSlotRef(MaterializedView materializedView) { - List slotRefs = Lists.newArrayList(); - Expr partitionExpr = materializedView.getPartitionExpr(); - partitionExpr.collect(SlotRef.class, slotRefs); - // if partitionExpr is FunctionCallExpr, get first SlotRef - Preconditions.checkState(slotRefs.size() == 1); - return slotRefs.get(0); - } - /** * Return the partition column of the materialized view. * NOTE: Only one column is supported for now, support more columns in the future. diff --git a/fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java b/fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java index d76aa49a7dcf4..04433096a874e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java @@ -254,4 +254,21 @@ public static void toRuntimeProfile(RuntimeProfile profile) { Tracers tracers = THREAD_LOCAL.get(); tracers.allTracer[1].toRuntimeProfile(profile); } + + public static String getTrace(Mode mode) { + switch (mode) { + case TIMER: + return Tracers.printScopeTimer(); + case VARS: + return Tracers.printVars(); + case TIMING: + return Tracers.printTiming(); + case LOGS: + return Tracers.printLogs(); + case REASON: + return Tracers.printReasons(); + default: + return ""; + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/MvTaskRunContext.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/MvTaskRunContext.java index fed51082bb002..7bfbfa01d9258 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/MvTaskRunContext.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/MvTaskRunContext.java @@ -14,9 +14,7 @@ package com.starrocks.scheduler; -import com.google.common.base.Preconditions; import com.google.common.collect.Range; -import com.starrocks.catalog.Column; import com.starrocks.catalog.PartitionKey; import com.starrocks.catalog.Table; import com.starrocks.catalog.TableProperty; @@ -45,13 +43,6 @@ public class MvTaskRunContext extends TaskRunContext { // multi original partition names. private Map>> externalRefBaseTableMVPartitionMap; - // The Table which materialized view' partition column comes from is called `RefBaseTable`: - // - Materialized View's to-refresh partitions is synced from its `refBaseTable`. - private Table refBaseTable; - // The `RefBaseTable`'s partition column which materialized view's partition column derives from - // is called `refBaseTablePartitionColumn`. - private Column refBaseTablePartitionColumn; - private String nextPartitionStart = null; private String nextPartitionEnd = null; private ExecPlan execPlan = null; @@ -153,22 +144,4 @@ public int getPartitionTTLNumber() { public void setPartitionTTLNumber(int partitionTTLNumber) { this.partitionTTLNumber = partitionTTLNumber; } - - public Table getRefBaseTable() { - return refBaseTable; - } - - public void setRefBaseTable(Table refBaseTable) { - Preconditions.checkNotNull(refBaseTable); - this.refBaseTable = refBaseTable; - } - - public Column getRefBaseTablePartitionColumn() { - return refBaseTablePartitionColumn; - } - - public void setRefBaseTablePartitionColumn(Column refBaseTablePartitionColumn) { - Preconditions.checkNotNull(refBaseTablePartitionColumn); - this.refBaseTablePartitionColumn = refBaseTablePartitionColumn; - } } diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java index 3c6866e8192e6..62c9cee77082a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java @@ -23,7 +23,6 @@ import com.google.common.collect.Range; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; -import com.starrocks.analysis.SlotRef; import com.starrocks.catalog.BaseTableInfo; import com.starrocks.catalog.Column; import com.starrocks.catalog.Database; @@ -183,8 +182,11 @@ public TaskRun getNextTaskRun() { public void processTaskRun(TaskRunContext context) { // register tracers Tracers.register(context.getCtx()); + QueryDebugOptions queryDebugOptions = context.getCtx().getSessionVariable().getQueryDebugOptions(); // init to collect the base timer for refresh profile - Tracers.init(Tracers.Mode.TIMER, Tracers.Module.BASE, true, false); + Tracers.Mode mvRefreshTraceMode = queryDebugOptions.getMvRefreshTraceMode(); + Tracers.Module mvRefreshTraceModule = queryDebugOptions.getMvRefreshTraceModule(); + Tracers.init(mvRefreshTraceMode, mvRefreshTraceModule, true, false); IMaterializedViewMetricsEntity mvEntity = null; ConnectContext connectContext = context.getCtx(); @@ -221,7 +223,7 @@ public void processTaskRun(TaskRunContext context) { runtimeProfile = new RuntimeProfile(); Tracers.toRuntimeProfile(runtimeProfile); } - + LOG.info("Refresh mv {} trace logs: {}", materializedView.getName(), Tracers.getTrace(mvRefreshTraceMode)); Tracers.close(); postProcess(); } @@ -1043,21 +1045,6 @@ private boolean syncPartitions() throws AnalysisException { // collect base table snapshot infos snapshotBaseTables = collectBaseTableSnapshotInfos(materializedView); - // prepare ref base table and partition column - // NOTE: This should be after @refreshExternalTable since the base table may be changed and repaired again. - PartitionInfo partitionInfo = materializedView.getPartitionInfo(); - - if (!partitionInfo.isUnPartitioned()) { - Pair partitionTableAndColumn = getRefBaseTableAndPartitionColumn(snapshotBaseTables); - Table refBaseTable = partitionTableAndColumn.first; - if (!snapshotBaseTables.containsKey(refBaseTable.getId())) { - throw new DmlException(String.format("The ref base table %s of materialized view %s is not existed in snapshot.", - refBaseTable.getName(), materializedView.getName())); - } - mvContext.setRefBaseTable(snapshotBaseTables.get(refBaseTable.getId()).getBaseTable()); - mvContext.setRefBaseTablePartitionColumn(partitionTableAndColumn.second); - } - // do sync partitions (add or drop partitions) for materialized view boolean result = mvRefreshPartitioner.syncAddOrDropPartitions(); LOG.info("finish sync partitions. mv:{}, cost(ms): {}", materializedView.getName(), @@ -1065,26 +1052,6 @@ private boolean syncPartitions() throws AnalysisException { return result; } - /** - * TODO: merge it with {@code MaterializedView#getDirectTableAndPartitionColumn} but `getDirectTableAndPartitionColumn` - * only use the existed base table info but {@code getRefBaseTableAndPartitionColumn} only uses the table name to keep - * mv's partition and ref base bases' relation. - *

- * return the ref base table and column that materialized view's partition column - * derives from if it exists, otherwise return null. - */ - private Pair getRefBaseTableAndPartitionColumn(Map tableSnapshotInfos) { - SlotRef slotRef = MaterializedView.getRefBaseTablePartitionSlotRef(materializedView); - for (TableSnapshotInfo snapshotInfo : tableSnapshotInfos.values()) { - BaseTableInfo baseTableInfo = snapshotInfo.getBaseTableInfo(); - if (slotRef.getTblNameWithoutAnalyzed().getTbl().equals(baseTableInfo.getTableName())) { - Table table = snapshotInfo.getBaseTable(); - return Pair.create(table, table.getColumn(slotRef.getColumnName())); - } - } - return Pair.create(null, null); - } - /** * If it's tentative, don't really consider the staleness of MV, but all potential partitions */ @@ -1231,14 +1198,12 @@ private boolean checkBaseTablePartitionHasChanged(TableSnapshotInfo snapshotInfo if (!(mvPartitionInfo.isRangePartition())) { return false; } - Pair partitionTableAndColumn = materializedView.getRefBaseTablePartitionColumn(); - Column partitionColumn = partitionTableAndColumn.second; - // TODO: need to consider(non ref-base table's change) + Map partitionTableAndColumn = materializedView.getRefBaseTablePartitionColumns(); // For Non-partition based base table, it's not necessary to check the partition changed. - if (!snapshotTable.equals(partitionTableAndColumn.first) - || !snapshotTable.containColumn(partitionColumn.getName())) { + if (!partitionTableAndColumn.containsKey(snapshotTable)) { return false; } + Column partitionColumn = partitionTableAndColumn.get(snapshotTable); Map> snapshotPartitionMap = PartitionUtil.getPartitionKeyRange( snapshotTable, partitionColumn, MaterializedView.getPartitionExpr(materializedView)); Map> currentPartitionMap = PartitionUtil.getPartitionKeyRange( diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshListPartitioner.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshListPartitioner.java index c5592891629f8..7aaf1a000ec7c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshListPartitioner.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshListPartitioner.java @@ -17,6 +17,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.starrocks.analysis.BoolLiteral; import com.starrocks.analysis.Expr; import com.starrocks.analysis.IsNullPredicate; import com.starrocks.analysis.LiteralExpr; @@ -131,25 +132,31 @@ public boolean syncAddOrDropPartitions() { } @Override - public Expr generatePartitionPredicate(Table table, Set refBaseTablePartitionNames, + public Expr generatePartitionPredicate(Table refBaseTable, Set refBaseTablePartitionNames, Expr mvPartitionSlotRef) throws AnalysisException { Map> basePartitionMaps = mvContext.getRefBaseTableListPartitionMap(); if (basePartitionMaps.isEmpty()) { return null; } - Table refBaseTable = mvContext.getRefBaseTable(); Map baseListPartitionMap = basePartitionMaps.get(refBaseTable); - if (baseListPartitionMap == null || baseListPartitionMap.isEmpty()) { + if (baseListPartitionMap == null) { + LOG.warn("Generate incremental partition predicate failed, " + + "basePartitionMaps:{} contains no refBaseTable:{}", basePartitionMaps, refBaseTable); return null; } + if (baseListPartitionMap.isEmpty()) { + return new BoolLiteral(true); + } List sourceTablePartitionList = Lists.newArrayList(); List partitionCols = refBaseTable.getPartitionColumns(); Map partitionTableAndColumn = mv.getRefBaseTablePartitionColumns(); if (partitionTableAndColumn == null || !partitionTableAndColumn.containsKey(refBaseTable)) { + LOG.warn("Generate incremental partition failed, partitionTableAndColumn {} contains no ref table {}", + partitionTableAndColumn, refBaseTable); return null; } - Column refPartitionColumn = partitionTableAndColumn.get(table); + Column refPartitionColumn = partitionTableAndColumn.get(refBaseTable); int refIndex = ListPartitionDiffer.getRefBaseTableIdx(refBaseTable, refPartitionColumn); Type partitionType = partitionCols.get(refIndex).getType(); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshPlanBuilder.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshPlanBuilder.java index 7592b22e5a76c..d9e250c2d49ee 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshPlanBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshPlanBuilder.java @@ -14,28 +14,37 @@ package com.starrocks.scheduler.mv; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; +import com.starrocks.analysis.BoolLiteral; import com.starrocks.analysis.Expr; import com.starrocks.analysis.FunctionCallExpr; import com.starrocks.analysis.SlotRef; import com.starrocks.catalog.Column; +import com.starrocks.catalog.ExpressionRangePartitionInfo; import com.starrocks.catalog.FunctionSet; +import com.starrocks.catalog.ListPartitionInfo; import com.starrocks.catalog.MaterializedView; +import com.starrocks.catalog.PartitionInfo; import com.starrocks.catalog.Table; import com.starrocks.common.AnalysisException; -import com.starrocks.common.Pair; import com.starrocks.qe.ConnectContext; import com.starrocks.scheduler.MvTaskRunContext; import com.starrocks.sql.analyzer.Analyzer; import com.starrocks.sql.analyzer.AnalyzerUtils; +import com.starrocks.sql.analyzer.QueryAnalyzer; import com.starrocks.sql.analyzer.Scope; import com.starrocks.sql.ast.InsertStmt; import com.starrocks.sql.ast.PartitionNames; import com.starrocks.sql.ast.QueryRelation; import com.starrocks.sql.ast.QueryStatement; +import com.starrocks.sql.ast.SelectList; +import com.starrocks.sql.ast.SelectListItem; import com.starrocks.sql.ast.SelectRelation; import com.starrocks.sql.ast.TableRelation; +import org.apache.commons.collections4.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -44,12 +53,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class MVPCTRefreshPlanBuilder { private static final Logger LOG = LogManager.getLogger(MVPCTRefreshPlanBuilder.class); private final MaterializedView mv; private final MvTaskRunContext mvContext; private final MVPCTRefreshPartitioner mvRefreshPartitioner; + private final boolean isEnableInsertStrict; public MVPCTRefreshPlanBuilder(MaterializedView mv, MvTaskRunContext mvContext, @@ -57,15 +68,22 @@ public MVPCTRefreshPlanBuilder(MaterializedView mv, this.mv = mv; this.mvContext = mvContext; this.mvRefreshPartitioner = mvRefreshPartitioner; + this.isEnableInsertStrict = mvContext.getCtx().getSessionVariable().getEnableInsertStrict(); } public InsertStmt analyzeAndBuildInsertPlan(InsertStmt insertStmt, Map> refTableRefreshPartitions, ConnectContext ctx) throws AnalysisException { - // analyze the insert stmt Analyzer.analyze(insertStmt, ctx); + InsertStmt newInsertStmt = buildInsertPlan(insertStmt, refTableRefreshPartitions); + return newInsertStmt; + } + + private InsertStmt buildInsertPlan(InsertStmt insertStmt, + Map> refTableRefreshPartitions) throws AnalysisException { // if the refTableRefreshPartitions is empty(not partitioned mv), no need to generate partition predicate if (refTableRefreshPartitions.isEmpty()) { + LOG.info("There is no ref table partitions to refresh, skip to generate partition predicates"); return insertStmt; } @@ -78,131 +96,295 @@ public InsertStmt analyzeAndBuildInsertPlan(InsertStmt insertStmt, QueryRelation queryRelation = queryStatement.getQueryRelation(); List extraPartitionPredicates = Lists.newArrayList(); Multimap tableRelations = AnalyzerUtils.collectAllTableRelation(queryStatement); - for (String tblName : tableRelations.keys()) { + Map mvRefBaseTablePartitionSlotRefs = mv.getRefBaseTablePartitionSlots(); + if (CollectionUtils.sizeIsEmpty(mvRefBaseTablePartitionSlotRefs)) { + throw new AnalysisException(String.format("MV refresh cannot generate partition predicates " + + "because of mv %s contains no ref base table's partitions", mv.getName())); + } + + Set uniqueTableNames = tableRelations.keySet().stream().collect(Collectors.toSet()); + int numOfPushDownIntoTables = 0; + boolean hasGenerateNonPushDownPredicates = false; + for (String tblName : uniqueTableNames) { // skip to generate partition predicate for non-ref base tables - if (!refTableRefreshPartitions.containsKey(tblName) || !tableRelations.containsKey(tblName)) { + if (!refTableRefreshPartitions.containsKey(tblName)) { + LOG.warn("Skip to generate partition predicate to refresh because it's not ref " + + "base table, table: {}, mv:{}, refTableRefreshPartitions:{}", tblName, mv.getName(), + refTableRefreshPartitions); continue; } // set partition names for ref base table Set tablePartitionNames = refTableRefreshPartitions.get(tblName); Collection relations = tableRelations.get(tblName); TableRelation tableRelation = relations.iterator().next(); - - // if there are multiple table relations, don't push down partition predicate into table relation - boolean isPushDownBelowTable = (relations.size() == 1); Table table = tableRelation.getTable(); if (table == null) { - LOG.warn("Optimize materialized view {} refresh task, generate table relation {} failed: " + - "table is null", mv.getName(), tableRelation.getName()); - continue; - } - // external table doesn't support query with partitionNames - if (isPushDownBelowTable && !table.isExternalTableWithFileSystem()) { - LOG.info("Optimize materialized view {} refresh task, generate table relation {} target partition names:{} ", - mv.getName(), tableRelation.getName(), Joiner.on(",").join(tablePartitionNames)); - tableRelation.setPartitionNames( - new PartitionNames(false, new ArrayList<>(tablePartitionNames))); + throw new AnalysisException(String.format("Optimize materialized view %s refresh task, generate table relation " + + "%s failed: table is null", mv.getName(), tableRelation.getName())); } - - Pair refBaseTableAndCol = mv.getRefBaseTablePartitionColumn(); - if (refBaseTableAndCol == null || !refBaseTableAndCol.first.equals(table)) { + // skip it table is not ref base table. + if (!mvRefBaseTablePartitionSlotRefs.containsKey(table)) { + LOG.warn("Skip to generate partition predicate because it's mv direct ref base table:{}, mv:{}, " + + "refBaseTableAndCol: {}", table, mv.getName(), mvRefBaseTablePartitionSlotRefs); continue; } - // generate partition predicate for the select relation, so can generate partition predicates - // for non-ref base tables. - // eg: - // mv: create mv mv1 partition by t1.dt - // as select * from t1 join t2 on t1.dt = t2.dt. - // ref-base-table : t1.dt - // non-ref-base-table : t2.dt - // so add partition predicates for select relation when refresh partitions incrementally(eg: dt=20230810): - // (select * from t1 join t2 on t1.dt = t2.dt) where t1.dt=20230810 - Expr partitionPredicate = generatePartitionPredicate(table, tablePartitionNames, queryStatement); - if (partitionPredicate == null) { - continue; + SlotRef refTablePartitionSlotRef = mvRefBaseTablePartitionSlotRefs.get(table); + if (refTablePartitionSlotRef == null) { + throw new AnalysisException(String.format("Generate partition predicate failed: " + + "cannot find partition slot ref %s from query relation")); } - // try to push down into table relation - List slots = Lists.newArrayList(); - partitionPredicate.collect(SlotRef.class, slots); - Scope tableRelationScope = tableRelation.getScope(); - if (isPushDownBelowTable && canResolveSlotsInTheScope(slots, tableRelationScope)) { - LOG.info("Optimize materialized view {} refresh task, generate table relation {} " + - "partition predicate:{} ", - mv.getName(), tableRelation.getName(), partitionPredicate.toSql()); - tableRelation.setPartitionPredicate(partitionPredicate); + + // If there are multiple table relations, don't push down partition predicate into table relation + // If `enable_mv_refresh_query_rewrite` is enabled, table relation should not set partition names + // since it will deduce `hasTableHints` to true and causes rewrite failed. + boolean isPushDownBelowTable = (relations.size() == 1); + if (isPushDownBelowTable) { + boolean ret = pushDownPartitionPredicates(table, tableRelation, refTablePartitionSlotRef, + tablePartitionNames); + if (ret) { + numOfPushDownIntoTables += 1; + } else { + LOG.warn("Generate push down partition predicate failed, table:{}", table); + } + } else { + LOG.warn("Ref base table contains self join, cannot push down partition predicates, table:{}", + table.getName()); + // For non-push-down predicates, it only needs to be generated only once since we can only use mv's partition + // info ref column to generate incremental partition predicates. + // eg: + // mv: create mv xx as select dt as dt1, a from t1 join t2 on t1.dt = t2.dt; + // non-push-down predicate: where dt1 in ('2024-07-15') + if (hasGenerateNonPushDownPredicates) { + continue; + } + // Use the mv's partition info ref column to generate incremental partition predicates rather than ref base + // table's slot ref since ref base table's partition column may be aliased in the query relation. + String mvPartitionInfoRefColName = getMVPartitionInfoRefColumnName(); + // if it hasn't pushed down into table, add it into the query relation's predicate + Expr mvPartitionOutputExpr = getPartitionOutputExpr(queryStatement, mvPartitionInfoRefColName); + if (mvPartitionOutputExpr == null) { + LOG.warn("Fail to generate partition predicates for self-join table because output expr is null, " + + "table: {}, refTablePartitionSlotRef:{}", table.getName(), refTablePartitionSlotRef); + continue; + } + Expr partitionPredicate = generatePartitionPredicate(table, tablePartitionNames, mvPartitionOutputExpr); + if (partitionPredicate == null) { + LOG.warn("Fail to generate partition predicates for self-join table, " + + "table: {}, refTablePartitionSlotRef:{}", table.getName(), refTablePartitionSlotRef); + continue; + } + hasGenerateNonPushDownPredicates = true; + extraPartitionPredicates.add(partitionPredicate); } - extraPartitionPredicates.add(partitionPredicate); } if (extraPartitionPredicates.isEmpty()) { + doIfNoPushDownPredicates(numOfPushDownIntoTables, refTableRefreshPartitions); + LOG.info("Generate partition extra predicates empty, mv:{}, numOfPushDownIntoTables:{}", + mv.getName(), numOfPushDownIntoTables); return insertStmt; } - if (queryRelation instanceof SelectRelation) { SelectRelation selectRelation = (SelectRelation) queryRelation; extraPartitionPredicates.add(selectRelation.getWhereClause()); Expr finalPredicate = Expr.compoundAnd(extraPartitionPredicates); selectRelation.setWhereClause(finalPredicate); LOG.info("Optimize materialized view {} refresh task, generate insert stmt final " + - "predicate(select relation):{} ", mv.getName(), finalPredicate.toSql()); + "predicate(select relation):{} ", mv.getName(), finalPredicate.toSql()); + } else { + // support to generate partition predicate for other query relation types + LOG.warn("MV Refresh cannot push down partition predicate since " + + "the query relation is not select relation, mv:{}", mv.getName()); + List items = queryRelation.getOutputExpression().stream() + .map(x -> new SelectListItem(x, null)).collect(Collectors.toList()); + SelectList selectList = new SelectList(items, false); + SelectRelation selectRelation = new SelectRelation(selectList, queryRelation, + Expr.compoundAnd(extraPartitionPredicates), null, null); + selectRelation.setWhereClause(Expr.compoundAnd(extraPartitionPredicates)); + QueryStatement newQueryStatement = new QueryStatement(selectRelation); + insertStmt.setQueryStatement(newQueryStatement); + new QueryAnalyzer(mvContext.getCtx()).analyze(newQueryStatement); } return insertStmt; } - /** - * Check whether to push down predicate expr with the slot refs into the scope. - * - * @param slots : slot refs that are contained in the predicate expr - * @param scope : scope that try to push down into. - * @return - */ - private boolean canResolveSlotsInTheScope(List slots, Scope scope) { - return slots.stream().allMatch(s -> scope.tryResolveField(s).isPresent()); + private boolean pushDownPartitionPredicates(Table table, + TableRelation tableRelation, + SlotRef refBaseTablePartitionSlot, + Set tablePartitionNames) throws AnalysisException { + if (pushDownByPartitionNames(table, tableRelation, tablePartitionNames)) { + return true; + } + if (pushDownByPredicate(table, tableRelation, refBaseTablePartitionSlot, tablePartitionNames)) { + return true; + } + return false; + } + + private boolean pushDownByPartitionNames(Table table, + TableRelation tableRelation, + Set tablePartitionNames) { + if (table.isExternalTableWithFileSystem()) { + return false; + } + // external table doesn't support query with partitionNames + LOG.info("Optimize materialized view {} refresh task, push down partition names into table " + + "relation {}, filtered partition names:{} ", + mv.getName(), tableRelation.getName(), Joiner.on(",").join(tablePartitionNames)); + tableRelation.setPartitionNames( + new PartitionNames(false, new ArrayList<>(tablePartitionNames))); + return true; + } + + private boolean pushDownByPredicate(Table table, + TableRelation tableRelation, + SlotRef refBaseTablePartitionSlot, + Set tablePartitionNames) throws AnalysisException { + // generate partition predicate for the select relation, so can generate partition predicates + // for non-ref base tables. + // eg: + // mv: create mv mv1 partition by t1.dt + // as select * from t1 join t2 on t1.dt = t2.dt. + // ref-base-table : t1.dt + // non-ref-base-table : t2.dt + // so add partition predicates for select relation when refresh partitions incrementally(eg: dt=20230810): + // (select * from t1 join t2 on t1.dt = t2.dt) where t1.dt=20230810 + SlotRef cloned = (SlotRef) refBaseTablePartitionSlot.clone(); + cloned.setTblName(null); + Expr partitionPredicate = generatePartitionPredicate(table, + tablePartitionNames, cloned); + if (partitionPredicate == null) { + LOG.warn("Generate partition predicate failed, table:{}, tablePartitionNames:{}, outputMRefVPartitionExpr:{}", + table, tablePartitionNames, cloned); + return false; + } + // try to push down into table relation + final List slots = ImmutableList.of(cloned); + Scope tableRelationScope = tableRelation.getScope(); + if (!canResolveSlotsInTheScope(slots, tableRelationScope)) { + throw new AnalysisException(String.format("Cannot generate partition predicate " + + "because cannot find partition slot ref in ref table's scope, refBaseTable:%s, " + + "refBaseTablePartitionSlot:%s, tablePartitionNames:%s", + table, cloned, tablePartitionNames)); + } + LOG.info("Optimize materialized view {} refresh task, push down partition predicate into table " + + "relation {}, partition predicate:{} ", + mv.getName(), tableRelation.getName(), partitionPredicate.toSql()); + tableRelation.setPartitionPredicate(partitionPredicate); + return true; } /** - * Generate partition predicates to refresh the materialized view so can be refreshed by the incremental partitions. - * - * @param tablePartitionNames : the need pruned partition tables of the ref base table - * @param queryStatement : the materialized view's defined query statement - * @return - * @throws AnalysisException + * This is only used to self-joins table for now and to be compatible with before. */ - private Expr generatePartitionPredicate(Table table, Set tablePartitionNames, - QueryStatement queryStatement) - throws AnalysisException { - SlotRef partitionSlot = MaterializedView.getRefBaseTablePartitionSlotRef(mv); + @Deprecated + private Expr getPartitionOutputExpr(QueryStatement queryStatement, String mvPartitionInfoRefColName) { + if (mvPartitionInfoRefColName == null) { + LOG.warn("Generate partition predicate failed: " + + "mv partition info ref column is null, mv:{}", mv.getName()); + return null; + } + Expr outputPartitionSlot = findPartitionOutputExpr(queryStatement, mvPartitionInfoRefColName); + if (outputPartitionSlot == null) { + LOG.warn("Generate partition predicate failed: " + + "cannot find partition slot ref {} from query relation", mvPartitionInfoRefColName); + return null; + } + return outputPartitionSlot; + } + + private Expr findPartitionOutputExpr(QueryStatement queryStatement, String mvPartitionInfoRefColName) { List columnOutputNames = queryStatement.getQueryRelation().getColumnOutputNames(); List outputExpressions = queryStatement.getQueryRelation().getOutputExpression(); - Expr outputPartitionSlot = null; for (int i = 0; i < outputExpressions.size(); ++i) { - if (columnOutputNames.get(i).equalsIgnoreCase(partitionSlot.getColumnName())) { - outputPartitionSlot = outputExpressions.get(i); - break; - } else if (outputExpressions.get(i) instanceof FunctionCallExpr) { - FunctionCallExpr functionCallExpr = (FunctionCallExpr) outputExpressions.get(i); + Expr expr = outputExpressions.get(i); + if (columnOutputNames.get(i).equalsIgnoreCase(mvPartitionInfoRefColName)) { + return expr; + } else if (expr instanceof FunctionCallExpr) { + FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr; if (functionCallExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.STR2DATE) && functionCallExpr.getChild(0) instanceof SlotRef) { SlotRef slot = functionCallExpr.getChild(0).cast(); - if (slot.getColumnName().equalsIgnoreCase(partitionSlot.getColumnName())) { - outputPartitionSlot = slot; - break; + if (slot.getColumnName().equalsIgnoreCase(mvPartitionInfoRefColName)) { + return slot; } } } else { // alias name. - SlotRef slotRef = outputExpressions.get(i).unwrapSlotRef(); - if (slotRef != null && slotRef.getColumnName().equals(partitionSlot.getColumnName())) { - outputPartitionSlot = outputExpressions.get(i); - break; + SlotRef slotRef = expr.unwrapSlotRef(); + if (slotRef != null && slotRef.getColumnName().equals(mvPartitionInfoRefColName)) { + return outputExpressions.get(i); } } } + return null; + } - if (outputPartitionSlot == null) { - LOG.warn("Generate partition predicate failed: " + - "cannot find partition slot ref {} from query relation", partitionSlot); - return null; + /** + * Get the partition column name of the mv's partition info. + * eg: + * table1: partition by dt + * mv: create mv as select dt as dt1, key1 from table1; + * then mv partition info ref column name is dt1 rather than dt. + */ + private String getMVPartitionInfoRefColumnName() { + PartitionInfo partitionInfo = mv.getPartitionInfo(); + if (partitionInfo.isExprRangePartitioned()) { + ExpressionRangePartitionInfo expressionRangePartitionInfo = (ExpressionRangePartitionInfo) partitionInfo; + List exprs = expressionRangePartitionInfo.getPartitionExprs(mv.getIdToColumn()); + Preconditions.checkState(exprs.size() == 1); + List slotRefs = Lists.newArrayList(); + exprs.get(0).collect(SlotRef.class, slotRefs); + // if partitionExpr is FunctionCallExpr, get first SlotRef + Preconditions.checkState(slotRefs.size() == 1); + return slotRefs.get(0).getColumnName(); + } else if (partitionInfo.isListPartition()) { + ListPartitionInfo listPartitionInfo = (ListPartitionInfo) partitionInfo; + List partitionColumns = listPartitionInfo.getPartitionColumns(mv.getIdToColumn()); + Preconditions.checkState(partitionColumns.size() == 1); + return partitionColumns.get(0).getName(); + } + return null; + } + + /** + * Generate partition predicates to refresh the materialized view so can be refreshed by the incremental partitions. + * + * @param tablePartitionNames : the need pruned partition tables of the ref base table + * @return + * @throws AnalysisException + */ + private Expr generatePartitionPredicate(Table table, Set tablePartitionNames, + Expr mvPartitionOutputExpr) + throws AnalysisException { + if (tablePartitionNames.isEmpty()) { + return new BoolLiteral(true); } - return mvRefreshPartitioner.generatePartitionPredicate(table, tablePartitionNames, outputPartitionSlot); + return mvRefreshPartitioner.generatePartitionPredicate(table, tablePartitionNames, mvPartitionOutputExpr); + } + + private void doIfNoPushDownPredicates(int numOfPushDownIntoTables, + Map> refTableRefreshPartitions) throws AnalysisException { + int refBaseTableSize = refTableRefreshPartitions.size(); + if (numOfPushDownIntoTables == refBaseTableSize) { + return; + } + LOG.warn("Cannot generate partition predicate for mv refresh {} and there " + + "are no predicate push down tables, refBaseTableSize:{}, numOfPushDownIntoTables:{}", mv.getName(), + refBaseTableSize, numOfPushDownIntoTables); + if (isEnableInsertStrict) { + throw new AnalysisException(String.format("Cannot generate partition predicate for mv refresh %s", + mv.getName())); + } + } + + /** + * Check whether to push down predicate expr with the slot refs into the scope. + * + * @param slots : slot refs that are contained in the predicate expr + * @param scope : scope that try to push down into. + * @return + */ + private boolean canResolveSlotsInTheScope(List slots, Scope scope) { + return slots.stream().allMatch(s -> scope.tryResolveField(s).isPresent()); } -} +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitioner.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitioner.java index 014fc811e68e6..e7e788fd2c0c5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitioner.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitioner.java @@ -35,7 +35,6 @@ import com.starrocks.catalog.TableProperty; import com.starrocks.common.AnalysisException; import com.starrocks.common.Config; -import com.starrocks.common.Pair; import com.starrocks.common.util.RangeUtils; import com.starrocks.connector.PartitionUtil; import com.starrocks.scheduler.MvTaskRunContext; @@ -147,8 +146,13 @@ public Expr generatePartitionPredicate(Table table, Set refBaseTablePart // for nested mv, the base table may be another mv, which is partition by str2date(dt, '%Y%m%d') // here we should convert date into '%Y%m%d' format Expr partitionExpr = mv.getPartitionExpr(); - Pair partitionTableAndColumn = mv.getRefBaseTablePartitionColumn(); - boolean isConvertToDate = PartitionUtil.isConvertToDate(partitionExpr, partitionTableAndColumn.second); + Map partitionTableAndColumn = mv.getRefBaseTablePartitionColumns(); + if (!partitionTableAndColumn.containsKey(table)) { + LOG.warn("Cannot generate mv refresh partition predicate because cannot decide the partition column of table {}," + + "partitionTableAndColumn:{}", table.getName(), partitionTableAndColumn); + return null; + } + boolean isConvertToDate = PartitionUtil.isConvertToDate(partitionExpr, partitionTableAndColumn.get(table)); if (isConvertToDate && partitionExpr instanceof FunctionCallExpr && !sourceTablePartitionRange.isEmpty() && MvUtils.isDateRange(sourceTablePartitionRange.get(0))) { Optional functionCallExprOpt = getStr2DateExpr(partitionExpr); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/common/QueryDebugOptions.java b/fe/fe-core/src/main/java/com/starrocks/sql/common/QueryDebugOptions.java index e6e67aaf7a03d..58b3dcaa5cbd4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/common/QueryDebugOptions.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/common/QueryDebugOptions.java @@ -16,7 +16,9 @@ import com.google.gson.annotations.SerializedName; import com.starrocks.common.FeConstants; +import com.starrocks.common.profile.Tracers; import com.starrocks.persist.gson.GsonUtils; +import org.apache.logging.log4j.util.Strings; public class QueryDebugOptions { private static QueryDebugOptions INSTANCE = new QueryDebugOptions(); @@ -31,6 +33,12 @@ public class QueryDebugOptions { @SerializedName(value = "enableQueryTraceLog") private boolean enableQueryTraceLog = false; + @SerializedName(value = "mvRefreshTraceMode") + private String mvRefreshTraceMode; + + @SerializedName(value = "mvRefreshTraceModule") + private String mvRefreshTraceModule; + public QueryDebugOptions() { // To make unit test more stable, add retry times for refreshing materialized views. if (FeConstants.runningUnitTest) { @@ -62,6 +70,14 @@ public void setEnableQueryTraceLog(boolean enableQueryTraceLog) { this.enableQueryTraceLog = enableQueryTraceLog; } + public Tracers.Mode getMvRefreshTraceMode() { + return Strings.isEmpty(mvRefreshTraceMode) ? Tracers.Mode.TIMER : Tracers.Mode.valueOf(mvRefreshTraceMode); + } + + public Tracers.Module getMvRefreshTraceModule() { + return Strings.isEmpty(mvRefreshTraceModule) ? Tracers.Module.BASE : Tracers.Module.valueOf(mvRefreshTraceModule); + } + public static QueryDebugOptions getInstance() { return INSTANCE; } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/OptExpressionDuplicator.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/OptExpressionDuplicator.java index efb248884adb7..98679742dfa3b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/OptExpressionDuplicator.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/OptExpressionDuplicator.java @@ -77,18 +77,15 @@ public class OptExpressionDuplicator { // old ColumnRefOperator -> new ColumnRefOperator private final Map columnMapping; private final ReplaceColumnRefRewriter rewriter; - private final Table partitionByTable; - private final Column partitionColumn; private final boolean partialPartitionRewrite; private final OptimizerContext optimizerContext; + private final Map mvRefBaseTableColumns; public OptExpressionDuplicator(MaterializationContext materializationContext) { this.columnRefFactory = materializationContext.getQueryRefFactory(); this.columnMapping = Maps.newHashMap(); this.rewriter = new ReplaceColumnRefRewriter(columnMapping); - Pair partitionInfo = materializationContext.getMv().getRefBaseTablePartitionColumn(); - this.partitionByTable = partitionInfo == null ? null : partitionInfo.first; - this.partitionColumn = partitionInfo == null ? null : partitionInfo.second; + this.mvRefBaseTableColumns = materializationContext.getMv().getRefBaseTablePartitionColumns(); this.partialPartitionRewrite = !materializationContext.getMvUpdateInfo().getMvToRefreshPartitionNames().isEmpty(); this.optimizerContext = materializationContext.getOptimizerContext(); } @@ -97,8 +94,7 @@ public OptExpressionDuplicator(ColumnRefFactory columnRefFactory, OptimizerConte this.columnRefFactory = columnRefFactory; this.columnMapping = Maps.newHashMap(); this.rewriter = new ReplaceColumnRefRewriter(columnMapping); - this.partitionByTable = null; - this.partitionColumn = null; + this.mvRefBaseTableColumns = null; this.partialPartitionRewrite = false; this.optimizerContext = optimizerContext; } @@ -261,13 +257,15 @@ public OptExpression visitLogicalTableScan(OptExpression optExpression, Void con if (partialPartitionRewrite && optExpression.getOp() instanceof LogicalOlapScanOperator - && partitionByTable != null) { + && mvRefBaseTableColumns != null) { // maybe partition column is not in the output columns, should add it LogicalOlapScanOperator olapScan = (LogicalOlapScanOperator) optExpression.getOp(); OlapTable table = (OlapTable) olapScan.getTable(); - if (table.getId() == partitionByTable.getId()) { - if (!columnRefOperatorColumnMap.containsValue(partitionColumn)) { + if (mvRefBaseTableColumns.containsKey(table)) { + Column partitionColumn = mvRefBaseTableColumns.get(table); + if (!columnRefOperatorColumnMap.containsValue(partitionColumn) && + newColumnMetaToColRefMap.containsKey(partitionColumn)) { ColumnRefOperator partitionColumnRef = newColumnMetaToColRefMap.get(partitionColumn); columnRefColumnMapBuilder.put(partitionColumnRef, partitionColumn); } diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/PCTRefreshListPartitionOlapTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/PCTRefreshListPartitionOlapTest.java index b2658dd8f5a87..ecd9c08d8815c 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/PCTRefreshListPartitionOlapTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/PCTRefreshListPartitionOlapTest.java @@ -288,7 +288,6 @@ public void testRefreshSingleColumnMVWithSingleValues() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t2\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'beijing'\n" + " partitions=1/2"); Collection partitions = materializedView.getPartitions(); @@ -307,7 +306,6 @@ public void testRefreshSingleColumnMVWithSingleValues() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t2\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'hangzhou'\n" + " partitions=1/3"); Collection partitions = materializedView.getPartitions(); Assert.assertEquals(3, partitions.size()); @@ -344,7 +342,6 @@ public void testRefreshSingleColumnWithMultiValues() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t1\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province IN ('chongqing', 'beijing')\n" + " partitions=1/2"); Collection partitions = materializedView.getPartitions(); @@ -363,7 +360,6 @@ public void testRefreshSingleColumnWithMultiValues() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t1\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'hangzhou'\n" + " partitions=1/3"); Collection partitions = materializedView.getPartitions(); Assert.assertEquals(3, partitions.size()); @@ -402,7 +398,6 @@ public void testRefreshMultiColumnsMV1() { PlanTestBase.assertContains(plan, " 0:OlapScanNode\n" + " TABLE: t3\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'beijing'\n" + " partitions=2/4"); Collection partitions = materializedView.getPartitions(); @@ -422,7 +417,6 @@ public void testRefreshMultiColumnsMV1() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t3\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'beijing'\n" + " partitions=2/4"); Collection partitions = materializedView.getPartitions(); @@ -440,7 +434,6 @@ public void testRefreshMultiColumnsMV1() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t3\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'hangzhou'\n" + " partitions=1/5"); Collection partitions = materializedView.getPartitions(); Assert.assertEquals(3, partitions.size()); @@ -479,7 +472,6 @@ public void testRefreshMultiColumnsMV2() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t3\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 3: dt IN ('2024-01-01', '2024-01-02')\n" + " partitions=4/4"); Collection partitions = materializedView.getPartitions(); @@ -499,7 +491,6 @@ public void testRefreshMultiColumnsMV2() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t3\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 3: dt IN ('2024-01-01', '2024-01-02')\n" + " partitions=4/4"); Collection partitions = materializedView.getPartitions(); @@ -517,7 +508,6 @@ public void testRefreshMultiColumnsMV2() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t3\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 3: dt = '2024-01-01'\n" + " partitions=3/5"); Collection partitions = materializedView.getPartitions(); Assert.assertEquals(2, partitions.size()); @@ -555,7 +545,6 @@ public void testRefreshSingleColumnMVWithPartitionExpr() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t4\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'beijing'\n" + " partitions=1/2"); Collection partitions = materializedView.getPartitions(); @@ -574,7 +563,6 @@ public void testRefreshSingleColumnMVWithPartitionExpr() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t4\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'hangzhou'\n" + " partitions=1/3"); Collection partitions = materializedView.getPartitions(); Assert.assertEquals(2, partitions.size()); @@ -614,7 +602,6 @@ public void testRefreshMultiColumnsMVWithPartitionExpr() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t5\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'beijing'\n" + " partitions=1/2"); Collection partitions = materializedView.getPartitions(); @@ -636,7 +623,6 @@ public void testRefreshMultiColumnsMVWithPartitionExpr() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t5\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'beijing'\n" + " partitions=2/3"); Collection partitions = materializedView.getPartitions(); @@ -655,7 +641,6 @@ public void testRefreshMultiColumnsMVWithPartitionExpr() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t5\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'hangzhou'\n" + " partitions=1/4"); Collection partitions = materializedView.getPartitions(); Assert.assertEquals(2, partitions.size()); @@ -892,11 +877,9 @@ public void testRefreshJoinWithMultiColumns1() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t1\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'beijing'\n" + - " partitions=1/3"); + " partitions=2/3"); PlanTestBase.assertContains(plan, " TABLE: t5\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 8: province = 'beijing'\n" + " partitions=1/2"); Collection partitions = materializedView.getPartitions(); Assert.assertEquals(3, partitions.size()); @@ -936,7 +919,6 @@ public void testRefreshMVWithMultiNulllalbeColumns() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t6\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'beijing'\n" + " partitions=1/2"); Collection partitions = materializedView.getPartitions(); @@ -958,7 +940,6 @@ public void testRefreshMVWithMultiNulllalbeColumns() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t6\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'beijing'\n" + " partitions=2/3"); Collection partitions = materializedView.getPartitions(); @@ -977,7 +958,6 @@ public void testRefreshMVWithMultiNulllalbeColumns() { String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: t6\n" + " PREAGGREGATION: ON\n" + - " PREDICATES: 4: province = 'hangzhou'\n" + " partitions=1/4"); Collection partitions = materializedView.getPartitions(); Assert.assertEquals(2, partitions.size()); diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorHiveTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorHiveTest.java index 6fba125d3d998..fece5058f0bfe 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorHiveTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorHiveTest.java @@ -769,9 +769,9 @@ public void testRefreshPartitionWithMulParColumnsHiveTable2() throws Exception { MvTaskRunContext mvContext = processor.getMvContext(); ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); - Assert.assertTrue( - plan.contains("PARTITION PREDICATES: 5: par_date >= '2020-01-01', 5: par_date < '2020-01-03'")); - Assert.assertTrue(plan.contains("partitions=3/7")); + PlanTestBase.assertContains(plan, "PARTITION PREDICATES: 5: par_date >= '2020-01-01', " + + "5: par_date < '2020-01-03'"); + PlanTestBase.assertContains(plan, "partitions=3/7"); } @Test @@ -805,12 +805,15 @@ public void testHivePartitionPruneNonRefBaseTable1() throws Exception { MvTaskRunContext mvContext = processor.getMvContext(); ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); - Assert.assertTrue(plan.contains("TABLE: part_tbl1\n" + - " PARTITION PREDICATES: 4: par_date >= '2020-01-01', 4: par_date < '2020-01-05'\n" + - " partitions=4/4")); - Assert.assertTrue(plan.contains("TABLE: part_tbl2\n" + - " PARTITION PREDICATES: 8: par_date >= '2020-01-01', 8: par_date < '2020-01-05'\n" + - " partitions=4/4")); + System.out.println(plan); + PlanTestBase.assertContains(plan, "TABLE: part_tbl1\n" + + " PARTITION PREDICATES: 4: par_date IS NOT NULL, 4: par_date >= '2020-01-01', " + + "4: par_date < '2020-01-05'\n" + + " partitions=4/4"); + PlanTestBase.assertContains(plan, "TABLE: part_tbl2\n" + + " PARTITION PREDICATES: 8: par_date IS NOT NULL, 8: par_date >= '2020-01-01', 8: par_date < " + + "'2020-01-05' \n" + + " partitions=4/4"); } // run 2 @@ -827,10 +830,12 @@ public void testHivePartitionPruneNonRefBaseTable1() throws Exception { ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, "TABLE: part_tbl1\n" + - " PARTITION PREDICATES: 4: par_date >= '2020-01-05', 4: par_date < '2020-01-06'\n" + + " PARTITION PREDICATES: 4: par_date IS NOT NULL, 4: par_date >= '2020-01-05', 4: par_date < " + + "'2020-01-06'\n" + " partitions=1/5"); PlanTestBase.assertContains(plan, " TABLE: part_tbl2\n" + - " PARTITION PREDICATES: 8: par_date >= '2020-01-05', 8: par_date < '2020-01-06'\n" + + " PARTITION PREDICATES: 8: par_date >= '2020-01-05', 8: par_date < '2020-01-06', " + + "8: par_date IS NOT NULL\n" + " partitions=0/4"); } @@ -848,10 +853,12 @@ public void testHivePartitionPruneNonRefBaseTable1() throws Exception { ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); PlanTestBase.assertContains(plan, " TABLE: part_tbl1\n" + - " PARTITION PREDICATES: 4: par_date >= '2020-01-05', 4: par_date < '2020-01-06'\n" + + " PARTITION PREDICATES: 4: par_date IS NOT NULL, 4: par_date >= '2020-01-05', 4: par_date < " + + "'2020-01-06'\n" + " partitions=1/5"); PlanTestBase.assertContains(plan, " TABLE: part_tbl2\n" + - " PARTITION PREDICATES: 8: par_date >= '2020-01-05', 8: par_date < '2020-01-06'\n" + + " PARTITION PREDICATES: 8: par_date IS NOT NULL, 8: par_date >= '2020-01-05', 8: par_date < " + + "'2020-01-06'\n" + " partitions=1/5"); } @@ -886,12 +893,14 @@ public void testHivePartitionPruneNonRefBaseTable1() throws Exception { MvTaskRunContext mvContext = processor.getMvContext(); ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); - Assert.assertTrue(plan.contains("TABLE: part_tbl1\n" + - " PARTITION PREDICATES: 4: par_date >= '2020-01-01', 4: par_date < '2020-01-05'\n" + - " partitions=4/4")); - Assert.assertTrue(plan.contains("TABLE: part_tbl2\n" + - " PARTITION PREDICATES: 8: par_date >= '2020-01-01', 8: par_date < '2020-01-05'\n" + - " partitions=4/4")); + PlanTestBase.assertContains(plan, "TABLE: part_tbl1\n" + + " PARTITION PREDICATES: 4: par_date IS NOT NULL, 4: par_date >= '2020-01-01', 4: par_date < " + + "'2020-01-05'\n" + + " partitions=4/4"); + PlanTestBase.assertContains(plan, "TABLE: part_tbl2\n" + + " PARTITION PREDICATES: 8: par_date IS NOT NULL, 8: par_date >= '2020-01-01', 8: par_date < " + + "'2020-01-05'\n" + + " partitions=4/4"); } starRocksAssert.dropMaterializedView("hive_partition_prune_non_ref_tables2"); @@ -929,12 +938,13 @@ public void testHivePartitionPruneNonRefBaseTable2() throws Exception { MvTaskRunContext mvContext = processor.getMvContext(); ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); - Assert.assertTrue(plan.contains("TABLE: t1_par\n" + - " PARTITION PREDICATES: 10: par_date >= '2020-01-01', 10: par_date < '2020-01-05'\n" + - " partitions=6/6")); - Assert.assertTrue(plan.contains("TABLE: t2_par\n" + + PlanTestBase.assertContains(plan, "TABLE: t1_par\n" + + " PARTITION PREDICATES: 9: par_col IS NOT NULL, 10: par_date >= '2020-01-01', " + + "10: par_date < '2020-01-05'\n" + + " partitions=6/6"); + PlanTestBase.assertContains(plan, "TABLE: t2_par\n" + " PARTITION PREDICATES: 4: par_col IS NOT NULL\n" + - " partitions=6/6")); + " partitions=6/6"); } // run 2 @@ -951,13 +961,14 @@ public void testHivePartitionPruneNonRefBaseTable2() throws Exception { MvTaskRunContext mvContext = processor.getMvContext(); ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); - Assert.assertTrue(plan.contains("TABLE: t1_par\n" + - " PARTITION PREDICATES: 10: par_date >= '2020-01-05', 10: par_date < '2020-01-06'\n" + - " partitions=1/7")); + PlanTestBase.assertContains(plan, "TABLE: t1_par\n" + + " PARTITION PREDICATES: 9: par_col IS NOT NULL, 10: par_date >= '2020-01-05', " + + "10: par_date < '2020-01-06'\n" + + " partitions=1/7"); // TODO: multi-column partitions cannot prune partitions. - Assert.assertTrue(plan.contains("TABLE: t2_par\n" + + PlanTestBase.assertContains(plan, "TABLE: t2_par\n" + " PARTITION PREDICATES: 4: par_col IS NOT NULL\n" + - " partitions=6/6")); + " partitions=6/6"); } // run 3 @@ -975,13 +986,14 @@ public void testHivePartitionPruneNonRefBaseTable2() throws Exception { ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); // TODO: non-ref base table's update will refresh all the materialized views' partitions. - Assert.assertTrue(plan.contains("TABLE: t1_par\n" + - " PARTITION PREDICATES: 10: par_date >= '2020-01-01', 10: par_date < '2020-01-06'\n" + - " partitions=7/7")); + PlanTestBase.assertContains(plan, "TABLE: t1_par\n" + + " PARTITION PREDICATES: 9: par_col IS NOT NULL, 10: par_date >= '2020-01-01', 10: par_date < " + + "'2020-01-06'\n" + + " partitions=7/7"); // TODO: multi-column partitions cannot prune partitions. - Assert.assertTrue(plan.contains("TABLE: t2_par\n" + + PlanTestBase.assertContains(plan, "TABLE: t2_par\n" + " PARTITION PREDICATES: 4: par_col IS NOT NULL\n" + - " partitions=7/7")); + " partitions=7/7"); } // run 4 @@ -1058,11 +1070,12 @@ public void testHivePartitionPruneNonRefBaseTable3() throws Exception { MvTaskRunContext mvContext = processor.getMvContext(); ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); - Assert.assertTrue(plan.contains("partitions=5/5\n" + - " rollup: test_partition_prune_tbl1")); - Assert.assertTrue(plan.contains("PREDICATES: 4: k1 >= '2020-10-01', 4: k1 < '2020-12-15'\n" + - " partitions=1/1\n" + - " rollup: test_partition_prune_tbl2")); + PlanTestBase.assertContains(plan, "partitions=5/5\n" + + " rollup: test_partition_prune_tbl1"); + PlanTestBase.assertContains(plan, " TABLE: test_partition_prune_tbl2\n" + + " PREAGGREGATION: ON\n" + + " PREDICATES: 4: k1 IS NOT NULL\n" + + " partitions=1/1"); } // run 2 @@ -1077,11 +1090,13 @@ public void testHivePartitionPruneNonRefBaseTable3() throws Exception { MvTaskRunContext mvContext = processor.getMvContext(); ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); - Assert.assertTrue(plan.contains("partitions=5/5\n" + - " rollup: test_partition_prune_tbl1")); - Assert.assertTrue(plan.contains("PREDICATES: 4: k1 >= '2020-10-01', 4: k1 < '2020-12-15'\n" + - " partitions=1/1\n" + - " rollup: test_partition_prune_tbl2")); + PlanTestBase.assertContains(plan, " TABLE: test_partition_prune_tbl1\n" + + " PREAGGREGATION: ON\n" + + " PREDICATES: 1: k1 IS NOT NULL"); + PlanTestBase.assertContains(plan, " TABLE: test_partition_prune_tbl2\n" + + " PREAGGREGATION: ON\n" + + " PREDICATES: 4: k1 IS NOT NULL\n" + + " partitions=1/1"); } // run 3 @@ -1096,11 +1111,12 @@ public void testHivePartitionPruneNonRefBaseTable3() throws Exception { MvTaskRunContext mvContext = processor.getMvContext(); ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); - Assert.assertTrue(plan.contains("partitions=5/5\n" + - " rollup: test_partition_prune_tbl1")); - Assert.assertTrue(plan.contains("PREDICATES: 4: k1 >= '2020-10-01', 4: k1 < '2020-12-15'\n" + - " partitions=1/1\n" + - " rollup: test_partition_prune_tbl2")); + PlanTestBase.assertContains(plan, "partitions=5/5\n" + + " rollup: test_partition_prune_tbl1"); + PlanTestBase.assertContains(plan, " TABLE: test_partition_prune_tbl2\n" + + " PREAGGREGATION: ON\n" + + " PREDICATES: 4: k1 IS NOT NULL\n" + + " partitions=1/1"); } starRocksAssert.dropMaterializedView("partition_prune_mv1"); diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapPart2Test.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapPart2Test.java index 9c5f704005847..86db2482fd2a2 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapPart2Test.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapPart2Test.java @@ -60,11 +60,6 @@ public void after() throws Exception { cleanupEphemeralMVs(starRocksAssert, startCaseTime); } - private static void initAndExecuteTaskRun(TaskRun taskRun) throws Exception { - taskRun.initStatus(UUIDUtil.genUUID().toString(), System.currentTimeMillis()); - taskRun.executeTaskRun(); - } - @Test public void testMVRefreshWithTheSameTables1() { starRocksAssert.withMTables(List.of( diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapTest.java index a27688bdad248..23f430a9a4b1e 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapTest.java @@ -284,7 +284,7 @@ public void testUnionAllMvWithPartition() { ExecPlan execPlan = mvContext.getExecPlan(); String plan = execPlan.getExplainString(TExplainLevel.NORMAL); // TODO(fixme): for self join, forbid pushing down filter, but there are some cases to optimize. - PlanTestBase.assertContains(plan, "partitions=5/5"); + PlanTestBase.assertContains(plan, "partitions=1/5"); } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); diff --git a/fe/fe-core/src/test/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvRewriteTest.java b/fe/fe-core/src/test/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvRewriteTest.java index 58fdbb85c2058..20132fc74f767 100644 --- a/fe/fe-core/src/test/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvRewriteTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvRewriteTest.java @@ -230,7 +230,6 @@ public void testViewBasedMv() throws Exception { " SELECT emps_par.deptno as deptno1, depts.deptno as deptno2, emps_par.empid, emps_par.name" + " from emps_par join depts" + " on emps_par.deptno = depts.deptno"); - createAndRefreshMv("create materialized view join_mv_2" + " distributed by hash(deptno2)" + " partition by deptno1" + @@ -242,7 +241,6 @@ public void testViewBasedMv() throws Exception { " partition by deptno1" + " as " + " SELECT deptno1, deptno2, empid, name from view1"); - { String query = "SELECT deptno1, deptno2, empid, name from view1"; String plan = getFragmentPlan(query); diff --git a/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_multi_union2 b/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_multi_union2 index 1941b10f6e4ef..3427d5895d489 100644 --- a/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_multi_union2 +++ b/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_multi_union2 @@ -332,23 +332,23 @@ select count(1) from test_mv3; -- !result select count(1) from test_mv4; -- result: -6 +10 -- !result select count(1) from test_mv5; -- result: -6 +10 -- !result select count(1) from test_mv6; -- result: -64 +72 -- !result select count(1) from test_mv7; -- result: -64 +72 -- !result select count(1) from test_mv8; -- result: -6 +10 -- !result select dt from test_mv1 group by dt order by 1; -- result: @@ -392,6 +392,10 @@ select dt from test_mv4 group by dt order by 1; 2024-03-12 2024-03-13 2024-03-14 +2024-03-15 +2024-03-16 +2024-03-17 +2024-03-18 2024-04-10 -- !result select dt from test_mv5 group by dt order by 1; @@ -401,6 +405,10 @@ select dt from test_mv5 group by dt order by 1; 2024-03-12 2024-03-13 2024-03-14 +2024-03-15 +2024-03-16 +2024-03-17 +2024-03-18 2024-04-10 -- !result select dt from test_mv6 group by dt order by 1; @@ -436,6 +444,10 @@ select dt from test_mv8 group by dt order by 1; 2024-03-12 2024-03-13 2024-03-14 +2024-03-15 +2024-03-16 +2024-03-17 +2024-03-18 2024-04-10 -- !result INSERT INTO u1 (id,dt) VALUES (1,'2024-03-10'); @@ -478,23 +490,23 @@ select count(1) from test_mv3; -- !result select count(1) from test_mv4; -- result: -6 +11 -- !result select count(1) from test_mv5; -- result: -6 +11 -- !result select count(1) from test_mv6; -- result: -72 +82 -- !result select count(1) from test_mv7; -- result: -72 +82 -- !result select count(1) from test_mv8; -- result: -6 +11 -- !result select dt from test_mv1 group by dt order by 1; -- result: @@ -539,6 +551,11 @@ select dt from test_mv4 group by dt order by 1; 2024-03-12 2024-03-13 2024-03-14 +2024-03-15 +2024-03-16 +2024-03-17 +2024-03-18 +2024-03-19 2024-04-10 -- !result select dt from test_mv5 group by dt order by 1; @@ -548,6 +565,11 @@ select dt from test_mv5 group by dt order by 1; 2024-03-12 2024-03-13 2024-03-14 +2024-03-15 +2024-03-16 +2024-03-17 +2024-03-18 +2024-03-19 2024-04-10 -- !result select dt from test_mv6 group by dt order by 1; @@ -585,6 +607,11 @@ select dt from test_mv8 group by dt order by 1; 2024-03-12 2024-03-13 2024-03-14 +2024-03-15 +2024-03-16 +2024-03-17 +2024-03-18 +2024-03-19 2024-04-10 -- !result drop materialized view test_mv1; diff --git a/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_multi_union4 b/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_multi_union4 new file mode 100644 index 0000000000000..3d070806406a6 --- /dev/null +++ b/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_multi_union4 @@ -0,0 +1,272 @@ +-- name: test_mv_refresh_with_multi_union4 +create database db_${uuid0}; +-- result: +-- !result +use db_${uuid0}; +-- result: +-- !result +CREATE TABLE `u1` ( + `id` int(11) NOT NULL, + `dt` date NOT NULL +) ENGINE=OLAP +PRIMARY KEY(`id`, `dt`) +PARTITION BY RANGE(`dt`) +( + PARTITION p1 VALUES [("2024-03-10"), ("2024-03-11")), + PARTITION p2 VALUES [("2024-03-11"), ("2024-03-12")), + PARTITION p3 VALUES [("2024-03-12"), ("2024-03-13")), + PARTITION p4 VALUES [("2024-03-13"), ("2024-03-14")), + PARTITION p5 VALUES [("2024-03-14"), ("2024-03-15")), + PARTITION p6 VALUES [("2024-04-01"), ("2024-04-02")), + PARTITION p7 VALUES [("2024-04-10"), ("2024-04-11")) +) +DISTRIBUTED BY HASH(`id`) +PROPERTIES ( +"replication_num" = "1" +); +-- result: +-- !result +CREATE TABLE `u2` ( + `id` int(11) NOT NULL, + `dt` date NOT NULL +) ENGINE=OLAP +PRIMARY KEY(`id`, `dt`) +PARTITION BY RANGE(`dt`) +( + PARTITION p1 VALUES [("2024-04-10"), ("2024-04-11")), + PARTITION p2 VALUES [("2024-04-11"), ("2024-04-12")), + PARTITION p3 VALUES [("2024-04-12"), ("2024-04-13")), + PARTITION p4 VALUES [("2024-04-13"), ("2024-04-14")), + PARTITION p5 VALUES [("2024-04-14"), ("2024-04-15")) +) +DISTRIBUTED BY HASH(`id`) +PROPERTIES ( +"replication_num" = "1" +); +-- result: +-- !result +INSERT INTO u1 (id,dt) VALUES + (1,'2024-03-10'), + (2,'2024-03-11'), + (4,'2024-03-12'), + (7,'2024-03-13'), + (8,'2024-03-14'); +-- result: +-- !result +INSERT INTO u2 (id,dt) VALUES + (1,'2024-04-10'), + (2,'2024-04-11'), + (4,'2024-04-12'), + (7,'2024-04-13'); +-- result: +-- !result + +CREATE MATERIALIZED VIEW IF NOT EXISTS `test_mv1` +PARTITION BY date_trunc('day', `dt`) +DISTRIBUTED BY HASH(`dt`) +REFRESH ASYNC +AS + select dt from u1 + union all + select dt from u2; +-- result: +-- !result +CREATE MATERIALIZED VIEW IF NOT EXISTS `test_mv2` +PARTITION BY dt +DISTRIBUTED BY HASH(`dt`) +REFRESH ASYNC +AS +select dt, sum(s_id) as s_id +from +( + select dt, sum(id) as s_id from u1 group by dt + union all + select dt, sum(id) as s_id from u2 group by dt +) t group by dt; +-- result: +-- !result +CREATE MATERIALIZED VIEW IF NOT EXISTS `test_mv3` +PARTITION BY date_trunc('day', dt) +DISTRIBUTED BY HASH(`dt`) +REFRESH ASYNC +AS +select dt, sum(s_id) as s_id +from +( + select dt, sum(id) as s_id from u1 group by dt + union all + select dt, sum(id) as s_id from u2 group by dt +) t group by dt; +-- result: +-- !result +CREATE MATERIALIZED VIEW IF NOT EXISTS `test_mv4` +PARTITION BY dt +DISTRIBUTED BY HASH(`dt`) +REFRESH ASYNC +AS +select dt, sum(s_id) as s_id +from +( + select date_trunc('day', dt) as dt, sum(id) as s_id from u1 group by date_trunc('day', dt) + union all + select date_trunc('day', dt) as dt, sum(id) as s_id from u2 group by date_trunc('day', dt) +) t group by dt; +-- result: +-- !result +refresh materialized view test_mv1 with sync mode; +refresh materialized view test_mv2 with sync mode; +refresh materialized view test_mv3 with sync mode; +refresh materialized view test_mv4 with sync mode; +select count(1) from test_mv1; +-- result: +9 +-- !result +select count(1) from test_mv2; +-- result: +9 +-- !result +select count(1) from test_mv3; +-- result: +9 +-- !result +select count(1) from test_mv4; +-- result: +9 +-- !result +select dt from test_mv1 group by dt order by 1; +-- result: +2024-03-10 +2024-03-11 +2024-03-12 +2024-03-13 +2024-03-14 +2024-04-10 +2024-04-11 +2024-04-12 +2024-04-13 +-- !result +select dt from test_mv2 group by dt order by 1; +-- result: +2024-03-10 +2024-03-11 +2024-03-12 +2024-03-13 +2024-03-14 +2024-04-10 +2024-04-11 +2024-04-12 +2024-04-13 +-- !result +select dt from test_mv3 group by dt order by 1; +-- result: +2024-03-10 +2024-03-11 +2024-03-12 +2024-03-13 +2024-03-14 +2024-04-10 +2024-04-11 +2024-04-12 +2024-04-13 +-- !result +select dt from test_mv4 group by dt order by 1; +-- result: +2024-03-10 +2024-03-11 +2024-03-12 +2024-03-13 +2024-03-14 +2024-04-10 +2024-04-11 +2024-04-12 +2024-04-13 +-- !result +INSERT INTO u1 (id,dt) VALUES (1,'2024-03-10'); +-- result: +-- !result +INSERT INTO u2 (id,dt) VALUES (1,'2024-04-10'); +-- result: +-- !result +refresh materialized view test_mv1 with sync mode; +refresh materialized view test_mv2 with sync mode; +refresh materialized view test_mv3 with sync mode; +refresh materialized view test_mv4 with sync mode; +select count(1) from test_mv1; +-- result: +9 +-- !result +select count(1) from test_mv2; +-- result: +9 +-- !result +select count(1) from test_mv3; +-- result: +9 +-- !result +select count(1) from test_mv4; +-- result: +9 +-- !result +select dt from test_mv1 group by dt order by 1; +-- result: +2024-03-10 +2024-03-11 +2024-03-12 +2024-03-13 +2024-03-14 +2024-04-10 +2024-04-11 +2024-04-12 +2024-04-13 +-- !result +select dt from test_mv2 group by dt order by 1; +-- result: +2024-03-10 +2024-03-11 +2024-03-12 +2024-03-13 +2024-03-14 +2024-04-10 +2024-04-11 +2024-04-12 +2024-04-13 +-- !result +select dt from test_mv3 group by dt order by 1; +-- result: +2024-03-10 +2024-03-11 +2024-03-12 +2024-03-13 +2024-03-14 +2024-04-10 +2024-04-11 +2024-04-12 +2024-04-13 +-- !result +select dt from test_mv4 group by dt order by 1; +-- result: +2024-03-10 +2024-03-11 +2024-03-12 +2024-03-13 +2024-03-14 +2024-04-10 +2024-04-11 +2024-04-12 +2024-04-13 +-- !result +drop materialized view test_mv1; +-- result: +-- !result +drop materialized view test_mv2; +-- result: +-- !result +drop materialized view test_mv3; +-- result: +-- !result +drop materialized view test_mv4; +-- result: +-- !result +drop database db_${uuid0} force; +-- result: +-- !result \ No newline at end of file diff --git a/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_time_slice b/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_time_slice index ae9dede378403..eae25eca72466 100644 --- a/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_time_slice +++ b/test/sql/test_materialized_view_refresh/R/test_mv_refresh_with_time_slice @@ -21,11 +21,21 @@ INSERT INTO t1 VALUES -- !result CREATE MATERIALIZED VIEW mv1 PARTITION BY date_trunc("month", dt1) REFRESH DEFERRED MANUAL +PROPERTIES ( + "replication_num" = "1", + "session.enable_insert_strict" = "false", + "session.query_debug_options" = "{'mvRefreshTraceMode':'LOGS', 'mvRefreshTraceModule':'OPTIMIZER'}" +) AS SELECT time_slice(dt, interval 5 day) as dt1, sum(id) FROM t1 GROUP BY dt1; -- result: -- !result CREATE MATERIALIZED VIEW mv2 PARTITION BY date_trunc("month", dt1) REFRESH DEFERRED MANUAL +PROPERTIES ( + "replication_num" = "1", + "session.enable_insert_strict" = "false", + "session.query_debug_options" = "{'mvRefreshTraceMode':'LOGS', 'mvRefreshTraceModule':'OPTIMIZER'}" +) AS SELECT dt as dt1, sum(id) FROM t1 GROUP BY dt1; -- result: -- !result diff --git a/test/sql/test_materialized_view_refresh/T/test_mv_refresh_with_multi_union4 b/test/sql/test_materialized_view_refresh/T/test_mv_refresh_with_multi_union4 new file mode 100644 index 0000000000000..93bcbd4721edd --- /dev/null +++ b/test/sql/test_materialized_view_refresh/T/test_mv_refresh_with_multi_union4 @@ -0,0 +1,143 @@ +-- name: test_mv_refresh_with_multi_union4 + +create database db_${uuid0}; +use db_${uuid0}; + +CREATE TABLE `u1` ( + `id` int(11) NOT NULL, + `dt` date NOT NULL +) ENGINE=OLAP +PRIMARY KEY(`id`, `dt`) +PARTITION BY RANGE(`dt`) +( + PARTITION p1 VALUES [("2024-03-10"), ("2024-03-11")), + PARTITION p2 VALUES [("2024-03-11"), ("2024-03-12")), + PARTITION p3 VALUES [("2024-03-12"), ("2024-03-13")), + PARTITION p4 VALUES [("2024-03-13"), ("2024-03-14")), + PARTITION p5 VALUES [("2024-03-14"), ("2024-03-15")), + PARTITION p6 VALUES [("2024-04-01"), ("2024-04-02")), + PARTITION p7 VALUES [("2024-04-10"), ("2024-04-11")) +) +DISTRIBUTED BY HASH(`id`) +PROPERTIES ( +"replication_num" = "1" +); + +CREATE TABLE `u2` ( + `id` int(11) NOT NULL, + `dt` date NOT NULL +) ENGINE=OLAP +PRIMARY KEY(`id`, `dt`) +PARTITION BY RANGE(`dt`) +( + PARTITION p1 VALUES [("2024-04-10"), ("2024-04-11")), + PARTITION p2 VALUES [("2024-04-11"), ("2024-04-12")), + PARTITION p3 VALUES [("2024-04-12"), ("2024-04-13")), + PARTITION p4 VALUES [("2024-04-13"), ("2024-04-14")), + PARTITION p5 VALUES [("2024-04-14"), ("2024-04-15")) +) +DISTRIBUTED BY HASH(`id`) +PROPERTIES ( +"replication_num" = "1" +); + +INSERT INTO u1 (id,dt) VALUES + (1,'2024-03-10'), + (2,'2024-03-11'), + (4,'2024-03-12'), + (7,'2024-03-13'), + (8,'2024-03-14'); +INSERT INTO u2 (id,dt) VALUES + (1,'2024-04-10'), + (2,'2024-04-11'), + (4,'2024-04-12'), + (7,'2024-04-13'); + + +CREATE MATERIALIZED VIEW IF NOT EXISTS `test_mv1` +PARTITION BY date_trunc('day', `dt`) +DISTRIBUTED BY HASH(`dt`) +REFRESH ASYNC +AS + select dt from u1 + union all + select dt from u2; + +CREATE MATERIALIZED VIEW IF NOT EXISTS `test_mv2` +PARTITION BY dt +DISTRIBUTED BY HASH(`dt`) +REFRESH ASYNC +AS +select dt, sum(s_id) as s_id +from +( + select dt, sum(id) as s_id from u1 group by dt + union all + select dt, sum(id) as s_id from u2 group by dt +) t group by dt; + + +CREATE MATERIALIZED VIEW IF NOT EXISTS `test_mv3` +PARTITION BY date_trunc('day', dt) +DISTRIBUTED BY HASH(`dt`) +REFRESH ASYNC +AS +select dt, sum(s_id) as s_id +from +( + select dt, sum(id) as s_id from u1 group by dt + union all + select dt, sum(id) as s_id from u2 group by dt +) t group by dt; + +CREATE MATERIALIZED VIEW IF NOT EXISTS `test_mv4` +PARTITION BY dt +DISTRIBUTED BY HASH(`dt`) +REFRESH ASYNC +AS +select dt, sum(s_id) as s_id +from +( + select date_trunc('day', dt) as dt, sum(id) as s_id from u1 group by date_trunc('day', dt) + union all + select date_trunc('day', dt) as dt, sum(id) as s_id from u2 group by date_trunc('day', dt) +) t group by dt; + +refresh materialized view test_mv1 with sync mode; +refresh materialized view test_mv2 with sync mode; +refresh materialized view test_mv3 with sync mode; +refresh materialized view test_mv4 with sync mode; + +select count(1) from test_mv1; +select count(1) from test_mv2; +select count(1) from test_mv3; +select count(1) from test_mv4; + +select dt from test_mv1 group by dt order by 1; +select dt from test_mv2 group by dt order by 1; +select dt from test_mv3 group by dt order by 1; +select dt from test_mv4 group by dt order by 1; + +INSERT INTO u1 (id,dt) VALUES (1,'2024-03-10'); +INSERT INTO u2 (id,dt) VALUES (1,'2024-04-10'); + +refresh materialized view test_mv1 with sync mode; +refresh materialized view test_mv2 with sync mode; +refresh materialized view test_mv3 with sync mode; +refresh materialized view test_mv4 with sync mode; + +select count(1) from test_mv1; +select count(1) from test_mv2; +select count(1) from test_mv3; +select count(1) from test_mv4; + +select dt from test_mv1 group by dt order by 1; +select dt from test_mv2 group by dt order by 1; +select dt from test_mv3 group by dt order by 1; +select dt from test_mv4 group by dt order by 1; + +drop materialized view test_mv1; +drop materialized view test_mv2; +drop materialized view test_mv3; +drop materialized view test_mv4; +drop database db_${uuid0} force; \ No newline at end of file diff --git a/test/sql/test_materialized_view_refresh/T/test_mv_refresh_with_time_slice b/test/sql/test_materialized_view_refresh/T/test_mv_refresh_with_time_slice index 3a6dedb9d1386..c2d713b31dc98 100644 --- a/test/sql/test_materialized_view_refresh/T/test_mv_refresh_with_time_slice +++ b/test/sql/test_materialized_view_refresh/T/test_mv_refresh_with_time_slice @@ -1,4 +1,19 @@ -- name: test_mv_refresh_with_time_slice + +-- TODO(fixme): time_slice may generate interacted partitions even the selected partitions are incremental. +-- mysql> SELECT time_slice(dt, interval 5 day) as dt1, sum(id) FROM t1 where dt > '2020-07-01' GROUP BY dt1; +-- +---------------------+---------+ +-- | dt1 | sum(id) | +-- +---------------------+---------+ +-- | 2020-07-20 00:00:00 | 7 | +-- | 2020-06-30 00:00:00 | 3 | +-- | 2020-07-15 00:00:00 | 8 | +-- | 2020-07-05 00:00:00 | 12 | +-- | 2020-07-10 00:00:00 | 9 | +-- | 2020-07-25 00:00:00 | 9 | +-- +---------------------+---------+ +-- 6 rows in set (0.05 sec) + CREATE TABLE `t1` ( `id` int(11) NOT NULL, `dt` date NOT NULL @@ -18,10 +33,20 @@ INSERT INTO t1 VALUES CREATE MATERIALIZED VIEW mv1 PARTITION BY date_trunc("month", dt1) REFRESH DEFERRED MANUAL +PROPERTIES ( + "replication_num" = "1", + "session.enable_insert_strict" = "false", + "session.query_debug_options" = "{'mvRefreshTraceMode':'LOGS', 'mvRefreshTraceModule':'OPTIMIZER'}" +) AS SELECT time_slice(dt, interval 5 day) as dt1, sum(id) FROM t1 GROUP BY dt1; CREATE MATERIALIZED VIEW mv2 PARTITION BY date_trunc("month", dt1) REFRESH DEFERRED MANUAL +PROPERTIES ( + "replication_num" = "1", + "session.enable_insert_strict" = "false", + "session.query_debug_options" = "{'mvRefreshTraceMode':'LOGS', 'mvRefreshTraceModule':'OPTIMIZER'}" +) AS SELECT dt as dt1, sum(id) FROM t1 GROUP BY dt1; REFRESH MATERIALIZED VIEW mv1 WITH SYNC MODE; diff --git a/test/sql/test_transparent_mv/R/test_transparent_mv_hive b/test/sql/test_transparent_mv/R/test_transparent_mv_hive index ada2c6f00d384..b2946fe74f1ae 100644 --- a/test/sql/test_transparent_mv/R/test_transparent_mv_hive +++ b/test/sql/test_transparent_mv/R/test_transparent_mv_hive @@ -1083,8 +1083,8 @@ SELECT dt, num FROM test_mv1 where dt='2020-06-15' order by 1, 2 limit 3; SELECT dt, num FROM test_mv1 where dt!='2020-06-15' order by 1, 2 limit 3; -- result: 2020-06-16 3 +2020-06-16 3 2020-06-18 5 -2020-06-18 8 -- !result SELECT dt, num FROM test_mv1 where dt>='2020-06-15' order by 1, 2 limit 3; -- result: @@ -1107,37 +1107,37 @@ SELECT dt,sum(num) FROM test_mv1 where dt='2020-06-15' GROUP BY dt order by 1, 2 -- !result SELECT dt,sum(num) FROM test_mv1 where dt !='2020-06-15' GROUP BY dt order by 1, 2 limit 3; -- result: -2020-06-16 3 +2020-06-16 6 2020-06-18 13 2020-06-21 21 -- !result SELECT dt,sum(num) FROM test_mv1 where dt>='2020-06-15' GROUP BY dt order by 1, 2 limit 3; -- result: 2020-06-15 15 -2020-06-16 3 +2020-06-16 6 2020-06-18 13 -- !result SELECT dt,sum(num) FROM test_mv1 where dt>'2020-06-15' GROUP BY dt order by 1, 2 limit 3; -- result: -2020-06-16 3 +2020-06-16 6 2020-06-18 13 2020-06-21 21 -- !result SELECT dt,sum(num) FROM test_mv1 where dt>'2020-06-15' and dt < '2020-07-22' GROUP BY dt order by 1, 2 limit 3; -- result: -2020-06-16 3 +2020-06-16 6 2020-06-18 13 2020-06-21 21 -- !result SELECT dt,sum(num) FROM test_mv1 where dt>'2020-06-15' and dt <= '2020-07-22' GROUP BY dt order by 1, 2 limit 3; -- result: -2020-06-16 3 +2020-06-16 6 2020-06-18 13 2020-06-21 21 -- !result SELECT dt,sum(num) FROM test_mv1 where (dt>'2020-06-15' and dt <= '2020-06-22') or dt>'2020-07-01' GROUP BY dt order by 1, 2 limit 3; -- result: -2020-06-16 3 +2020-06-16 6 2020-06-18 13 2020-06-21 21 -- !result @@ -1152,7 +1152,7 @@ SELECT dt,sum(num) FROM test_mv1 where date_trunc('day', dt) ='2020-06-15' GROUP SELECT dt,sum(num) FROM test_mv1 where date_trunc('month', dt) ='2020-06-01' GROUP BY dt order by 1, 2 limit 3; -- result: 2020-06-15 15 -2020-06-16 3 +2020-06-16 6 2020-06-18 13 -- !result SELECT dt,sum(num) FROM test_mv1 where date_trunc('month', dt) ='2020-07-01' GROUP BY dt order by 1, 2 limit 3; @@ -1164,7 +1164,7 @@ SELECT dt,sum(num) FROM test_mv1 where date_trunc('month', dt) ='2020-07-01' GRO SELECT dt,sum(num) FROM test_mv1 GROUP BY dt order by 1, 2 limit 3; -- result: 2020-06-15 15 -2020-06-16 3 +2020-06-16 6 2020-06-18 13 -- !result drop materialized view default_catalog.db_${uuid0}.test_mv1; diff --git a/test/sql/test_transparent_mv/R/test_transparent_mv_iceberg_part2 b/test/sql/test_transparent_mv/R/test_transparent_mv_iceberg_part2 index 501fe7b41da5b..7dcdf7ba35417 100644 --- a/test/sql/test_transparent_mv/R/test_transparent_mv_iceberg_part2 +++ b/test/sql/test_transparent_mv/R/test_transparent_mv_iceberg_part2 @@ -498,37 +498,37 @@ SELECT dt,sum(num) FROM test_mv1 where dt='2020-06-15' GROUP BY dt order by 1, 2 -- !result SELECT dt,sum(num) FROM test_mv1 where dt !='2020-06-15' GROUP BY dt order by 1, 2 limit 3; -- result: -2020-06-16 3 +2020-06-16 6 2020-06-18 13 2020-06-21 21 -- !result SELECT dt,sum(num) FROM test_mv1 where dt>='2020-06-15' GROUP BY dt order by 1, 2 limit 3; -- result: 2020-06-15 9 -2020-06-16 3 +2020-06-16 6 2020-06-18 13 -- !result SELECT dt,sum(num) FROM test_mv1 where dt>'2020-06-15' GROUP BY dt order by 1, 2 limit 3; -- result: -2020-06-16 3 +2020-06-16 6 2020-06-18 13 2020-06-21 21 -- !result SELECT dt,sum(num) FROM test_mv1 where dt>'2020-06-15' and dt < '2020-07-22' GROUP BY dt order by 1, 2 limit 3; -- result: -2020-06-16 3 +2020-06-16 6 2020-06-18 13 2020-06-21 21 -- !result SELECT dt,sum(num) FROM test_mv1 where dt>'2020-06-15' and dt <= '2020-07-22' GROUP BY dt order by 1, 2 limit 3; -- result: -2020-06-16 3 +2020-06-16 6 2020-06-18 13 2020-06-21 21 -- !result SELECT dt,sum(num) FROM test_mv1 where (dt>'2020-06-15' and dt <= '2020-06-22') or dt>'2020-07-01' GROUP BY dt order by 1, 2 limit 3; -- result: -2020-06-16 3 +2020-06-16 6 2020-06-18 13 2020-06-21 21 -- !result @@ -543,7 +543,7 @@ SELECT dt,sum(num) FROM test_mv1 where date_trunc('day', dt) ='2020-06-15' GROUP SELECT dt,sum(num) FROM test_mv1 where date_trunc('month', dt) ='2020-06-01' GROUP BY dt order by 1, 2 limit 3; -- result: 2020-06-15 9 -2020-06-16 3 +2020-06-16 6 2020-06-18 13 -- !result SELECT dt,sum(num) FROM test_mv1 where date_trunc('month', dt) ='2020-07-01' GROUP BY dt order by 1, 2 limit 3; @@ -555,7 +555,7 @@ SELECT dt,sum(num) FROM test_mv1 where date_trunc('month', dt) ='2020-07-01' GRO SELECT dt,sum(num) FROM test_mv1 GROUP BY dt order by 1, 2 limit 3; -- result: 2020-06-15 9 -2020-06-16 3 +2020-06-16 6 2020-06-18 13 -- !result drop materialized view default_catalog.db_${uuid0}.test_mv1; diff --git a/test/test_sql_cases.py b/test/test_sql_cases.py index 5182507b46ee5..71f93fa6e1650 100644 --- a/test/test_sql_cases.py +++ b/test/test_sql_cases.py @@ -84,6 +84,20 @@ def setUp(self, *args, **kwargs): """set up""" super().setUp() self.connect_starrocks() + self._init_global_configs() + + def _init_global_configs(self): + '''' + Configs that are not ready for production but it can be used for testing. + ''' + default_configs = [ + "'enable_mv_refresh_insert_strict' = 'true'", + ] + + for config in default_configs: + sql = "ADMIN SET FRONTEND CONFIG (%s)" % config + print(sql) + self.execute_sql(sql) def tearDown(self): """tear down""" From c8d6dd5ab3775a8fbfa1c0879f4bcccf11d6ea00 Mon Sep 17 00:00:00 2001 From: "shuming.li" Date: Tue, 16 Jul 2024 20:32:54 +0800 Subject: [PATCH 2/2] fix bugs Signed-off-by: shuming.li --- .../scheduler/PartitionBasedMvRefreshProcessorHiveTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorHiveTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorHiveTest.java index fece5058f0bfe..31167fdf07dd6 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorHiveTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorHiveTest.java @@ -834,9 +834,8 @@ public void testHivePartitionPruneNonRefBaseTable1() throws Exception { "'2020-01-06'\n" + " partitions=1/5"); PlanTestBase.assertContains(plan, " TABLE: part_tbl2\n" + - " PARTITION PREDICATES: 8: par_date >= '2020-01-05', 8: par_date < '2020-01-06', " + - "8: par_date IS NOT NULL\n" + - " partitions=0/4"); + " PARTITION PREDICATES: 8: par_date IS NOT NULL\n" + + " partitions=4/4"); } // run 3