diff --git a/Sources/Queues/QueuesCommand.swift b/Sources/Queues/QueuesCommand.swift index b9ff152..bd78037 100644 --- a/Sources/Queues/QueuesCommand.swift +++ b/Sources/Queues/QueuesCommand.swift @@ -196,7 +196,7 @@ public final class QueuesCommand: Command { } // stop all scheduled jobs self.scheduledTasks.values.forEach { - $0.task.syncCancel(on: self.eventLoopGroup.next()) + $0.syncCancel(on: self.eventLoopGroup.next()) } } diff --git a/Sources/Queues/ScheduledJob.swift b/Sources/Queues/ScheduledJob.swift index 6c57c72..d6ca992 100644 --- a/Sources/Queues/ScheduledJob.swift +++ b/Sources/Queues/ScheduledJob.swift @@ -22,29 +22,133 @@ class AnyScheduledJob { } } +protocol AnyScheduledJobTask { + var done: EventLoopFuture { get } + + func cancel(promise: EventLoopPromise?) +} + +extension AnyScheduledJobTask { + func cancel() { + cancel(promise: nil) + } + + func syncCancel(on eventLoop: EventLoop) { + do { + let promise = eventLoop.makePromise(of: Void.self) + self.cancel(promise: promise) + try promise.futureResult.wait() + } catch { + print("failed cancelling repeated task \(error)") + } + } +} + extension AnyScheduledJob { - struct Task { - let task: RepeatedTask + typealias Task = AnyScheduledJobTask + + final class TaskImpl: Task { + let eventLoop: EventLoop + let done: EventLoopFuture + + var innerTask: RepeatedTask? { + get { + eventLoop.assertInEventLoop() + return _innerTask + } + set { + eventLoop.assertInEventLoop() + _innerTask = newValue + } + } + + var isCancelled: Bool { + get { + eventLoop.assertInEventLoop() + return _isCancelled + } + set { + eventLoop.assertInEventLoop() + _isCancelled = newValue + } + } + + private var _innerTask: RepeatedTask? + private var _isCancelled: Bool = false + + init( + eventLoop: EventLoop, + done: EventLoopFuture + ) { + self.eventLoop = eventLoop + self.done = done + } + + func cancel(promise: EventLoopPromise?) { + if eventLoop.inEventLoop { + cancel0(promise: promise) + } else { + eventLoop.execute { + self.cancel0(promise: promise) + } + } + } + + private func cancel0(promise: EventLoopPromise?) { + eventLoop.assertInEventLoop() + _isCancelled = true + _innerTask?.cancel(promise: promise) + } } func schedule(context: QueueContext) -> Task? { context.logger.trace("Beginning the scheduler process") + guard let date = self.scheduler.nextDate() else { context.logger.debug("No date scheduled for \(self.job.name)") return nil } + context.logger.debug("Scheduling \(self.job.name) to run at \(date)") - let promise = context.eventLoop.makePromise(of: Void.self) - let task = context.eventLoop.scheduleRepeatedTask( + + let eventLoop = context.eventLoop + let promise = eventLoop.makePromise(of: Void.self) + let task = TaskImpl(eventLoop: eventLoop, done: promise.futureResult) + + func recurse() { + if task.isCancelled { return } + + let nioTask = nioSchedule(eventLoop: eventLoop, date: date) { + let now = Date() + if now < date { + // It still doesn't reach scheduled date, reschedule. + recurse() + return + } + + context.logger.trace("Running the scheduled job \(self.job.name)") + self.job.run(context: context).cascade(to: promise) + } + + task.innerTask = nioTask + } + + eventLoop.execute { + recurse() + } + + return task + } + + private func nioSchedule(eventLoop: EventLoop, date: Date, completion: @escaping () -> Void) -> RepeatedTask { + return eventLoop.scheduleRepeatedTask( initialDelay: .microseconds(Int64(date.timeIntervalSinceNow * 1_000_000)), delay: .seconds(0) - ) { task in + ) { (nioTask) in // always cancel - task.cancel() - context.logger.trace("Running the scheduled job \(self.job.name)") - self.job.run(context: context).cascade(to: promise) + nioTask.cancel() + completion() } - return .init(task: task, done: promise.futureResult) } }