diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 9a6222b888b19..68ffdffd76709 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -117,6 +117,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { // copy one std::map cmt_offset = ctx->kafka_info->cmt_offset; + std::map cmt_offset_timestamp; //improve performance Status (KafkaConsumerPipe::*append_data)(const char* data, size_t size, char row_delimiter); @@ -164,6 +165,8 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { return result_st; } + ctx->kafka_info->cmt_offset_timestamp = cmt_offset_timestamp; + if (left_bytes == ctx->max_batch_size) { // nothing to be consumed, we have to cancel it, because // we do not allow finishing stream load pipe without data. @@ -209,6 +212,10 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { // but the standard usage is to record the last offset + 1. if (msg->offset() > 0) { cmt_offset[msg->partition()] = msg->offset() - 1; + auto timestamp = msg->timestamp(); + if (timestamp.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) { + cmt_offset_timestamp[msg->partition()] = msg->timestamp().timestamp; + } } } else { Status st = Status::OK(); @@ -218,6 +225,11 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { received_rows++; left_bytes -= msg->len(); cmt_offset[msg->partition()] = msg->offset(); + + auto timestamp = msg->timestamp(); + if (timestamp.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) { + cmt_offset_timestamp[msg->partition()] = msg->timestamp().timestamp; + } VLOG(3) << "consume partition[" << msg->partition() << " - " << msg->offset() << "]"; } else { // failed to append this msg, we must stop diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index ca8fb6352153a..c704cde6fee69 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -89,8 +89,10 @@ class KafkaLoadInfo { // partition -> begin offset, inclusive. std::map begin_offset; - // partiton -> commit offset, inclusive. + // partition -> commit offset, inclusive. std::map cmt_offset; + // partition -> commit offset timestamp, inclusive. + std::map cmt_offset_timestamp; //custom kafka property key -> value std::map properties; }; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index bb3b6e0527b72..5580b1c46fb44 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -411,6 +411,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt TKafkaRLTaskProgress kafka_progress; kafka_progress.partitionCmtOffset = ctx->kafka_info->cmt_offset; + kafka_progress.partitionCmtOffsetTimestamp = ctx->kafka_info->cmt_offset_timestamp; rl_attach.kafkaRLTaskProgress = kafka_progress; rl_attach.__isset.kafkaRLTaskProgress = true; diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index dc860a7cbba59..98ed46a19ff37 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -2547,4 +2547,7 @@ public class Config extends ConfigBase { public static int port_connectivity_check_timeout_ms = 10000; @ConfField(mutable = true) public static boolean allow_system_reserved_names = false; + + @ConfField(mutable = true) + public static long routine_load_unstable_threshold_second = 3600; } diff --git a/fe/fe-core/src/main/java/com/starrocks/common/InternalErrorCode.java b/fe/fe-core/src/main/java/com/starrocks/common/InternalErrorCode.java index 88d141fe62011..6e5a2bbde3442 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/InternalErrorCode.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/InternalErrorCode.java @@ -35,7 +35,8 @@ public enum InternalErrorCode { MANUAL_STOP_ERR(101), TOO_MANY_FAILURE_ROWS_ERR(102), CREATE_TASKS_ERR(103), - TASKS_ABORT_ERR(104); + TASKS_ABORT_ERR(104), + SLOW_RUNNING_ERR(105); private long errCode; diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaProgress.java index 48bf83efefef1..464493bd4f415 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaProgress.java @@ -43,9 +43,7 @@ import com.starrocks.common.DdlException; import com.starrocks.common.Pair; import com.starrocks.common.UserException; -import com.starrocks.common.util.DebugUtil; import com.starrocks.common.util.KafkaUtil; -import com.starrocks.thrift.TKafkaRLTaskProgress; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -80,9 +78,11 @@ public KafkaProgress() { super(LoadDataSourceType.KAFKA); } - public KafkaProgress(TKafkaRLTaskProgress tKafkaRLTaskProgress) { + public KafkaProgress(Map partitionOffsets) { super(LoadDataSourceType.KAFKA); - this.partitionIdToOffset = tKafkaRLTaskProgress.getPartitionCmtOffset(); + if (partitionOffsets != null) { + this.partitionIdToOffset = partitionOffsets; + } } public Map getPartitionIdToOffset(List partitionIds) { @@ -198,13 +198,11 @@ public String toJsonString() { } @Override - public void update(RLTaskTxnCommitAttachment attachment) { - KafkaProgress newProgress = (KafkaProgress) attachment.getProgress(); + public void update(RoutineLoadProgress progress) { + KafkaProgress newProgress = (KafkaProgress) progress; // + 1 to point to the next msg offset to be consumed newProgress.partitionIdToOffset.entrySet().stream() .forEach(entity -> this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); - LOG.debug("update kafka progress: {}, task: {}, job: {}", - newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId()); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaRoutineLoadJob.java index 854bcb027e1bc..61d10ce298882 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaRoutineLoadJob.java @@ -126,6 +126,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { public KafkaRoutineLoadJob() { // for serialization, id is dummy super(-1, LoadDataSourceType.KAFKA); + this.progress = new KafkaProgress(); + this.timestampProgress = new KafkaProgress(); } public KafkaRoutineLoadJob(Long id, String name, @@ -134,6 +136,7 @@ public KafkaRoutineLoadJob(Long id, String name, this.brokerList = brokerList; this.topic = topic; this.progress = new KafkaProgress(); + this.timestampProgress = new KafkaProgress(); } public String getConfluentSchemaRegistryUrl() { @@ -315,13 +318,14 @@ protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttac @Override protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { super.updateProgress(attachment); - this.progress.update(attachment); + this.progress.update(attachment.getProgress()); + this.timestampProgress.update(attachment.getTimestampProgress()); } @Override protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { super.replayUpdateProgress(attachment); - this.progress.update(attachment); + this.progress.update(attachment.getProgress()); } @Override @@ -777,4 +781,25 @@ public void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourc LOG.info("modify the data source properties of kafka routine load job: {}, datasource properties: {}", this.id, dataSourceProperties); } + + // update substate according to the lag. + @Override + public void updateSubstate() throws UserException { + KafkaProgress progress = (KafkaProgress) getTimestampProgress(); + Map partitionTimestamps = progress.getPartitionIdToOffset(); + long now = System.currentTimeMillis(); + + for (Map.Entry entry : partitionTimestamps.entrySet()) { + int partition = entry.getKey(); + long lag = (now - entry.getValue().longValue()) / 1000; + if (lag > Config.routine_load_unstable_threshold_second) { + updateSubstate(JobSubstate.UNSTABLE, new ErrorReason(InternalErrorCode.SLOW_RUNNING_ERR, + String.format("The lag [%d] of partition [%d] exceeds " + + "Config.routine_load_unstable_threshold_second [%d]", + lag, partition, Config.routine_load_unstable_threshold_second))); + return; + } + } + updateSubstate(JobSubstate.STABLE, null); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarProgress.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarProgress.java index bebe8eaae6a34..511eecbdd97df 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarProgress.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarProgress.java @@ -21,7 +21,6 @@ import com.google.gson.annotations.SerializedName; import com.starrocks.common.Pair; import com.starrocks.common.io.Text; -import com.starrocks.common.util.DebugUtil; import com.starrocks.thrift.TPulsarRLTaskProgress; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -126,8 +125,8 @@ public String toJsonString() { } @Override - public void update(RLTaskTxnCommitAttachment attachment) { - PulsarProgress newProgress = (PulsarProgress) attachment.getProgress(); + public void update(RoutineLoadProgress progress) { + PulsarProgress newProgress = (PulsarProgress) progress; for (Map.Entry entry : newProgress.partitionToBacklogNum.entrySet()) { String partition = entry.getKey(); Long backlogNum = entry.getValue(); @@ -136,8 +135,6 @@ public void update(RLTaskTxnCommitAttachment attachment) { // Remove initial position if exists partitionToInitialPosition.remove(partition); } - LOG.debug("update pulsar progress: {}, task: {}, job: {}", - newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId()); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarRoutineLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarRoutineLoadJob.java index b802de4e37b3e..e19773dc92044 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarRoutineLoadJob.java @@ -282,13 +282,14 @@ public String dataSourcePropertiesToSql() { @Override protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { super.updateProgress(attachment); - this.progress.update(attachment); + this.progress.update(attachment.getProgress()); + this.timestampProgress.update(attachment.getTimestampProgress()); } @Override protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { super.replayUpdateProgress(attachment); - this.progress.update(attachment); + this.progress.update(attachment.getProgress()); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RLTaskTxnCommitAttachment.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RLTaskTxnCommitAttachment.java index 6c101a33c16d7..28a57e7e7976b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RLTaskTxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RLTaskTxnCommitAttachment.java @@ -62,6 +62,8 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment { private long taskExecutionTimeMs; @SerializedName("progress") private RoutineLoadProgress progress; + @SerializedName("timestampProgress") + private RoutineLoadProgress timestampProgress; private String errorLogUrl; private long loadedBytes; @@ -82,10 +84,14 @@ public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttac switch (rlTaskTxnCommitAttachment.getLoadSourceType()) { case KAFKA: - this.progress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress()); + this.progress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress() + .getPartitionCmtOffset()); + this.timestampProgress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress(). + getPartitionCmtOffsetTimestamp()); break; case PULSAR: this.progress = new PulsarProgress(rlTaskTxnCommitAttachment.getPulsarRLTaskProgress()); + this.timestampProgress = new PulsarProgress(); break; default: break; @@ -136,6 +142,10 @@ public RoutineLoadProgress getProgress() { return progress; } + public RoutineLoadProgress getTimestampProgress() { + return timestampProgress; + } + public String getErrorLogUrl() { return errorLogUrl; } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java index 98331c455d260..4128f106d08a6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java @@ -168,6 +168,11 @@ public boolean isFinalState() { } } + public enum JobSubstate { + STABLE, + UNSTABLE + } + @SerializedName("i") protected long id; @SerializedName("n") @@ -188,6 +193,7 @@ public boolean isFinalState() { protected int desireTaskConcurrentNum; // optional @SerializedName("s") protected JobState state = JobState.NEED_SCHEDULE; + protected JobSubstate substate = JobSubstate.STABLE; @SerializedName("da") protected LoadDataSourceType dataSourceType; protected double maxFilterRatio = 1; @@ -242,6 +248,9 @@ public boolean isFinalState() { @SerializedName("p") protected RoutineLoadProgress progress; + @SerializedName("tp") + protected RoutineLoadProgress timestampProgress; + protected long firstResumeTimestamp; // the first resume time protected long autoResumeCount; protected boolean autoResumeLock = false; //it can't auto resume iff true @@ -250,6 +259,8 @@ public boolean isFinalState() { protected ErrorReason pauseReason; protected ErrorReason cancelReason; + protected ErrorReason stateChangedReason; + @SerializedName("c") protected long createTimestamp = System.currentTimeMillis(); @SerializedName("pa") @@ -615,6 +626,10 @@ public RoutineLoadProgress getProgress() { return progress; } + public RoutineLoadProgress getTimestampProgress() { + return timestampProgress; + } + public double getMaxFilterRatio() { return maxFilterRatio; } @@ -1456,7 +1471,11 @@ public List getShowInfo() { row.add(TimeUtils.longToTimeString(endTimestamp)); row.add(db == null ? String.valueOf(dbId) : db.getFullName()); row.add(tbl == null ? String.valueOf(tableId) : tbl.getName()); - row.add(getState().name()); + if (state == JobState.RUNNING) { + row.add(substate == JobSubstate.STABLE ? state.name() : substate.name()); + } else { + row.add(state.name()); + } row.add(dataSourceType.name()); row.add(String.valueOf(getSizeOfRoutineLoadTaskInfoList())); row.add(jobPropertiesToJsonString()); @@ -1464,6 +1483,7 @@ public List getShowInfo() { row.add(customPropertiesJsonToString()); row.add(getStatistic()); row.add(getProgress().toJsonString()); + row.add(getTimestampProgress().toJsonString()); switch (state) { case PAUSED: row.add(pauseReason == null ? "" : pauseReason.toString()); @@ -1471,6 +1491,13 @@ public List getShowInfo() { case CANCELLED: row.add(cancelReason == null ? "" : cancelReason.toString()); break; + case RUNNING: + if (substate == JobSubstate.UNSTABLE) { + row.add(stateChangedReason == null ? "" : stateChangedReason.toString()); + } else { + row.add(""); + } + break; default: row.add(""); } @@ -1716,6 +1743,7 @@ public void readFields(DataInput in) throws IOException { switch (dataSourceType) { case KAFKA: { progress = new KafkaProgress(); + timestampProgress = new KafkaProgress(); progress.readFields(in); break; } @@ -1872,4 +1900,26 @@ private void modifyCommonJobProperties(Map jobProperties) { public void gsonPostProcess() throws IOException { setRoutineLoadDesc(CreateRoutineLoadStmt.getLoadDesc(origStmt, sessionVariables)); } + + protected void updateSubstate(JobSubstate substate, ErrorReason reason) throws UserException { + writeLock(); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_job_substate", this.substate) + .add("desire_job_substate", substate) + .add("msg", reason) + .build()); + try { + this.substate = substate; + this.stateChangedReason = reason; + } finally { + writeUnlock(); + } + } + + public void updateSubstateStable() throws UserException { + updateSubstate(JobSubstate.STABLE, null); + } + + public void updateSubstate() throws UserException { + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadProgress.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadProgress.java index 4160b016219de..1701ffd006399 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadProgress.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadProgress.java @@ -56,7 +56,7 @@ public RoutineLoadProgress(LoadDataSourceType loadDataSourceType) { this.loadDataSourceType = loadDataSourceType; } - abstract void update(RLTaskTxnCommitAttachment attachment); + abstract void update(RoutineLoadProgress progress); abstract String toJsonString(); diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadTaskScheduler.java index ce3766f9b15ff..31bc4364233b2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadTaskScheduler.java @@ -189,9 +189,14 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc msg = String.format("there is no new data in kafka/pulsar, wait for %d seconds to schedule again", routineLoadTaskInfo.getTaskScheduleIntervalMs() / 1000); } + // The job keeps up with source. + routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateSubstateStable(); delayPutToQueue(routineLoadTaskInfo, msg); return; } + // Update the job state is the job is too slow. + routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateSubstate(); + } catch (RoutineLoadPauseException e) { String msg = "fe abort task with reason: check task ready to execute failed, " + e.getMessage(); routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState( diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/ShowRoutineLoadStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/ShowRoutineLoadStmt.java index ecea811ad5c20..fb4a11b40e3a3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/ShowRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/ShowRoutineLoadStmt.java @@ -80,6 +80,7 @@ public class ShowRoutineLoadStmt extends ShowStmt { .add("CustomProperties") .add("Statistic") .add("Progress") + .add("TimestampProgress") .add("ReasonOfStateChanged") .add("ErrorLogUrls") .add("OtherMsg") diff --git a/fe/fe-core/src/test/java/com/starrocks/analysis/ShowRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/com/starrocks/analysis/ShowRoutineLoadStmtTest.java index 5f0c07b428f40..26cb3e5a5060a 100644 --- a/fe/fe-core/src/test/java/com/starrocks/analysis/ShowRoutineLoadStmtTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/analysis/ShowRoutineLoadStmtTest.java @@ -46,7 +46,7 @@ public void testNormal() throws Exception { Assert.assertEquals("label", stmt.getName()); Assert.assertEquals("testDb", stmt.getDbFullName()); Assert.assertFalse(stmt.isIncludeHistory()); - Assert.assertEquals(18, stmt.getMetaData().getColumnCount()); + Assert.assertEquals(19, stmt.getMetaData().getColumnCount()); Assert.assertEquals("Id", stmt.getMetaData().getColumn(0).getName()); } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadJobTest.java index e6eee111932b0..ebf1ba98314f2 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadJobTest.java @@ -114,10 +114,10 @@ public void testAfterAborted(@Injectable TransactionState transactionState, RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment(); TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); tKafkaRLTaskProgress.partitionCmtOffset = Maps.newHashMap(); - KafkaProgress kafkaProgress = new KafkaProgress(tKafkaRLTaskProgress); + KafkaProgress kafkaProgress = new KafkaProgress(tKafkaRLTaskProgress.getPartitionCmtOffset()); Deencapsulation.setField(attachment, "progress", kafkaProgress); - KafkaProgress currentProgress = new KafkaProgress(tKafkaRLTaskProgress); + KafkaProgress currentProgress = new KafkaProgress(tKafkaRLTaskProgress.getPartitionCmtOffset()); new Expectations() { { @@ -158,17 +158,74 @@ void writeUnlock() { } @Test - public void testGetShowInfo(@Mocked KafkaProgress kafkaProgress) { - RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); - Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); - ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR, - TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString()); - Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason); - Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress); + public void testGetShowInfo() throws UserException { + { + // PAUSE state + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); + Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); + ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR, + TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString()); + Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason); + + List showInfo = routineLoadJob.getShowInfo(); + Assert.assertEquals(true, showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity)) + .anyMatch(entity -> entity.equals(errorReason.toString()))); + } + + { + // Progress + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); + + Map partitionOffsets = Maps.newHashMap(); + partitionOffsets.put(Integer.valueOf(0), Long.valueOf(1234)); + KafkaProgress kafkaProgress = new KafkaProgress(partitionOffsets); + Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress); - List showInfo = routineLoadJob.getShowInfo(); - Assert.assertEquals(true, showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity)) - .anyMatch(entity -> entity.equals(errorReason.toString()))); + Map partitionOffsetTimestamps = Maps.newHashMap(); + partitionOffsetTimestamps.put(Integer.valueOf(0), Long.valueOf(1701411708410L)); + KafkaProgress kafkaTimestampProgress = new KafkaProgress(partitionOffsetTimestamps); + Deencapsulation.setField(routineLoadJob, "timestampProgress", kafkaTimestampProgress); + + List showInfo = routineLoadJob.getShowInfo(); + //The displayed value is the actual value - 1 + Assert.assertEquals("{\"0\":\"1233\"}", showInfo.get(14)); + Assert.assertEquals("{\"0\":\"1701411708409\"}", showInfo.get(15)); + } + + { + // UNSTABLE substate + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); + + Map partitionOffsetTimestamps = Maps.newHashMap(); + partitionOffsetTimestamps.put(Integer.valueOf(0), Long.valueOf(1701411708410L)); + KafkaProgress kafkaTimestampProgress = new KafkaProgress(partitionOffsetTimestamps); + Deencapsulation.setField(routineLoadJob, "timestampProgress", kafkaTimestampProgress); + + routineLoadJob.updateState(RoutineLoadJob.JobState.RUNNING, null, false); + // The job is set unstable due to the progress is too slow. + routineLoadJob.updateSubstate(); + + List showInfo = routineLoadJob.getShowInfo(); + Assert.assertEquals("UNSTABLE", showInfo.get(7)); + // The lag [xxx] of partition [0] exceeds Config.routine_load_unstable_threshold_second [3600] + Assert.assertTrue(showInfo.get(16).contains( + "partition [0] exceeds Config.routine_load_unstable_threshold_second [3600]")); + + partitionOffsetTimestamps.put(Integer.valueOf(0), Long.valueOf(System.currentTimeMillis())); + kafkaTimestampProgress = new KafkaProgress(partitionOffsetTimestamps); + Deencapsulation.setField(routineLoadJob, "timestampProgress", kafkaTimestampProgress); + // The job is set stable due to the progress is kept up. + routineLoadJob.updateSubstate(); + showInfo = routineLoadJob.getShowInfo(); + Assert.assertEquals("RUNNING", showInfo.get(7)); + Assert.assertEquals("", showInfo.get(16)); + + // The job is set stable. + routineLoadJob.updateSubstateStable(); + showInfo = routineLoadJob.getShowInfo(); + Assert.assertEquals("RUNNING", showInfo.get(7)); + Assert.assertEquals("", showInfo.get(16)); + } } @Test diff --git a/fe/fe-core/src/test/java/com/starrocks/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/transaction/GlobalTransactionMgrTest.java index 5715fbc810719..72fc08f20ae52 100644 --- a/fe/fe-core/src/test/java/com/starrocks/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/transaction/GlobalTransactionMgrTest.java @@ -367,11 +367,17 @@ LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1") Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); + Map oldKafkaProgressMap = Maps.newHashMap(); oldKafkaProgressMap.put(1, 0L); - KafkaProgress oldkafkaProgress = new KafkaProgress(); - Deencapsulation.setField(oldkafkaProgress, "partitionIdToOffset", oldKafkaProgressMap); + KafkaProgress oldkafkaProgress = new KafkaProgress(oldKafkaProgressMap); Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress); + + Map kafkaTimestampProgressMap = Maps.newHashMap(); + kafkaTimestampProgressMap.put(1, 1701411701409L); + KafkaProgress oldKafkaTimestampProgress = new KafkaProgress(kafkaTimestampProgressMap); + Deencapsulation.setField(routineLoadJob, "timestampProgress", oldKafkaTimestampProgress); + Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = new TRLTaskTxnCommitAttachment(); @@ -380,11 +386,20 @@ LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1") rlTaskTxnCommitAttachment.setFilteredRows(1); rlTaskTxnCommitAttachment.setJobId(Deencapsulation.getField(routineLoadJob, "id")); rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA); + TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); + Map kafkaProgress = Maps.newHashMap(); kafkaProgress.put(1, 100L); // start from 0, so rows number is 101, and consumed offset is 100 tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress); rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress); + + Map kafkaTimestampProgress = Maps.newHashMap(); + kafkaTimestampProgress.put(1, 1701411701509L); // start from 0, so rows number is 101, and consumed offset is 100 + tKafkaRLTaskProgress.setPartitionCmtOffsetTimestamp(kafkaTimestampProgress); + + rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress); + TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment); RoutineLoadMgr routineLoadManager = new RoutineLoadMgr(); @@ -441,11 +456,17 @@ LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1") Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); + Map oldKafkaProgressMap = Maps.newHashMap(); oldKafkaProgressMap.put(1, 0L); - KafkaProgress oldkafkaProgress = new KafkaProgress(); - Deencapsulation.setField(oldkafkaProgress, "partitionIdToOffset", oldKafkaProgressMap); + KafkaProgress oldkafkaProgress = new KafkaProgress(oldKafkaProgressMap); Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress); + + Map oldKafkaTimestampProgressMap = Maps.newHashMap(); + oldKafkaTimestampProgressMap.put(1, 1701411701409L); + KafkaProgress oldKafkaTimestampProgress = new KafkaProgress(oldKafkaTimestampProgressMap); + Deencapsulation.setField(routineLoadJob, "timestampProgress", oldKafkaTimestampProgress); + Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = new TRLTaskTxnCommitAttachment(); @@ -454,11 +475,20 @@ LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1") rlTaskTxnCommitAttachment.setFilteredRows(11); rlTaskTxnCommitAttachment.setJobId(Deencapsulation.getField(routineLoadJob, "id")); rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA); + TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); + Map kafkaProgress = Maps.newHashMap(); - kafkaProgress.put(1, 110L); // start from 0, so rows number is 111, consumed offset is 110 + kafkaProgress.put(1, 110L); // start from 0, so rows number is 101, and consumed offset is 100 tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress); rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress); + + Map kafkaTimestampProgress = Maps.newHashMap(); + kafkaTimestampProgress.put(1, 1701411701609L); // start from 0, so rows number is 101, and consumed offset is 100 + tKafkaRLTaskProgress.setPartitionCmtOffsetTimestamp(kafkaTimestampProgress); + + rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress); + TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment); RoutineLoadMgr routineLoadManager = new RoutineLoadMgr(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c574c89049fb7..941ca05df63a3 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -763,6 +763,7 @@ struct TStreamLoadPutResult { struct TKafkaRLTaskProgress { 1: required map partitionCmtOffset + 2: optional map partitionCmtOffsetTimestamp } struct TPulsarRLTaskProgress {