Skip to content

Commit

Permalink
≈[Feature] Unify load profile through analyze profile & support sessi…
Browse files Browse the repository at this point in the history
…on/table profile collect granularity

Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo committed Aug 7, 2024
1 parent f07abad commit be0d1ed
Show file tree
Hide file tree
Showing 26 changed files with 218 additions and 41 deletions.
4 changes: 2 additions & 2 deletions docs/en/administration/management/FE_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4513,9 +4513,9 @@ ADMIN SET FRONTEND CONFIG ("key" = "value");
-->

<!--
##### stream_load_profile_collect_second
##### load_profile_collect_threshold_second
- Default: 10
- Default: 0
- Type: Long
- Unit: Seconds
- Is mutable: Yes
Expand Down
4 changes: 2 additions & 2 deletions docs/zh/administration/management/FE_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4519,9 +4519,9 @@ Compaction Score 代表了一个表分区是否值得进行 Compaction 的评分
-->

<!--
##### stream_load_profile_collect_second
##### stream_load_profile_collect_threshold_second

- 默认值:10
- 默认值:0
- 类型:Long
- 单位:Seconds
- 是否动态:是
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATED_STORAGE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BUCKET_SIZE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_MUTABLE_BUCKET_NUM) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_WRITE_QUORUM) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_ENABLE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_TTL) ||
Expand Down Expand Up @@ -515,6 +516,9 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_MUTABLE_BUCKET_NUM)) {
schemaChangeHandler.updateTableMeta(db, tableName, properties,
TTabletMetaType.MUTABLE_BUCKET_NUM);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE)) {
schemaChangeHandler.updateTableMeta(db, tableName, properties,
TTabletMetaType.ENABLE_LOAD_PROFILE);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC)) {
schemaChangeHandler.updateTableMeta(db, tableName, properties,
TTabletMetaType.PRIMARY_INDEX_CACHE_EXPIRE_SEC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2198,6 +2198,11 @@ public void updateTableMeta(Database db, String tableName, Map<String, String> p
if (mutableBucketNum == olapTable.getMutableBucketNum()) {
return;
}
} else if (metaType == TTabletMetaType.ENABLE_LOAD_PROFILE) {
boolean enableLoadProfile = Boolean.parseBoolean(properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE));
if (enableLoadProfile == olapTable.enableLoadProfile()) {
return;
}
} else if (metaType == TTabletMetaType.PRIMARY_INDEX_CACHE_EXPIRE_SEC) {
int primaryIndexCacheExpireSec = Integer.parseInt(properties.get(
PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC));
Expand Down
33 changes: 33 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ public enum OlapTableState {

protected Map<Long, Long> doubleWritePartitions = new HashMap<>();

private long lastCollectProfileTime = 0;

public OlapTable() {
this(TableType.OLAP);
}
Expand Down Expand Up @@ -2497,6 +2499,23 @@ public void setMutableBucketNum(long bucketNum) {
tableProperty.buildMutableBucketNum();
}

public Boolean enableLoadProfile() {
if (tableProperty != null) {
return tableProperty.enableLoadProfile();
}
return false;
}

public void setEnableLoadProfile(boolean enableLoadProfile) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
}
tableProperty
.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE,
Boolean.valueOf(enableLoadProfile).toString());
tableProperty.buildEnableLoadProfile();
}

public TWriteQuorumType writeQuorum() {
if (tableProperty != null) {
return tableProperty.writeQuorum();
Expand Down Expand Up @@ -3155,6 +3174,12 @@ public Map<String, String> getProperties() {
properties.put(PropertyAnalyzer.PROPERTIES_MUTABLE_BUCKET_NUM, mutableBucketNum.toString());
}

// enable load profile
Boolean enableLoadProfile = enableLoadProfile();
if (enableLoadProfile) {
properties.put(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE, "true");
}

// locations
Multimap<String, String> locationsMap = getLocation();
if (locationsMap != null) {
Expand Down Expand Up @@ -3431,4 +3456,12 @@ public void unlockCreatePartition(String partitionName) {
}
}

public long getLastCollectProfileTime() {
return lastCollectProfileTime;
}

public void updateLastCollectProfileTime() {
this.lastCollectProfileTime = System.currentTimeMillis();
}

}
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ public static String valueList() {
// the default mutable bucket number
private long mutableBucketNum = 0;

private boolean enableLoadProfile = false;

// 1. This table has been deleted. if hasDelete is false, the BE segment must don't have deleteConditions.
// If hasDelete is true, the BE segment maybe have deleteConditions because compaction.
// 2. Before checkpoint, we relay delete job journal log to persist.
Expand Down Expand Up @@ -358,6 +360,9 @@ public TableProperty buildProperty(short opCode) {
case OperationType.OP_MODIFY_MUTABLE_BUCKET_NUM:
buildMutableBucketNum();
break;
case OperationType.OP_MODIFY_ENABLE_LOAD_PROFILE:
buildEnableLoadProfile();
break;
case OperationType.OP_MODIFY_BINLOG_CONFIG:
buildBinlogConfig();
break;
Expand Down Expand Up @@ -654,6 +659,13 @@ public TableProperty buildMutableBucketNum() {
return this;
}

public TableProperty buildEnableLoadProfile() {
if (properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE) != null) {
enableLoadProfile = Boolean.parseBoolean(properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE));
}
return this;
}

public TableProperty buildEnablePersistentIndex() {
enablePersistentIndex = Boolean.parseBoolean(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_PERSISTENT_INDEX, "false"));
Expand Down Expand Up @@ -910,6 +922,10 @@ public long getMutableBucketNum() {
return mutableBucketNum;
}

public boolean enableLoadProfile() {
return enableLoadProfile;
}

public String getStorageVolume() {
return storageVolume;
}
Expand Down Expand Up @@ -1026,6 +1042,7 @@ public void gsonPostProcess() throws IOException {
buildPartitionLiveNumber();
buildReplicatedStorage();
buildBucketSize();
buildEnableLoadProfile();
buildBinlogConfig();
buildBinlogAvailableVersion();
buildDataCachePartitionDuration();
Expand Down
10 changes: 8 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2833,11 +2833,17 @@ public class Config extends ConfigBase {
public static int routine_load_scheduler_interval_millisecond = 10000;

/**
* Only when the stream load time exceeds this value,
* Only when the stream/routine load time exceeds this value,
* the profile will be put into the profileManager
*/
@ConfField(mutable = true, aliases = {"stream_load_profile_collect_second"})
public static long stream_load_profile_collect_threshold_second = 0;

/**
* The interval of collecting load profile through table granularity
*/
@ConfField(mutable = true)
public static long stream_load_profile_collect_second = 10; //10s
public static long load_profile_collect_interval_second = 0;

/**
* If set to <= 0, means that no limitation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public class ProfileManager implements MemoryTrackable {
public static final String DEFAULT_DB = "Default Db";
public static final String VARIABLES = "Variables";
public static final String PROFILE_COLLECT_TIME = "Collect Profile Time";
public static final String LOAD_TYPE = "Load Type";

public static final String LOAD_TYPE_STREAM_LOAD = "STREAM_LOAD";
public static final String LOAD_TYPE_ROUTINE_LOAD = "ROUTINE_LOAD";

public static final ArrayList<String> PROFILE_HEADERS = new ArrayList<>(
Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public class PropertyAnalyzer {

public static final String PROPERTIES_MUTABLE_BUCKET_NUM = "mutable_bucket_num";

public static final String PROPERTIES_ENABLE_LOAD_PROFILE = "enable_load_profile";

public static final String PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC = "primary_index_cache_expire_sec";

public static final String PROPERTIES_TABLET_TYPE = "tablet_type";
Expand Down Expand Up @@ -446,6 +448,14 @@ public static long analyzeMutableBucketNum(Map<String, String> properties) throw
}
}

public static boolean analyzeEnableLoadProfile(Map<String, String> properties) {
boolean enableLoadProfile = false;
if (properties != null && properties.containsKey(PROPERTIES_ENABLE_LOAD_PROFILE)) {
enableLoadProfile = Boolean.parseBoolean(properties.get(PROPERTIES_ENABLE_LOAD_PROFILE));
}
return enableLoadProfile;
}

public static int analyzeAutoRefreshPartitionsLimit(Map<String, String> properties, MaterializedView mv) {
if (mv.getRefreshScheme().getType() == MaterializedView.RefreshType.MANUAL) {
throw new SemanticException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class RuntimeProfileParser {
Pattern.compile("^- (.*?)$");

public static RuntimeProfile parseFrom(String content) {
LOG.debug("Parse runtime profile from content: {}", content);
LOG.info("Parse runtime profile from content: {}", content);
BufferedReader bufferedReader = new BufferedReader(new StringReader(content));
// (profile, profileIndent, counterStack(name, counter, counterIndent))
LinkedList<ProfileTuple> profileStack = Lists.newLinkedList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ public void readFields(DataInput in) throws IOException {
case OperationType.OP_MODIFY_REPLICATED_STORAGE:
case OperationType.OP_MODIFY_BUCKET_SIZE:
case OperationType.OP_MODIFY_MUTABLE_BUCKET_NUM:
case OperationType.OP_MODIFY_ENABLE_LOAD_PROFILE:
case OperationType.OP_MODIFY_BINLOG_CONFIG:
case OperationType.OP_MODIFY_BINLOG_AVAILABLE_VERSION:
case OperationType.OP_MODIFY_ENABLE_PERSISTENT_INDEX:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonPreProcessable;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.scheduler.Coordinator;
import com.starrocks.rpc.ThriftConnectionPool;
import com.starrocks.rpc.ThriftRPCRequestExecutor;
Expand Down Expand Up @@ -840,7 +838,7 @@ private void unprotectedWaitCoordFinish() throws UserException {
}

if (coord.isEnableLoadProfile()) {
collectProfile();
collectProfile(false);
}

this.trackingUrl = coord.getTrackingUrl();
Expand Down Expand Up @@ -1019,7 +1017,7 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw

// sync stream load collect profile, here we collect profile only when be has reported
if (isSyncStreamLoad() && coord != null && coord.isProfileAlreadyReported()) {
collectProfile();
collectProfile(true);
}

writeLock();
Expand All @@ -1037,31 +1035,28 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
}
}

public void collectProfile() {
long currentTimestamp = System.currentTimeMillis();
long totalTimeMs = currentTimestamp - createTimeMs;

// For the usage scenarios of flink cdc or routine load,
// the frequency of stream load maybe very high, resulting in many profiles,
// but we may only care about the long-duration stream load profile.
if (totalTimeMs < Config.stream_load_profile_collect_second * 1000) {
LOG.info(String.format("Load %s, totalTimeMs %d < Config.stream_load_profile_collect_second %d)",
label, totalTimeMs, Config.stream_load_profile_collect_second));
return;
}

public RuntimeProfile buildTopLevelProfile(boolean isAborted) {
RuntimeProfile profile = new RuntimeProfile("Load");
RuntimeProfile summaryProfile = new RuntimeProfile("Summary");
summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(loadId));
summaryProfile.addInfoString(ProfileManager.START_TIME,
TimeUtils.longToTimeString(createTimeMs));

summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(System.currentTimeMillis()));
long currentTimestamp = System.currentTimeMillis();
long totalTimeMs = currentTimestamp - createTimeMs;
summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp));
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs));

summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load");
summaryProfile.addInfoString(ProfileManager.LOAD_TYPE, getStringByType());
summaryProfile.addInfoString(ProfileManager.QUERY_STATE, isAborted ? "Aborted" : "Finished");
summaryProfile.addInfoString("StarRocks Version",
String.format("%s-%s", Version.STARROCKS_VERSION, Version.STARROCKS_COMMIT_HASH));
//summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser());
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, getStmt());
//summaryProfile.addInfoString("Timeout", DebugUtil.getPrettyStringMs(timeoutS * 1000));
//summaryProfile.addInfoString("Strict Mode", String.valueOf(strictMode));
//summaryProfile.addInfoString("Partial Update", String.valueOf(partialUpdate));
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, dbName);

Map<String, String> loadCounters = coord.getLoadCounters();
Expand All @@ -1071,19 +1066,20 @@ public void collectProfile() {
summaryProfile.addInfoString("NumRowsAbnormal", loadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL));
summaryProfile.addInfoString("numRowsUnselected", loadCounters.get(LoadJob.UNSELECTED_ROWS));
}
ConnectContext session = ConnectContext.get();
if (session != null) {
SessionVariable variables = session.getSessionVariable();
if (variables != null) {
summaryProfile.addInfoString("NonDefaultSessionVariables", variables.getNonDefaultVariablesJson());
}
}

profile.addChild(summaryProfile);

return profile;
}


public void collectProfile(boolean isAborted) {
RuntimeProfile profile = buildTopLevelProfile(isAborted);

if (coord.getQueryProfile() != null) {
if (!isSyncStreamLoad()) {
coord.collectProfileSync();
profile.addChild(coord.buildQueryProfile(session == null || session.needMergeProfile()));
profile.addChild(coord.buildQueryProfile(true));
} else {
profile.addChild(coord.getQueryProfile());
}
Expand Down Expand Up @@ -1140,6 +1136,10 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
return;
}

if (isSyncStreamLoad() && coord.isProfileAlreadyReported()) {
collectProfile(true);
}

writeLock();
try {
if (isFinalState()) {
Expand Down Expand Up @@ -1351,6 +1351,10 @@ public boolean isRoutineLoadTask() {
return type == Type.ROUTINE_LOAD;
}

public String getStmt() {
return "";
}

// for sync stream load
public void setCoordinator(Coordinator coord) {
this.coord = coord;
Expand All @@ -1359,9 +1363,9 @@ public void setCoordinator(Coordinator coord) {
public String getStringByType() {
switch (this.type) {
case ROUTINE_LOAD:
return "ROUTINE_LOAD";
return ProfileManager.LOAD_TYPE_ROUTINE_LOAD;
case STREAM_LOAD:
return "STREAM_LOAD";
return ProfileManager.LOAD_TYPE_STREAM_LOAD;
case PARALLEL_STREAM_LOAD:
return "PARALLEL_STREAM_LOAD";
default:
Expand Down
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal)
case OperationType.OP_MODIFY_REPLICATED_STORAGE:
case OperationType.OP_MODIFY_BUCKET_SIZE:
case OperationType.OP_MODIFY_MUTABLE_BUCKET_NUM:
case OperationType.OP_MODIFY_ENABLE_LOAD_PROFILE:
case OperationType.OP_MODIFY_BINLOG_AVAILABLE_VERSION:
case OperationType.OP_MODIFY_BINLOG_CONFIG:
case OperationType.OP_MODIFY_ENABLE_PERSISTENT_INDEX:
Expand Down Expand Up @@ -1723,6 +1724,10 @@ public void logModifyMutableBucketNum(ModifyTablePropertyOperationLog info) {
logEdit(OperationType.OP_MODIFY_MUTABLE_BUCKET_NUM, info);
}

public void logModifyEnableLoadProfile(ModifyTablePropertyOperationLog info) {
logEdit(OperationType.OP_MODIFY_ENABLE_LOAD_PROFILE, info);
}

public void logReplaceTempPartition(ReplacePartitionOperationLog info) {
logEdit(OperationType.OP_REPLACE_TEMP_PARTITION, info);
}
Expand Down
Loading

0 comments on commit be0d1ed

Please sign in to comment.