diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadMgr.java index cbd5ead4923aa..7bda6f4d354f0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadMgr.java @@ -509,15 +509,13 @@ private void sortRoutineLoadJob(List routineLoadJobList) { } } - public boolean checkTaskInJob(UUID taskId) { + public boolean checkTaskInJob(long jobId, UUID taskId) { readLock(); try { - for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { - if (routineLoadJob.containsTask(taskId)) { - return true; - } + if (!idToRoutineLoadJob.containsKey(jobId)) { + return false; } - return false; + return idToRoutineLoadJob.get(jobId).containsTask(taskId); } finally { readUnlock(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadTaskScheduler.java index 76a9cb034755f..0e5e42d5bc9a4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadTaskScheduler.java @@ -175,7 +175,7 @@ private void submitToSchedule(RoutineLoadTaskInfo routineLoadTaskInfo) { void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exception { routineLoadTaskInfo.setLastScheduledTime(System.currentTimeMillis()); // check if task has been abandoned - if (!routineLoadManager.checkTaskInJob(routineLoadTaskInfo.getId())) { + if (!routineLoadManager.checkTaskInJob(routineLoadTaskInfo.getJobId(), routineLoadTaskInfo.getId())) { // task has been abandoned while renew task has been added in queue // or database has been deleted LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()) diff --git a/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadManagerTest.java index a08c98a72aad9..482411600cfed 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadManagerTest.java @@ -887,4 +887,23 @@ public void testJsonFormatImage() throws Exception { Assert.assertNotNull(restartedRoutineLoadManager.getJob(1L)); Assert.assertNotNull(restartedRoutineLoadManager.getJob(2L)); } + + + @Test + public void testCheckTaskInJob() throws Exception { + + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "job1", 1L, 1L, null, "topic1"); + Map partitionIdToOffset = Maps.newHashMap(); + partitionIdToOffset.put(1, 100L); + partitionIdToOffset.put(2, 200L); + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1), kafkaRoutineLoadJob, 20000, + System.currentTimeMillis(), partitionIdToOffset, Config.routine_load_task_timeout_second * 1000); + kafkaRoutineLoadJob.routineLoadTaskInfoList.add(routineLoadTaskInfo); + RoutineLoadMgr routineLoadMgr = new RoutineLoadMgr(); + routineLoadMgr.addRoutineLoadJob(kafkaRoutineLoadJob, "db"); + boolean taskExist = routineLoadMgr.checkTaskInJob(kafkaRoutineLoadJob.getId(), routineLoadTaskInfo.getId()); + Assert.assertTrue(taskExist); + boolean taskNotExist = routineLoadMgr.checkTaskInJob(-1L, routineLoadTaskInfo.getId()); + Assert.assertFalse(taskNotExist); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadTaskSchedulerTest.java index 0c887a00d9323..2b4dec76c88c3 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -97,7 +97,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 routineLoadManager.getClusterIdleSlotNum(); minTimes = 0; result = 1; - routineLoadManager.checkTaskInJob((UUID) any); + routineLoadManager.checkTaskInJob(anyLong, (UUID) any); minTimes = 0; result = true; @@ -157,7 +157,7 @@ public void testSchedulerOneTaskTxnNotFound() { routineLoadManager.getClusterIdleSlotNum(); minTimes = 0; result = 1; - routineLoadManager.checkTaskInJob((UUID) any); + routineLoadManager.checkTaskInJob(anyLong, (UUID) any); minTimes = 0; result = true; @@ -239,7 +239,7 @@ public void testSchedulerOneTaskDbNotFound() { routineLoadManager.getClusterIdleSlotNum(); minTimes = 0; result = 1; - routineLoadManager.checkTaskInJob((UUID) any); + routineLoadManager.checkTaskInJob(anyLong, (UUID) any); minTimes = 0; result = true;