From ddd09de20689213de45fce191cf164ec8abdea63 Mon Sep 17 00:00:00 2001 From: hoffermei Date: Tue, 8 Oct 2024 10:01:25 +0800 Subject: [PATCH] add external cooldown job manager to run cooldown task at fix rate #51649 Signed-off-by: hoffermei --- .../com/starrocks/alter/AlterJobExecutor.java | 21 +- .../starrocks/alter/SchemaChangeHandler.java | 81 ------- .../java/com/starrocks/catalog/OlapTable.java | 26 ++- .../com/starrocks/catalog/TableProperty.java | 18 +- .../common/util/PropertyAnalyzer.java | 51 ++++- ...onfig.java => ExternalCooldownConfig.java} | 38 +++- .../ExternalCooldownPartitionSelector.java | 13 +- .../ExternalCooldownSchedule.java | 147 +++++++++++++ .../com/starrocks/journal/JournalEntity.java | 6 +- .../java/com/starrocks/persist/EditLog.java | 10 + .../com/starrocks/persist/OperationType.java | 3 +- .../persist/metablock/SRMetaBlockID.java | 2 + .../com/starrocks/qe/DDLStmtExecutor.java | 2 +- .../com/starrocks/scheduler/TaskBuilder.java | 63 +++++- .../ExternalCooldownJobExecutor.java | 71 +++++++ .../ExternalCooldownMaintenanceJob.java | 200 ++++++++++++++++++ .../externalcooldown/ExternalCooldownMgr.java | 189 +++++++++++++++++ .../com/starrocks/server/GlobalStateMgr.java | 14 ++ .../com/starrocks/server/LocalMetastore.java | 79 ++++++- .../starrocks/server/OlapTableFactory.java | 4 +- .../analyzer/AlterTableClauseAnalyzer.java | 12 +- .../com/starrocks/sql/analyzer/Analyzer.java | 14 ++ .../CancelExternalCooldownAnalyzer.java | 40 ---- .../analyzer/ExternalCooldownAnalyzer.java | 55 +++-- .../CreateTableWithExternalCooldownTest.java | 144 +++++++++++++ .../common/proc/PartitionsProcDirTest.java | 25 +++ .../ExternalCooldownConfigTest.java | 36 ++++ ...ExternalCooldownPartitionSelectorTest.java | 10 +- .../ExternalCooldownScheduleTest.java | 36 ++++ .../ModifyExternalCooldownConfigTest.java | 79 +++++++ .../starrocks/persist/OperationTypeTest.java | 1 + .../qe/ExternalCooldownExecutorTest.java | 96 +++++++++ .../starrocks/server/LocalMetaStoreTest.java | 19 ++ 33 files changed, 1411 insertions(+), 194 deletions(-) rename fe/fe-core/src/main/java/com/starrocks/externalcooldown/{ExternalCoolDownConfig.java => ExternalCooldownConfig.java} (77%) create mode 100644 fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownSchedule.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownJobExecutor.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownMaintenanceJob.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownMgr.java delete mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/analyzer/CancelExternalCooldownAnalyzer.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/catalog/CreateTableWithExternalCooldownTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownConfigTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownScheduleTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/persist/ModifyExternalCooldownConfigTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/qe/ExternalCooldownExecutorTest.java diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java b/fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java index 05f0d396f1d55f..7b2e5d0185f805 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java @@ -34,6 +34,7 @@ import com.starrocks.catalog.PartitionType; import com.starrocks.catalog.RangePartitionInfo; import com.starrocks.catalog.Table; +import com.starrocks.catalog.TableProperty; import com.starrocks.catalog.Type; import com.starrocks.common.AnalysisException; import com.starrocks.common.DdlException; @@ -422,6 +423,9 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause clause, } GlobalStateMgr.getCurrentState().getLocalMetastore() .modifyTableDynamicPartition(db, olapTable, properties); + } else if (TableProperty.isSamePrefixProperties(properties, + PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_PREFIX)) { + GlobalStateMgr.getCurrentState().getLocalMetastore().alterTableProperties(db, olapTable, properties); } else if (properties.containsKey("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)) { Preconditions.checkNotNull(properties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)); GlobalStateMgr.getCurrentState().getLocalMetastore() @@ -712,6 +716,12 @@ private void modifyPartitionsProperty(Database db, TTabletType tTabletType = PropertyAnalyzer.analyzeTabletType(properties); + // 5. external cooldown synced time + long coolDownSyncedTimeMs = PropertyAnalyzer.analyzeExternalCooldownSyncedTimeMs(properties); + + // 6. external cooldown consistency check time + long coolDownCheckTimeMs = PropertyAnalyzer.analyzeExternalCooldownConsistencyCheckTimeMs(properties); + // modify meta here for (String partitionName : partitionNames) { Partition partition = olapTable.getPartition(partitionName); @@ -762,8 +772,17 @@ private void modifyPartitionsProperty(Database db, if (tTabletType != partitionInfo.getTabletType(partition.getId())) { partitionInfo.setTabletType(partition.getId(), tTabletType); } + if (coolDownSyncedTimeMs != -1L && + coolDownSyncedTimeMs != partitionInfo.getExternalCoolDownSyncedTimeMs(partition.getId())) { + partitionInfo.setExternalCoolDownSyncedTimeMs(partition.getId(), coolDownSyncedTimeMs); + } + if (coolDownCheckTimeMs != -1L && + coolDownCheckTimeMs != partitionInfo.getExternalCoolDownConsistencyCheckTimeMs(partition.getId())) { + partitionInfo.setExternalCoolDownConsistencyCheckTimeMs(partition.getId(), coolDownCheckTimeMs); + } ModifyPartitionInfo info = new ModifyPartitionInfo(db.getId(), olapTable.getId(), partition.getId(), - newDataProperty, newReplicationNum, hasInMemory ? newInMemory : oldInMemory); + newDataProperty, newReplicationNum, hasInMemory ? newInMemory : oldInMemory, + coolDownSyncedTimeMs, coolDownCheckTimeMs); modifyPartitionInfos.add(info); } diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeHandler.java index 373b0b83d6ea96..79947502b66a95 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeHandler.java @@ -88,7 +88,6 @@ import com.starrocks.common.util.concurrent.MarkedCountDownLatch; import com.starrocks.common.util.concurrent.lock.LockType; import com.starrocks.common.util.concurrent.lock.Locker; -import com.starrocks.externalcooldown.ExternalCoolDownConfig; import com.starrocks.persist.TableAddOrDropColumnsInfo; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.ShowResultSet; @@ -133,7 +132,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -2132,85 +2130,6 @@ public void updateTableMeta(Database db, String tableName, Map p } } - public boolean updateExternalCoolDownConfigMeta(Database db, Long tableId, Map properties) { - OlapTable olapTable; - ExternalCoolDownConfig newExternalCoolDownConfig; - boolean hasChanged = false; - boolean isModifiedSuccess = true; - Locker locker = new Locker(); - locker.lockDatabase(db, LockType.READ); - try { - olapTable = (OlapTable) db.getTable(tableId); - if (olapTable == null) { - return false; - } - if (!olapTable.containsExternalCoolDownConfig()) { - newExternalCoolDownConfig = new ExternalCoolDownConfig(); - hasChanged = true; - } else { - newExternalCoolDownConfig = new ExternalCoolDownConfig(olapTable.getCurExternalCoolDownConfig()); - } - } finally { - locker.unLockDatabase(db, LockType.READ); - } - - // judge whether the attribute has changed - // no exception will be thrown, for the analyzer has checked - if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET)) { - String externalCoolDownTarget = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET); - if (!Objects.equals(externalCoolDownTarget, newExternalCoolDownConfig.getTarget())) { - newExternalCoolDownConfig.setTarget(externalCoolDownTarget); - hasChanged = true; - } - } - if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE)) { - String externalCoolDownSchedule = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE); - if (!Objects.equals(externalCoolDownSchedule, newExternalCoolDownConfig.getSchedule())) { - newExternalCoolDownConfig.setSchedule(externalCoolDownSchedule); - hasChanged = true; - } - } - if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND)) { - long externalCoolDownWaitSecond = Long.parseLong(properties.get( - PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND)); - if (externalCoolDownWaitSecond != newExternalCoolDownConfig.getWaitSecond()) { - newExternalCoolDownConfig.setWaitSecond(externalCoolDownWaitSecond); - hasChanged = true; - } - } - if (!hasChanged) { - LOG.info("table {} external cool down config is same as the previous version, so nothing need to do", - olapTable.getName()); - return true; - } - locker.lockDatabase(db, LockType.WRITE); - try { - ExternalCoolDownConfig oldExternalCoolDownConfig = olapTable.getCurExternalCoolDownConfig(); - GlobalStateMgr.getCurrentState().getLocalMetastore().modifyExternalCoolDownMeta( - db, olapTable, newExternalCoolDownConfig); - if (oldExternalCoolDownConfig != null) { - LOG.info("update external cool down config of table {} successfully, the external cool down config after " + - "modified is : {}, previous is {}", - olapTable.getName(), - olapTable.getCurExternalCoolDownConfig().toString(), - oldExternalCoolDownConfig.toString()); - } else { - LOG.info("update external cool down config of table {} successfully, the external cool down config" - + " after modified is : {}, ", - olapTable.getName(), olapTable.getCurExternalCoolDownConfig().toString()); - } - } catch (Exception e) { - // defensive programming, it normally should not throw an exception, - // here is just to ensure that a correct result can be returned - LOG.warn("update external cool down config of table {} failed", olapTable.getName()); - isModifiedSuccess = false; - } finally { - locker.unLockDatabase(db, LockType.WRITE); - } - - return isModifiedSuccess; - } - // return true means that the modification of FEMeta is successful, // and as long as the modification of metadata is successful, // the final consistency will be achieved through the report handler diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java index cbc294440edb75..4acd92ae5b341a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java @@ -36,6 +36,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -87,7 +88,7 @@ import com.starrocks.common.util.Util; import com.starrocks.common.util.WriteQuorum; import com.starrocks.common.util.concurrent.MarkedCountDownLatch; -import com.starrocks.externalcooldown.ExternalCoolDownConfig; +import com.starrocks.externalcooldown.ExternalCooldownConfig; import com.starrocks.lake.DataCacheInfo; import com.starrocks.lake.StarOSAgent; import com.starrocks.lake.StorageInfo; @@ -486,18 +487,18 @@ public void setBinlogTxnId(long binlogTxnId) { this.binlogTxnId = binlogTxnId; } - public ExternalCoolDownConfig getCurExternalCoolDownConfig() { + public ExternalCooldownConfig getCurExternalCoolDownConfig() { if (tableProperty != null) { return tableProperty.getExternalCoolDownConfig(); } return null; } - public void setCurExternalCoolDownConfig(ExternalCoolDownConfig externalCoolDownConfig) { + public void setCurExternalCoolDownConfig(ExternalCooldownConfig externalCoolDownConfig) { if (tableProperty == null) { tableProperty = new TableProperty(Maps.newHashMap()); } - tableProperty.modifyTableProperties(externalCoolDownConfig.toProperties()); + tableProperty.modifyTableProperties(externalCoolDownConfig.getProperties()); tableProperty.setExternalCoolDownConfig(externalCoolDownConfig); } @@ -3335,6 +3336,10 @@ public Map getProperties() { // unique properties properties.putAll(getUniqueProperties()); + if (getCurExternalCoolDownConfig() != null) { + properties.putAll(getCurExternalCoolDownConfig().getProperties()); + } + return properties; } @@ -3424,11 +3429,20 @@ public String getExternalCoolDownTarget() { return null; } + public TableName getExternalCoolDownTargetTableName() { + String tableName = getExternalCoolDownTarget(); + if (Strings.isNullOrEmpty(tableName)) { + return null; + } + List pieces = Splitter.on(".").splitToList(tableName); + return new TableName(pieces.get(0), pieces.get(1), pieces.get(2)); + } + public Table getExternalCoolDownTable() { - if (tableProperty == null) { + TableName tableName = getExternalCoolDownTargetTableName(); + if (tableName == null) { return null; } - TableName tableName = TableName.fromString(tableProperty.getExternalCoolDownTarget()); Optional table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(tableName); return table.orElse(null); } diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java b/fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java index aad0959be68856..12ccb0f2413318 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java @@ -53,7 +53,7 @@ import com.starrocks.common.util.PropertyAnalyzer; import com.starrocks.common.util.TimeUtils; import com.starrocks.common.util.WriteQuorum; -import com.starrocks.externalcooldown.ExternalCoolDownConfig; +import com.starrocks.externalcooldown.ExternalCooldownConfig; import com.starrocks.lake.StorageInfo; import com.starrocks.persist.OperationType; import com.starrocks.persist.gson.GsonPostProcessable; @@ -295,7 +295,7 @@ public static String valueList() { private boolean useFastSchemaEvolution; private PeriodDuration dataCachePartitionDuration; - private ExternalCoolDownConfig externalCoolDownConfig; + private ExternalCooldownConfig externalCoolDownConfig; private Multimap location; @@ -385,7 +385,7 @@ public TableProperty buildProperty(short opCode) { buildConstraint(); break; case OperationType.OP_MODIFY_EXTERNAL_COOLDOWN_CONFIG: - buildExternalCoolDownConfig(); + buildExternalCooldownConfig(); break; default: break; @@ -430,19 +430,19 @@ public void setBinlogConfig(BinlogConfig binlogConfig) { this.binlogConfig = binlogConfig; } - public TableProperty buildExternalCoolDownConfig() { + public TableProperty buildExternalCooldownConfig() { String externalCoolDownTarget = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET); String externalCoolDownSchedule = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE); long externalCoolDownWaitSecond = Long.parseLong(properties.getOrDefault( PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND, - String.valueOf(0))); - externalCoolDownConfig = new ExternalCoolDownConfig( + String.valueOf(-1L))); + externalCoolDownConfig = new ExternalCooldownConfig( externalCoolDownTarget, externalCoolDownSchedule, externalCoolDownWaitSecond); return this; } // just modify externalCoolDownConfig, not properties - public void setExternalCoolDownConfig(ExternalCoolDownConfig externalCoolDownConfig) { + public void setExternalCoolDownConfig(ExternalCooldownConfig externalCoolDownConfig) { this.externalCoolDownConfig = externalCoolDownConfig; } @@ -998,7 +998,7 @@ public BinlogConfig getBinlogConfig() { return binlogConfig; } - public ExternalCoolDownConfig getExternalCoolDownConfig() { + public ExternalCooldownConfig getExternalCoolDownConfig() { return externalCoolDownConfig; } @@ -1107,6 +1107,6 @@ public void gsonPostProcess() throws IOException { buildStorageType(); buildMvProperties(); buildLocation(); - buildExternalCoolDownConfig(); + buildExternalCooldownConfig(); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/common/util/PropertyAnalyzer.java index 5e121a57a113b8..2a9e688db02371 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/util/PropertyAnalyzer.java @@ -72,7 +72,8 @@ import com.starrocks.common.ErrorReport; import com.starrocks.common.FeConstants; import com.starrocks.common.Pair; -import com.starrocks.externalcooldown.ExternalCoolDownConfig; +import com.starrocks.externalcooldown.ExternalCooldownConfig; +import com.starrocks.externalcooldown.ExternalCooldownSchedule; import com.starrocks.lake.DataCacheInfo; import com.starrocks.qe.ConnectContext; import com.starrocks.server.GlobalStateMgr; @@ -250,8 +251,11 @@ public class PropertyAnalyzer { */ public static final String MULTI_LOCATION_LABELS_REGEX = "\\s*" + SINGLE_LOCATION_LABEL_REGEX + "\\s*(,\\s*" + SINGLE_LOCATION_LABEL_REGEX + "){0,9}\\s*"; + // external cooldown prefix + public static final String PROPERTIES_EXTERNAL_COOLDOWN_PREFIX = "external_cooldown"; // "external_cooldown_target"="iceberg_catalog.iceberg_db.iceberg_tbl", public static final String PROPERTIES_EXTERNAL_COOLDOWN_TARGET = "external_cooldown_target"; + public static final String PROPERTIES_EXTERNAL_COOLDOWN_CONFIG = "external_cooldown_config"; // "external_cooldown_schedule"="START END EVERY INTERVAL " public static final String PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE = "external_cooldown_schedule"; @@ -1670,8 +1674,8 @@ public static DataProperty analyzeMVDataProperty(MaterializedView materializedVi return dataProperty; } - public static ExternalCoolDownConfig analyzeExternalCoolDownConfig(Map properties) throws AnalysisException { - ExternalCoolDownConfig externalCoolDownConfig = new ExternalCoolDownConfig(); + public static ExternalCooldownConfig analyzeExternalCoolDownConfig(Map properties) throws AnalysisException { + ExternalCooldownConfig externalCoolDownConfig = new ExternalCooldownConfig(); if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET)) { String target = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET); @@ -1698,12 +1702,9 @@ public static ExternalCoolDownConfig analyzeExternalCoolDownConfig(Map properties, } return TimeUtils.parseDate(text, PrimitiveType.DATETIME).getTime(); } + + public static long analyzeExternalCooldownSyncedTimeMs(Map properties) throws AnalysisException { + long coldDownSyncedTimeMs = -1L; + if (properties != null && properties.containsKey(PROPERTIES_EXTERNAL_COOLDOWN_SYNCED_TIME)) { + String coldDownSyncedTimeMsStr = properties.get(PROPERTIES_EXTERNAL_COOLDOWN_SYNCED_TIME); + if (coldDownSyncedTimeMsStr.isEmpty()) { + coldDownSyncedTimeMs = 0L; + } else { + coldDownSyncedTimeMs = TimeUtils.timeStringToLong(coldDownSyncedTimeMsStr); + if (coldDownSyncedTimeMs == -1) { + throw new AnalysisException(PROPERTIES_EXTERNAL_COOLDOWN_SYNCED_TIME + " format error."); + } + } + } + + return coldDownSyncedTimeMs; + } + + public static long analyzeExternalCooldownConsistencyCheckTimeMs(Map properties) throws AnalysisException { + long coldDownConsistencyCheckTimeMs = -1L; + if (properties != null && properties.containsKey(PROPERTIES_EXTERNAL_COOLDOWN_CONSISTENCY_CHECK_TIME)) { + String coldDownConsistencyCheckTimeMsStr = properties.get(PROPERTIES_EXTERNAL_COOLDOWN_CONSISTENCY_CHECK_TIME); + if (coldDownConsistencyCheckTimeMsStr.isEmpty()) { + coldDownConsistencyCheckTimeMs = 0L; + } else { + coldDownConsistencyCheckTimeMs = TimeUtils.timeStringToLong(coldDownConsistencyCheckTimeMsStr); + if (coldDownConsistencyCheckTimeMs == -1) { + throw new AnalysisException(PROPERTIES_EXTERNAL_COOLDOWN_CONSISTENCY_CHECK_TIME + " format error."); + } + } + } + + return coldDownConsistencyCheckTimeMs; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCoolDownConfig.java b/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownConfig.java similarity index 77% rename from fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCoolDownConfig.java rename to fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownConfig.java index 40eff674f54f8f..41d30e98e4778a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCoolDownConfig.java +++ b/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownConfig.java @@ -26,7 +26,7 @@ import java.util.HashMap; import java.util.Map; -public class ExternalCoolDownConfig implements Writable { +public class ExternalCooldownConfig implements Writable { @SerializedName("target") private String target; @@ -36,22 +36,40 @@ public class ExternalCoolDownConfig implements Writable { @SerializedName("waitSecond") private long waitSecond; - public ExternalCoolDownConfig(String target, String schedule, long waitSecond) { + public ExternalCooldownConfig(String target, String schedule, long waitSecond) { this.target = target; this.schedule = schedule; this.waitSecond = waitSecond; } - public ExternalCoolDownConfig(ExternalCoolDownConfig externalCoolDownConfig) { - this.target = externalCoolDownConfig.target; - this.schedule = externalCoolDownConfig.schedule; - this.waitSecond = externalCoolDownConfig.waitSecond; + public ExternalCooldownConfig(ExternalCooldownConfig externalCoolDownConfig) { + target = null; + schedule = null; + waitSecond = 0; + if (externalCoolDownConfig != null) { + target = externalCoolDownConfig.target; + schedule = externalCoolDownConfig.schedule; + waitSecond = externalCoolDownConfig.waitSecond; + } } - public ExternalCoolDownConfig() { + public ExternalCooldownConfig() { this(null, null, 0); } + public boolean isReadyForAutoCooldown() { + if (target == null || target.isEmpty()) { + return false; + } + if (waitSecond <= 0) { + return false; + } + if (schedule == null || schedule.isEmpty()) { + return false; + } + return true; + } + public void buildFromProperties(Map properties) { if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET)) { target = properties.get( @@ -91,7 +109,7 @@ public void setWaitSecond(Long waitSecond) { this.waitSecond = waitSecond; } - public Map toProperties() { + public Map getProperties() { Map properties = new HashMap<>(); properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET, target); properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE, schedule); @@ -104,8 +122,8 @@ public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); } - public static ExternalCoolDownConfig read(DataInput in) throws IOException { - return GsonUtils.GSON.fromJson(Text.readString(in), ExternalCoolDownConfig.class); + public static ExternalCooldownConfig read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), ExternalCooldownConfig.class); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownPartitionSelector.java b/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownPartitionSelector.java index 4f7861adb95fba..7406732f733f62 100644 --- a/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownPartitionSelector.java +++ b/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownPartitionSelector.java @@ -74,6 +74,10 @@ public ExternalCooldownPartitionSelector(Database db, OlapTable olapTable, public void init() { fullTableName = db.getFullName() + "." + olapTable.getName(); + reloadSatisfiedPartitions(); + } + + public void reloadSatisfiedPartitions() { tableSatisfied = true; partitionInfo = olapTable.getPartitionInfo(); if (olapTable.getExternalCoolDownWaitSecond() == null) { @@ -109,7 +113,6 @@ public void init() { } else { externalCoolDownWaitSeconds = waitSeconds; } - satisfiedPartitions = this.getSatisfiedPartitions(-1); } @@ -155,7 +158,8 @@ private boolean isPartitionSatisfied(Partition partition) { fullTableName, partition.getName()); return false; } - if (partitionInfo.getExternalCoolDownConsistencyCheckDifference(partition.getId()) == 0) { + Long diff = partitionInfo.getExternalCoolDownConsistencyCheckDifference(partition.getId()); + if (diff == null || diff == 0) { // has consistency check after external cool down, and check result ok LOG.debug("table [{}] partition[{}] external cool down consistency check result ok", fullTableName, partition.getName()); @@ -167,7 +171,10 @@ private boolean isPartitionSatisfied(Partition partition) { public Partition getOneSatisfiedPartition() { if (satisfiedPartitions.isEmpty()) { - return null; + reloadSatisfiedPartitions(); + if (satisfiedPartitions.isEmpty()) { + return null; + } } return satisfiedPartitions.remove(0); } diff --git a/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownSchedule.java b/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownSchedule.java new file mode 100644 index 00000000000000..abc51e35bbab9d --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/externalcooldown/ExternalCooldownSchedule.java @@ -0,0 +1,147 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.externalcooldown; + +import java.text.SimpleDateFormat; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +public class ExternalCooldownSchedule { + private static final Pattern SCHEDULE_PATTERN = Pattern.compile( + "\\s*START\\s+(?\\d{2}:\\d{2})\\s+END\\s+(?\\d{2}:\\d{2})\\s+EVERY\\s+" + + "INTERVAL\\s+(?\\d+)\\s+(?(HOUR|MINUTE|SECOND))\\s*", + Pattern.CASE_INSENSITIVE); + private static final SimpleDateFormat TIME_FORMAT = new SimpleDateFormat("HH:mm"); + + private final String start; + private final String end; + private final long interval; + private final String unit; + private long lastScheduleMs = 0L; + + + public ExternalCooldownSchedule(String start, String end, long interval, String unit) { + this.start = start; + this.end = end; + this.interval = interval; + this.unit = unit; + } + + public static boolean validateScheduleString(String schedule) { + Matcher matcher = SCHEDULE_PATTERN.matcher(schedule); + if (!matcher.matches()) { + return false; + } + ExternalCooldownSchedule externalCooldownSchedule = fromString(schedule); + String start = externalCooldownSchedule.getStart(); + if (start.compareTo("23:59") > 0 || start.compareTo("00:00") < 0) { + return false; + } + String end = externalCooldownSchedule.getEnd(); + if (end.compareTo("23:59") > 0 || end.compareTo("00:00") < 0) { + return false; + } + return true; + } + + public long getIntervalSeconds() { + long intervalSeconds; + switch (unit.toUpperCase()) { + case "HOUR": + intervalSeconds = interval * 3600; + break; + case "MINUTE": + intervalSeconds = interval * 60; + break; + case "SECOND": + intervalSeconds = interval; + break; + default: + intervalSeconds = interval; + break; + } + return intervalSeconds; + } + + public static ExternalCooldownSchedule fromString(String schedule) { + Matcher matcher = SCHEDULE_PATTERN.matcher(schedule); + if (!matcher.matches()) { + return null; + } + String start = matcher.group("start"); + String end = matcher.group("end"); + String interval = matcher.group("interval"); + String unit = matcher.group("unit"); + long intervalValue = Long.parseLong(interval); + return new ExternalCooldownSchedule(start, end, intervalValue, unit); + } + + public String getStart() { + return start; + } + + public String getEnd() { + return end; + } + + public long getInterval() { + return interval; + } + + public String getUnit() { + return unit; + } + + @Override + public String toString() { + return String.format("START %s END %s EVERY INTERVAL %s %s", start, end, interval, unit); + } + + public long getLastScheduleMs() { + return lastScheduleMs; + } + + public void setLastScheduleMs(long lastScheduleMs) { + this.lastScheduleMs = lastScheduleMs; + } + + public boolean trySchedule() { + long currentMs = System.currentTimeMillis(); + String s = TIME_FORMAT.format(currentMs); + if (end.compareTo(start) < 0) { + // ex: [start=23:00, end=07:00) + if (!(s.compareTo(start) >= 0 || s.compareTo(end) < 0)) { + return false; + } + } else if (end.compareTo(start) > 0) { + // ex: [start=01:00, end=07:00) + if (s.compareTo(start) < 0) { + return false; + } + if (s.compareTo(end) >= 0) { + return false; + } + } else { + // never schedule if start == end + return false; + } + if ((currentMs - getLastScheduleMs()) / 1000 < getIntervalSeconds()) { + return false; + } + lastScheduleMs = currentMs; + return true; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java b/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java index e9d7d7d4b25ce0..4181e0ac3fa6b1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java @@ -131,6 +131,7 @@ import com.starrocks.persist.gson.GsonUtils; import com.starrocks.plugin.PluginInfo; import com.starrocks.scheduler.Task; +import com.starrocks.scheduler.externalcooldown.ExternalCooldownMaintenanceJob; import com.starrocks.scheduler.mv.MVEpoch; import com.starrocks.scheduler.mv.MVMaintenanceJob; import com.starrocks.scheduler.persist.ArchiveTaskRunsLog; @@ -766,7 +767,10 @@ public void readFields(DataInput in) throws IOException { } case OperationType.OP_MODIFY_EXTERNAL_COOLDOWN_CONFIG: { data = ModifyTablePropertyOperationLog.read(in); - isRead = true; + break; + } + case OperationType.OP_EXTERNAL_COOLDOWN_JOB_STATE: { + data = GsonUtils.GSON.fromJson(Text.readString(in), ExternalCooldownMaintenanceJob.class); break; } default: { diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java b/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java index 9328da3b35b0c3..03a0ffcc688576 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java @@ -82,6 +82,7 @@ import com.starrocks.proto.EncryptionKeyPB; import com.starrocks.replication.ReplicationJob; import com.starrocks.scheduler.Task; +import com.starrocks.scheduler.externalcooldown.ExternalCooldownMaintenanceJob; import com.starrocks.scheduler.mv.MVEpoch; import com.starrocks.scheduler.mv.MVMaintenanceJob; import com.starrocks.scheduler.persist.ArchiveTaskRunsLog; @@ -1089,6 +1090,11 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal) GlobalStateMgr.getCurrentState().getKeyMgr().replayAddKey(keyPB); break; } + case OperationType.OP_EXTERNAL_COOLDOWN_JOB_STATE: { + ExternalCooldownMaintenanceJob job = (ExternalCooldownMaintenanceJob) journal.getData(); + GlobalStateMgr.getCurrentState().getExternalCooldownMgr().replay(job); + break; + } default: { if (Config.metadata_ignore_unknown_operation_type) { LOG.warn("UNKNOWN Operation Type {}", opCode); @@ -1931,4 +1937,8 @@ public void logCancelDisableDisk(CancelDisableDiskInfo info) { public void logRecoverPartitionVersion(PartitionVersionRecoveryInfo info) { logEdit(OperationType.OP_RECOVER_PARTITION_VERSION, info); } + + public void logExternalCooldownJobState(ExternalCooldownMaintenanceJob job) { + logEdit(OperationType.OP_EXTERNAL_COOLDOWN_JOB_STATE, job); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java b/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java index 8a7292ed4c1ad6..9102b8b3effc02 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java @@ -583,7 +583,8 @@ public class OperationType { // External cool down config @IgnorableOnReplayFailed public static final short OP_MODIFY_EXTERNAL_COOLDOWN_CONFIG = 13600; - + @IgnorableOnReplayFailed + public static final short OP_EXTERNAL_COOLDOWN_JOB_STATE = 13601; /** * NOTICE: OperationType cannot use a value exceeding 20000, and an error will be reported if it exceeds */ diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java b/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java index f4db1e9e235f91..2395b57120a11a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java @@ -97,6 +97,8 @@ public int getId() { public static final SRMetaBlockID PIPE_MGR = new SRMetaBlockID(32); + public static final SRMetaBlockID EXTERNAL_COOLDOWN_MGR = new SRMetaBlockID(33); + @Override public String toString() { return String.valueOf(id); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java index e55b93f454e494..0dc342b4fd13fb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java @@ -403,7 +403,7 @@ public ShowResultSet visitCancelRefreshMaterializedViewStatement(CancelRefreshMa return null; } - private static ExecuteOption getCooldownExecuteOption(CreateExternalCooldownStmt externalCooldownStmt) { + public static ExecuteOption getCooldownExecuteOption(CreateExternalCooldownStmt externalCooldownStmt) { boolean force = externalCooldownStmt.isForceRefresh(); PartitionRangeDesc range = externalCooldownStmt.getPartitionRangeDesc(); HashMap taskRunProperties = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskBuilder.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskBuilder.java index 484fd192d5074c..ceab76288f9d8c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskBuilder.java @@ -16,12 +16,17 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import com.google.common.collect.Range; import com.starrocks.alter.OptimizeTask; import com.starrocks.analysis.IntLiteral; import com.starrocks.analysis.TableName; import com.starrocks.catalog.Database; import com.starrocks.catalog.MaterializedView; import com.starrocks.catalog.OlapTable; +import com.starrocks.catalog.Partition; +import com.starrocks.catalog.PartitionInfo; +import com.starrocks.catalog.PartitionKey; +import com.starrocks.catalog.RangePartitionInfo; import com.starrocks.catalog.Table; import com.starrocks.common.Config; import com.starrocks.common.DdlException; @@ -44,9 +49,11 @@ import com.starrocks.warehouse.Warehouse; import org.jetbrains.annotations.NotNull; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import static com.starrocks.scheduler.PartitionBasedCooldownProcessor.PARTITION_ID; import static com.starrocks.scheduler.TaskRun.MV_ID; // TaskBuilder is responsible for converting Stmt to Task Class @@ -296,7 +303,7 @@ public static Task buildExternalCooldownTask(CreateExternalCooldownStmt external task.setDbName(db.getOriginName()); Map taskProperties = getExternalCooldownTaskProperties(externalCooldownStmt, table); - task.setDefinition(String.format("INSERT INTO %s SELECT * FROM %s", + task.setDefinition(String.format("INSERT OVERWRITE %s SELECT * FROM %s", TableName.fromString(olapTable.getExternalCoolDownTarget()).toSql(), externalCooldownStmt.getTableName().toSql())); task.setProperties(taskProperties); @@ -305,6 +312,54 @@ public static Task buildExternalCooldownTask(CreateExternalCooldownStmt external return task; } + public static Task buildExternalCooldownTask(Database db, OlapTable olapTable, Partition partition) { + Task task = new Task(getExternalCooldownTaskName(olapTable.getId(), partition.getId()) + + "-" + System.currentTimeMillis()); + task.setSource(Constants.TaskSource.EXTERNAL_COOLDOWN); + task.setDbName(db.getOriginName()); + Map taskProperties = getExternalCooldownTaskProperties(olapTable, partition); + task.setDefinition(String.format("INSERT OVERWRITE %s SELECT * FROM %s", + olapTable.getExternalCoolDownTargetTableName().toSql(), + (new TableName(db.getOriginName(), olapTable.getName())).toSql() + )); + task.setProperties(taskProperties); + task.setExpireTime(0L); + handleSpecialTaskProperties(task); + return task; + } + + @NotNull + private static Map getExternalCooldownTaskProperties(OlapTable table, Partition partition) { + Map taskProperties = Maps.newHashMap(); + taskProperties.put(PartitionBasedCooldownProcessor.TABLE_ID, String.valueOf(table.getId())); + PartitionInfo partitionInfo = table.getPartitionInfo(); + RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo; + Range range = rangePartitionInfo.getRange(partition.getId()); + taskProperties.put(PARTITION_ID, String.valueOf(partition.getId())); + taskProperties.put(PartitionBasedCooldownProcessor.PARTITION_START, + String.valueOf(range.lowerEndpoint().getKeys().get(0).getStringValue())); + taskProperties.put(PartitionBasedCooldownProcessor.PARTITION_END, + String.valueOf(range.upperEndpoint().getKeys().get(0).getStringValue())); + taskProperties.put(PartitionBasedCooldownProcessor.TABLE_ID, String.valueOf(table.getId())); + return taskProperties; + } + + public static ExecuteOption getCooldownExecuteOption(OlapTable table, Partition partition) { + PartitionInfo partitionInfo = table.getPartitionInfo(); + RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo; + Range range = rangePartitionInfo.getRange(partition.getId()); + HashMap taskRunProperties = new HashMap<>(); + taskRunProperties.put(PARTITION_ID, String.valueOf(partition.getId())); + taskRunProperties.put(TaskRun.PARTITION_START, range.lowerEndpoint().getKeys().get(0).getStringValue()); + taskRunProperties.put(TaskRun.PARTITION_END, range.upperEndpoint().getKeys().get(0).getStringValue()); + taskRunProperties.put(TaskRun.FORCE, Boolean.toString(false)); + ExecuteOption executeOption = new ExecuteOption( + Constants.TaskRunPriority.HIGH.value(), false, taskRunProperties); + executeOption.setManual(true); + executeOption.setSync(false); + return executeOption; + } + @NotNull private static Map getExternalCooldownTaskProperties(CreateExternalCooldownStmt stmt, Table table) { Map taskProperties = Maps.newHashMap(); @@ -315,7 +370,7 @@ private static Map getExternalCooldownTaskProperties(CreateExter taskProperties.put(PartitionBasedCooldownProcessor.PARTITION_END, String.valueOf(stmt.getPartitionRangeDesc().getPartitionEnd())); } - taskProperties.put(PartitionBasedCooldownProcessor.FORCE, String.valueOf(table.getId())); + taskProperties.put(PartitionBasedCooldownProcessor.TABLE_ID, String.valueOf(table.getId())); return taskProperties; } @@ -326,4 +381,8 @@ public static String getMvTaskName(long mvId) { public static String getExternalCooldownTaskName(Long tableId) { return "external-cooldown-" + tableId; } + + public static String getExternalCooldownTaskName(Long tableId, long partitionId) { + return "external-cooldown-" + tableId + "-" + partitionId; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownJobExecutor.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownJobExecutor.java new file mode 100644 index 00000000000000..01a654945c9e6a --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownJobExecutor.java @@ -0,0 +1,71 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +package com.starrocks.scheduler.externalcooldown; + +import com.starrocks.common.util.FrontendDaemon; +import com.starrocks.server.GlobalStateMgr; +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.stream.Collectors; + + +public class ExternalCooldownJobExecutor extends FrontendDaemon { + private static final long EXECUTOR_INTERVAL_MILLIS = 5000; + private static final Logger LOG = LogManager.getLogger(ExternalCooldownJobExecutor.class); + + public ExternalCooldownJobExecutor() { + super("External Cooldown Job Executor", EXECUTOR_INTERVAL_MILLIS); + } + + @Override + protected void runAfterCatalogReady() { + // initialize if need + GlobalStateMgr.getCurrentState().getExternalCooldownMgr().doInitializeIfNeed(); + + try { + runImpl(); + } catch (Throwable e) { + LOG.error("Failed to run the ExternalCooldownJobExecutor ", e); + } + } + + private void runImpl() { + List jobs = GlobalStateMgr.getCurrentState().getExternalCooldownMgr().getRunnableJobs(); + if (CollectionUtils.isEmpty(jobs)) { + return; + } + + long startMillis = System.currentTimeMillis(); + for (ExternalCooldownMaintenanceJob job : jobs) { + if (!job.isRunnable()) { + LOG.warn("Job {} external cooldown config not satisfied ", job); + continue; + } + try { + job.onSchedule(); + } catch (Exception e) { + LOG.warn("[ExternalCooldownJobExecutor] execute job got exception", e); + } + } + + String jobNameList = jobs.stream().map(x -> x.getOlapTable().getName()).collect(Collectors.joining(", ")); + long duration = System.currentTimeMillis() - startMillis; + LOG.info("[ExternalCooldownJobExecutor] finish schedule batch of jobs in {}ms: {}", duration, jobNameList); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownMaintenanceJob.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownMaintenanceJob.java new file mode 100644 index 00000000000000..61d7efeb7964e2 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownMaintenanceJob.java @@ -0,0 +1,200 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.scheduler.externalcooldown; + +import com.google.common.base.Preconditions; +import com.google.gson.annotations.SerializedName; +import com.starrocks.catalog.Database; +import com.starrocks.catalog.OlapTable; +import com.starrocks.catalog.Partition; +import com.starrocks.catalog.Table; +import com.starrocks.common.io.Text; +import com.starrocks.common.io.Writable; +import com.starrocks.externalcooldown.ExternalCooldownConfig; +import com.starrocks.externalcooldown.ExternalCooldownPartitionSelector; +import com.starrocks.externalcooldown.ExternalCooldownSchedule; +import com.starrocks.persist.gson.GsonPostProcessable; +import com.starrocks.persist.gson.GsonPreProcessable; +import com.starrocks.persist.gson.GsonUtils; +import com.starrocks.scheduler.Constants; +import com.starrocks.scheduler.ExecuteOption; +import com.starrocks.scheduler.Task; +import com.starrocks.scheduler.TaskBuilder; +import com.starrocks.scheduler.TaskManager; +import com.starrocks.server.GlobalStateMgr; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Objects; + + +/** + * Long-running job responsible for external cooldown maintenance. + */ +public class ExternalCooldownMaintenanceJob implements Writable, GsonPreProcessable, GsonPostProcessable { + private static final Logger LOG = LogManager.getLogger(ExternalCooldownMaintenanceJob.class); + + // Persisted state + @SerializedName("jobId") + private final long jobId; + @SerializedName("dbId") + private final long dbId; + @SerializedName("tableId") + private final long tableId; + + // Runtime ephemeral state + // At most one thread could execute this job, this flag indicates is someone scheduling this job + private transient OlapTable olapTable; + private transient ExternalCooldownPartitionSelector partitionSelector; + private transient ExternalCooldownSchedule schedule; + private transient String lastRunTaskName = null; + + public ExternalCooldownMaintenanceJob(OlapTable olapTable, long dbId) { + this.jobId = olapTable.getId(); + this.tableId = olapTable.getId(); + this.olapTable = olapTable; + this.dbId = dbId; + } + + public static ExternalCooldownMaintenanceJob read(DataInput input) throws IOException { + ExternalCooldownMaintenanceJob job = GsonUtils.GSON.fromJson( + Text.readString(input), ExternalCooldownMaintenanceJob.class); + return job; + } + + public void restore() { + if (olapTable == null) { + Table table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(dbId, tableId); + Preconditions.checkState(table != null && table.isOlapTable()); + this.olapTable = (OlapTable) table; + } + if (partitionSelector == null) { + Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId); + Preconditions.checkState(db != null); + partitionSelector = new ExternalCooldownPartitionSelector(db, olapTable); + partitionSelector.init(); + } + if (schedule == null) { + schedule = ExternalCooldownSchedule.fromString(olapTable.getExternalCoolDownSchedule()); + } else { + ExternalCooldownSchedule tmpSchedule = ExternalCooldownSchedule.fromString(olapTable.getExternalCoolDownSchedule()); + if (tmpSchedule != null) { + tmpSchedule.setLastScheduleMs(schedule.getLastScheduleMs()); + schedule = tmpSchedule; + } + } + } + + public void stopJob() { + // stopTasks(); + } + + public void onSchedule() throws Exception { + if (lastRunTaskName != null) { + Task task = GlobalStateMgr.getCurrentState().getTaskManager().getTask(lastRunTaskName); + if (task != null && task.getState() != Constants.TaskState.ACTIVE) { + return; + } + } + if (schedule == null || !schedule.trySchedule()) { + LOG.debug("current time not match external cooldown schedule, skip"); + return; + } + if (!partitionSelector.hasPartitionSatisfied()) { + partitionSelector.reloadSatisfiedPartitions(); + if (!partitionSelector.hasPartitionSatisfied()) { + return; + } + } + Partition partition = partitionSelector.getOneSatisfiedPartition(); + if (partition == null) { + return; + } + LOG.info("create external cooldown task for partition {}", partition); + Database db = GlobalStateMgr.getCurrentState().getMetadataMgr().getDb(dbId); + Task task = TaskBuilder.buildExternalCooldownTask(db, olapTable, partition); + TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager(); + taskManager.killTask(task.getName(), false); + taskManager.createTask(task, false); + ExecuteOption executeOption = TaskBuilder.getCooldownExecuteOption(olapTable, partition); + taskManager.executeTask(task.getName(), executeOption); + lastRunTaskName = task.getName(); + } + + public boolean isRunnable() { + if (olapTable == null) { + return false; + } + ExternalCooldownConfig config = olapTable.getCurExternalCoolDownConfig(); + if (config == null) { + return false; + } + return config.isReadyForAutoCooldown(); + } + + public OlapTable getOlapTable() { + return olapTable; + } + + public long getJobId() { + return jobId; + } + + public long getTableId() { + return tableId; + } + + public long getDbId() { + return dbId; + } + + @Override + public String toString() { + return String.format("ExternalCooldownJob id=%s,dbId=%s,tableId=%d", jobId, dbId, tableId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ExternalCooldownMaintenanceJob that = (ExternalCooldownMaintenanceJob) o; + return jobId == that.jobId && tableId == that.tableId; + } + + @Override + public int hashCode() { + return Objects.hash(jobId, tableId); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + @Override + public void gsonPostProcess() throws IOException { + } + + @Override + public void gsonPreProcess() throws IOException { + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownMgr.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownMgr.java new file mode 100644 index 00000000000000..c989e7af5e63c2 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/externalcooldown/ExternalCooldownMgr.java @@ -0,0 +1,189 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.scheduler.externalcooldown; + +import com.google.common.base.Preconditions; +import com.google.gson.annotations.SerializedName; +import com.starrocks.catalog.OlapTable; +import com.starrocks.common.io.Text; +import com.starrocks.persist.ImageWriter; +import com.starrocks.persist.gson.GsonUtils; +import com.starrocks.persist.metablock.SRMetaBlockEOFException; +import com.starrocks.persist.metablock.SRMetaBlockException; +import com.starrocks.persist.metablock.SRMetaBlockID; +import com.starrocks.persist.metablock.SRMetaBlockReader; +import com.starrocks.persist.metablock.SRMetaBlockWriter; +import com.starrocks.server.GlobalStateMgr; +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * Manage lifetime of external cooldown, including create, build, refresh, destroy + */ +public class ExternalCooldownMgr { + private static final Logger LOG = LogManager.getLogger(ExternalCooldownMgr.class); + + private final Map jobMap = new ConcurrentHashMap<>(); + + private boolean initialized = false; + + /** + * Reload jobs from meta store + */ + public long reload(DataInputStream input, long checksum) throws IOException { + Preconditions.checkState(jobMap.isEmpty()); + + try { + String str = Text.readString(input); + SerializedJobs data = GsonUtils.GSON.fromJson(str, SerializedJobs.class); + if (CollectionUtils.isNotEmpty(data.jobList)) { + for (ExternalCooldownMaintenanceJob job : data.jobList) { + job.restore(); + jobMap.put(job.getTableId(), job); + } + LOG.info("reload external cooldown maintenance jobs: {}", data.jobList); + LOG.debug("reload external cooldown maintenance job details: {}", str); + } + checksum ^= data.jobList.size(); + } catch (EOFException ignored) { + } + return checksum; + } + + /** + * Replay from journal + */ + public void replay(ExternalCooldownMaintenanceJob job) throws IOException { + try { + job.restore(); + jobMap.put(job.getTableId(), job); + LOG.info("Replay external cooldown maintenance jobs: {}", job); + } catch (Exception e) { + LOG.warn("Replay external cooldown maintenance job failed: {}", job); + LOG.warn("Failed to replay external cooldown maintenance job", e); + } + } + + /** + * Store jobs in meta store + */ + public long store(DataOutputStream output, long checksum) throws IOException { + SerializedJobs data = new SerializedJobs(); + data.jobList = new ArrayList<>(jobMap.values()); + String json = GsonUtils.GSON.toJson(data); + Text.writeString(output, json); + checksum ^= data.jobList.size(); + return checksum; + } + + public void prepareMaintenanceWork(long dbId, OlapTable olapTable) { + ExternalCooldownMaintenanceJob job = jobMap.get(olapTable.getId()); + if (job != null) { + job.restore(); + return; + } + try { + // Create the job but not execute it + job = new ExternalCooldownMaintenanceJob(olapTable, dbId); + Preconditions.checkState(jobMap.putIfAbsent(job.getTableId(), job) == null, "job already existed"); + + GlobalStateMgr.getCurrentState().getEditLog().logExternalCooldownJobState(job); + LOG.info("create the maintenance job for external cooldown table: {}", olapTable.getName()); + } catch (Exception e) { + jobMap.remove(olapTable.getId()); + LOG.warn("prepare external cooldown for {} failed, ", olapTable.getName(), e); + } + + if (job != null) { + job.restore(); + } + } + + /** + * Stop the maintenance job for external cooldown after dropped + */ + public void stopMaintainExternalCooldown(OlapTable olapTable) { + ExternalCooldownMaintenanceJob job = getJob(olapTable.getId()); + if (job == null) { + LOG.warn("external cooldown job not exists {}", olapTable.getName()); + return; + } + job.stopJob(); + jobMap.remove(olapTable.getId()); + LOG.info("Remove maintenance job for external cooldown: {}", olapTable.getName()); + } + + private ExternalCooldownMaintenanceJob getJob(long tblId) { + return jobMap.get(tblId); + } + + public List getRunnableJobs() { + return this.jobMap.values().stream().filter(ExternalCooldownMaintenanceJob::isRunnable).collect(Collectors.toList()); + } + + static class SerializedJobs { + @SerializedName("jobList") + List jobList; + } + + public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockException { + int numJson = 1 + jobMap.size(); + SRMetaBlockWriter writer = imageWriter.getBlockWriter(SRMetaBlockID.EXTERNAL_COOLDOWN_MGR, numJson); + writer.writeInt(jobMap.size()); + for (ExternalCooldownMaintenanceJob job : jobMap.values()) { + writer.writeJson(job); + } + + writer.close(); + } + + public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException { + int numJson = reader.readInt(); + for (int i = 0; i < numJson; ++i) { + ExternalCooldownMaintenanceJob job = reader.readJson(ExternalCooldownMaintenanceJob.class); + job.restore(); + jobMap.put(job.getTableId(), job); + } + } + + public void doInitializeIfNeed() { + if (!initialized) { + reloadJobs(); + this.initialized = true; + } + } + + public void reloadJobs() { + for (ExternalCooldownMaintenanceJob job : jobMap.values()) { + try { + job.restore(); + } catch (IllegalStateException e) { + LOG.warn("restore job {} failed, will, ", job.getTableId(), e); + this.jobMap.remove(job.getTableId()); + } + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index 3be54f2f263fcf..9d560f3c4f17c8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -198,6 +198,8 @@ import com.starrocks.rpc.ThriftRPCRequestExecutor; import com.starrocks.scheduler.MVActiveChecker; import com.starrocks.scheduler.TaskManager; +import com.starrocks.scheduler.externalcooldown.ExternalCooldownJobExecutor; +import com.starrocks.scheduler.externalcooldown.ExternalCooldownMgr; import com.starrocks.scheduler.history.TableKeeper; import com.starrocks.scheduler.mv.MVJobExecutor; import com.starrocks.scheduler.mv.MaterializedViewMgr; @@ -303,6 +305,7 @@ public class GlobalStateMgr { private final StreamLoadMgr streamLoadMgr; private final ExportMgr exportMgr; private final MaterializedViewMgr materializedViewMgr; + private final ExternalCooldownMgr externalCooldownMgr; private final ConsistencyChecker consistencyChecker; private final BackupHandler backupHandler; @@ -400,6 +403,8 @@ public class GlobalStateMgr { private final MVJobExecutor mvMVJobExecutor; + private final ExternalCooldownJobExecutor externalCooldownJobExecutor; + private final SmallFileMgr smallFileMgr; private final DynamicPartitionScheduler dynamicPartitionScheduler; @@ -628,6 +633,7 @@ private GlobalStateMgr(boolean isCkptGlobalState, NodeMgr nodeMgr) { this.routineLoadMgr = new RoutineLoadMgr(); this.exportMgr = new ExportMgr(); this.materializedViewMgr = new MaterializedViewMgr(); + this.externalCooldownMgr = new ExternalCooldownMgr(); this.consistencyChecker = new ConsistencyChecker(); this.lock = new QueryableReentrantLock(true); @@ -696,6 +702,7 @@ private GlobalStateMgr(boolean isCkptGlobalState, NodeMgr nodeMgr) { this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadMgr); this.routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadMgr); this.mvMVJobExecutor = new MVJobExecutor(); + this.externalCooldownJobExecutor = new ExternalCooldownJobExecutor(); this.smallFileMgr = new SmallFileMgr(); @@ -1383,6 +1390,7 @@ private void startLeaderOnlyDaemonThreads() { taskManager.start(); taskCleaner.start(); mvMVJobExecutor.start(); + externalCooldownJobExecutor.start(); pipeListener.start(); pipeScheduler.start(); mvActiveChecker.start(); @@ -1524,6 +1532,7 @@ public void loadImage(String imageDir) throws IOException { .put(SRMetaBlockID.REPLICATION_MGR, replicationMgr::load) .put(SRMetaBlockID.KEY_MGR, keyMgr::load) .put(SRMetaBlockID.PIPE_MGR, pipeManager.getRepo()::load) + .put(SRMetaBlockID.EXTERNAL_COOLDOWN_MGR, externalCooldownMgr::load) .build(); Set metaMgrMustExists = new HashSet<>(loadImages.keySet()); @@ -1723,6 +1732,7 @@ public void saveImage(ImageWriter imageWriter, File curFile) throws IOException replicationMgr.save(imageWriter); keyMgr.save(imageWriter); pipeManager.getRepo().save(imageWriter); + externalCooldownMgr.save(imageWriter); } catch (SRMetaBlockException e) { LOG.error("Save meta block failed ", e); throw new IOException("Save meta block failed ", e); @@ -2145,6 +2155,10 @@ public MaterializedViewMgr getMaterializedViewMgr() { return this.materializedViewMgr; } + public ExternalCooldownMgr getExternalCooldownMgr() { + return this.externalCooldownMgr; + } + public SmallFileMgr getSmallFileMgr() { return this.smallFileMgr; } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java index 3c10bebbaa55af..78a79c419a1835 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java @@ -121,7 +121,7 @@ import com.starrocks.common.util.concurrent.lock.Locker; import com.starrocks.connector.ConnectorMetadata; import com.starrocks.connector.exception.StarRocksConnectorException; -import com.starrocks.externalcooldown.ExternalCoolDownConfig; +import com.starrocks.externalcooldown.ExternalCooldownConfig; import com.starrocks.lake.DataCacheInfo; import com.starrocks.lake.LakeMaterializedView; import com.starrocks.lake.LakeTable; @@ -266,6 +266,7 @@ import java.util.stream.Collectors; import javax.validation.constraints.NotNull; +import static com.starrocks.common.util.PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SYNCED_TIME; import static com.starrocks.server.GlobalStateMgr.NEXT_ID_INIT_VALUE; import static com.starrocks.server.GlobalStateMgr.isCheckpointThread; @@ -3606,6 +3607,64 @@ public void alterTableProperties(Database db, OlapTable table, Map partitions = table.getPartitions(); + long newCooldownSyncedTimeMs = 0L; + List clearPartitionIds = new ArrayList<>(); + + for (Partition partition : partitions) { + long partitionId = partition.getId(); + Long oldCoolDownSyncedTimeMs = partitionInfo.getExternalCoolDownSyncedTimeMs(partitionId); + if (oldCoolDownSyncedTimeMs == null || oldCoolDownSyncedTimeMs <= 0) { + continue; + } + clearPartitionIds.add(partitionId); + partitionInfo.setExternalCoolDownSyncedTimeMs(partitionId, newCooldownSyncedTimeMs); + GlobalStateMgr.getCurrentState().getEditLog().logModifyPartition(new ModifyPartitionInfo( + db.getId(), table.getId(), partitionId, + partitionInfo.getDataProperty(partitionId), + partitionInfo.getReplicationNum(partitionId), + partitionInfo.getIsInMemory(partitionId), + newCooldownSyncedTimeMs, -1L + )); + } + if (!clearPartitionIds.isEmpty()) { + LOG.info("clear {} for partition {} as external cooldown target changed", + PROPERTIES_EXTERNAL_COOLDOWN_SYNCED_TIME, clearPartitionIds); + } + } + } } } @@ -3654,6 +3713,20 @@ private Map validateToBeModifiedProps(Map proper String locations = PropertyAnalyzer.analyzeLocation(properties, true); results.put(PropertyAnalyzer.PROPERTIES_LABELS_LOCATION, locations); } + if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET) || + properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND) || + properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE)) { + ExternalCooldownConfig config; + try { + config = PropertyAnalyzer.analyzeExternalCoolDownConfig(properties); + } catch (AnalysisException ex) { + throw new RuntimeException(ex.getMessage()); + } + if (!table.isOlapTable()) { + throw new DdlException("Cannot set external cooldown property for non olap table"); + } + results.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_CONFIG, config); + } if (!properties.isEmpty()) { throw new DdlException("Modify failed because unknown properties: " + properties); } @@ -3801,13 +3874,13 @@ public void modifyBinlogMeta(Database db, OlapTable table, BinlogConfig binlogCo table.setCurBinlogConfig(binlogConfig); } - public void modifyExternalCoolDownMeta(Database db, OlapTable table, ExternalCoolDownConfig externalCoolDownConfig) { + public void modifyExternalCoolDownMeta(Database db, OlapTable table, ExternalCooldownConfig externalCoolDownConfig) { Locker locker = new Locker(); Preconditions.checkArgument(locker.isDbWriteLockHeldByCurrentThread(db)); ModifyTablePropertyOperationLog log = new ModifyTablePropertyOperationLog( db.getId(), table.getId(), - externalCoolDownConfig.toProperties()); + externalCoolDownConfig.getProperties()); GlobalStateMgr.getCurrentState().getEditLog().logModifyExternalCoolDownConfig(log); table.setCurExternalCoolDownConfig(externalCoolDownConfig); diff --git a/fe/fe-core/src/main/java/com/starrocks/server/OlapTableFactory.java b/fe/fe-core/src/main/java/com/starrocks/server/OlapTableFactory.java index b587032db3aa98..4367708c1cd6e0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/OlapTableFactory.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/OlapTableFactory.java @@ -48,7 +48,7 @@ import com.starrocks.common.util.DynamicPartitionUtil; import com.starrocks.common.util.PropertyAnalyzer; import com.starrocks.common.util.Util; -import com.starrocks.externalcooldown.ExternalCoolDownConfig; +import com.starrocks.externalcooldown.ExternalCooldownConfig; import com.starrocks.lake.DataCacheInfo; import com.starrocks.lake.LakeTable; import com.starrocks.lake.StorageInfo; @@ -394,7 +394,7 @@ public Table createTable(LocalMetastore metastore, Database db, CreateTableStmt properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE) || properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND))) { try { - ExternalCoolDownConfig externalCoolDownConfig = PropertyAnalyzer.analyzeExternalCoolDownConfig(properties); + ExternalCooldownConfig externalCoolDownConfig = PropertyAnalyzer.analyzeExternalCoolDownConfig(properties); table.setCurExternalCoolDownConfig(externalCoolDownConfig); LOG.info("create table {} set external cool down config, target = {}, schedule = {}, wait second = {}", tableName, externalCoolDownConfig.getTarget(), externalCoolDownConfig.getSchedule(), diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableClauseAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableClauseAnalyzer.java index 4e64db3378e19a..90512769647079 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableClauseAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableClauseAnalyzer.java @@ -53,6 +53,7 @@ import com.starrocks.common.util.PropertyAnalyzer; import com.starrocks.common.util.TimeUtils; import com.starrocks.common.util.WriteQuorum; +import com.starrocks.externalcooldown.ExternalCooldownSchedule; import com.starrocks.qe.ConnectContext; import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.RunMode; @@ -119,6 +120,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import static com.starrocks.common.util.PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_PREFIX; import static com.starrocks.sql.common.ErrorMsgProxy.PARSER_ERROR_MSG; public class AlterTableClauseAnalyzer implements AstVisitor { @@ -178,7 +180,8 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause clause, if (properties.size() != 1 && !(TableProperty.isSamePrefixProperties(properties, TableProperty.DYNAMIC_PARTITION_PROPERTY_PREFIX) - || TableProperty.isSamePrefixProperties(properties, TableProperty.BINLOG_PROPERTY_PREFIX))) { + || TableProperty.isSamePrefixProperties(properties, TableProperty.BINLOG_PROPERTY_PREFIX) + || TableProperty.isSamePrefixProperties(properties, PROPERTIES_EXTERNAL_COOLDOWN_PREFIX))) { ErrorReport.reportSemanticException(ErrorCode.ERR_COMMON_ERROR, "Can only set one table property at a time"); } @@ -339,13 +342,10 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause clause, } if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE)) { String schedule = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE); - Pattern schedulePattern = Pattern.compile( - "^\\s*START\\s+\\d+:\\d+\\s+END\\s+\\d+:\\d+\\s+EVERY\\s+INTERVAL\\s+\\d+[smh]\\s*$", - Pattern.CASE_INSENSITIVE); - if (!schedulePattern.matcher(schedule).find()) { + if (!ExternalCooldownSchedule.validateScheduleString(schedule)) { ErrorReport.reportSemanticException(ErrorCode.ERR_COMMON_ERROR, "Property " + PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE + - " must be format like `START 01:00 END 07:59 EVERY INTERVAL 1m`"); + " must be format like `START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE`"); } } if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND)) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java index 511fd59cfb45cb..a6a3f1d4ab1867 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java @@ -49,6 +49,7 @@ import com.starrocks.sql.ast.CancelAlterTableStmt; import com.starrocks.sql.ast.CancelCompactionStmt; import com.starrocks.sql.ast.CancelExportStmt; +import com.starrocks.sql.ast.CancelExternalCooldownStmt; import com.starrocks.sql.ast.CancelLoadStmt; import com.starrocks.sql.ast.CancelRefreshMaterializedViewStmt; import com.starrocks.sql.ast.ClearDataCacheRulesStmt; @@ -57,6 +58,7 @@ import com.starrocks.sql.ast.CreateDataCacheRuleStmt; import com.starrocks.sql.ast.CreateDbStmt; import com.starrocks.sql.ast.CreateDictionaryStmt; +import com.starrocks.sql.ast.CreateExternalCooldownStmt; import com.starrocks.sql.ast.CreateFileStmt; import com.starrocks.sql.ast.CreateFunctionStmt; import com.starrocks.sql.ast.CreateMaterializedViewStatement; @@ -1016,5 +1018,17 @@ public Void visitShowDictionaryStatement(ShowDictionaryStmt statement, ConnectCo DictionaryAnalyzer.analyze(statement, context); return null; } + + @Override + public Void visitCreateExternalCooldownStatement(CreateExternalCooldownStmt statement, ConnectContext context) { + ExternalCooldownAnalyzer.analyze(statement, context); + return null; + } + + @Override + public Void visitCancelExternalCooldownStatement(CancelExternalCooldownStmt statement, ConnectContext context) { + ExternalCooldownAnalyzer.analyze(statement, context); + return null; + } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/CancelExternalCooldownAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/CancelExternalCooldownAnalyzer.java deleted file mode 100644 index 4e1cacccb528b8..00000000000000 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/CancelExternalCooldownAnalyzer.java +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - - -package com.starrocks.sql.analyzer; - -import com.google.common.base.Strings; -import com.starrocks.analysis.TableName; -import com.starrocks.common.ErrorCode; -import com.starrocks.common.ErrorReport; -import com.starrocks.qe.ConnectContext; -import com.starrocks.sql.ast.CancelExternalCooldownStmt; - - -public class CancelExternalCooldownAnalyzer { - - public static void analyzeCancelExternalCooldownStmt( - CancelExternalCooldownStmt cancelExternalCooldownStmt, ConnectContext session) { - TableName tableName = cancelExternalCooldownStmt.getTableName(); - if (tableName.getDb() == null) { - String dbName = session.getDatabase(); - if (Strings.isNullOrEmpty(dbName)) { - ErrorReport.reportSemanticException(ErrorCode.ERR_NO_DB_ERROR); - } - tableName.setDb(dbName); - } - cancelExternalCooldownStmt.setTableName(tableName); - } -} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ExternalCooldownAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ExternalCooldownAnalyzer.java index 83c7528a1ac9e2..0b6edc62aef7db 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ExternalCooldownAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ExternalCooldownAnalyzer.java @@ -20,31 +20,56 @@ import com.starrocks.common.ErrorCode; import com.starrocks.common.ErrorReport; import com.starrocks.qe.ConnectContext; +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.CancelExternalCooldownStmt; import com.starrocks.sql.ast.CreateExternalCooldownStmt; +import com.starrocks.sql.ast.StatementBase; import org.apache.commons.collections.MapUtils; import java.util.Map; public class ExternalCooldownAnalyzer { + public static void analyze(StatementBase stmt, ConnectContext session) { + new ExternalCooldownAnalyzer.ExternalCooldownAnalyzerVisitor().visit(stmt, session); + } - public static void analyzeCreateExternalCooldownStmt( - CreateExternalCooldownStmt createExternalCooldownStmt, ConnectContext session) { - TableName tableName = createExternalCooldownStmt.getTableName(); - if (tableName.getDb() == null) { - String dbName = session.getDatabase(); - if (Strings.isNullOrEmpty(dbName)) { - ErrorReport.reportSemanticException(ErrorCode.ERR_NO_DB_ERROR); + static class ExternalCooldownAnalyzerVisitor implements AstVisitor { + @Override + public Void visitCreateExternalCooldownStatement(CreateExternalCooldownStmt createExternalCooldownStmt, + ConnectContext context) { + TableName tableName = createExternalCooldownStmt.getTableName(); + if (tableName.getDb() == null) { + String dbName = context.getDatabase(); + if (Strings.isNullOrEmpty(dbName)) { + ErrorReport.reportSemanticException(ErrorCode.ERR_NO_DB_ERROR); + } + tableName.setDb(dbName); + } + createExternalCooldownStmt.setTableName(tableName); + createExternalCooldownStmt.setPartitionRangeDesc(createExternalCooldownStmt.getPartitionRangeDesc()); + analyzeExternalCooldownProperties(createExternalCooldownStmt.getProperties()); + return null; + } + + public static void analyzeExternalCooldownProperties(Map properties) { + if (MapUtils.isEmpty(properties)) { + return; } - tableName.setDb(dbName); } - createExternalCooldownStmt.setTableName(tableName); - createExternalCooldownStmt.setPartitionRangeDesc(createExternalCooldownStmt.getPartitionRangeDesc()); - analyzeExternalCooldownProperties(createExternalCooldownStmt.getProperties()); - } - public static void analyzeExternalCooldownProperties(Map properties) { - if (MapUtils.isEmpty(properties)) { - return; + @Override + public Void visitCancelExternalCooldownStatement( + CancelExternalCooldownStmt cancelExternalCooldownStmt, ConnectContext context) { + TableName tableName = cancelExternalCooldownStmt.getTableName(); + if (tableName.getDb() == null) { + String dbName = context.getDatabase(); + if (Strings.isNullOrEmpty(dbName)) { + ErrorReport.reportSemanticException(ErrorCode.ERR_NO_DB_ERROR); + } + tableName.setDb(dbName); + } + cancelExternalCooldownStmt.setTableName(tableName); + return null; } } } diff --git a/fe/fe-core/src/test/java/com/starrocks/catalog/CreateTableWithExternalCooldownTest.java b/fe/fe-core/src/test/java/com/starrocks/catalog/CreateTableWithExternalCooldownTest.java new file mode 100644 index 00000000000000..fbcad4ebf20d94 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/catalog/CreateTableWithExternalCooldownTest.java @@ -0,0 +1,144 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file is based on code available under the Apache license here: +// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.catalog; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.starrocks.common.Config; +import com.starrocks.common.FeConstants; +import com.starrocks.connector.ConnectorContext; +import com.starrocks.connector.ConnectorMgr; +import com.starrocks.connector.iceberg.IcebergMetadata; +import com.starrocks.externalcooldown.ExternalCooldownConfig; +import com.starrocks.qe.ConnectContext; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.utframe.StarRocksAssert; +import com.starrocks.utframe.UtFrameUtils; +import mockit.Mock; +import mockit.MockUp; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +/** + * Test create table with location property and check whether + * the replica is placed on the right backends with matching location label. + */ +public class CreateTableWithExternalCooldownTest { + private static ConnectContext connectContext; + private static StarRocksAssert starRocksAssert; + + @Test + public void testCreateTableWithExternalCooldownConfig() throws Exception { + FeConstants.runningUnitTest = true; + Config.alter_scheduler_interval_millisecond = 100; + Config.dynamic_partition_enable = true; + Config.dynamic_partition_check_interval_seconds = 1; + Config.enable_strict_storage_medium_check = false; + Config.enable_experimental_rowstore = true; + UtFrameUtils.createMinStarRocksCluster(); + // create connect context + connectContext = UtFrameUtils.createDefaultCtx(); + starRocksAssert = new StarRocksAssert(connectContext); + starRocksAssert.withDatabase("test").useDatabase("test"); + + Config.default_replication_num = 1; + + starRocksAssert.withDatabase("test"); + starRocksAssert.useDatabase("test"); + + new MockUp() { + @Mock + public Table getTable(String dbName, String tblName) { + return new IcebergTable(1, "iceberg_tbl", "iceberg_catalog", + "iceberg_catalog", "iceberg_db", + "table1", "", Lists.newArrayList(), new BaseTable(null, ""), Maps.newHashMap()); + } + }; + new MockUp() { + @Mock + public String getTableIdentifier() { + return "iceberg_catalog.iceberg_db.iceberg_tbl"; + } + }; + new MockUp() { + @Mock + public PartitionSpec spec() { + return PartitionSpec.unpartitioned(); + } + }; + + ConnectorMgr connectorMgr = GlobalStateMgr.getCurrentState().getConnectorMgr(); + Map properties = Maps.newHashMap(); + + properties.put("type", "iceberg"); + properties.put("iceberg.catalog.type", "hive"); + properties.put("hive.metastore.uris", "thrift://127.0.0.1:9083"); + connectorMgr.createConnector(new ConnectorContext("iceberg_catalog", "iceberg", properties), false); + + starRocksAssert.withTable("CREATE TABLE test.tbl1\n" + + "(\n" + + " k1 date,\n" + + " k2 int,\n" + + " v1 int sum\n" + + ")\n" + + "PARTITION BY RANGE(k1)\n" + + "(\n" + + " PARTITION p1 values [('2024-03-01 00:00:00'),('2024-03-02 00:00:00')),\n" + + " PARTITION p2 values [('2024-03-02 00:00:00'),('2024-03-03 00:00:00')),\n" + + " PARTITION p3 values [('2024-03-03 00:00:00'),('2024-03-04 00:00:00')),\n" + + " PARTITION p4 values [('2024-03-04 00:00:00'),('2024-03-05 00:00:00')),\n" + + " PARTITION p5 values [('2024-03-05 00:00:00'),('2024-03-06 00:00:00'))\n" + + ")\n" + + "DISTRIBUTED BY HASH(k2) BUCKETS 1\n" + + "PROPERTIES(" + + "'replication_num' = '1',\n" + + "'external_cooldown_target' = 'iceberg_catalog.iceberg_db.iceberg_tbl',\n" + + "'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE',\n" + + "'external_cooldown_wait_second' = '60'\n" + + ");"); + Table table = starRocksAssert.getTable("test", "tbl1"); + Assert.assertTrue(table instanceof OlapTable); + OlapTable olapTable = (OlapTable) table; + ExternalCooldownConfig config = olapTable.getCurExternalCoolDownConfig(); + Assert.assertNotNull(config); + Assert.assertEquals("iceberg_catalog.iceberg_db.iceberg_tbl", config.getTarget()); + Assert.assertEquals("START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE", config.getSchedule()); + Assert.assertEquals((Long) 60L, config.getWaitSecond()); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/test/java/com/starrocks/common/proc/PartitionsProcDirTest.java b/fe/fe-core/src/test/java/com/starrocks/common/proc/PartitionsProcDirTest.java index 6a4480464e21f3..63c58d9ba0b910 100644 --- a/fe/fe-core/src/test/java/com/starrocks/common/proc/PartitionsProcDirTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/common/proc/PartitionsProcDirTest.java @@ -18,10 +18,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.starrocks.catalog.Column; +import com.starrocks.catalog.DataProperty; import com.starrocks.catalog.Database; import com.starrocks.catalog.ListPartitionInfo; import com.starrocks.catalog.MaterializedIndex; import com.starrocks.catalog.MaterializedIndex.IndexState; +import com.starrocks.catalog.OlapTable; import com.starrocks.catalog.Partition; import com.starrocks.catalog.PartitionInfo; import com.starrocks.catalog.PartitionType; @@ -42,6 +44,7 @@ public class PartitionsProcDirTest { private Database db; private LakeTable cloudNativTable; + private OlapTable olapTable; @Before public void setUp() throws DdlException, AnalysisException { @@ -59,6 +62,15 @@ public void setUp() throws DdlException, AnalysisException { cloudNativTable.addPartition(new Partition(partitionId, "p1", index, new RandomDistributionInfo(10))); db.registerTableUnlocked(cloudNativTable); + + MaterializedIndex index1 = new MaterializedIndex(1001L, IndexState.NORMAL); + Partition partition = new Partition(1028L, "p2", index1, new RandomDistributionInfo(10)); + List col1 = Lists.newArrayList(new Column("province", Type.VARCHAR)); + PartitionInfo listPartition1 = new ListPartitionInfo(PartitionType.LIST, col1); + listPartition1.addPartition(partition.getId(), DataProperty.getInferredDefaultDataProperty(), (short) 1, true); + olapTable = new OlapTable(1026L, "olap_table", col, null, listPartition1, null); + olapTable.addPartition(partition); + db.registerTableUnlocked(olapTable); } @Test @@ -74,4 +86,17 @@ public void testFetchResult() throws AnalysisException { Assert.assertEquals("NORMAL", list1.get(5)); Assert.assertEquals("province", list1.get(6)); } + + @Test + public void testFetchResult4OlapTable() throws AnalysisException { + BaseProcResult result = (BaseProcResult) new PartitionsProcDir(db, olapTable, false).fetchResult(); + List> rows = result.getRows(); + List list1 = rows.get(0); + Assert.assertEquals("1028", list1.get(0)); + Assert.assertEquals("p2", list1.get(1)); + Assert.assertEquals("1", list1.get(2)); + Assert.assertEquals("\\N", list1.get(20)); + Assert.assertEquals("\\N", list1.get(21)); + Assert.assertEquals("0", list1.get(22)); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownConfigTest.java b/fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownConfigTest.java new file mode 100644 index 00000000000000..048a3a0dab36c1 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownConfigTest.java @@ -0,0 +1,36 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.externalcooldown; + +import org.junit.Assert; +import org.junit.Test; + + +public class ExternalCooldownConfigTest { + + @Test + public void testPartitionStartEnd() throws Exception { + ExternalCooldownConfig config = new ExternalCooldownConfig( + "iceberg.db1.tbl1", "START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE", 3600); + + ExternalCooldownSchedule schedule = ExternalCooldownSchedule.fromString(config.getSchedule()); + Assert.assertNotNull(schedule); + Assert.assertEquals("01:00", schedule.getStart()); + Assert.assertEquals("07:59", schedule.getEnd()); + Assert.assertEquals(1L, schedule.getInterval()); + Assert.assertEquals("MINUTE", schedule.getUnit()); + Assert.assertEquals(60L, schedule.getIntervalSeconds()); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownPartitionSelectorTest.java b/fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownPartitionSelectorTest.java index 9d79103c0ac8f6..f37bebae2de140 100644 --- a/fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownPartitionSelectorTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownPartitionSelectorTest.java @@ -115,7 +115,7 @@ public PartitionSpec spec() { "PROPERTIES(" + "'replication_num' = '1',\n" + "'external_cooldown_target' = 'iceberg_catalog.iceberg_db.iceberg_tbl',\n" + - "'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 1m'\n," + + "'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE'\n," + "'external_cooldown_wait_second' = '1'\n" + ");"); starRocksAssert.withTable("CREATE TABLE test.tbl2\n" + @@ -136,7 +136,7 @@ public PartitionSpec spec() { "PROPERTIES(" + "'replication_num' = '1',\n" + "'external_cooldown_target' = 'iceberg_catalog.iceberg_db.iceberg_tbl',\n" + - "'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 1m'\n," + + "'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE'\n," + "'external_cooldown_wait_second' = '1'\n" + ");"); starRocksAssert.withTable("CREATE TABLE test.tbl3\n" + @@ -157,7 +157,7 @@ public PartitionSpec spec() { "PROPERTIES(" + "'replication_num' = '1',\n" + "'external_cooldown_target' = 'iceberg_catalog.iceberg_db.iceberg_tbl',\n" + - "'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 1m'\n," + + "'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE'\n," + "'external_cooldown_wait_second' = '1'\n" + ");"); starRocksAssert.withTable("CREATE TABLE test.tbl4\n" + @@ -170,7 +170,7 @@ public PartitionSpec spec() { "PROPERTIES(" + "'replication_num' = '1',\n" + "'external_cooldown_target' = 'iceberg_catalog.iceberg_db.iceberg_tbl',\n" + - "'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 1m'\n," + + "'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 10 SECOND'\n," + "'external_cooldown_wait_second' = '1'\n" + ");"); @@ -191,7 +191,7 @@ public PartitionSpec spec() { "PROPERTIES (\n" + " 'replication_num' = '1',\n" + " 'external_cooldown_target' = 'iceberg_catalog.iceberg_db.iceberg_tbl',\n" + - " 'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 1m'\n," + + " 'external_cooldown_schedule' = 'START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE'\n," + " 'external_cooldown_wait_second' = '1'\n" + ")"); } diff --git a/fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownScheduleTest.java b/fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownScheduleTest.java new file mode 100644 index 00000000000000..542c711e2e4d73 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/externalcooldown/ExternalCooldownScheduleTest.java @@ -0,0 +1,36 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.externalcooldown; + +import org.junit.Assert; +import org.junit.Test; + + +public class ExternalCooldownScheduleTest { + + @Test + public void testPartitionStartEnd() throws Exception { + + + String scheduleStr = "START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE"; + ExternalCooldownSchedule schedule = ExternalCooldownSchedule.fromString(scheduleStr); + Assert.assertNotNull(schedule); + Assert.assertEquals("01:00", schedule.getStart()); + Assert.assertEquals("07:59", schedule.getEnd()); + Assert.assertEquals(1L, schedule.getInterval()); + Assert.assertEquals("MINUTE", schedule.getUnit()); + Assert.assertEquals(60L, schedule.getIntervalSeconds()); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/test/java/com/starrocks/persist/ModifyExternalCooldownConfigTest.java b/fe/fe-core/src/test/java/com/starrocks/persist/ModifyExternalCooldownConfigTest.java new file mode 100644 index 00000000000000..27fb2578cbdd77 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/persist/ModifyExternalCooldownConfigTest.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.persist; + +import com.starrocks.common.io.Writable; +import com.starrocks.common.util.PropertyAnalyzer; +import com.starrocks.journal.JournalEntity; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashMap; + +import static com.starrocks.persist.OperationType.OP_MODIFY_EXTERNAL_COOLDOWN_CONFIG; + +public class ModifyExternalCooldownConfigTest { + private String fileName = "./ModifyExternalCooldownConfigTest"; + + @After + public void tearDown() { + File file = new File(fileName); + file.delete(); + } + + @Test + public void testNormal() throws IOException { + // 1. Write objects to file + File file = new File(fileName); + file.createNewFile(); + DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + + HashMap properties = new HashMap<>(); + properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET, "iceberg.db1.tbl1"); + properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND, "3600"); + properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE, + "START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE"); + ModifyTablePropertyOperationLog modifyTablePropertyOperationLog = + new ModifyTablePropertyOperationLog(100L, 200L, properties); + out.writeShort(OP_MODIFY_EXTERNAL_COOLDOWN_CONFIG); + modifyTablePropertyOperationLog.write(out); + out.flush(); + out.close(); + + // 2. Read objects from file + DataInputStream in = new DataInputStream(new FileInputStream(file)); + JournalEntity journalEntity = new JournalEntity(); + journalEntity.readFields(in); + Writable data = journalEntity.getData(); + Assert.assertEquals(OP_MODIFY_EXTERNAL_COOLDOWN_CONFIG, journalEntity.getOpCode()); + Assert.assertTrue(data instanceof ModifyTablePropertyOperationLog); + + ModifyTablePropertyOperationLog readModifyExternalCooldownConfigInfo = (ModifyTablePropertyOperationLog) data; + Assert.assertEquals(readModifyExternalCooldownConfigInfo.getDbId(), 100L); + Assert.assertEquals(readModifyExternalCooldownConfigInfo.getTableId(), 200L); + Assert.assertEquals(readModifyExternalCooldownConfigInfo.getProperties(), properties); + in.close(); + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java b/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java index 189edf0809d737..f865fc1786a326 100644 --- a/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java @@ -148,6 +148,7 @@ public void testRecoverableOperations() { Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_REPLICATION_JOB)); Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DELETE_REPLICATION_JOB)); Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_RESET_FRONTENDS)); + Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_EXTERNAL_COOLDOWN_JOB_STATE)); } @Test diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/ExternalCooldownExecutorTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/ExternalCooldownExecutorTest.java new file mode 100644 index 00000000000000..af0bfae370a598 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/qe/ExternalCooldownExecutorTest.java @@ -0,0 +1,96 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe; + +import com.starrocks.catalog.IcebergTable; +import com.starrocks.catalog.Table; +import com.starrocks.scheduler.Task; +import com.starrocks.scheduler.TaskBuilder; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.MetadataMgr; +import com.starrocks.sql.ast.StatementBase; +import com.starrocks.sql.plan.ConnectorPlanTestBase; +import com.starrocks.statistic.StatisticsMetaManager; +import com.starrocks.utframe.StarRocksAssert; +import com.starrocks.utframe.UtFrameUtils; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class ExternalCooldownExecutorTest { + @ClassRule + public static TemporaryFolder temp = new TemporaryFolder(); + + private static ConnectContext connectContext; + private static StarRocksAssert starRocksAssert; + + @BeforeClass + public static void beforeClass() throws Exception { + ConnectorPlanTestBase.doInit(temp.newFolder().toURI().toString()); + connectContext = UtFrameUtils.createDefaultCtx(); + starRocksAssert = new StarRocksAssert(connectContext); + if (!starRocksAssert.databaseExist("_statistics_")) { + StatisticsMetaManager m = new StatisticsMetaManager(); + m.createStatisticsTablesForTest(); + } + + new MockUp() { + @Mock + public Table getTable(String catalogName, String dbName, String tblName) { + return new IcebergTable(); + } + }; + starRocksAssert.withDatabase("test").useDatabase("test") + .withTable("CREATE TABLE test.tbl1\n" + + "(\n" + + " k1 int,\n" + + " k2 datetime,\n" + + " v1 int\n" + + ")\n" + + "DUPLICATE KEY(`k1`, `k2`)\n" + + "PARTITION BY RANGE(`k2`)\n" + + "(\n" + + " PARTITION p1 VALUES [('2020-01-01 00:00:00'),('2020-01-02 00:00:00')),\n" + + " PARTITION p2 VALUES [('2020-01-02 00:00:00'),('2020-01-03 00:00:00'))\n" + + ")\n" + + "DISTRIBUTED BY HASH(k1) BUCKETS 1\n" + + "PROPERTIES(\n" + + "'external_cooldown_target'='iceberg.db1.tbl1',\n" + + "'external_cooldown_schedule'='START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE',\n" + + "'external_cooldown_wait_second'='3600',\n" + + "'replication_num' = '1'\n" + + ");"); + } + + @Test + public void testCooldownTable() throws Exception { + StatementBase stmt = UtFrameUtils.parseStmtWithNewParser("cooldown table test.tbl1", connectContext); + StmtExecutor executor = new StmtExecutor(connectContext, stmt); + executor.execute(); + + Table table = starRocksAssert.getTable("test", "tbl1"); + String taskName = TaskBuilder.getExternalCooldownTaskName(table.getId()); + Task task = GlobalStateMgr.getCurrentState().getTaskManager().getTask(taskName); + Assert.assertNotNull(task); + + StatementBase stmt1 = UtFrameUtils.parseStmtWithNewParser("cancel cooldown table test.tbl1", connectContext); + StmtExecutor executor1 = new StmtExecutor(connectContext, stmt1); + executor1.execute(); + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/server/LocalMetaStoreTest.java b/fe/fe-core/src/test/java/com/starrocks/server/LocalMetaStoreTest.java index cd551d08a6e55e..02b74110e508d8 100644 --- a/fe/fe-core/src/test/java/com/starrocks/server/LocalMetaStoreTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/server/LocalMetaStoreTest.java @@ -41,6 +41,7 @@ import com.starrocks.common.util.UUIDUtil; import com.starrocks.common.util.concurrent.lock.LockType; import com.starrocks.common.util.concurrent.lock.Locker; +import com.starrocks.externalcooldown.ExternalCooldownConfig; import com.starrocks.persist.EditLog; import com.starrocks.persist.ModifyPartitionInfo; import com.starrocks.persist.PhysicalPartitionPersistInfoV2; @@ -312,4 +313,22 @@ public void testRenameColumnException() throws Exception { } } + @Test + public void testAlterTableExternalCooldownProperties() throws Exception { + Database db = connectContext.getGlobalStateMgr().getLocalMetastore().getDb("test"); + OlapTable table = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), "t1"); + + Map properties = Maps.newHashMap(); + properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE, + "START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE"); + properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET, "iceberg.db1.tbl1"); + properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND, "3600"); + LocalMetastore localMetastore = connectContext.getGlobalStateMgr().getLocalMetastore(); + localMetastore.alterTableProperties(db, table, properties); + ExternalCooldownConfig config = table.getCurExternalCoolDownConfig(); + Assert.assertNotNull(config); + Assert.assertEquals(config.getTarget(), "iceberg.db1.tbl1"); + Assert.assertEquals(config.getSchedule(), "START 01:00 END 07:59 EVERY INTERVAL 1 MINUTE"); + Assert.assertEquals(config.getWaitSecond(), (Long) 3600L); + } }