Skip to content

Commit

Permalink
Merge pull request #3313 from wangyu096/issue_3312
Browse files Browse the repository at this point in the history
perf: 对执行对象任务写入 db 进行分批处理 #3312
  • Loading branch information
jsonwan authored Nov 28, 2024
2 parents 5e8b4f5 + a9ad191 commit 83a0e35
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.common.util;

import org.apache.commons.collections4.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;

/**
* 分批处理工具类
*/
public class BatchUtil {

/**
* 分批执行工具
*
* @param targets 执行目标列表
* @param batchSize 每批次大小
* @param execution 执行函数
* @param <E> 执行目标
*/
public static <E> void executeBatch(Collection<E> targets,
int batchSize,
Consumer<Collection<E>> execution) {
if (CollectionUtils.isEmpty(targets)) {
return;
}
if (targets.size() <= batchSize) {
execution.accept(targets);
} else {
List<List<E>> targetBatches = CollectionUtil.partitionCollection(targets, batchSize);
targetBatches.forEach(execution);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<cleanHistoryOnStart>${CLEAN_HISTORY_ON_START}</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<pattern>${LOG_PATTERN}</pattern>
<pattern>${FILE_LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.tencent.bk.job.common.mysql.dynamic.ds.DbOperationEnum;
import com.tencent.bk.job.common.mysql.dynamic.ds.MySQLOperation;
import com.tencent.bk.job.common.mysql.jooq.JooqDataTypeUtil;
import com.tencent.bk.job.common.util.CollectionUtil;
import com.tencent.bk.job.common.util.BatchUtil;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.dao.TaskInstanceDAO;
import com.tencent.bk.job.execute.dao.common.DSLContextProviderFactory;
Expand Down Expand Up @@ -527,11 +527,7 @@ public List<Long> listTaskInstanceId(Long appId, Long fromTime, Long toTime, int
public void saveTaskInstanceHosts(long appId,
long taskInstanceId,
Collection<HostDTO> hosts) {
if (CollectionUtils.isEmpty(hosts)) {
return;
}
List<List<HostDTO>> hostBatches = CollectionUtil.partitionCollection(hosts, 2000);
hostBatches.forEach(batchHosts -> {
BatchUtil.executeBatch(hosts, 2000, batchHosts -> {
BatchBindStep batchInsert = dsl().batch(
dsl().insertInto(
TASK_INSTANCE_HOST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected final List<ResultGroupDTO> groupTasks(List<ExecuteObjectTask> tasks) {
*
* @param tasks 需要被保存的任务
*/
protected boolean isSaveTasksUsingExecuteObjectMode(Collection<ExecuteObjectTask> tasks) {
protected boolean isExecuteObjectSupported(Collection<ExecuteObjectTask> tasks) {
// 根据执行对象任务模型中的 executeObjectId 参数判断是否支持执行对象
ExecuteObjectTask anyTask = tasks.stream().findAny().orElse(null);
return Objects.requireNonNull(anyTask).getExecuteObjectId() != null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.tencent.bk.job.execute.service.impl;

import com.tencent.bk.job.common.constant.Order;
import com.tencent.bk.job.common.util.BatchUtil;
import com.tencent.bk.job.execute.dao.FileAgentTaskDAO;
import com.tencent.bk.job.execute.dao.FileExecuteObjectTaskDAO;
import com.tencent.bk.job.execute.dao.common.IdGen;
Expand Down Expand Up @@ -51,7 +52,19 @@ public void batchSaveTasks(Collection<ExecuteObjectTask> tasks) {
}
tasks.forEach(task -> task.setId(idGen.genGseFileExecuteObjTaskId()));

if (isSaveTasksUsingExecuteObjectMode(tasks)) {
boolean executeObjectSupported = isExecuteObjectSupported(tasks);

// 任务分批,避免大事务造成 db 主从延迟
BatchUtil.executeBatch(
tasks,
2000,
batchTasks -> executeSaveTasks(executeObjectSupported, batchTasks)
);
}

private void executeSaveTasks(boolean executeObjectSupported,
Collection<ExecuteObjectTask> tasks) {
if (executeObjectSupported) {
fileExecuteObjectTaskDAO.batchSaveTasks(tasks);
} else {
fileAgentTaskDAO.batchSaveAgentTasks(tasks);
Expand All @@ -64,7 +77,18 @@ public void batchUpdateTasks(Collection<ExecuteObjectTask> tasks) {
return;
}

if (isSaveTasksUsingExecuteObjectMode(tasks)) {
boolean executeObjectSupported = isExecuteObjectSupported(tasks);
// 任务分批,避免大事务造成 db 主从延迟
BatchUtil.executeBatch(
tasks,
2000,
batchTasks -> executeUpdateTasks(executeObjectSupported, batchTasks)
);
}

private void executeUpdateTasks(boolean executeObjectSupported,
Collection<ExecuteObjectTask> tasks) {
if (executeObjectSupported) {
fileExecuteObjectTaskDAO.batchUpdateTasks(tasks);
} else {
fileAgentTaskDAO.batchUpdateAgentTasks(tasks);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.tencent.bk.job.execute.service.impl;

import com.tencent.bk.job.common.constant.Order;
import com.tencent.bk.job.common.util.BatchUtil;
import com.tencent.bk.job.execute.dao.ScriptAgentTaskDAO;
import com.tencent.bk.job.execute.dao.ScriptExecuteObjectTaskDAO;
import com.tencent.bk.job.execute.dao.common.IdGen;
Expand Down Expand Up @@ -50,7 +51,20 @@ public void batchSaveTasks(Collection<ExecuteObjectTask> tasks) {
}

tasks.forEach(task -> task.setId(idGen.genGseScriptExecuteObjTaskId()));
if (isSaveTasksUsingExecuteObjectMode(tasks)) {

boolean executeObjectSupported = isExecuteObjectSupported(tasks);

// 任务分批,避免大事务造成 db 主从延迟
BatchUtil.executeBatch(
tasks,
2000,
batchTasks -> executeSaveTasks(executeObjectSupported, batchTasks)
);
}

private void executeSaveTasks(boolean executeObjectSupported,
Collection<ExecuteObjectTask> tasks) {
if (executeObjectSupported) {
scriptExecuteObjectTaskDAO.batchSaveTasks(tasks);
} else {
scriptAgentTaskDAO.batchSaveAgentTasks(tasks);
Expand All @@ -62,7 +76,20 @@ public void batchUpdateTasks(Collection<ExecuteObjectTask> tasks) {
if (CollectionUtils.isEmpty(tasks)) {
return;
}
if (isSaveTasksUsingExecuteObjectMode(tasks)) {

boolean executeObjectSupported = isExecuteObjectSupported(tasks);

// 任务分批,避免大事务造成 db 主从延迟
BatchUtil.executeBatch(
tasks,
2000,
batchTasks -> executeUpdateTasks(executeObjectSupported, batchTasks)
);
}

private void executeUpdateTasks(boolean executeObjectSupported,
Collection<ExecuteObjectTask> tasks) {
if (executeObjectSupported) {
scriptExecuteObjectTaskDAO.batchUpdateTasks(tasks);
} else {
scriptAgentTaskDAO.batchUpdateAgentTasks(tasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,16 +865,16 @@ private void checkStepInstanceAtomicTasksLimit(TaskInstanceDTO taskInstance,
new Integer[]{jobExecuteConfig.getFileTasksMax()});
}
} else if (stepInstance.isScriptStep()) {
int targetServerSize = stepInstance.getTargetExecuteObjectCount();
if (targetServerSize > 10000) {
int targetExecuteObjectSize = stepInstance.getTargetExecuteObjectCount();
if (targetExecuteObjectSize > 10000) {
TASK_MONITOR_LOGGER.info("LargeTask|type:script|taskName:{}|appCode:{}|appId:{}|operator:{}"
+ "|targetServerSize:{}",
taskName, appCode, appId, operator, targetServerSize);
+ "|targetExecuteObjectSize:{}",
taskName, appCode, appId, operator, targetExecuteObjectSize);
}
if (targetServerSize > jobExecuteConfig.getScriptTaskMaxTargetServer()) {
if (targetExecuteObjectSize > jobExecuteConfig.getScriptTaskMaxTargetServer()) {
log.info("Reject large task|type:file|taskName:{}|appCode:{}|appId:{}|operator" +
":{}|targetServerSize:{}|maxAllowedSize:{}", taskName, appCode, appId, operator,
targetServerSize, jobExecuteConfig.getScriptTaskMaxTargetServer());
":{}|targetExecuteObjectSize:{}|maxAllowedSize:{}", taskName, appCode, appId, operator,
targetExecuteObjectSize, jobExecuteConfig.getScriptTaskMaxTargetServer());
throw new ResourceExhaustedException(ErrorCode.SCRIPT_TASK_TARGET_SERVER_EXCEEDS_LIMIT,
new Integer[]{jobExecuteConfig.getScriptTaskMaxTargetServer()});
}
Expand Down

0 comments on commit 83a0e35

Please sign in to comment.