Skip to content

Commit

Permalink
optimize scheduler one task cost (#50328)
Browse files Browse the repository at this point in the history
Signed-off-by: hoffermei <[email protected]>
  • Loading branch information
hoffermei committed Nov 20, 2024
1 parent 981c85f commit c43fc75
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -584,15 +584,13 @@ private void sortRoutineLoadJob(List<RoutineLoadJob> routineLoadJobList) {
}
}

public boolean checkTaskInJob(UUID taskId) {
public boolean checkTaskInJob(long jobId, UUID taskId) {
readLock();
try {
for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
if (routineLoadJob.containsTask(taskId)) {
return true;
}
if (!idToRoutineLoadJob.containsKey(jobId)) {
return false;
}
return false;
return idToRoutineLoadJob.get(jobId).containsTask(taskId);
} finally {
readUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private void submitToSchedule(RoutineLoadTaskInfo routineLoadTaskInfo) {
void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exception {
routineLoadTaskInfo.setLastScheduledTime(System.currentTimeMillis());
// check if task has been abandoned
if (!routineLoadManager.checkTaskInJob(routineLoadTaskInfo.getId())) {
if (!routineLoadManager.checkTaskInJob(routineLoadTaskInfo.getJobId(), routineLoadTaskInfo.getId())) {
// task has been abandoned while renew task has been added in queue
// or database has been deleted
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,4 +1049,23 @@ public void testJsonFormatImage() throws Exception {
Assert.assertNotNull(restartedRoutineLoadManager.getJob(1L));
Assert.assertNotNull(restartedRoutineLoadManager.getJob(2L));
}


@Test
public void testCheckTaskInJob() throws Exception {

KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "job1", 1L, 1L, null, "topic1");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 100L);
partitionIdToOffset.put(2, 200L);
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1), kafkaRoutineLoadJob, 20000,
System.currentTimeMillis(), partitionIdToOffset, Config.routine_load_task_timeout_second * 1000);
kafkaRoutineLoadJob.routineLoadTaskInfoList.add(routineLoadTaskInfo);
RoutineLoadMgr routineLoadMgr = new RoutineLoadMgr();
routineLoadMgr.addRoutineLoadJob(kafkaRoutineLoadJob, "db");
boolean taskExist = routineLoadMgr.checkTaskInJob(kafkaRoutineLoadJob.getId(), routineLoadTaskInfo.getId());
Assert.assertTrue(taskExist);
boolean taskNotExist = routineLoadMgr.checkTaskInJob(-1L, routineLoadTaskInfo.getId());
Assert.assertFalse(taskNotExist);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1
routineLoadManager.getClusterIdleSlotNum();
minTimes = 0;
result = 1;
routineLoadManager.checkTaskInJob((UUID) any);
routineLoadManager.checkTaskInJob(anyLong, (UUID) any);
minTimes = 0;
result = true;

Expand Down Expand Up @@ -157,7 +157,7 @@ public void testSchedulerOneTaskTxnNotFound() {
routineLoadManager.getClusterIdleSlotNum();
minTimes = 0;
result = 1;
routineLoadManager.checkTaskInJob((UUID) any);
routineLoadManager.checkTaskInJob(anyLong, (UUID) any);
minTimes = 0;
result = true;

Expand Down Expand Up @@ -239,7 +239,7 @@ public void testSchedulerOneTaskDbNotFound() {
routineLoadManager.getClusterIdleSlotNum();
minTimes = 0;
result = 1;
routineLoadManager.checkTaskInJob((UUID) any);
routineLoadManager.checkTaskInJob(anyLong, (UUID) any);
minTimes = 0;
result = true;

Expand Down

0 comments on commit c43fc75

Please sign in to comment.