Skip to content

Commit

Permalink
[Enhancement] (Multi Ref Base Table Part2) MVPCTRefreshPlanBuilder su…
Browse files Browse the repository at this point in the history
…pports multi ref base tables in mv refresh (backport #48284) (#48444)

Signed-off-by: shuming.li <[email protected]>
Co-authored-by: shuming.li <[email protected]>
  • Loading branch information
mergify[bot] and LiShuMing authored Jul 16, 2024
1 parent d5ea15a commit 73c86ee
Show file tree
Hide file tree
Showing 22 changed files with 916 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -637,21 +637,6 @@ public void setQueryOutputIndices(List<Integer> queryOutputIndices) {
this.queryOutputIndices = queryOutputIndices;
}

/**
* @param materializedView : materialized view to check
* @return : return the column slot ref which materialized view's partition column comes from.
* <p>
* NOTE: Only support one column for Materialized View's partition column for now.
*/
public static SlotRef getRefBaseTablePartitionSlotRef(MaterializedView materializedView) {
List<SlotRef> slotRefs = Lists.newArrayList();
Expr partitionExpr = materializedView.getPartitionExpr();
partitionExpr.collect(SlotRef.class, slotRefs);
// if partitionExpr is FunctionCallExpr, get first SlotRef
Preconditions.checkState(slotRefs.size() == 1);
return slotRefs.get(0);
}

/**
* Return the partition column of the materialized view.
* NOTE: Only one column is supported for now, support more columns in the future.
Expand Down
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,21 @@ public static void toRuntimeProfile(RuntimeProfile profile) {
Tracers tracers = THREAD_LOCAL.get();
tracers.allTracer[1].toRuntimeProfile(profile);
}

public static String getTrace(Mode mode) {
switch (mode) {
case TIMER:
return Tracers.printScopeTimer();
case VARS:
return Tracers.printVars();
case TIMING:
return Tracers.printTiming();
case LOGS:
return Tracers.printLogs();
case REASON:
return Tracers.printReasons();
default:
return "";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

package com.starrocks.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TableProperty;
Expand Down Expand Up @@ -45,13 +43,6 @@ public class MvTaskRunContext extends TaskRunContext {
// multi original partition names.
private Map<Table, Map<String, Set<String>>> externalRefBaseTableMVPartitionMap;

// The Table which materialized view' partition column comes from is called `RefBaseTable`:
// - Materialized View's to-refresh partitions is synced from its `refBaseTable`.
private Table refBaseTable;
// The `RefBaseTable`'s partition column which materialized view's partition column derives from
// is called `refBaseTablePartitionColumn`.
private Column refBaseTablePartitionColumn;

private String nextPartitionStart = null;
private String nextPartitionEnd = null;
private ExecPlan execPlan = null;
Expand Down Expand Up @@ -153,22 +144,4 @@ public int getPartitionTTLNumber() {
public void setPartitionTTLNumber(int partitionTTLNumber) {
this.partitionTTLNumber = partitionTTLNumber;
}

public Table getRefBaseTable() {
return refBaseTable;
}

public void setRefBaseTable(Table refBaseTable) {
Preconditions.checkNotNull(refBaseTable);
this.refBaseTable = refBaseTable;
}

public Column getRefBaseTablePartitionColumn() {
return refBaseTablePartitionColumn;
}

public void setRefBaseTablePartitionColumn(Column refBaseTablePartitionColumn) {
Preconditions.checkNotNull(refBaseTablePartitionColumn);
this.refBaseTablePartitionColumn = refBaseTablePartitionColumn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.starrocks.analysis.SlotRef;
import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
Expand Down Expand Up @@ -183,8 +182,11 @@ public TaskRun getNextTaskRun() {
public void processTaskRun(TaskRunContext context) {
// register tracers
Tracers.register(context.getCtx());
QueryDebugOptions queryDebugOptions = context.getCtx().getSessionVariable().getQueryDebugOptions();
// init to collect the base timer for refresh profile
Tracers.init(Tracers.Mode.TIMER, Tracers.Module.BASE, true, false);
Tracers.Mode mvRefreshTraceMode = queryDebugOptions.getMvRefreshTraceMode();
Tracers.Module mvRefreshTraceModule = queryDebugOptions.getMvRefreshTraceModule();
Tracers.init(mvRefreshTraceMode, mvRefreshTraceModule, true, false);

IMaterializedViewMetricsEntity mvEntity = null;
ConnectContext connectContext = context.getCtx();
Expand Down Expand Up @@ -221,7 +223,7 @@ public void processTaskRun(TaskRunContext context) {
runtimeProfile = new RuntimeProfile();
Tracers.toRuntimeProfile(runtimeProfile);
}

LOG.info("Refresh mv {} trace logs: {}", materializedView.getName(), Tracers.getTrace(mvRefreshTraceMode));
Tracers.close();
postProcess();
}
Expand Down Expand Up @@ -1043,48 +1045,13 @@ private boolean syncPartitions() throws AnalysisException {
// collect base table snapshot infos
snapshotBaseTables = collectBaseTableSnapshotInfos(materializedView);

// prepare ref base table and partition column
// NOTE: This should be after @refreshExternalTable since the base table may be changed and repaired again.
PartitionInfo partitionInfo = materializedView.getPartitionInfo();

if (!partitionInfo.isUnPartitioned()) {
Pair<Table, Column> partitionTableAndColumn = getRefBaseTableAndPartitionColumn(snapshotBaseTables);
Table refBaseTable = partitionTableAndColumn.first;
if (!snapshotBaseTables.containsKey(refBaseTable.getId())) {
throw new DmlException(String.format("The ref base table %s of materialized view %s is not existed in snapshot.",
refBaseTable.getName(), materializedView.getName()));
}
mvContext.setRefBaseTable(snapshotBaseTables.get(refBaseTable.getId()).getBaseTable());
mvContext.setRefBaseTablePartitionColumn(partitionTableAndColumn.second);
}

// do sync partitions (add or drop partitions) for materialized view
boolean result = mvRefreshPartitioner.syncAddOrDropPartitions();
LOG.info("finish sync partitions. mv:{}, cost(ms): {}", materializedView.getName(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
return result;
}

/**
* TODO: merge it with {@code MaterializedView#getDirectTableAndPartitionColumn} but `getDirectTableAndPartitionColumn`
* only use the existed base table info but {@code getRefBaseTableAndPartitionColumn} only uses the table name to keep
* mv's partition and ref base bases' relation.
* </p>
* return the ref base table and column that materialized view's partition column
* derives from if it exists, otherwise return null.
*/
private Pair<Table, Column> getRefBaseTableAndPartitionColumn(Map<Long, TableSnapshotInfo> tableSnapshotInfos) {
SlotRef slotRef = MaterializedView.getRefBaseTablePartitionSlotRef(materializedView);
for (TableSnapshotInfo snapshotInfo : tableSnapshotInfos.values()) {
BaseTableInfo baseTableInfo = snapshotInfo.getBaseTableInfo();
if (slotRef.getTblNameWithoutAnalyzed().getTbl().equals(baseTableInfo.getTableName())) {
Table table = snapshotInfo.getBaseTable();
return Pair.create(table, table.getColumn(slotRef.getColumnName()));
}
}
return Pair.create(null, null);
}

/**
* If it's tentative, don't really consider the staleness of MV, but all potential partitions
*/
Expand Down Expand Up @@ -1231,14 +1198,12 @@ private boolean checkBaseTablePartitionHasChanged(TableSnapshotInfo snapshotInfo
if (!(mvPartitionInfo.isRangePartition())) {
return false;
}
Pair<Table, Column> partitionTableAndColumn = materializedView.getRefBaseTablePartitionColumn();
Column partitionColumn = partitionTableAndColumn.second;
// TODO: need to consider(non ref-base table's change)
Map<Table, Column> partitionTableAndColumn = materializedView.getRefBaseTablePartitionColumns();
// For Non-partition based base table, it's not necessary to check the partition changed.
if (!snapshotTable.equals(partitionTableAndColumn.first)
|| !snapshotTable.containColumn(partitionColumn.getName())) {
if (!partitionTableAndColumn.containsKey(snapshotTable)) {
return false;
}
Column partitionColumn = partitionTableAndColumn.get(snapshotTable);
Map<String, Range<PartitionKey>> snapshotPartitionMap = PartitionUtil.getPartitionKeyRange(
snapshotTable, partitionColumn, MaterializedView.getPartitionExpr(materializedView));
Map<String, Range<PartitionKey>> currentPartitionMap = PartitionUtil.getPartitionKeyRange(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.analysis.BoolLiteral;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.IsNullPredicate;
import com.starrocks.analysis.LiteralExpr;
Expand Down Expand Up @@ -131,25 +132,31 @@ public boolean syncAddOrDropPartitions() {
}

@Override
public Expr generatePartitionPredicate(Table table, Set<String> refBaseTablePartitionNames,
public Expr generatePartitionPredicate(Table refBaseTable, Set<String> refBaseTablePartitionNames,
Expr mvPartitionSlotRef) throws AnalysisException {
Map<Table, Map<String, PListCell>> basePartitionMaps = mvContext.getRefBaseTableListPartitionMap();
if (basePartitionMaps.isEmpty()) {
return null;
}
Table refBaseTable = mvContext.getRefBaseTable();
Map<String, PListCell> baseListPartitionMap = basePartitionMaps.get(refBaseTable);
if (baseListPartitionMap == null || baseListPartitionMap.isEmpty()) {
if (baseListPartitionMap == null) {
LOG.warn("Generate incremental partition predicate failed, " +
"basePartitionMaps:{} contains no refBaseTable:{}", basePartitionMaps, refBaseTable);
return null;
}
if (baseListPartitionMap.isEmpty()) {
return new BoolLiteral(true);
}

List<Expr> sourceTablePartitionList = Lists.newArrayList();
List<Column> partitionCols = refBaseTable.getPartitionColumns();
Map<Table, Column> partitionTableAndColumn = mv.getRefBaseTablePartitionColumns();
if (partitionTableAndColumn == null || !partitionTableAndColumn.containsKey(refBaseTable)) {
LOG.warn("Generate incremental partition failed, partitionTableAndColumn {} contains no ref table {}",
partitionTableAndColumn, refBaseTable);
return null;
}
Column refPartitionColumn = partitionTableAndColumn.get(table);
Column refPartitionColumn = partitionTableAndColumn.get(refBaseTable);
int refIndex = ListPartitionDiffer.getRefBaseTableIdx(refBaseTable, refPartitionColumn);
Type partitionType = partitionCols.get(refIndex).getType();

Expand Down
Loading

0 comments on commit 73c86ee

Please sign in to comment.