From f8bc6e094e149c46edc0f569d4b0fbf9c66eba5c Mon Sep 17 00:00:00 2001 From: felixncheng Date: Wed, 3 Jan 2024 14:37:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20Job=E4=BB=BB=E5=8A=A1=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E6=95=85=E9=9A=9C=E8=BD=AC=E7=A7=BB=20#1563=20(#1592)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tencent/bkrepo/job/batch/base/BatchJob.kt | 105 ++++++++++++++++-- .../bkrepo/job/batch/base/FailoverJob.kt | 21 ++++ .../bkrepo/job/batch/base/MongoDbBatchJob.kt | 82 +++++++++++++- .../job/batch/base/MongodbFailoverJob.kt | 50 +++++++++ .../base/RecoverableMongodbJobContext.kt | 21 ++++ .../job/config/ScheduledTaskConfigurer.kt | 4 +- .../config/properties/BatchJobProperties.kt | 7 +- .../job/controller/user/UserJobController.kt | 7 ++ .../tencent/bkrepo/job/pojo/TJobFailover.kt | 18 +++ .../job/repository/JobSnapshotRepository.kt | 9 ++ .../bkrepo/job/service/SystemJobService.kt | 35 ++++-- .../job/batch/FileReferenceCleanupJobTest.kt | 3 + 12 files changed, 333 insertions(+), 29 deletions(-) create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/FailoverJob.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongodbFailoverJob.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/RecoverableMongodbJobContext.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/pojo/TJobFailover.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/repository/JobSnapshotRepository.kt diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/BatchJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/BatchJob.kt index e7d2292271..8544a5d506 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/BatchJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/BatchJob.kt @@ -33,7 +33,9 @@ import com.tencent.bkrepo.common.service.util.SpringContextUtils import com.tencent.bkrepo.job.config.properties.BatchJobProperties import com.tencent.bkrepo.job.listener.event.TaskExecutedEvent import net.javacrumbs.shedlock.core.LockConfiguration +import net.javacrumbs.shedlock.core.LockProvider import net.javacrumbs.shedlock.core.LockingTaskExecutor +import net.javacrumbs.shedlock.core.SimpleLock import org.springframework.beans.factory.annotation.Autowired import java.time.Duration import java.time.LocalDateTime @@ -42,7 +44,7 @@ import kotlin.system.measureNanoTime /** * 抽象批处理作业Job * */ -abstract class BatchJob(open val batchJobProperties: BatchJobProperties) { +abstract class BatchJob(open val batchJobProperties: BatchJobProperties) : FailoverJob { /** * 锁名称 */ @@ -64,6 +66,12 @@ abstract class BatchJob(open val batchJobProperties: BatchJobPro @Volatile private var stop = true + /** + * Job是否正在运行 + * */ + @Volatile + private var inProcess = false + /** * 是否排他执行,如果是则会加分布式锁 * */ @@ -93,6 +101,10 @@ abstract class BatchJob(open val batchJobProperties: BatchJobPro @Autowired private lateinit var lockingTaskExecutor: LockingTaskExecutor + @Autowired + private lateinit var lockProvider: LockProvider + private var lock: SimpleLock? = null + var lastBeginTime: LocalDateTime? = null var lastEndTime: LocalDateTime? = null var lastExecuteTime: Long? = null @@ -105,9 +117,14 @@ abstract class BatchJob(open val batchJobProperties: BatchJobPro stop = false val jobContext = createJobContext() val wasExecuted = if (isExclusive) { - val task = LockingTaskExecutor.TaskWithResult { doStart(jobContext) } - val result = lockingTaskExecutor.executeWithLock(task, getLockConfiguration()) - result.wasExecuted() + var wasExecuted = false + lockProvider.lock(getLockConfiguration()).ifPresent { + lock = it + it.use { doStart(jobContext) } + lock = null + wasExecuted = true + } + wasExecuted } else { doStart(jobContext) true @@ -118,9 +135,7 @@ abstract class BatchJob(open val batchJobProperties: BatchJobPro } if (!wasExecuted) { logger.info("Job[${getJobName()}] already execution.") - return wasExecuted } - stop = true return wasExecuted } @@ -129,7 +144,11 @@ abstract class BatchJob(open val batchJobProperties: BatchJobPro * */ private fun doStart(jobContext: C) { try { + inProcess = true lastBeginTime = LocalDateTime.now() + if (isFailover()) { + recover() + } val elapseNano = measureNanoTime { doStart0(jobContext) } @@ -140,11 +159,13 @@ abstract class BatchJob(open val batchJobProperties: BatchJobPro val event = TaskExecutedEvent( name = getJobName(), context = jobContext, - time = Duration.ofNanos(elapseNano) + time = Duration.ofNanos(elapseNano), ) SpringContextUtils.publishEvent(event) } catch (e: Exception) { logger.info("Job[${getJobName()}] execution failed.", e) + } finally { + inProcess = false } } @@ -153,10 +174,43 @@ abstract class BatchJob(open val batchJobProperties: BatchJobPro /** * 停止任务 * */ - fun stop() { + fun stop(timeout: Long = DEFAULT_STOP_TIMEOUT, force: Boolean = false) { + if (stop && !inProcess) { + logger.info("Job [${getJobName()}] already stopped.") + return + } + logger.info("Stop job [${getJobName()}].") + // 尽量等待任务执行完毕 + var waitTime = 0L + while (inProcess && waitTime < timeout) { + logger.info("Job [${getJobName()}] is still running, waiting for it to terminate.") + Thread.sleep(SLEEP_TIME_INTERVAL) + waitTime += SLEEP_TIME_INTERVAL + } + if (inProcess) { + logger.info("Stop job timeout [$timeout] ms.") + } + // 只有释放锁,才需要进行故障转移 + if (inProcess && force) { + logger.info("Force stop job [${getJobName()}] and unlock.") + failover() + lock?.unlockQuietly() + } stop = true } + override fun failover() { + // NO-OP + } + + override fun isFailover(): Boolean { + return false + } + + override fun recover() { + // NO-OP + } + /** * 启用 */ @@ -172,12 +226,19 @@ abstract class BatchJob(open val batchJobProperties: BatchJobPro } /** - * 任务是否在运行 + * 任务是否应该运行 * */ - fun isRunning(): Boolean { + fun shouldRun(): Boolean { return !stop } + /** + * 任务是否正在运行 + * */ + fun inProcess(): Boolean { + return inProcess + } + /** * 获取分布式锁需要的锁配置 * */ @@ -192,7 +253,31 @@ abstract class BatchJob(open val batchJobProperties: BatchJobPro return batchJobProperties.enabled } + /** + * 使用锁,[block]运行完后,将会释放锁 + * */ + private fun SimpleLock.use(block: () -> Unit) { + try { + block() + } finally { + unlockQuietly() + } + } + + /** + * 静默释放锁 + * */ + private fun SimpleLock.unlockQuietly() { + try { + unlock() + } catch (ignore: Exception) { + // ignore + } + } + companion object { private val logger = LoggerHolder.jobLogger + private const val SLEEP_TIME_INTERVAL = 1000L + private const val DEFAULT_STOP_TIMEOUT = 30000L } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/FailoverJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/FailoverJob.kt new file mode 100644 index 0000000000..68bf221ca1 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/FailoverJob.kt @@ -0,0 +1,21 @@ +package com.tencent.bkrepo.job.batch.base + +/** + * 支持故障转移的job + * */ +interface FailoverJob { + /** + * 故障转移 + * */ + fun failover() + + /** + * 是否发生故障转移 + * */ + fun isFailover(): Boolean + + /** + * 恢复现场 + */ + fun recover() +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt index 9eab5bc1aa..afc7030819 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt @@ -28,12 +28,15 @@ package com.tencent.bkrepo.job.batch.base import com.tencent.bkrepo.common.api.util.HumanReadable +import com.tencent.bkrepo.common.api.util.readJsonString +import com.tencent.bkrepo.common.api.util.toJsonString import com.tencent.bkrepo.common.mongo.constant.ID import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID import com.tencent.bkrepo.common.service.log.LoggerHolder import com.tencent.bkrepo.job.config.properties.MongodbJobProperties import com.tencent.bkrepo.job.executor.BlockThreadPoolTaskExecutorDecorator import com.tencent.bkrepo.job.executor.IdentityTask +import com.tencent.bkrepo.job.pojo.TJobFailover import net.javacrumbs.shedlock.core.LockingTaskExecutor import org.bson.types.ObjectId import org.springframework.beans.factory.annotation.Autowired @@ -42,6 +45,9 @@ import org.springframework.data.mongodb.core.MongoTemplate import org.springframework.data.mongodb.core.find import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query +import java.net.InetAddress +import java.time.LocalDateTime +import java.util.Collections import java.util.concurrent.CountDownLatch import kotlin.system.measureNanoTime @@ -50,7 +56,7 @@ import kotlin.system.measureNanoTime * */ abstract class MongoDbBatchJob( private val properties: MongodbJobProperties, -) : CenterNodeJob(properties) { +) : MongodbFailoverJob(properties) { /** * 需要操作的表名列表 * */ @@ -102,12 +108,31 @@ abstract class MongoDbBatchJob( @Autowired private lateinit var executor: BlockThreadPoolTaskExecutorDecorator + /** + * 是否存在异步任务 + * */ private var hasAsyncTask = false + /** + * 未执行列表 + * */ + private var undoList = Collections.synchronizedList(mutableListOf()) + + /** + * 恢复任务上下文 + * */ + private var recoverableJobContext = RecoverableMongodbJobContext(mutableListOf()) + + /** + * 是否是从故障中恢复 + * */ + private var recover = false + override fun doStart0(jobContext: Context) { try { hasAsyncTask = false - val collectionNames = collectionNames() + prepareContext(jobContext) + val collectionNames = undoList.toList() if (concurrentLevel == JobConcurrentLevel.COLLECTION) { // 使用闭锁来保证表异步生产任务的结束 val countDownLatch = CountDownLatch(collectionNames.size) @@ -128,12 +153,29 @@ abstract class MongoDbBatchJob( } } + /** + * 准备执行上下文 + * */ + private fun prepareContext(jobContext: Context) { + undoList.clear() + if (recover) { + jobContext.success = recoverableJobContext.success + jobContext.failed = recoverableJobContext.failed + jobContext.total = recoverableJobContext.total + undoList.addAll(recoverableJobContext.undoCollectionNames) + recover = false + } else { + recoverableJobContext.init(jobContext) + undoList.addAll(collectionNames()) + } + } + /** * 处理单个表数据 * */ private fun runCollection(collectionName: String, context: Context) { - if (!isRunning()) { - logger.info("Job[${getJobName()}] already stop.") + if (!shouldRun()) { + logger.info("Job[${getJobName()}] already stopped.") return } logger.info("Job[${getJobName()}]: Start collection $collectionName.") @@ -167,10 +209,11 @@ abstract class MongoDbBatchJob( querySize = data.size lastId = data.last()[ID] as ObjectId report(context) - } while (querySize == pageSize && isRunning()) + } while (querySize == pageSize && shouldRun()) }.apply { val elapsedTime = HumanReadable.time(this) onRunCollectionFinished(collectionName, context) + undoList.remove(collectionName) logger.info("Job[${getJobName()}]: collection $collectionName run completed,sum [$sum] elapse $elapsedTime") } } @@ -217,6 +260,35 @@ abstract class MongoDbBatchJob( } } + override fun capture(): TJobFailover { + return with(recoverableJobContext) { + TJobFailover( + name = getJobName(), + createdBy = hostName(), + createdDate = LocalDateTime.now(), + success = success.get(), + failed = failed.get(), + total = total.get(), + data = undoList.toJsonString(), + ) + } + } + + override fun reply(snapshot: TJobFailover) { + with(snapshot) { + recoverableJobContext.reset() + recoverableJobContext.success.addAndGet(success) + recoverableJobContext.failed.addAndGet(failed) + recoverableJobContext.total.addAndGet(total) + data?.let { data -> recoverableJobContext.undoCollectionNames.addAll(data.readJsonString()) } + } + recover = true + } + + private fun hostName(): String { + return InetAddress.getLocalHost().hostName + } + companion object { private val logger = LoggerHolder.jobLogger diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongodbFailoverJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongodbFailoverJob.kt new file mode 100644 index 0000000000..bf301265e2 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongodbFailoverJob.kt @@ -0,0 +1,50 @@ +package com.tencent.bkrepo.job.batch.base + +import com.tencent.bkrepo.job.config.properties.BatchJobProperties +import com.tencent.bkrepo.job.pojo.TJobFailover +import com.tencent.bkrepo.job.repository.JobSnapshotRepository +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired + +/** + * 使用mongodb的故障转移实现 + * */ +abstract class MongodbFailoverJob(batchJobProperties: BatchJobProperties) : + CenterNodeJob(batchJobProperties), FailoverJob { + @Autowired + private lateinit var jobSnapshotRepository: JobSnapshotRepository + + private val name = getJobName() + + override fun failover() { + val jobFailover = capture() + jobSnapshotRepository.save(jobFailover) + logger.info("Job [$name] failover successful.") + } + + override fun isFailover(): Boolean { + return jobSnapshotRepository.findFirstByNameOrderByIdDesc(name) != null + } + + override fun recover() { + jobSnapshotRepository.findFirstByNameOrderByIdDesc(name)?.let { + reply(it) + logger.info("Job [$name] recover successful.") + jobSnapshotRepository.deleteByName(name) + } + } + + /** + * 发生故障时,捕获现场,以便后续恢复 + * */ + abstract fun capture(): TJobFailover + + /** + * 根据故障转移记录[jobFailover]恢复现场 + * */ + abstract fun reply(jobFailover: TJobFailover) + + companion object { + private val logger = LoggerFactory.getLogger(MongodbFailoverJob::class.java) + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/RecoverableMongodbJobContext.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/RecoverableMongodbJobContext.kt new file mode 100644 index 0000000000..9537f911bd --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/RecoverableMongodbJobContext.kt @@ -0,0 +1,21 @@ +package com.tencent.bkrepo.job.batch.base + +/** + * 可恢复的mongodb job上下文 + * */ +open class RecoverableMongodbJobContext( + val undoCollectionNames: MutableList, +) : JobContext() { + fun init(jobContext: JobContext) { + success = jobContext.success + failed = jobContext.failed + total = jobContext.total + } + + fun reset() { + undoCollectionNames.clear() + success.set(0) + failed.set(0) + total.set(0) + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/ScheduledTaskConfigurer.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/ScheduledTaskConfigurer.kt index cc27a786be..a7b6cdd956 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/ScheduledTaskConfigurer.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/ScheduledTaskConfigurer.kt @@ -27,6 +27,7 @@ package com.tencent.bkrepo.job.config +import com.tencent.bkrepo.common.service.shutdown.ServiceShutdownHook import com.tencent.bkrepo.job.batch.base.BatchJob import com.tencent.bkrepo.job.config.properties.BatchJobProperties import org.slf4j.LoggerFactory @@ -43,7 +44,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar @Configuration class ScheduledTaskConfigurer( val jobs: List>, - val builder: TaskSchedulerBuilder + val builder: TaskSchedulerBuilder, ) : SchedulingConfigurer { init { scheduledTaskConfigurer = this @@ -57,6 +58,7 @@ class ScheduledTaskConfigurer( val taskScheduler = builder.build() taskScheduler.initialize() Companion.taskRegistrar.setTaskScheduler(taskScheduler) + ServiceShutdownHook.add { jobs.forEach { it.stop(it.batchJobProperties.stopTimeout, true) } } } jobs.filter { it.batchJobProperties.enabled }.forEach { val properties = it.batchJobProperties diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/BatchJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/BatchJobProperties.kt index fcb4614337..d043dc2c3b 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/BatchJobProperties.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/BatchJobProperties.kt @@ -41,5 +41,10 @@ open class BatchJobProperties( open var cron: String = Scheduled.CRON_DISABLED, open var fixedDelay: Long = 0, open var fixedRate: Long = 0, - open var initialDelay: Long = 0 + open var initialDelay: Long = 0, + + /** + * 停止任务超时时间,查过该时间,则会强制停止任务 + * */ + var stopTimeout: Long = 30000, ) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/controller/user/UserJobController.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/controller/user/UserJobController.kt index 229fffa1d4..2b86277efb 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/controller/user/UserJobController.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/controller/user/UserJobController.kt @@ -38,6 +38,7 @@ import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.PathVariable import org.springframework.web.bind.annotation.PutMapping import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RequestParam import org.springframework.web.bind.annotation.RestController @RestController @@ -56,4 +57,10 @@ class UserJobController(val systemJobService: SystemJobService) { fun update(@PathVariable name: String, enabled: Boolean, running: Boolean): Response { return ResponseBuilder.success(systemJobService.update(name, enabled, running)) } + + @PutMapping("/stop") + fun stop(@RequestParam(required = false) name: String?, @RequestParam maxWaitTime: Long = 0): Response { + systemJobService.stop(name, maxWaitTime) + return ResponseBuilder.success() + } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/pojo/TJobFailover.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/pojo/TJobFailover.kt new file mode 100644 index 0000000000..7eebf54d38 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/pojo/TJobFailover.kt @@ -0,0 +1,18 @@ +package com.tencent.bkrepo.job.pojo + +import org.springframework.data.mongodb.core.index.CompoundIndex +import org.springframework.data.mongodb.core.mapping.Document +import java.time.LocalDateTime + +@Document("job_failover") +@CompoundIndex(name = "name_idx", def = "{'name': 1}", background = true) +data class TJobFailover( + var id: String? = null, + var createdBy: String, + var createdDate: LocalDateTime, + var name: String, + var success: Long, + var failed: Long, + var total: Long, + var data: String?, +) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/repository/JobSnapshotRepository.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/repository/JobSnapshotRepository.kt new file mode 100644 index 0000000000..06539275a6 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/repository/JobSnapshotRepository.kt @@ -0,0 +1,9 @@ +package com.tencent.bkrepo.job.repository + +import com.tencent.bkrepo.job.pojo.TJobFailover +import org.springframework.data.mongodb.repository.MongoRepository + +interface JobSnapshotRepository : MongoRepository { + fun findFirstByNameOrderByIdDesc(name: String): TJobFailover? + fun deleteByName(name: String) +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/service/SystemJobService.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/service/SystemJobService.kt index e3f1584a6c..c4032567a3 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/service/SystemJobService.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/service/SystemJobService.kt @@ -60,14 +60,15 @@ class SystemJobService(val jobs: List>) { fixedDelay = fixedDelay, fixedRate = fixedRate, initialDelay = initialDelay, - running = it.isRunning(), + running = it.inProcess(), lastBeginTime = it.lastBeginTime, lastEndTime = it.lastEndTime, lastExecuteTime = it.lastExecuteTime, - nextExecuteTime = getNextExecuteTime(it.batchJobProperties, - it.lastBeginTime, - it.lastEndTime - ) + nextExecuteTime = getNextExecuteTime( + it.batchJobProperties, + it.lastBeginTime, + it.lastEndTime, + ), ) jobDetails.add(jobDetail) } @@ -78,7 +79,7 @@ class SystemJobService(val jobs: List>) { private fun getNextExecuteTime( batchJobProperties: BatchJobProperties, lastBeginTime: LocalDateTime?, - lastEndTime: LocalDateTime? + lastEndTime: LocalDateTime?, ): LocalDateTime? { if (!batchJobProperties.enabled) { return null @@ -86,9 +87,11 @@ class SystemJobService(val jobs: List>) { val finalNextTime: Date val triggerContext = buildTriggerContext(lastBeginTime, lastEndTime) if (batchJobProperties.cron.equals(ScheduledTaskRegistrar.CRON_DISABLED)) { - finalNextTime = getNextTimeByPeriodic(batchJobProperties, - triggerContext, - lastBeginTime) + finalNextTime = getNextTimeByPeriodic( + batchJobProperties, + triggerContext, + lastBeginTime, + ) } else { finalNextTime = getNextTimeByCron(batchJobProperties, triggerContext) } @@ -100,7 +103,7 @@ class SystemJobService(val jobs: List>) { */ private fun buildTriggerContext( lastBeginTime: LocalDateTime?, - lastEndTime: LocalDateTime? + lastEndTime: LocalDateTime?, ): SimpleTriggerContext { val lastFinshTime = if (lastEndTime != null) { Date.from(lastEndTime.atZone(ZoneId.systemDefault()).toInstant()) @@ -120,7 +123,7 @@ class SystemJobService(val jobs: List>) { */ private fun getNextTimeByCron( batchJobProperties: BatchJobProperties, - triggerContext: SimpleTriggerContext + triggerContext: SimpleTriggerContext, ): Date { val cronTrigger = CronTrigger(batchJobProperties.cron) return cronTrigger.nextExecutionTime(triggerContext) @@ -132,7 +135,7 @@ class SystemJobService(val jobs: List>) { private fun getNextTimeByPeriodic( batchJobProperties: BatchJobProperties, triggerContext: SimpleTriggerContext, - lastBeginTime: LocalDateTime? + lastBeginTime: LocalDateTime?, ): Date { val fixRateStatus = if (batchJobProperties.fixedDelay != 0L) false else true val period = if (batchJobProperties.fixedDelay != 0L) { @@ -163,4 +166,12 @@ class SystemJobService(val jobs: List>) { } return true } + + fun stop(name: String?, maxWaitTime: Long) { + if (name == null) { + jobs.forEach { it.stop(maxWaitTime) } + } else { + jobs.first { it.getJobName() == name }.stop(maxWaitTime) + } + } } diff --git a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJobTest.kt b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJobTest.kt index a4d19a6765..f6b2fc9b4e 100644 --- a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJobTest.kt +++ b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJobTest.kt @@ -33,6 +33,7 @@ import com.tencent.bkrepo.common.service.util.SpringContextUtils import com.tencent.bkrepo.common.storage.core.StorageService import com.tencent.bkrepo.common.storage.credentials.InnerCosCredentials import com.tencent.bkrepo.job.SHARDING_COUNT +import com.tencent.bkrepo.job.repository.JobSnapshotRepository import com.tencent.bkrepo.repository.api.StorageCredentialsClient import io.mockk.every import io.mockk.mockk @@ -67,6 +68,8 @@ class FileReferenceCleanupJobTest : JobBaseTest() { @MockBean lateinit var archiveClient: ArchiveClient + @MockBean + lateinit var jobSnapshotRepository: JobSnapshotRepository @Autowired lateinit var mongoTemplate: MongoTemplate