Skip to content

Commit

Permalink
[BugFix] Move MaterializedViewMgr into GlobalStateMgr (#48564)
Browse files Browse the repository at this point in the history
Signed-off-by: gengjun-git <[email protected]>
  • Loading branch information
gengjun-git authored Jul 29, 2024
1 parent e4e6903 commit 8ba3138
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.starrocks.persist.AlterViewInfo;
import com.starrocks.persist.SwapTableOperationLog;
import com.starrocks.qe.ConnectContext;
import com.starrocks.scheduler.mv.MaterializedViewMgr;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.LocalMetastore;
import com.starrocks.sql.analyzer.SemanticException;
Expand Down Expand Up @@ -169,9 +168,9 @@ public Void visitAlterMaterializedViewStatement(AlterMaterializedViewStmt stmt,
+ "Do not allow to do ALTER ops");
}

MaterializedViewMgr.getInstance().stopMaintainMV(materializedView);
GlobalStateMgr.getCurrentState().getMaterializedViewMgr().stopMaintainMV(materializedView);
visit(stmt.getAlterTableClause());
MaterializedViewMgr.getInstance().rebuildMaintainMV(materializedView);
GlobalStateMgr.getCurrentState().getMaterializedViewMgr().rebuildMaintainMV(materializedView);
return null;
} finally {
locker.unLockDatabase(db, LockType.WRITE);
Expand Down
5 changes: 2 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import com.starrocks.scheduler.Task;
import com.starrocks.scheduler.mv.MVEpoch;
import com.starrocks.scheduler.mv.MVMaintenanceJob;
import com.starrocks.scheduler.mv.MaterializedViewMgr;
import com.starrocks.scheduler.persist.ArchiveTaskRunsLog;
import com.starrocks.scheduler.persist.DropTaskRunsLog;
import com.starrocks.scheduler.persist.DropTasksLog;
Expand Down Expand Up @@ -1107,12 +1106,12 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal)
}
case OperationType.OP_MV_JOB_STATE: {
MVMaintenanceJob job = (MVMaintenanceJob) journal.getData();
MaterializedViewMgr.getInstance().replay(job);
GlobalStateMgr.getCurrentState().getMaterializedViewMgr().replay(job);
break;
}
case OperationType.OP_MV_EPOCH_UPDATE: {
MVEpoch epoch = (MVEpoch) journal.getData();
MaterializedViewMgr.getInstance().replayEpoch(epoch);
GlobalStateMgr.getCurrentState().getMaterializedViewMgr().replayEpoch(epoch);
break;
}
case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.starrocks.scheduler.mv;

import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.server.GlobalStateMgr;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -50,7 +51,7 @@ protected void runAfterCatalogReady() {
}

private void runImpl() {
List<MVMaintenanceJob> jobs = MaterializedViewMgr.getInstance().getRunnableJobs();
List<MVMaintenanceJob> jobs = GlobalStateMgr.getCurrentState().getMaterializedViewMgr().getRunnableJobs();
if (CollectionUtils.isEmpty(jobs)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,9 @@
*/
public class MaterializedViewMgr {
private static final Logger LOG = LogManager.getLogger(MaterializedViewMgr.class);
private static final MaterializedViewMgr INSTANCE = new MaterializedViewMgr();

private final Map<MvId, MVMaintenanceJob> jobMap = new ConcurrentHashMap<>();

private MaterializedViewMgr() {
}

public static MaterializedViewMgr getInstance() {
return INSTANCE;
}

public MaterializedView createSinkTable(CreateMaterializedViewStatement stmt, PartitionInfo partitionInfo,
long mvId, long dbId)
throws DdlException {
Expand Down
14 changes: 12 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public class GlobalStateMgr {
private final RoutineLoadMgr routineLoadMgr;
private final StreamLoadMgr streamLoadMgr;
private final ExportMgr exportMgr;
private final MaterializedViewMgr materializedViewMgr;

private final ConsistencyChecker consistencyChecker;
private final BackupHandler backupHandler;
Expand Down Expand Up @@ -614,6 +615,7 @@ private GlobalStateMgr(boolean isCkptGlobalState, NodeMgr nodeMgr) {
this.streamLoadMgr = new StreamLoadMgr();
this.routineLoadMgr = new RoutineLoadMgr();
this.exportMgr = new ExportMgr();
this.materializedViewMgr = new MaterializedViewMgr();

this.consistencyChecker = new ConsistencyChecker();
this.lock = new QueryableReentrantLock(true);
Expand Down Expand Up @@ -1433,6 +1435,8 @@ private void transferToNonLeader(FrontendNodeType newType) {
feType = newType;
}

// The manager that loads meta from image must be a member of GlobalStateMgr and cannot be SINGLETON,
// since Checkpoint uses a separate memory.
public void loadImage(String imageDir) throws IOException {
Storage storage = new Storage(imageDir);
File curFile = storage.getCurrentImageFile();
Expand Down Expand Up @@ -1471,7 +1475,7 @@ public void loadImage(String imageDir) throws IOException {
.put(SRMetaBlockID.INSERT_OVERWRITE_JOB_MGR, insertOverwriteJobMgr::load)
.put(SRMetaBlockID.COMPACTION_MGR, compactionMgr::load)
.put(SRMetaBlockID.STREAM_LOAD_MGR, streamLoadMgr::load)
.put(SRMetaBlockID.MATERIALIZED_VIEW_MGR, MaterializedViewMgr.getInstance()::load)
.put(SRMetaBlockID.MATERIALIZED_VIEW_MGR, materializedViewMgr::load)
.put(SRMetaBlockID.GLOBAL_FUNCTION_MGR, globalFunctionMgr::load)
.put(SRMetaBlockID.STORAGE_VOLUME_MGR, storageVolumeMgr::load)
.put(SRMetaBlockID.DICTIONARY_MGR, dictionaryMgr::load)
Expand Down Expand Up @@ -1606,6 +1610,8 @@ public void saveImage() throws IOException {
}
}

// The manager that saves meta to image must be a member of GlobalStateMgr and cannot be SINGLETON,
// since Checkpoint uses a separate memory.
public void saveImage(File curFile, long replayedJournalId) throws IOException {
if (!curFile.exists()) {
if (!curFile.createNewFile()) {
Expand Down Expand Up @@ -1644,7 +1650,7 @@ public void saveImage(File curFile, long replayedJournalId) throws IOException {
insertOverwriteJobMgr.save(dos);
compactionMgr.save(dos);
streamLoadMgr.save(dos);
MaterializedViewMgr.getInstance().save(dos);
materializedViewMgr.save(dos);
globalFunctionMgr.save(dos);
storageVolumeMgr.save(dos);
dictionaryMgr.save(dos);
Expand Down Expand Up @@ -2078,6 +2084,10 @@ public ExportMgr getExportMgr() {
return this.exportMgr;
}

public MaterializedViewMgr getMaterializedViewMgr() {
return this.materializedViewMgr;
}

public SmallFileMgr getSmallFileMgr() {
return this.smallFileMgr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@
import com.starrocks.scheduler.TaskBuilder;
import com.starrocks.scheduler.TaskManager;
import com.starrocks.scheduler.TaskRun;
import com.starrocks.scheduler.mv.MaterializedViewMgr;
import com.starrocks.sql.analyzer.AnalyzerUtils;
import com.starrocks.sql.analyzer.Authorizer;
import com.starrocks.sql.ast.AddPartitionClause;
Expand Down Expand Up @@ -3170,8 +3169,8 @@ public void createMaterializedView(CreateMaterializedViewStatement stmt)
MaterializedView materializedView;
if (RunMode.isSharedNothingMode()) {
if (refreshSchemeDesc.getType().equals(MaterializedView.RefreshType.INCREMENTAL)) {
materializedView =
MaterializedViewMgr.getInstance().createSinkTable(stmt, partitionInfo, mvId, db.getId());
materializedView = GlobalStateMgr.getCurrentState().getMaterializedViewMgr()
.createSinkTable(stmt, partitionInfo, mvId, db.getId());
materializedView.setMaintenancePlan(stmt.getMaintenancePlan());
} else {
materializedView =
Expand Down Expand Up @@ -3270,7 +3269,7 @@ public void createMaterializedView(CreateMaterializedViewStatement stmt)
materializedView.setPartitionExprMaps(partitionExprMaps);
}

MaterializedViewMgr.getInstance().prepareMaintenanceWork(stmt, materializedView);
GlobalStateMgr.getCurrentState().getMaterializedViewMgr().prepareMaintenanceWork(stmt, materializedView);

String storageVolumeId = "";
if (materializedView.isCloudNativeMaterializedView()) {
Expand Down Expand Up @@ -3341,7 +3340,7 @@ private void createTaskForMaterializedView(String dbName, MaterializedView mater
MaterializedView.RefreshMoment refreshMoment = materializedView.getRefreshScheme().getMoment();

if (refreshType.equals(MaterializedView.RefreshType.INCREMENTAL)) {
MaterializedViewMgr.getInstance().startMaintainMV(materializedView);
GlobalStateMgr.getCurrentState().getMaterializedViewMgr().startMaintainMV(materializedView);
return;
}

Expand Down Expand Up @@ -3411,7 +3410,7 @@ private String executeRefreshMvTask(String dbName, MaterializedView materialized
materializedView.getName(), refreshType, executeOption);

if (refreshType.equals(MaterializedView.RefreshType.INCREMENTAL)) {
MaterializedViewMgr.getInstance().onTxnPublish(materializedView);
GlobalStateMgr.getCurrentState().getMaterializedViewMgr().onTxnPublish(materializedView);
} else if (refreshType != MaterializedView.RefreshType.SYNC) {
TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager();
final String mvTaskName = TaskBuilder.getMvTaskName(materializedView.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@
import com.starrocks.scheduler.Constants;
import com.starrocks.scheduler.Task;
import com.starrocks.scheduler.TaskManager;
import com.starrocks.scheduler.mv.MaterializedViewMgr;
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.MetadataMgr;
Expand Down Expand Up @@ -2596,7 +2595,7 @@ public TMVReportEpochResponse mvReport(TMVMaintenanceTasks request) throws TExce
if (!request.getTask_type().equals(MVTaskType.REPORT_EPOCH)) {
throw new TException("Only support report_epoch task");
}
MaterializedViewMgr.getInstance().onReportEpoch(request);
GlobalStateMgr.getCurrentState().getMaterializedViewMgr().onReportEpoch(request);
return new TMVReportEpochResponse();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
import com.starrocks.planner.stream.StreamJoinNode;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import com.starrocks.scheduler.mv.MaterializedViewMgr;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.LocalMetastore;
import com.starrocks.server.RunMode;
Expand Down Expand Up @@ -269,8 +268,8 @@ public static ExecPlan createPhysicalPlanForMV(ConnectContext connectContext,
PartitionInfo partitionInfo = LocalMetastore.buildPartitionInfo(createStmt);
long mvId = GlobalStateMgr.getCurrentState().getNextId();
long dbId = GlobalStateMgr.getCurrentState().getDb(createStmt.getTableName().getDb()).getId();
MaterializedView view =
MaterializedViewMgr.getInstance().createSinkTable(createStmt, partitionInfo, mvId, dbId);
MaterializedView view = GlobalStateMgr.getCurrentState().getMaterializedViewMgr()
.createSinkTable(createStmt, partitionInfo, mvId, dbId);
TupleDescriptor tupleDesc = buildTupleDesc(execPlan, view);
view.setMaintenancePlan(execPlan);
List<Long> fakePartitionIds = Arrays.asList(1L, 2L, 3L);
Expand Down

0 comments on commit 8ba3138

Please sign in to comment.