Skip to content

Commit

Permalink
[Feature] Adds routine_load_unstable_threshold_second (backport #36222)…
Browse files Browse the repository at this point in the history
… (branch-3.1) (#37008)

Signed-off-by: ricky <[email protected]>
Co-authored-by: wyb <[email protected]>
  • Loading branch information
rickif and wyb authored Dec 15, 2023
1 parent 0c972a0 commit c3af3db
Show file tree
Hide file tree
Showing 18 changed files with 234 additions and 40 deletions.
12 changes: 12 additions & 0 deletions be/src/runtime/routine_load/data_consumer_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {

// copy one
std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset;
std::map<int32_t, int64_t> cmt_offset_timestamp;

//improve performance
Status (KafkaConsumerPipe::*append_data)(const char* data, size_t size, char row_delimiter);
Expand Down Expand Up @@ -164,6 +165,8 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
return result_st;
}

ctx->kafka_info->cmt_offset_timestamp = cmt_offset_timestamp;

if (left_bytes == ctx->max_batch_size) {
// nothing to be consumed, we have to cancel it, because
// we do not allow finishing stream load pipe without data.
Expand Down Expand Up @@ -209,6 +212,10 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
// but the standard usage is to record the last offset + 1.
if (msg->offset() > 0) {
cmt_offset[msg->partition()] = msg->offset() - 1;
auto timestamp = msg->timestamp();
if (timestamp.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
cmt_offset_timestamp[msg->partition()] = msg->timestamp().timestamp;
}
}
} else {
Status st = Status::OK();
Expand All @@ -218,6 +225,11 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
received_rows++;
left_bytes -= msg->len();
cmt_offset[msg->partition()] = msg->offset();

auto timestamp = msg->timestamp();
if (timestamp.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
cmt_offset_timestamp[msg->partition()] = msg->timestamp().timestamp;
}
VLOG(3) << "consume partition[" << msg->partition() << " - " << msg->offset() << "]";
} else {
// failed to append this msg, we must stop
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ class KafkaLoadInfo {

// partition -> begin offset, inclusive.
std::map<int32_t, int64_t> begin_offset;
// partiton -> commit offset, inclusive.
// partition -> commit offset, inclusive.
std::map<int32_t, int64_t> cmt_offset;
// partition -> commit offset timestamp, inclusive.
std::map<int32_t, int64_t> cmt_offset_timestamp;
//custom kafka property key -> value
std::map<std::string, std::string> properties;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt

TKafkaRLTaskProgress kafka_progress;
kafka_progress.partitionCmtOffset = ctx->kafka_info->cmt_offset;
kafka_progress.partitionCmtOffsetTimestamp = ctx->kafka_info->cmt_offset_timestamp;

rl_attach.kafkaRLTaskProgress = kafka_progress;
rl_attach.__isset.kafkaRLTaskProgress = true;
Expand Down
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2547,4 +2547,7 @@ public class Config extends ConfigBase {
public static int port_connectivity_check_timeout_ms = 10000;
@ConfField(mutable = true)
public static boolean allow_system_reserved_names = false;

@ConfField(mutable = true)
public static long routine_load_unstable_threshold_second = 3600;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public enum InternalErrorCode {
MANUAL_STOP_ERR(101),
TOO_MANY_FAILURE_ROWS_ERR(102),
CREATE_TASKS_ERR(103),
TASKS_ABORT_ERR(104);
TASKS_ABORT_ERR(104),
SLOW_RUNNING_ERR(105);

private long errCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.common.util.KafkaUtil;
import com.starrocks.thrift.TKafkaRLTaskProgress;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -80,9 +78,11 @@ public KafkaProgress() {
super(LoadDataSourceType.KAFKA);
}

public KafkaProgress(TKafkaRLTaskProgress tKafkaRLTaskProgress) {
public KafkaProgress(Map<Integer, Long> partitionOffsets) {
super(LoadDataSourceType.KAFKA);
this.partitionIdToOffset = tKafkaRLTaskProgress.getPartitionCmtOffset();
if (partitionOffsets != null) {
this.partitionIdToOffset = partitionOffsets;
}
}

public Map<Integer, Long> getPartitionIdToOffset(List<Integer> partitionIds) {
Expand Down Expand Up @@ -198,13 +198,11 @@ public String toJsonString() {
}

@Override
public void update(RLTaskTxnCommitAttachment attachment) {
KafkaProgress newProgress = (KafkaProgress) attachment.getProgress();
public void update(RoutineLoadProgress progress) {
KafkaProgress newProgress = (KafkaProgress) progress;
// + 1 to point to the next msg offset to be consumed
newProgress.partitionIdToOffset.entrySet().stream()
.forEach(entity -> this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1));
LOG.debug("update kafka progress: {}, task: {}, job: {}",
newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
public KafkaRoutineLoadJob() {
// for serialization, id is dummy
super(-1, LoadDataSourceType.KAFKA);
this.progress = new KafkaProgress();
this.timestampProgress = new KafkaProgress();
}

public KafkaRoutineLoadJob(Long id, String name,
Expand All @@ -134,6 +136,7 @@ public KafkaRoutineLoadJob(Long id, String name,
this.brokerList = brokerList;
this.topic = topic;
this.progress = new KafkaProgress();
this.timestampProgress = new KafkaProgress();
}

public String getConfluentSchemaRegistryUrl() {
Expand Down Expand Up @@ -315,13 +318,14 @@ protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttac
@Override
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
super.updateProgress(attachment);
this.progress.update(attachment);
this.progress.update(attachment.getProgress());
this.timestampProgress.update(attachment.getTimestampProgress());
}

@Override
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
super.replayUpdateProgress(attachment);
this.progress.update(attachment);
this.progress.update(attachment.getProgress());
}

@Override
Expand Down Expand Up @@ -777,4 +781,25 @@ public void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourc
LOG.info("modify the data source properties of kafka routine load job: {}, datasource properties: {}",
this.id, dataSourceProperties);
}

// update substate according to the lag.
@Override
public void updateSubstate() throws UserException {
KafkaProgress progress = (KafkaProgress) getTimestampProgress();
Map<Integer, Long> partitionTimestamps = progress.getPartitionIdToOffset();
long now = System.currentTimeMillis();

for (Map.Entry<Integer, Long> entry : partitionTimestamps.entrySet()) {
int partition = entry.getKey();
long lag = (now - entry.getValue().longValue()) / 1000;
if (lag > Config.routine_load_unstable_threshold_second) {
updateSubstate(JobSubstate.UNSTABLE, new ErrorReason(InternalErrorCode.SLOW_RUNNING_ERR,
String.format("The lag [%d] of partition [%d] exceeds " +
"Config.routine_load_unstable_threshold_second [%d]",
lag, partition, Config.routine_load_unstable_threshold_second)));
return;
}
}
updateSubstate(JobSubstate.STABLE, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.gson.annotations.SerializedName;
import com.starrocks.common.Pair;
import com.starrocks.common.io.Text;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.thrift.TPulsarRLTaskProgress;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -126,8 +125,8 @@ public String toJsonString() {
}

@Override
public void update(RLTaskTxnCommitAttachment attachment) {
PulsarProgress newProgress = (PulsarProgress) attachment.getProgress();
public void update(RoutineLoadProgress progress) {
PulsarProgress newProgress = (PulsarProgress) progress;
for (Map.Entry<String, Long> entry : newProgress.partitionToBacklogNum.entrySet()) {
String partition = entry.getKey();
Long backlogNum = entry.getValue();
Expand All @@ -136,8 +135,6 @@ public void update(RLTaskTxnCommitAttachment attachment) {
// Remove initial position if exists
partitionToInitialPosition.remove(partition);
}
LOG.debug("update pulsar progress: {}, task: {}, job: {}",
newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,14 @@ public String dataSourcePropertiesToSql() {
@Override
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
super.updateProgress(attachment);
this.progress.update(attachment);
this.progress.update(attachment.getProgress());
this.timestampProgress.update(attachment.getTimestampProgress());
}

@Override
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
super.replayUpdateProgress(attachment);
this.progress.update(attachment);
this.progress.update(attachment.getProgress());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment {
private long taskExecutionTimeMs;
@SerializedName("progress")
private RoutineLoadProgress progress;
@SerializedName("timestampProgress")
private RoutineLoadProgress timestampProgress;
private String errorLogUrl;
private long loadedBytes;

Expand All @@ -82,10 +84,14 @@ public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttac

switch (rlTaskTxnCommitAttachment.getLoadSourceType()) {
case KAFKA:
this.progress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress());
this.progress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress()
.getPartitionCmtOffset());
this.timestampProgress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress().
getPartitionCmtOffsetTimestamp());
break;
case PULSAR:
this.progress = new PulsarProgress(rlTaskTxnCommitAttachment.getPulsarRLTaskProgress());
this.timestampProgress = new PulsarProgress();
break;
default:
break;
Expand Down Expand Up @@ -136,6 +142,10 @@ public RoutineLoadProgress getProgress() {
return progress;
}

public RoutineLoadProgress getTimestampProgress() {
return timestampProgress;
}

public String getErrorLogUrl() {
return errorLogUrl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public boolean isFinalState() {
}
}

public enum JobSubstate {
STABLE,
UNSTABLE
}

@SerializedName("i")
protected long id;
@SerializedName("n")
Expand All @@ -188,6 +193,7 @@ public boolean isFinalState() {
protected int desireTaskConcurrentNum; // optional
@SerializedName("s")
protected JobState state = JobState.NEED_SCHEDULE;
protected JobSubstate substate = JobSubstate.STABLE;
@SerializedName("da")
protected LoadDataSourceType dataSourceType;
protected double maxFilterRatio = 1;
Expand Down Expand Up @@ -242,6 +248,9 @@ public boolean isFinalState() {
@SerializedName("p")
protected RoutineLoadProgress progress;

@SerializedName("tp")
protected RoutineLoadProgress timestampProgress;

protected long firstResumeTimestamp; // the first resume time
protected long autoResumeCount;
protected boolean autoResumeLock = false; //it can't auto resume iff true
Expand All @@ -250,6 +259,8 @@ public boolean isFinalState() {
protected ErrorReason pauseReason;
protected ErrorReason cancelReason;

protected ErrorReason stateChangedReason;

@SerializedName("c")
protected long createTimestamp = System.currentTimeMillis();
@SerializedName("pa")
Expand Down Expand Up @@ -615,6 +626,10 @@ public RoutineLoadProgress getProgress() {
return progress;
}

public RoutineLoadProgress getTimestampProgress() {
return timestampProgress;
}

public double getMaxFilterRatio() {
return maxFilterRatio;
}
Expand Down Expand Up @@ -1456,21 +1471,33 @@ public List<String> getShowInfo() {
row.add(TimeUtils.longToTimeString(endTimestamp));
row.add(db == null ? String.valueOf(dbId) : db.getFullName());
row.add(tbl == null ? String.valueOf(tableId) : tbl.getName());
row.add(getState().name());
if (state == JobState.RUNNING) {
row.add(substate == JobSubstate.STABLE ? state.name() : substate.name());
} else {
row.add(state.name());
}
row.add(dataSourceType.name());
row.add(String.valueOf(getSizeOfRoutineLoadTaskInfoList()));
row.add(jobPropertiesToJsonString());
row.add(dataSourcePropertiesJsonToString());
row.add(customPropertiesJsonToString());
row.add(getStatistic());
row.add(getProgress().toJsonString());
row.add(getTimestampProgress().toJsonString());
switch (state) {
case PAUSED:
row.add(pauseReason == null ? "" : pauseReason.toString());
break;
case CANCELLED:
row.add(cancelReason == null ? "" : cancelReason.toString());
break;
case RUNNING:
if (substate == JobSubstate.UNSTABLE) {
row.add(stateChangedReason == null ? "" : stateChangedReason.toString());
} else {
row.add("");
}
break;
default:
row.add("");
}
Expand Down Expand Up @@ -1716,6 +1743,7 @@ public void readFields(DataInput in) throws IOException {
switch (dataSourceType) {
case KAFKA: {
progress = new KafkaProgress();
timestampProgress = new KafkaProgress();
progress.readFields(in);
break;
}
Expand Down Expand Up @@ -1872,4 +1900,26 @@ private void modifyCommonJobProperties(Map<String, String> jobProperties) {
public void gsonPostProcess() throws IOException {
setRoutineLoadDesc(CreateRoutineLoadStmt.getLoadDesc(origStmt, sessionVariables));
}

protected void updateSubstate(JobSubstate substate, ErrorReason reason) throws UserException {
writeLock();
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_job_substate", this.substate)
.add("desire_job_substate", substate)
.add("msg", reason)
.build());
try {
this.substate = substate;
this.stateChangedReason = reason;
} finally {
writeUnlock();
}
}

public void updateSubstateStable() throws UserException {
updateSubstate(JobSubstate.STABLE, null);
}

public void updateSubstate() throws UserException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public RoutineLoadProgress(LoadDataSourceType loadDataSourceType) {
this.loadDataSourceType = loadDataSourceType;
}

abstract void update(RLTaskTxnCommitAttachment attachment);
abstract void update(RoutineLoadProgress progress);

abstract String toJsonString();

Expand Down
Loading

0 comments on commit c3af3db

Please sign in to comment.