Skip to content

Commit

Permalink
add unit test StarRocks#45536
Browse files Browse the repository at this point in the history
Signed-off-by: hoffermei <[email protected]>
  • Loading branch information
hoffermei committed Oct 17, 2024
1 parent 27b34b2 commit 3adc511
Show file tree
Hide file tree
Showing 24 changed files with 1,204 additions and 169 deletions.
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ public void setCurExternalCoolDownConfig(ExternalCooldownConfig externalCoolDown
if (tableProperty == null) {
tableProperty = new TableProperty(Maps.newHashMap());
}
tableProperty.modifyTableProperties(externalCoolDownConfig.getProperties());
tableProperty.modifyTableProperties(externalCoolDownConfig.getValidProperties());
tableProperty.setExternalCoolDownConfig(externalCoolDownConfig);
}

Expand Down Expand Up @@ -3367,7 +3367,7 @@ public Map<String, String> getProperties() {
properties.putAll(getUniqueProperties());

if (getCurExternalCoolDownConfig() != null) {
properties.putAll(getCurExternalCoolDownConfig().getProperties());
properties.putAll(getCurExternalCoolDownConfig().getValidProperties());
}

return properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class ExternalCooldownConfig implements Writable {
@SerializedName("target")
Expand All @@ -35,34 +36,35 @@ public class ExternalCooldownConfig implements Writable {
private String schedule;

@SerializedName("waitSecond")
private long waitSecond;
private Long waitSecond;

public ExternalCooldownConfig(String target, String schedule, long waitSecond) {
public ExternalCooldownConfig(String target, String schedule, Long waitSecond) {
this.target = target;
this.schedule = schedule;
this.waitSecond = waitSecond;
}

public ExternalCooldownConfig(ExternalCooldownConfig externalCoolDownConfig) {
target = null;
schedule = null;
waitSecond = 0;
if (externalCoolDownConfig != null) {
target = externalCoolDownConfig.target;
schedule = externalCoolDownConfig.schedule;
waitSecond = externalCoolDownConfig.waitSecond;
} else {
target = null;
schedule = null;
waitSecond = null;
}
}

public ExternalCooldownConfig() {
this(null, null, 0);
this(null, null, null);
}

public boolean isReadyForAutoCooldown() {
if (target == null || target.isEmpty()) {
return false;
}
if (waitSecond <= 0) {
if (waitSecond == null || waitSecond <= 0) {
return false;
}
return schedule != null && !schedule.isEmpty();
Expand Down Expand Up @@ -92,11 +94,17 @@ public void setWaitSecond(Long waitSecond) {
this.waitSecond = waitSecond;
}

public Map<String, String> getProperties() {
public Map<String, String> getValidProperties() {
Map<String, String> properties = new HashMap<>();
properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET, target);
properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE, schedule);
properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND, String.valueOf(waitSecond));
if (target != null && !target.isEmpty()) {
properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET, target);
}
if (schedule != null && !schedule.isEmpty()) {
properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_SCHEDULE, schedule);
}
if (waitSecond != null && waitSecond > 0) {
properties.put(PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND, String.valueOf(waitSecond));
}
return properties;
}

Expand All @@ -122,6 +130,20 @@ public void mergeUpdateFromProperties(Map<String, String> properties) throws Ddl
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ExternalCooldownConfig that = (ExternalCooldownConfig) o;
return Objects.equals(target, that.target) &&
Objects.equals(schedule, that.schedule) &&
Objects.equals(waitSecond, that.waitSecond);
}

@Override
public String toString() {
return String.format("{ target : %s,\n " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.OlapTable;
Expand All @@ -29,7 +28,6 @@
import com.starrocks.catalog.Table;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.sql.common.DmlException;
import com.starrocks.sql.common.PListCell;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
Expand All @@ -45,25 +43,23 @@

public class ExternalCooldownPartitionSelector {
private static final Logger LOG = LogManager.getLogger(ExternalCooldownPartitionSelector.class);
private final Database db;
private final OlapTable olapTable;
private org.apache.iceberg.Table targetIcebergTable;
private long externalCoolDownWaitSeconds;
private boolean tableSatisfied;
private String fullTableName;
private String tableName;
private final String partitionStart;
private final String partitionEnd;
private final boolean isForce;
private PartitionInfo partitionInfo;
private List<Partition> satisfiedPartitions;

public ExternalCooldownPartitionSelector(Database db, OlapTable olapTable) {
this(db, olapTable, null, null, false);
public ExternalCooldownPartitionSelector(OlapTable olapTable) {
this(olapTable, null, null, false);
}

public ExternalCooldownPartitionSelector(Database db, OlapTable olapTable,
public ExternalCooldownPartitionSelector(OlapTable olapTable,
String partitionStart, String partitionEnd, boolean isForce) {
this.db = db;
this.olapTable = olapTable;
this.partitionStart = partitionStart;
this.partitionEnd = partitionEnd;
Expand All @@ -73,46 +69,43 @@ public ExternalCooldownPartitionSelector(Database db, OlapTable olapTable,
}

public void init() {
fullTableName = db.getFullName() + "." + olapTable.getName();
tableName = olapTable.getName();
reloadSatisfiedPartitions();
}

public void reloadSatisfiedPartitions() {
tableSatisfied = true;
partitionInfo = olapTable.getPartitionInfo();
if (olapTable.getExternalCoolDownWaitSecond() == null) {

// check table has external cool down wait second
Long waitSeconds = olapTable.getExternalCoolDownWaitSecond();
if (waitSeconds == null || waitSeconds <= 0) {
LOG.info("table[{}] has no set `{}` property or not satisfied. ignore", tableName,
PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND);
tableSatisfied = false;
return;
}

Table targetTable = olapTable.getExternalCoolDownTable();
if (targetTable == null) {
LOG.debug("table[{}]'s `{}` not found. ignore", fullTableName,
LOG.debug("table[{}]'s `{}` not found. ignore", tableName,
PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET);
tableSatisfied = false;
}
if (!(targetTable instanceof IcebergTable)) {
LOG.debug("table[{}]'s `{}` property is not iceberg table. ignore", fullTableName,
LOG.debug("table[{}]'s `{}` property is not iceberg table. ignore", tableName,
PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET);
tableSatisfied = false;
return;
}
targetIcebergTable = ((IcebergTable) targetTable).getNativeTable();
if (targetIcebergTable == null) {
LOG.debug("table[{}]'s `{}` property related native iceberg table not found. ignore",
fullTableName, PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET);
tableName, PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_TARGET);
tableSatisfied = false;
}

// check table has external cool down wait second
Long waitSeconds = olapTable.getExternalCoolDownWaitSecond();
if (waitSeconds == null || waitSeconds == 0) {
LOG.info("table[{}] has no set `{}` property. ignore", fullTableName,
PropertyAnalyzer.PROPERTIES_EXTERNAL_COOLDOWN_WAIT_SECOND);
tableSatisfied = false;
} else {
externalCoolDownWaitSeconds = waitSeconds;
}
externalCoolDownWaitSeconds = waitSeconds;
satisfiedPartitions = this.getSatisfiedPartitions(-1);
}

Expand All @@ -124,18 +117,19 @@ public boolean isTableSatisfied() {
* check whether partition satisfy external cool down condition
*/
private boolean isPartitionSatisfied(Partition partition) {
// force cooldown don't need check cooldown state and consistency check result,
// and initial partition could also be cooldown
if (isForce) {
return true;
}

long externalCoolDownWaitMillis = externalCoolDownWaitSeconds * 1000L;
// partition with init version has no data, doesn't need do external cool down
if (partition.getVisibleVersion() == Partition.PARTITION_INIT_VERSION) {
LOG.debug("table[{}] partition[{}] is init version. ignore", fullTableName, partition.getName());
LOG.debug("table[{}] partition[{}] is init version. ignore", tableName, partition.getName());
return false;
}

// force cooldown don't need check cooldown state and consistency check result
if (isForce) {
return true;
}

// after partition update, should wait a while to avoid unnecessary duplicate external cool down
long changedMillis = System.currentTimeMillis() - partition.getVisibleVersionTime();
if (changedMillis <= externalCoolDownWaitMillis) {
Expand All @@ -155,14 +149,14 @@ private boolean isPartitionSatisfied(Partition partition) {
if (consistencyCheckTimeMs == null || partitionCoolDownSyncedTimeMs > consistencyCheckTimeMs) {
// wait to do consistency check after external cool down
LOG.debug("table [{}] partition[{}] external cool down time newer then consistency check time",
fullTableName, partition.getName());
tableName, partition.getName());
return false;
}
Long diff = partitionInfo.getExternalCoolDownConsistencyCheckDifference(partition.getId());
if (diff == null || diff == 0) {
// has consistency check after external cool down, and check result ok
LOG.debug("table [{}] partition[{}] external cool down consistency check result ok",
fullTableName, partition.getName());
tableName, partition.getName());
return false;
}

Expand Down Expand Up @@ -252,10 +246,9 @@ private Set<String> getPartitionsInRange() throws AnalysisException {
return olapTable.getVisiblePartitionNames();
} else if (partitionInfo instanceof RangePartitionInfo) {
return getPartitionNamesByRangeWithPartitionLimit();
} else if (partitionInfo instanceof ListPartitionInfo) {
return getPartitionNamesByListWithPartitionLimit();
} else {
throw new DmlException("unsupported partition info type:" + partitionInfo.getClass().getName());
// ListPartitionInfo
return getPartitionNamesByListWithPartitionLimit();
}
}

Expand All @@ -264,8 +257,8 @@ public List<Partition> getSatisfiedPartitions(int limit) {

boolean isOlapTablePartitioned = olapTable.getPartitions().size() > 1 || olapTable.dynamicPartitionExists();
if (!isOlapTablePartitioned && targetIcebergTable.spec() != null && !targetIcebergTable.spec().fields().isEmpty()) {
LOG.warn("table: {} is a none partitioned table, cannot have partitionSpec fields",
fullTableName);
LOG.warn("source table: {} is a none partitioned table, target table shouldn't partitionSpec fields",
tableName);
return chosenPartitions;
}

Expand All @@ -289,11 +282,11 @@ public List<Partition> getSatisfiedPartitions(int limit) {
} catch (Exception e) {
isSatisfied = false;
String msg = String.format("check partition [%s-%s] satisfy external cool down condition failed",
fullTableName, partition.getName());
tableName, partition.getName());
LOG.warn(msg, e);
}
if (isSatisfied) {
LOG.info("choose partition[{}-{}] to external cool down", fullTableName, partition.getName());
LOG.info("choose partition[{}-{}] to external cool down", tableName, partition.getName());
chosenPartitions.add(partition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ public void setLastScheduleMs(long lastScheduleMs) {
this.lastScheduleMs = lastScheduleMs;
}

public boolean trySchedule() {
long currentMs = System.currentTimeMillis();
public boolean trySchedule(long currentMs) {
String s = timeFormat.format(currentMs);
if (end.compareTo(start) < 0) {
// ex: [start=23:00, end=07:00)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public ShowResultSet visitCancelRefreshMaterializedViewStatement(CancelRefreshMa
}

public static ExecuteOption getCooldownExecuteOption(CreateExternalCooldownStmt externalCooldownStmt) {
boolean force = externalCooldownStmt.isForceRefresh();
boolean force = externalCooldownStmt.isForce();
PartitionRangeDesc range = externalCooldownStmt.getPartitionRangeDesc();
HashMap<String, String> taskRunProperties = new HashMap<>();
taskRunProperties.put(TaskRun.PARTITION_START, range == null ? null : range.getPartitionStart());
Expand All @@ -430,7 +430,7 @@ public ShowResultSet visitCreateExternalCooldownStatement(CreateExternalCooldown
}
final String cooldownTaskName = TaskBuilder.getExternalCooldownTaskName(table.getId());
if (!taskManager.containTask(cooldownTaskName)) {
Task task = TaskBuilder.buildExternalCooldownTask(stmt, context);
Task task = TaskBuilder.buildExternalCooldownTask(stmt);
taskManager.createTask(task, false);
}
ExecuteOption executeOption = getCooldownExecuteOption(stmt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.common.DdlException;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.externalcooldown.ExternalCooldownPartitionSelector;
import com.starrocks.persist.EditLog;
import com.starrocks.persist.ModifyPartitionInfo;
Expand Down Expand Up @@ -84,8 +83,7 @@ private void prepare(TaskRunContext context) {
throw new DmlException(TABLE_PREFIX + table.getName() + "'s external table is not iceberg table");
}

partitionSelector = new ExternalCooldownPartitionSelector(
db, olapTable, partitionStart, partitionEnd, isForce);
partitionSelector = new ExternalCooldownPartitionSelector(olapTable, partitionStart, partitionEnd, isForce);
if (!partitionSelector.isTableSatisfied()) {
throw new DmlException(TABLE_PREFIX + table.getName() + " don't satisfy external cool down condition");
}
Expand Down Expand Up @@ -165,7 +163,7 @@ public void processTaskRun(TaskRunContext context) throws Exception {

if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
LOG.warn("[QueryId:{}] external cooldown task table {} partition {} failed, err: {}",
DebugUtil.printId(ctx.getQueryId()), olapTable.getName(),
ctx.getQueryId().toString(), olapTable.getName(),
partition.getName(), ctx.getState().getErrorMessage());
throw new DdlException(ctx.getState().getErrorMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public static void rebuildMVTask(String dbName,
}
}

public static Task buildExternalCooldownTask(CreateExternalCooldownStmt externalCooldownStmt, ConnectContext context) {
public static Task buildExternalCooldownTask(CreateExternalCooldownStmt externalCooldownStmt) {
TableName tableName = externalCooldownStmt.getTableName();
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(tableName.getDb());
Table table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(tableName.getDb(), tableName.getTbl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ private void runImpl() {
continue;
}
try {
job.onSchedule();
long currentMs = System.currentTimeMillis();
job.onSchedule(currentMs);
} catch (DdlException e) {
LOG.warn("[ExternalCooldownJobExecutor] execute job got exception", e);
}
Expand Down
Loading

0 comments on commit 3adc511

Please sign in to comment.