Skip to content

Commit

Permalink
fix(task-framework): ignore location validation and update task statu…
Browse files Browse the repository at this point in the history
…s after refresh (#2961)

* ignore location validation and update task status

* bugfix

* bugfix

* bugfix
  • Loading branch information
guowl3 authored Jul 12, 2024
1 parent ebd6c94 commit 33ac643
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public DLMJobStore(ConnectionConfig metaDBConfig) {
}

public void destroy() {
dataSource.close();
if (dataSource != null) {
dataSource.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ public List<DlmTableUnit> findByScheduleTaskId(Long scheduleTaskId) {
}

public TaskStatus getTaskStatus(Long scheduleTaskId) {
Set<TaskStatus> collect = findByScheduleTaskId(scheduleTaskId).stream().map(DlmTableUnit::getStatus).collect(
return getTaskStatus(findByScheduleTaskId(scheduleTaskId));
}

public TaskStatus getTaskStatus(List<DlmTableUnit> dlmTableUnits) {
Set<TaskStatus> collect = dlmTableUnits.stream().map(DlmTableUnit::getStatus).collect(
Collectors.toSet());
if (collect.contains(TaskStatus.FAILED)) {
return TaskStatus.FAILED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public RateLimitConfiguration getByOrderIdOrElseDefaultConfig(Long orderId) {
}
}

public Optional<RateLimitConfiguration> findByScheduleId(Long scheduleId) {
return limiterConfigRepository.findById(scheduleId).map(mapper::entityToModel);
}

public List<RateLimitConfiguration> findByOrderIds(Collection<Long> orderIds) {
return limiterConfigRepository.findByOrderIdIn(orderIds).stream().map(mapper::entityToModel)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ public long calculatePartSize(long fileLength) {
* 也就是杭州的client只允许操作杭州的bucket,不允许跨域操作
*/
private void validateBucket() {
if (objectStorageConfiguration.getCloudProvider() == CloudProvider.NONE) {
if (objectStorageConfiguration.getCloudProvider() == CloudProvider.NONE
|| objectStorageConfiguration.getCloudProvider() == CloudProvider.HUAWEI_CLOUD) {
return;
}
String bucketName = getBucketName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private ScheduleTaskParameters detailParameters(Schedule schedule) {
Collectors.toSet())).stream().collect(Collectors.toMap(Database::getId, o -> o));
parameters.setSourceDatabase(id2Database.get(parameters.getSourceDatabaseId()));
parameters.setTargetDatabase(id2Database.get(parameters.getTargetDataBaseId()));
parameters.setRateLimit(limiterService.getByOrderIdOrElseDefaultConfig(schedule.getId()));
limiterService.findByScheduleId(schedule.getId()).ifPresent(parameters::setRateLimit);
return parameters;
}
case DATA_DELETE: {
Expand All @@ -241,7 +241,7 @@ private ScheduleTaskParameters detailParameters(Schedule schedule) {
.stream().collect(Collectors.toMap(Database::getId, o -> o));
parameters.setDatabase(id2Database.get(parameters.getDatabaseId()));
parameters.setTargetDatabase(id2Database.get(parameters.getTargetDatabaseId()));
parameters.setRateLimit(limiterService.getByOrderIdOrElseDefaultConfig(schedule.getId()));
limiterService.findByScheduleId(schedule.getId()).ifPresent(parameters::setRateLimit);
return parameters;
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.core.shared.constant.TaskStatus;
import com.oceanbase.odc.service.dlm.DLMService;
import com.oceanbase.odc.service.dlm.model.DlmTableUnit;
import com.oceanbase.odc.service.schedule.ScheduleTaskService;
import com.oceanbase.odc.service.task.executor.task.TaskResult;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -41,6 +43,9 @@ public class DLMResultProcessor implements ResultProcessor {
@Autowired
private DLMService dlmService;

@Autowired
private ScheduleTaskService taskService;

@Override
public void process(TaskResult result) {
log.info("Start refresh result,result={}", result.getResultJson());
Expand All @@ -55,6 +60,9 @@ public void process(TaskResult result) {
log.info("Create or update dlm tableUnits success,jobIdentity={},scheduleTaskId={}",
result.getJobIdentity(),
dlmTableUnits.get(0).getScheduleTaskId());
TaskStatus taskStatus = dlmService.getTaskStatus(dlmTableUnits);
taskService.updateStatusById(dlmTableUnits.get(0).getScheduleTaskId(), taskStatus);
log.info("Update schedule task status to {} success", taskStatus);
} catch (Exception e) {
log.warn("Refresh result failed.", e);
}
Expand Down

0 comments on commit 33ac643

Please sign in to comment.