diff --git a/Package.swift b/Package.swift index e3d8f64..6c3a2ed 100644 --- a/Package.swift +++ b/Package.swift @@ -11,11 +11,12 @@ let package = Package( ], products: [ .library(name: "Queues", targets: ["Queues"]), - .library(name: "XCTQueues", targets: ["XCTQueues"]) + .library(name: "XCTQueues", targets: ["XCTQueues"]), ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), + .package(url: "https://github.com/vapor/vapor.git", from: "4.104.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.70.0"), + .package(url: "https://github.com/apple/swift-metrics.git", from: "2.5.0"), ], targets: [ .target( @@ -23,6 +24,7 @@ let package = Package( dependencies: [ .product(name: "Vapor", package: "vapor"), .product(name: "NIOCore", package: "swift-nio"), + .product(name: "Metrics", package: "swift-metrics"), ], swiftSettings: swiftSettings ), @@ -39,6 +41,7 @@ let package = Package( .target(name: "Queues"), .target(name: "XCTQueues"), .product(name: "XCTVapor", package: "vapor"), + .product(name: "MetricsTestKit", package: "swift-metrics"), ], swiftSettings: swiftSettings ), diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift index 1092a83..f04268b 100644 --- a/Package@swift-5.9.swift +++ b/Package@swift-5.9.swift @@ -11,11 +11,12 @@ let package = Package( ], products: [ .library(name: "Queues", targets: ["Queues"]), - .library(name: "XCTQueues", targets: ["XCTQueues"]) + .library(name: "XCTQueues", targets: ["XCTQueues"]), ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), + .package(url: "https://github.com/vapor/vapor.git", from: "4.104.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.70.0"), + .package(url: "https://github.com/apple/swift-metrics.git", from: "2.5.0"), ], targets: [ .target( @@ -23,6 +24,7 @@ let package = Package( dependencies: [ .product(name: "Vapor", package: "vapor"), .product(name: "NIOCore", package: "swift-nio"), + .product(name: "Metrics", package: "swift-metrics"), ], swiftSettings: swiftSettings ), @@ -39,6 +41,7 @@ let package = Package( .target(name: "Queues"), .target(name: "XCTQueues"), .product(name: "XCTVapor", package: "vapor"), + .product(name: "MetricsTestKit", package: "swift-metrics"), ], swiftSettings: swiftSettings ), diff --git a/Sources/Queues/AsyncQueue.swift b/Sources/Queues/AsyncQueue.swift index 5593408..577981c 100644 --- a/Sources/Queues/AsyncQueue.swift +++ b/Sources/Queues/AsyncQueue.swift @@ -1,28 +1,29 @@ import Foundation -import Vapor +import Metrics import NIOCore +import Vapor public protocol AsyncQueue: Queue { /// The job context var context: QueueContext { get } - + /// Gets the next job to be run /// - Parameter id: The ID of the job func get(_ id: JobIdentifier) async throws -> JobData - + /// Sets a job that should be run in the future /// - Parameters: /// - id: The ID of the job /// - data: Data for the job func set(_ id: JobIdentifier, to data: JobData) async throws - + /// Removes a job from the queue /// - Parameter id: The ID of the job func clear(_ id: JobIdentifier) async throws /// Pops the next job in the queue func pop() async throws -> JobIdentifier? - + /// Pushes the next job into a queue /// - Parameter id: The ID of the job func push(_ id: JobIdentifier) async throws @@ -32,19 +33,19 @@ extension AsyncQueue { public func get(_ id: JobIdentifier) -> EventLoopFuture { self.context.eventLoop.makeFutureWithTask { try await self.get(id) } } - + public func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { self.context.eventLoop.makeFutureWithTask { try await self.set(id, to: data) } } - + public func clear(_ id: JobIdentifier) -> EventLoopFuture { self.context.eventLoop.makeFutureWithTask { try await self.clear(id) } } - + public func pop() -> EventLoopFuture { self.context.eventLoop.makeFutureWithTask { try await self.pop() } } - + public func push(_ id: JobIdentifier) -> EventLoopFuture { self.context.eventLoop.makeFutureWithTask { try await self.push(id) } } @@ -68,9 +69,9 @@ extension Queue { logger[metadataKey: "queue"] = "\(self.queueName.string)" logger[metadataKey: "job-id"] = "\(id.string)" logger[metadataKey: "job-name"] = "\(J.name)" - - let storage = JobData( - payload: try J.serializePayload(payload), + + let storage = try JobData( + payload: J.serializePayload(payload), maxRetryCount: maxRetryCount, jobName: J.name, delayUntil: delayUntil, @@ -82,7 +83,11 @@ extension Queue { logger.trace("Pusing job to queue") try await self.push(id).get() logger.info("Dispatched job") - + Counter(label: "dispatched.jobs.counter", dimensions: [ + ("queueName", self.queueName.string), + ("jobName", J.name), + ]).increment() + await self.sendNotification(of: "dispatch", logger: logger) { try await $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: self.eventLoop).get() } diff --git a/Sources/Queues/Queue.swift b/Sources/Queues/Queue.swift index efee13e..ef7b420 100644 --- a/Sources/Queues/Queue.swift +++ b/Sources/Queues/Queue.swift @@ -1,29 +1,31 @@ -import NIOCore -import Logging + import Foundation +import Logging +import Metrics +import NIOCore /// A type that can store and retrieve jobs from a persistence layer public protocol Queue: Sendable { /// The job context var context: QueueContext { get } - + /// Gets the next job to be run /// - Parameter id: The ID of the job func get(_ id: JobIdentifier) -> EventLoopFuture - + /// Sets a job that should be run in the future /// - Parameters: /// - id: The ID of the job /// - data: Data for the job func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture - + /// Removes a job from the queue /// - Parameter id: The ID of the job func clear(_ id: JobIdentifier) -> EventLoopFuture /// Pops the next job in the queue func pop() -> EventLoopFuture - + /// Pushes the next job into a queue /// - Parameter id: The ID of the job func push(_ id: JobIdentifier) -> EventLoopFuture @@ -34,27 +36,27 @@ extension Queue { public var eventLoop: any EventLoop { self.context.eventLoop } - + /// A logger public var logger: Logger { self.context.logger } - + /// The configuration for the queue public var configuration: QueuesConfiguration { self.context.configuration } - + /// The queue's name public var queueName: QueueName { self.context.queueName } - + /// The key name of the queue public var key: String { self.queueName.makeKey(with: self.configuration.persistenceKey) } - + /// Dispatch a job into the queue for processing /// - Parameters: /// - job: The Job type @@ -94,7 +96,11 @@ extension Queue { logger.trace("Pusing job to queue") return self.push(id) }.flatMapWithEventLoop { _, eventLoop in - logger.info("Dispatched job") + Counter(label: "dispatched.jobs.counter", dimensions: [ + ("queueName", self.queueName.string), + ("jobName", J.name), + ]).increment() + self.logger.info("Dispatched queue job") return self.sendNotification(of: "dispatch", logger: logger) { $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: eventLoop) } diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index c916b4e..f1a5bad 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -1,6 +1,8 @@ -import NIOCore -import Logging +import Dispatch import Foundation +import Logging +import Metrics +import NIOCore extension Queue { public var worker: QueueWorker { @@ -16,7 +18,7 @@ public struct QueueWorker: Sendable { /// This is a thin wrapper for ELF-style callers. public func run() -> EventLoopFuture { self.queue.eventLoop.makeFutureWithTask { - try await run() + try await self.run() } } @@ -25,14 +27,14 @@ public struct QueueWorker: Sendable { public func run() async throws { while try await self.runOneJob() {} } - + /// Pop a job off the queue and try to run it. If no jobs are available, do /// nothing. Returns whether a job was run. private func runOneJob() async throws -> Bool { var logger = self.queue.logger logger[metadataKey: "queue"] = "\(self.queue.queueName.string)" logger.trace("Popping job from queue") - + guard let id = try await self.queue.pop().get() else { // No job found, go around again. logger.trace("No pending jobs") @@ -41,7 +43,7 @@ public struct QueueWorker: Sendable { logger[metadataKey: "job-id"] = "\(id.string)" logger.trace("Found pending job") - + let data = try await self.queue.get(id).get() logger.trace("Received job data", metadata: ["job-data": "\(data)"]) logger[metadataKey: "job-name"] = "\(data.jobName)" @@ -68,11 +70,13 @@ public struct QueueWorker: Sendable { } private func runOneJob(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws { + let startTime = DispatchTime.now().uptimeNanoseconds logger.info("Dequeing and running job", metadata: ["attempt": "\(jobData.currentAttempt)", "retries-left": "\(jobData.remainingAttempts)"]) do { try await job._dequeue(self.queue.context, id: id.string, payload: jobData.payload).get() logger.trace("Job ran successfully", metadata: ["attempts-made": "\(jobData.currentAttempt)"]) + self.updateMetrics(for: id, startTime: startTime, queue: self.queue) await self.queue.sendNotification(of: "success", logger: logger) { try await $0.success(jobId: id.string, eventLoop: self.queue.context.eventLoop).get() } @@ -82,6 +86,7 @@ public struct QueueWorker: Sendable { return try await self.retry(id: id, job: job, jobData: jobData, error: error, logger: logger) } else { logger.warning("Job failed, no retries remaining", metadata: ["error": "\(error)", "attempts-made": "\(jobData.currentAttempt)"]) + self.updateMetrics(for: id, startTime: startTime, queue: self.queue, error: error) try await job._error(self.queue.context, id: id.string, error, payload: jobData.payload).get() await self.queue.sendNotification(of: "failure", logger: logger) { @@ -102,12 +107,42 @@ public struct QueueWorker: Sendable { queuedAt: .init(), attempts: jobData.currentAttempt ) - + logger.warning("Job failed, retrying", metadata: [ - "retry-delay": "\(delay)", "error": "\(error)", "next-attempt": "\(updatedData.currentAttempt)", "retries-left": "\(updatedData.remainingAttempts)" + "retry-delay": "\(delay)", "error": "\(error)", "next-attempt": "\(updatedData.currentAttempt)", "retries-left": "\(updatedData.remainingAttempts)", ]) try await self.queue.clear(id).get() try await self.queue.set(id, to: updatedData).get() try await self.queue.push(id).get() } + + private func updateMetrics( + for id: JobIdentifier, + startTime: UInt64, + queue: any Queue, + error: (any Error)? = nil + ) { + // Checks how long the job took to complete + Timer( + label: "\(id.string).jobDurationTimer", + dimensions: [ + ("success", error == nil ? "true" : "false"), + ("id", id.string), + ], + preferredDisplayUnit: .seconds + ).recordNanoseconds(DispatchTime.now().uptimeNanoseconds - startTime) + + // Adds the completed job to a different counter depending on its result + if error != nil { + Counter( + label: "error.completed.jobs.counter", + dimensions: [("queueName", queue.queueName.string)] + ).increment() + } else { + Counter( + label: "success.completed.jobs.counter", + dimensions: [("queueName", queue.queueName.string)] + ).increment() + } + } } diff --git a/Sources/XCTQueues/TestQueueDriver.swift b/Sources/XCTQueues/TestQueueDriver.swift index ace6885..de6f335 100644 --- a/Sources/XCTQueues/TestQueueDriver.swift +++ b/Sources/XCTQueues/TestQueueDriver.swift @@ -1,7 +1,7 @@ +import NIOConcurrencyHelpers +import NIOCore import Queues import Vapor -import NIOCore -import NIOConcurrencyHelpers extension Application.Queues.Provider { public static var test: Self { @@ -25,7 +25,7 @@ struct TestQueuesDriver: QueuesDriver { func makeQueue(with context: QueueContext) -> any Queue { TestQueue(_context: .init(context)) } - + func shutdown() { // nothing } @@ -43,6 +43,7 @@ extension Application.Queues { var jobs: [JobIdentifier: JobData] = [:] var queue: [JobIdentifier] = [] } + private let box = NIOLockedValueBox(.init()) public var jobs: [JobIdentifier: JobData] { @@ -56,31 +57,31 @@ extension Application.Queues { } /// Returns the payloads of all jobs in the queue having type `J`. - public func all(_ job: J.Type) -> [J.Payload] { + public func all(_: J.Type) -> [J.Payload] { let filteredJobIds = self.jobs.filter { $1.jobName == J.name }.map { $0.0 } return self.queue .filter { filteredJobIds.contains($0) } - .compactMap { jobs[$0] } + .compactMap { self.jobs[$0] } .compactMap { try? J.parsePayload($0.payload) } } /// Returns the payload of the first job in the queue having type `J`. - public func first(_ job: J.Type) -> J.Payload? { - let filteredJobIds = jobs.filter { $1.jobName == J.name }.map { $0.0 } - + public func first(_: J.Type) -> J.Payload? { + let filteredJobIds = self.jobs.filter { $1.jobName == J.name }.map { $0.0 } + guard let queueJob = self.queue.first(where: { filteredJobIds.contains($0) }), let jobData = self.jobs[queueJob] else { return nil } return try? J.parsePayload(jobData.payload) } - + /// Checks whether a job of type `J` was dispatched to queue public func contains(_ job: J.Type) -> Bool { self.first(job) != nil } } - + struct TestQueueKey: StorageKey, LockKey { typealias Value = TestQueueStorage } @@ -96,7 +97,7 @@ extension Application.Queues { struct AsyncTestQueueKey: StorageKey, LockKey { typealias Value = TestQueueStorage } - + public var asyncTest: TestQueueStorage { self.application.storage[AsyncTestQueueKey.self]! } @@ -109,34 +110,34 @@ extension Application.Queues { struct TestQueue: Queue { let _context: NIOLockedValueBox var context: QueueContext { self._context.withLockedValue { $0 } } - + func get(_ id: JobIdentifier) -> EventLoopFuture { self._context.withLockedValue { context in context.eventLoop.makeSucceededFuture(context.application.queues.test.jobs[id]!) } } - + func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { self._context.withLockedValue { context in context.application.queues.test.jobs[id] = data return context.eventLoop.makeSucceededVoidFuture() } } - + func clear(_ id: JobIdentifier) -> EventLoopFuture { self._context.withLockedValue { context in context.application.queues.test.jobs[id] = nil return context.eventLoop.makeSucceededVoidFuture() } } - + func pop() -> EventLoopFuture { self._context.withLockedValue { context in let last = context.application.queues.test.queue.popLast() return context.eventLoop.makeSucceededFuture(last) } } - + func push(_ id: JobIdentifier) -> EventLoopFuture { self._context.withLockedValue { context in context.application.queues.test.queue.append(id) @@ -148,7 +149,7 @@ struct TestQueue: Queue { struct AsyncTestQueue: AsyncQueue { let _context: NIOLockedValueBox var context: QueueContext { self._context.withLockedValue { $0 } } - + func get(_ id: JobIdentifier) async throws -> JobData { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id]! } } func set(_ id: JobIdentifier, to data: JobData) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = data } } func clear(_ id: JobIdentifier) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = nil } } diff --git a/Tests/QueuesTests/AsyncQueueTests.swift b/Tests/QueuesTests/AsyncQueueTests.swift index 52a2452..e1df95e 100644 --- a/Tests/QueuesTests/AsyncQueueTests.swift +++ b/Tests/QueuesTests/AsyncQueueTests.swift @@ -16,92 +16,80 @@ func XCTAssertNoThrowAsync( final class AsyncQueueTests: XCTestCase { var app: Application! - + override class func setUp() { XCTAssert(isLoggingConfigured) } override func setUp() async throws { - app = try await Application.make(.testing) + self.app = try await Application.make(.testing) } - + override func tearDown() async throws { - try await app.asyncShutdown() + try await self.app.asyncShutdown() } - + func testAsyncJobWithSyncQueue() async throws { - app.queues.use(.test) - - let promise = app.eventLoopGroup.any().makePromise(of: Void.self) - app.queues.add(MyAsyncJob(promise: promise)) - - app.get("foo") { req in + self.app.queues.use(.test) + + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req in try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "baz")) try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "quux")) return "done" } - - try await app.testable().test(.GET, "foo") { res async in + + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - - XCTAssertEqual(app.queues.test.queue.count, 3) - XCTAssertEqual(app.queues.test.jobs.count, 3) - let job = app.queues.test.first(MyAsyncJob.self) - XCTAssert(app.queues.test.contains(MyAsyncJob.self)) + + XCTAssertEqual(self.app.queues.test.queue.count, 3) + XCTAssertEqual(self.app.queues.test.jobs.count, 3) + let job = self.app.queues.test.first(MyAsyncJob.self) + XCTAssert(self.app.queues.test.contains(MyAsyncJob.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - - try await app.queues.queue.worker.run().get() - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) - + + try await self.app.queues.queue.worker.run().get() + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + await XCTAssertNoThrowAsync(try await promise.futureResult.get()) } func testAsyncJobWithAsyncQueue() async throws { - app.queues.use(.asyncTest) + self.app.queues.use(.asyncTest) - let promise = app.eventLoopGroup.any().makePromise(of: Void.self) - app.queues.add(MyAsyncJob(promise: promise)) - - app.get("foo") { req in + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req in try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "baz")) try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "quux")) return "done" } - - try await app.testable().test(.GET, "foo") { res async in + + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - - XCTAssertEqual(app.queues.asyncTest.queue.count, 3) - XCTAssertEqual(app.queues.asyncTest.jobs.count, 3) - let job = app.queues.asyncTest.first(MyAsyncJob.self) - XCTAssert(app.queues.asyncTest.contains(MyAsyncJob.self)) + + XCTAssertEqual(self.app.queues.asyncTest.queue.count, 3) + XCTAssertEqual(self.app.queues.asyncTest.jobs.count, 3) + let job = self.app.queues.asyncTest.first(MyAsyncJob.self) + XCTAssert(self.app.queues.asyncTest.contains(MyAsyncJob.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - - try await app.queues.queue.worker.run().get() - XCTAssertEqual(app.queues.asyncTest.queue.count, 0) - XCTAssertEqual(app.queues.asyncTest.jobs.count, 0) - await XCTAssertNoThrowAsync(try await promise.futureResult.get()) - } -} + try await self.app.queues.queue.worker.run().get() + XCTAssertEqual(self.app.queues.asyncTest.queue.count, 0) + XCTAssertEqual(self.app.queues.asyncTest.jobs.count, 0) -struct MyAsyncJob: AsyncJob { - let promise: EventLoopPromise - - struct Data: Codable { - var foo: String - } - - func dequeue(_ context: QueueContext, _ payload: Data) async throws { - self.promise.succeed() + await XCTAssertNoThrowAsync(try await promise.futureResult.get()) } } diff --git a/Tests/QueuesTests/MetricsTests.swift b/Tests/QueuesTests/MetricsTests.swift new file mode 100644 index 0000000..74673f7 --- /dev/null +++ b/Tests/QueuesTests/MetricsTests.swift @@ -0,0 +1,114 @@ +@testable import CoreMetrics +import Metrics +import MetricsTestKit +import NIOConcurrencyHelpers +import Queues +@testable import Vapor +import XCTQueues +import XCTVapor + +final class MetricsTests: XCTestCase { + var app: Application! + var metrics: TestMetrics! + + override func setUp() async throws { + self.metrics = TestMetrics() + MetricsSystem.bootstrapInternal(self.metrics) + + self.app = try await Application.make(.testing) + self.app.queues.use(.test) + } + + override func tearDown() async throws { + try await self.app.asyncShutdown() + } + + func testJobDurationTimer() async throws { + let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req async throws in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"), id: JobIdentifier(string: "some-id")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + try await self.app.queues.queue.worker.run() + + let timer = try XCTUnwrap(self.metrics.timers.first(where: { $0.label == "some-id.jobDurationTimer" })) + let successDimension = try XCTUnwrap(timer.dimensions.first(where: { $0.0 == "success" })) + let idDimension = try XCTUnwrap(timer.dimensions.first(where: { $0.0 == "id" })) + XCTAssertEqual(successDimension.1, "true") + XCTAssertEqual(idDimension.1, "some-id") + + try XCTAssertNoThrow(promise.futureResult.wait()) + } + + func testSuccessfullyCompletedJobsCounter() async throws { + let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req async throws in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + try await self.app.queues.queue.worker.run() + let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "success.completed.jobs.counter" })) + let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) + XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + } + + func testErroringJobsCounter() async throws { + let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) + self.app.queues.add(FailingAsyncJob(promise: promise)) + + self.app.get("foo") { req async throws in + try await req.queue.dispatch(FailingAsyncJob.self, .init(foo: "bar")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + try await self.app.queues.queue.worker.run() + let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "error.completed.jobs.counter" })) + let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) + XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + } + + func testDispatchedJobsCounter() async throws { + let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req async throws in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"), id: JobIdentifier(string: "first")) + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "rab"), id: JobIdentifier(string: "second")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + try await self.app.queues.queue.worker.run() + + let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "dispatched.jobs.counter" })) + let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) + let jobNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "jobName" })) + XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + XCTAssertEqual(jobNameDimension.1, MyAsyncJob.name) + } +} diff --git a/Tests/QueuesTests/QueueTests.swift b/Tests/QueuesTests/QueueTests.swift index cf90318..031fea1 100644 --- a/Tests/QueuesTests/QueueTests.swift +++ b/Tests/QueuesTests/QueueTests.swift @@ -1,9 +1,9 @@ import Atomics +import NIOConcurrencyHelpers import Queues import XCTest -import XCTVapor import XCTQueues -import NIOConcurrencyHelpers +import XCTVapor func XCTAssertEqualAsync( _ expression1: @autoclosure () async throws -> T, @@ -56,19 +56,19 @@ final class QueueTests: XCTestCase { self.app = try await Application.make(.testing) self.app.queues.use(.test) } - + override func tearDown() async throws { try await self.app.asyncShutdown() self.app = nil } - + func testVaporIntegrationWithInProcessJob() async throws { let jobSignal1 = self.app.eventLoopGroup.any().makePromise(of: String.self) self.app.queues.add(Foo1(promise: jobSignal1)) let jobSignal2 = self.app.eventLoopGroup.any().makePromise(of: String.self) self.app.queues.add(Foo2(promise: jobSignal2)) try self.app.queues.startInProcessJobs(on: .default) - + self.app.get("bar1") { req in try await req.queue.dispatch(Foo1.self, .init(foo: "Bar payload")).get() return "job bar dispatched" @@ -78,7 +78,7 @@ final class QueueTests: XCTestCase { try await req.queue.dispatch(Foo2.self, .init(foo: "Bar payload")) return "job bar dispatched" } - + try await self.app.testable().test(.GET, "bar1") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "job bar dispatched") @@ -90,7 +90,7 @@ final class QueueTests: XCTestCase { await XCTAssertEqualAsync(try await jobSignal1.futureResult.get(), "Bar payload") await XCTAssertEqualAsync(try await jobSignal2.futureResult.get(), "Bar payload") } - + func testVaporIntegration() async throws { let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) self.app.queues.add(Foo1(promise: promise)) @@ -99,23 +99,23 @@ final class QueueTests: XCTestCase { try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) return "done" } - + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - + XCTAssertEqual(self.app.queues.test.queue.count, 1) XCTAssertEqual(self.app.queues.test.jobs.count, 1) let job = self.app.queues.test.first(Foo1.self) XCTAssert(self.app.queues.test.contains(Foo1.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - + try await self.app.queues.queue.worker.run() XCTAssertEqual(self.app.queues.test.queue.count, 0) XCTAssertEqual(self.app.queues.test.jobs.count, 0) - + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } @@ -155,23 +155,23 @@ final class QueueTests: XCTestCase { try await req.queue.dispatch(Foo1.self, .init(foo: "bar"), id: JobIdentifier(string: "my-custom-id")) return "done" } - + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - + XCTAssertEqual(self.app.queues.test.queue.count, 1) XCTAssertEqual(self.app.queues.test.jobs.count, 1) XCTAssert(self.app.queues.test.jobs.keys.map(\.string).contains("my-custom-id")) - + try await self.app.queues.queue.worker.run() XCTAssertEqual(self.app.queues.test.queue.count, 0) XCTAssertEqual(self.app.queues.test.jobs.count, 0) - + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } - + func testScheduleBuilderAPI() async throws { // yearly self.app.queues.schedule(Cleanup()).yearly().in(.may).on(23).at(.noon) @@ -194,19 +194,19 @@ final class QueueTests: XCTestCase { // hourly self.app.queues.schedule(Cleanup()).hourly().at(30) } - + func testRepeatingScheduledJob() async throws { let scheduledJob = TestingScheduledJob() XCTAssertEqual(scheduledJob.count.load(ordering: .relaxed), 0) self.app.queues.schedule(scheduledJob).everySecond() try self.app.queues.startScheduledJobs() - + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) self.app.eventLoopGroup.any().scheduleTask(in: .seconds(5)) { XCTAssert(scheduledJob.count.load(ordering: .relaxed) > 4) promise.succeed() } - + try await promise.futureResult.get() } @@ -215,20 +215,20 @@ final class QueueTests: XCTestCase { XCTAssertEqual(scheduledJob.count.load(ordering: .relaxed), 0) self.app.queues.schedule(scheduledJob).everySecond() try self.app.queues.startScheduledJobs() - + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) self.app.eventLoopGroup.any().scheduleTask(in: .seconds(5)) { XCTAssert(scheduledJob.count.load(ordering: .relaxed) > 4) promise.succeed() } - + try await promise.futureResult.get() } func testFailingScheduledJob() async throws { self.app.queues.schedule(FailingScheduledJob()).everySecond() try self.app.queues.startScheduledJobs() - + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) self.app.eventLoopGroup.any().scheduleTask(in: .seconds(1)) { promise.succeed() @@ -239,7 +239,7 @@ final class QueueTests: XCTestCase { func testAsyncFailingScheduledJob() async throws { self.app.queues.schedule(AsyncFailingScheduledJob()).everySecond() try self.app.queues.startScheduledJobs() - + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) self.app.eventLoopGroup.any().scheduleTask(in: .seconds(1)) { promise.succeed() @@ -250,7 +250,7 @@ final class QueueTests: XCTestCase { func testCustomWorkerCount() async throws { // Setup custom ELG with 4 threads let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 4) - + do { let count = self.app.eventLoopGroup.any().makePromise(of: Int.self) self.app.queues.use(custom: WorkerCountDriver(count: count)) @@ -306,7 +306,7 @@ final class QueueTests: XCTestCase { XCTAssertEqual(self.app.queues.test.queue.count, 0) XCTAssertEqual(self.app.queues.test.jobs.count, 0) XCTAssertTrue(dequeuedHook.successHit) - + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } @@ -361,7 +361,7 @@ final class QueueTests: XCTestCase { let errorHook = ErrorHook() self.app.queues.add(successHook) self.app.queues.add(errorHook) - + self.app.get("foo") { req in try await req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) return "done" @@ -397,7 +397,7 @@ final class QueueTests: XCTestCase { let errorHook = AsyncErrorHook() self.app.queues.add(successHook) self.app.queues.add(errorHook) - + self.app.get("foo") { req in try await req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) return "done" @@ -522,7 +522,7 @@ final class QueueTests: XCTestCase { final class DispatchHook: JobEventDelegate, @unchecked Sendable { var successHit = false - func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { + func dispatched(job _: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { self.successHit = true return eventLoop.makeSucceededVoidFuture() } @@ -531,7 +531,7 @@ final class DispatchHook: JobEventDelegate, @unchecked Sendable { final class SuccessHook: JobEventDelegate, @unchecked Sendable { var successHit = false - func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + func success(jobId _: String, eventLoop: any EventLoop) -> EventLoopFuture { self.successHit = true return eventLoop.makeSucceededVoidFuture() } @@ -540,7 +540,7 @@ final class SuccessHook: JobEventDelegate, @unchecked Sendable { final class ErrorHook: JobEventDelegate, @unchecked Sendable { var errorCount = 0 - func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture { + func error(jobId _: String, error _: any Error, eventLoop: any EventLoop) -> EventLoopFuture { self.errorCount += 1 return eventLoop.makeSucceededVoidFuture() } @@ -549,7 +549,7 @@ final class ErrorHook: JobEventDelegate, @unchecked Sendable { final class DequeuedHook: JobEventDelegate, @unchecked Sendable { var successHit = false - func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + func didDequeue(jobId _: String, eventLoop: any EventLoop) -> EventLoopFuture { self.successHit = true return eventLoop.makeSucceededVoidFuture() } @@ -557,22 +557,22 @@ final class DequeuedHook: JobEventDelegate, @unchecked Sendable { actor AsyncDispatchHook: AsyncJobEventDelegate { var successHit = false - func dispatched(job: JobEventData) async throws { self.successHit = true } + func dispatched(job _: JobEventData) async throws { self.successHit = true } } actor AsyncSuccessHook: AsyncJobEventDelegate { var successHit = false - func success(jobId: String) async throws { self.successHit = true } + func success(jobId _: String) async throws { self.successHit = true } } actor AsyncErrorHook: AsyncJobEventDelegate { var errorCount = 0 - func error(jobId: String, error: any Error) async throws { self.errorCount += 1 } + func error(jobId _: String, error _: any Error) async throws { self.errorCount += 1 } } actor AsyncDequeuedHook: AsyncJobEventDelegate { var successHit = false - func didDequeue(jobId: String) async throws { self.successHit = true } + func didDequeue(jobId _: String) async throws { self.successHit = true } } final class WorkerCountDriver: QueuesDriver, @unchecked Sendable { @@ -609,30 +609,29 @@ final class WorkerCountDriver: QueuesDriver, @unchecked Sendable { let driver: WorkerCountDriver var context: QueueContext - func get(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } - func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { fatalError() } - func clear(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } + func get(_: JobIdentifier) -> EventLoopFuture { fatalError() } + func set(_: JobIdentifier, to _: JobData) -> EventLoopFuture { fatalError() } + func clear(_: JobIdentifier) -> EventLoopFuture { fatalError() } func pop() -> EventLoopFuture { self.driver.record(eventLoop: self.context.eventLoop) return self.context.eventLoop.makeSucceededFuture(nil) } - func push(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } + + func push(_: JobIdentifier) -> EventLoopFuture { fatalError() } } } -struct Failure: Error {} - struct FailingScheduledJob: ScheduledJob { func run(context: QueueContext) -> EventLoopFuture { context.eventLoop.makeFailedFuture(Failure()) } } struct AsyncFailingScheduledJob: AsyncScheduledJob { - func run(context: QueueContext) async throws { throw Failure() } + func run(context _: QueueContext) async throws { throw Failure() } } struct TestingScheduledJob: ScheduledJob { var count = ManagedAtomic(0) - + func run(context: QueueContext) -> EventLoopFuture { self.count.wrappingIncrement(ordering: .relaxed) return context.eventLoop.makeSucceededVoidFuture() @@ -641,22 +640,22 @@ struct TestingScheduledJob: ScheduledJob { struct AsyncTestingScheduledJob: AsyncScheduledJob { var count = ManagedAtomic(0) - func run(context: QueueContext) async throws { self.count.wrappingIncrement(ordering: .relaxed) } + func run(context _: QueueContext) async throws { self.count.wrappingIncrement(ordering: .relaxed) } } struct Foo1: Job { let promise: EventLoopPromise - + struct Data: Codable { var foo: String } - + func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { self.promise.succeed(data.foo) return context.eventLoop.makeSucceededVoidFuture() } - - func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + + func error(_ context: QueueContext, _ error: any Error, _: Data) -> EventLoopFuture { self.promise.fail(error) return context.eventLoop.makeSucceededVoidFuture() } @@ -664,17 +663,17 @@ struct Foo1: Job { struct Foo2: Job { let promise: EventLoopPromise - + struct Data: Codable { var foo: String } - + func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { self.promise.succeed(data.foo) return context.eventLoop.makeSucceededVoidFuture() } - - func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + + func error(_ context: QueueContext, _ error: any Error, _: Data) -> EventLoopFuture { self.promise.fail(error) return context.eventLoop.makeSucceededVoidFuture() } @@ -685,11 +684,11 @@ struct Bar: Job { var foo: String } - func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { + func dequeue(_ context: QueueContext, _: Data) -> EventLoopFuture { context.eventLoop.makeFailedFuture(Abort(.badRequest)) } - func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + func error(_ context: QueueContext, _: any Error, _: Data) -> EventLoopFuture { context.eventLoop.makeSucceededVoidFuture() } } @@ -699,11 +698,11 @@ struct Baz: Job { var foo: String } - func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { + func dequeue(_ context: QueueContext, _: Data) -> EventLoopFuture { context.eventLoop.makeFailedFuture(Abort(.badRequest)) } - func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + func error(_ context: QueueContext, _: any Error, _: Data) -> EventLoopFuture { context.eventLoop.makeSucceededVoidFuture() } diff --git a/Tests/QueuesTests/Utilities/FailingAsyncJob.swift b/Tests/QueuesTests/Utilities/FailingAsyncJob.swift new file mode 100644 index 0000000..2e17bdf --- /dev/null +++ b/Tests/QueuesTests/Utilities/FailingAsyncJob.swift @@ -0,0 +1,15 @@ +import Queues + +struct FailingAsyncJob: AsyncJob { + let promise: EventLoopPromise + + struct Payload: Codable { + var foo: String + } + + func dequeue(_: QueueContext, _: Payload) async throws { + let failure = Failure() + self.promise.fail(failure) + throw failure + } +} diff --git a/Tests/QueuesTests/Utilities/Failure.swift b/Tests/QueuesTests/Utilities/Failure.swift new file mode 100644 index 0000000..6ec8021 --- /dev/null +++ b/Tests/QueuesTests/Utilities/Failure.swift @@ -0,0 +1 @@ +struct Failure: Error {} diff --git a/Tests/QueuesTests/Utilities/MyAsyncJob.swift b/Tests/QueuesTests/Utilities/MyAsyncJob.swift new file mode 100644 index 0000000..23ba31f --- /dev/null +++ b/Tests/QueuesTests/Utilities/MyAsyncJob.swift @@ -0,0 +1,13 @@ +import Queues + +struct MyAsyncJob: AsyncJob { + let promise: EventLoopPromise + + struct Payload: Codable { + var foo: String + } + + func dequeue(_: QueueContext, _: Payload) async throws { + self.promise.succeed() + } +}