Skip to content

Commit

Permalink
add external cooldown job manager to run cooldown task at fix rate #5…
Browse files Browse the repository at this point in the history
…1649

Signed-off-by: hoffermei <[email protected]>
  • Loading branch information
hoffermei committed Oct 9, 2024
1 parent a9a8198 commit ddd09de
Show file tree
Hide file tree
Showing 33 changed files with 1,411 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.starrocks.catalog.PartitionType;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TableProperty;
import com.starrocks.catalog.Type;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.DdlException;
Expand Down Expand Up @@ -422,6 +423,9 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause clause,
}
GlobalStateMgr.getCurrentState().getLocalMetastore()
.modifyTableDynamicPartition(db, olapTable, properties);
} else if (TableProperty.isSamePrefixProperties(properties,
PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_PREFIX)) {
GlobalStateMgr.getCurrentState().getLocalMetastore().alterTableProperties(db, olapTable, properties);
} else if (properties.containsKey("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)) {
Preconditions.checkNotNull(properties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM));
GlobalStateMgr.getCurrentState().getLocalMetastore()
Expand Down Expand Up @@ -712,6 +716,12 @@ private void modifyPartitionsProperty(Database db,
TTabletType tTabletType =
PropertyAnalyzer.analyzeTabletType(properties);

// 5. external cooldown synced time
long coolDownSyncedTimeMs = PropertyAnalyzer.analyzeExternalCooldownSyncedTimeMs(properties);

// 6. external cooldown consistency check time
long coolDownCheckTimeMs = PropertyAnalyzer.analyzeExternalCooldownConsistencyCheckTimeMs(properties);

// modify meta here
for (String partitionName : partitionNames) {
Partition partition = olapTable.getPartition(partitionName);
Expand Down Expand Up @@ -762,8 +772,17 @@ private void modifyPartitionsProperty(Database db,
if (tTabletType != partitionInfo.getTabletType(partition.getId())) {
partitionInfo.setTabletType(partition.getId(), tTabletType);
}
if (coolDownSyncedTimeMs != -1L &&
coolDownSyncedTimeMs != partitionInfo.getExternalCoolDownSyncedTimeMs(partition.getId())) {
partitionInfo.setExternalCoolDownSyncedTimeMs(partition.getId(), coolDownSyncedTimeMs);
}
if (coolDownCheckTimeMs != -1L &&
coolDownCheckTimeMs != partitionInfo.getExternalCoolDownConsistencyCheckTimeMs(partition.getId())) {
partitionInfo.setExternalCoolDownConsistencyCheckTimeMs(partition.getId(), coolDownCheckTimeMs);
}
ModifyPartitionInfo info = new ModifyPartitionInfo(db.getId(), olapTable.getId(), partition.getId(),
newDataProperty, newReplicationNum, hasInMemory ? newInMemory : oldInMemory);
newDataProperty, newReplicationNum, hasInMemory ? newInMemory : oldInMemory,
coolDownSyncedTimeMs, coolDownCheckTimeMs);
modifyPartitionInfos.add(info);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import com.starrocks.common.util.concurrent.MarkedCountDownLatch;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.externalcooldown.ExternalCoolDownConfig;
import com.starrocks.persist.TableAddOrDropColumnsInfo;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.ShowResultSet;
Expand Down Expand Up @@ -133,7 +132,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -2132,85 +2130,6 @@ public void updateTableMeta(Database db, String tableName, Map<String, String> p
}
}

public boolean updateExternalCoolDownConfigMeta(Database db, Long tableId, Map<String, String> properties) {
OlapTable olapTable;
ExternalCoolDownConfig newExternalCoolDownConfig;
boolean hasChanged = false;
boolean isModifiedSuccess = true;
Locker locker = new Locker();
locker.lockDatabase(db, LockType.READ);
try {
olapTable = (OlapTable) db.getTable(tableId);
if (olapTable == null) {
return false;
}
if (!olapTable.containsExternalCoolDownConfig()) {
newExternalCoolDownConfig = new ExternalCoolDownConfig();
hasChanged = true;
} else {
newExternalCoolDownConfig = new ExternalCoolDownConfig(olapTable.getCurExternalCoolDownConfig());
}
} finally {
locker.unLockDatabase(db, LockType.READ);
}

// judge whether the attribute has changed
// no exception will be thrown, for the analyzer has checked
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET)) {
String externalCoolDownTarget = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET);
if (!Objects.equals(externalCoolDownTarget, newExternalCoolDownConfig.getTarget())) {
newExternalCoolDownConfig.setTarget(externalCoolDownTarget);
hasChanged = true;
}
}
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE)) {
String externalCoolDownSchedule = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE);
if (!Objects.equals(externalCoolDownSchedule, newExternalCoolDownConfig.getSchedule())) {
newExternalCoolDownConfig.setSchedule(externalCoolDownSchedule);
hasChanged = true;
}
}
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND)) {
long externalCoolDownWaitSecond = Long.parseLong(properties.get(
PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND));
if (externalCoolDownWaitSecond != newExternalCoolDownConfig.getWaitSecond()) {
newExternalCoolDownConfig.setWaitSecond(externalCoolDownWaitSecond);
hasChanged = true;
}
}
if (!hasChanged) {
LOG.info("table {} external cool down config is same as the previous version, so nothing need to do",
olapTable.getName());
return true;
}
locker.lockDatabase(db, LockType.WRITE);
try {
ExternalCoolDownConfig oldExternalCoolDownConfig = olapTable.getCurExternalCoolDownConfig();
GlobalStateMgr.getCurrentState().getLocalMetastore().modifyExternalCoolDownMeta(
db, olapTable, newExternalCoolDownConfig);
if (oldExternalCoolDownConfig != null) {
LOG.info("update external cool down config of table {} successfully, the external cool down config after " +
"modified is : {}, previous is {}",
olapTable.getName(),
olapTable.getCurExternalCoolDownConfig().toString(),
oldExternalCoolDownConfig.toString());
} else {
LOG.info("update external cool down config of table {} successfully, the external cool down config"
+ " after modified is : {}, ",
olapTable.getName(), olapTable.getCurExternalCoolDownConfig().toString());
}
} catch (Exception e) {
// defensive programming, it normally should not throw an exception,
// here is just to ensure that a correct result can be returned
LOG.warn("update external cool down config of table {} failed", olapTable.getName());
isModifiedSuccess = false;
} finally {
locker.unLockDatabase(db, LockType.WRITE);
}

return isModifiedSuccess;
}

// return true means that the modification of FEMeta is successful,
// and as long as the modification of metadata is successful,
// the final consistency will be achieved through the report handler
Expand Down
26 changes: 20 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -87,7 +88,7 @@
import com.starrocks.common.util.Util;
import com.starrocks.common.util.WriteQuorum;
import com.starrocks.common.util.concurrent.MarkedCountDownLatch;
import com.starrocks.externalcooldown.ExternalCoolDownConfig;
import com.starrocks.externalcooldown.ExternalCooldownConfig;
import com.starrocks.lake.DataCacheInfo;
import com.starrocks.lake.StarOSAgent;
import com.starrocks.lake.StorageInfo;
Expand Down Expand Up @@ -486,18 +487,18 @@ public void setBinlogTxnId(long binlogTxnId) {
this.binlogTxnId = binlogTxnId;
}

public ExternalCoolDownConfig getCurExternalCoolDownConfig() {
public ExternalCooldownConfig getCurExternalCoolDownConfig() {
if (tableProperty != null) {
return tableProperty.getExternalCoolDownConfig();
}
return null;
}

public void setCurExternalCoolDownConfig(ExternalCoolDownConfig externalCoolDownConfig) {
public void setCurExternalCoolDownConfig(ExternalCooldownConfig externalCoolDownConfig) {
if (tableProperty == null) {
tableProperty = new TableProperty(Maps.newHashMap());
}
tableProperty.modifyTableProperties(externalCoolDownConfig.toProperties());
tableProperty.modifyTableProperties(externalCoolDownConfig.getProperties());
tableProperty.setExternalCoolDownConfig(externalCoolDownConfig);
}

Expand Down Expand Up @@ -3335,6 +3336,10 @@ public Map<String, String> getProperties() {
// unique properties
properties.putAll(getUniqueProperties());

if (getCurExternalCoolDownConfig() != null) {
properties.putAll(getCurExternalCoolDownConfig().getProperties());
}

return properties;
}

Expand Down Expand Up @@ -3424,11 +3429,20 @@ public String getExternalCoolDownTarget() {
return null;
}

public TableName getExternalCoolDownTargetTableName() {
String tableName = getExternalCoolDownTarget();
if (Strings.isNullOrEmpty(tableName)) {
return null;
}
List<String> pieces = Splitter.on(".").splitToList(tableName);
return new TableName(pieces.get(0), pieces.get(1), pieces.get(2));
}

public Table getExternalCoolDownTable() {
if (tableProperty == null) {
TableName tableName = getExternalCoolDownTargetTableName();
if (tableName == null) {
return null;
}
TableName tableName = TableName.fromString(tableProperty.getExternalCoolDownTarget());
Optional<Table> table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(tableName);
return table.orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.WriteQuorum;
import com.starrocks.externalcooldown.ExternalCoolDownConfig;
import com.starrocks.externalcooldown.ExternalCooldownConfig;
import com.starrocks.lake.StorageInfo;
import com.starrocks.persist.OperationType;
import com.starrocks.persist.gson.GsonPostProcessable;
Expand Down Expand Up @@ -295,7 +295,7 @@ public static String valueList() {
private boolean useFastSchemaEvolution;

private PeriodDuration dataCachePartitionDuration;
private ExternalCoolDownConfig externalCoolDownConfig;
private ExternalCooldownConfig externalCoolDownConfig;

private Multimap<String, String> location;

Expand Down Expand Up @@ -385,7 +385,7 @@ public TableProperty buildProperty(short opCode) {
buildConstraint();
break;
case OperationType.OP_MODIFY_EXTERNAL_COOLDOWN_CONFIG:
buildExternalCoolDownConfig();
buildExternalCooldownConfig();
break;
default:
break;
Expand Down Expand Up @@ -430,19 +430,19 @@ public void setBinlogConfig(BinlogConfig binlogConfig) {
this.binlogConfig = binlogConfig;
}

public TableProperty buildExternalCoolDownConfig() {
public TableProperty buildExternalCooldownConfig() {
String externalCoolDownTarget = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET);
String externalCoolDownSchedule = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE);
long externalCoolDownWaitSecond = Long.parseLong(properties.getOrDefault(
PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND,
String.valueOf(0)));
externalCoolDownConfig = new ExternalCoolDownConfig(
String.valueOf(-1L)));
externalCoolDownConfig = new ExternalCooldownConfig(
externalCoolDownTarget, externalCoolDownSchedule, externalCoolDownWaitSecond);
return this;
}

// just modify externalCoolDownConfig, not properties
public void setExternalCoolDownConfig(ExternalCoolDownConfig externalCoolDownConfig) {
public void setExternalCoolDownConfig(ExternalCooldownConfig externalCoolDownConfig) {
this.externalCoolDownConfig = externalCoolDownConfig;
}

Expand Down Expand Up @@ -998,7 +998,7 @@ public BinlogConfig getBinlogConfig() {
return binlogConfig;
}

public ExternalCoolDownConfig getExternalCoolDownConfig() {
public ExternalCooldownConfig getExternalCoolDownConfig() {
return externalCoolDownConfig;
}

Expand Down Expand Up @@ -1107,6 +1107,6 @@ public void gsonPostProcess() throws IOException {
buildStorageType();
buildMvProperties();
buildLocation();
buildExternalCoolDownConfig();
buildExternalCooldownConfig();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
import com.starrocks.common.ErrorReport;
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.externalcooldown.ExternalCoolDownConfig;
import com.starrocks.externalcooldown.ExternalCooldownConfig;
import com.starrocks.externalcooldown.ExternalCooldownSchedule;
import com.starrocks.lake.DataCacheInfo;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
Expand Down Expand Up @@ -250,8 +251,11 @@ public class PropertyAnalyzer {
*/
public static final String MULTI_LOCATION_LABELS_REGEX = "\\s*" + SINGLE_LOCATION_LABEL_REGEX +
"\\s*(,\\s*" + SINGLE_LOCATION_LABEL_REGEX + "){0,9}\\s*";
// external cooldown prefix
public static final String PROPERTIES_EXTERNAL_COOLDOWN_PREFIX = "external_cooldown";
// "external_cooldown_target"="iceberg_catalog.iceberg_db.iceberg_tbl",
public static final String PROPERTIES_EXTERNAL_COOLDOWN_TARGET = "external_cooldown_target";
public static final String PROPERTIES_EXTERNAL_COOLDOWN_CONFIG = "external_cooldown_config";

// "external_cooldown_schedule"="START <start_time> END <end_time> EVERY INTERVAL <cooldown_interval>"
public static final String PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE = "external_cooldown_schedule";
Expand Down Expand Up @@ -1670,8 +1674,8 @@ public static DataProperty analyzeMVDataProperty(MaterializedView materializedVi
return dataProperty;
}

public static ExternalCoolDownConfig analyzeExternalCoolDownConfig(Map<String, String> properties) throws AnalysisException {
ExternalCoolDownConfig externalCoolDownConfig = new ExternalCoolDownConfig();
public static ExternalCooldownConfig analyzeExternalCoolDownConfig(Map<String, String> properties) throws AnalysisException {
ExternalCooldownConfig externalCoolDownConfig = new ExternalCooldownConfig();

if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET)) {
String target = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET);
Expand All @@ -1698,12 +1702,9 @@ public static ExternalCoolDownConfig analyzeExternalCoolDownConfig(Map<String, S
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE)) {
String schedule = properties.get(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE);
properties.remove(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE);
Pattern schedulePattern = Pattern.compile(
"^\\s*START\\s+\\d+:\\d+\\s+END\\s+\\d+:\\d+\\s+EVERY\\s+INTERVAL\\s+\\d+[smh]\\s*$",
Pattern.CASE_INSENSITIVE);
if (!schedulePattern.matcher(schedule).find()) {
if (!ExternalCooldownSchedule.validateScheduleString(schedule)) {
throw new AnalysisException("Property " + PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE +
" must be format like `START 01:00 END 07:59 EVERY INTERVAL 1m`");
" must be format like `START 23:00 END 08:00 EVERY INTERVAL 1 MINUTE`");
}
externalCoolDownConfig.setSchedule(schedule);
}
Expand Down Expand Up @@ -1753,4 +1754,38 @@ public static long analyzeDatetimeProp(Map<String, String> properties,
}
return TimeUtils.parseDate(text, PrimitiveType.DATETIME).getTime();
}

public static long analyzeExternalCooldownSyncedTimeMs(Map<String, String> properties) throws AnalysisException {
long coldDownSyncedTimeMs = -1L;
if (properties != null && properties.containsKey(PROPERTIES_EXTERNAL_COOLDOWN_SYNCED_TIME)) {
String coldDownSyncedTimeMsStr = properties.get(PROPERTIES_EXTERNAL_COOLDOWN_SYNCED_TIME);
if (coldDownSyncedTimeMsStr.isEmpty()) {
coldDownSyncedTimeMs = 0L;
} else {
coldDownSyncedTimeMs = TimeUtils.timeStringToLong(coldDownSyncedTimeMsStr);
if (coldDownSyncedTimeMs == -1) {
throw new AnalysisException(PROPERTIES_EXTERNAL_COOLDOWN_SYNCED_TIME + " format error.");
}
}
}

return coldDownSyncedTimeMs;
}

public static long analyzeExternalCooldownConsistencyCheckTimeMs(Map<String, String> properties) throws AnalysisException {
long coldDownConsistencyCheckTimeMs = -1L;
if (properties != null && properties.containsKey(PROPERTIES_EXTERNAL_COOLDOWN_CONSISTENCY_CHECK_TIME)) {
String coldDownConsistencyCheckTimeMsStr = properties.get(PROPERTIES_EXTERNAL_COOLDOWN_CONSISTENCY_CHECK_TIME);
if (coldDownConsistencyCheckTimeMsStr.isEmpty()) {
coldDownConsistencyCheckTimeMs = 0L;
} else {
coldDownConsistencyCheckTimeMs = TimeUtils.timeStringToLong(coldDownConsistencyCheckTimeMsStr);
if (coldDownConsistencyCheckTimeMs == -1) {
throw new AnalysisException(PROPERTIES_EXTERNAL_COOLDOWN_CONSISTENCY_CHECK_TIME + " format error.");
}
}
}

return coldDownConsistencyCheckTimeMs;
}
}
Loading

0 comments on commit ddd09de

Please sign in to comment.