From 8d9ee352d691e80aa56a4262f7852cd355f93142 Mon Sep 17 00:00:00 2001 From: pt Date: Thu, 6 Oct 2022 17:26:18 +0200 Subject: [PATCH 1/6] Added dependency to swift-metrics --- Package.swift | 4 +- Sources/Queues/AsyncQueue.swift | 28 +-- Sources/Queues/Queue.swift | 31 ++-- Sources/Queues/QueueWorker.swift | 54 +++++- Sources/XCTQueues/TestQueueDriver.swift | 35 ++-- Tests/QueuesTests/AsyncQueueTests.swift | 90 +++++----- Tests/QueuesTests/MetricsTests.swift | 103 +++++++++++ .../Utilities/CapturingMetricsSystem.swift | 160 ++++++++++++++++++ Tests/QueuesTests/Utilities/MyAsyncJob.swift | 13 ++ 9 files changed, 416 insertions(+), 102 deletions(-) create mode 100644 Tests/QueuesTests/MetricsTests.swift create mode 100644 Tests/QueuesTests/Utilities/CapturingMetricsSystem.swift create mode 100644 Tests/QueuesTests/Utilities/MyAsyncJob.swift diff --git a/Package.swift b/Package.swift index e3d8f64..7fae7b9 100644 --- a/Package.swift +++ b/Package.swift @@ -11,11 +11,12 @@ let package = Package( ], products: [ .library(name: "Queues", targets: ["Queues"]), - .library(name: "XCTQueues", targets: ["XCTQueues"]) + .library(name: "XCTQueues", targets: ["XCTQueues"]), ], dependencies: [ .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), + .package(url: "https://github.com/apple/swift-metrics.git", "2.0.0"), ], targets: [ .target( @@ -23,6 +24,7 @@ let package = Package( dependencies: [ .product(name: "Vapor", package: "vapor"), .product(name: "NIOCore", package: "swift-nio"), + .product(name: "Metrics", package: "swift-metrics"), ], swiftSettings: swiftSettings ), diff --git a/Sources/Queues/AsyncQueue.swift b/Sources/Queues/AsyncQueue.swift index 5593408..a0bd785 100644 --- a/Sources/Queues/AsyncQueue.swift +++ b/Sources/Queues/AsyncQueue.swift @@ -1,28 +1,29 @@ import Foundation -import Vapor +import Metrics import NIOCore +import Vapor public protocol AsyncQueue: Queue { /// The job context var context: QueueContext { get } - + /// Gets the next job to be run /// - Parameter id: The ID of the job func get(_ id: JobIdentifier) async throws -> JobData - + /// Sets a job that should be run in the future /// - Parameters: /// - id: The ID of the job /// - data: Data for the job func set(_ id: JobIdentifier, to data: JobData) async throws - + /// Removes a job from the queue /// - Parameter id: The ID of the job func clear(_ id: JobIdentifier) async throws /// Pops the next job in the queue func pop() async throws -> JobIdentifier? - + /// Pushes the next job into a queue /// - Parameter id: The ID of the job func push(_ id: JobIdentifier) async throws @@ -32,19 +33,19 @@ extension AsyncQueue { public func get(_ id: JobIdentifier) -> EventLoopFuture { self.context.eventLoop.makeFutureWithTask { try await self.get(id) } } - + public func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { self.context.eventLoop.makeFutureWithTask { try await self.set(id, to: data) } } - + public func clear(_ id: JobIdentifier) -> EventLoopFuture { self.context.eventLoop.makeFutureWithTask { try await self.clear(id) } } - + public func pop() -> EventLoopFuture { self.context.eventLoop.makeFutureWithTask { try await self.pop() } } - + public func push(_ id: JobIdentifier) -> EventLoopFuture { self.context.eventLoop.makeFutureWithTask { try await self.push(id) } } @@ -68,9 +69,9 @@ extension Queue { logger[metadataKey: "queue"] = "\(self.queueName.string)" logger[metadataKey: "job-id"] = "\(id.string)" logger[metadataKey: "job-name"] = "\(J.name)" - - let storage = JobData( - payload: try J.serializePayload(payload), + + let storage = try JobData( + payload: J.serializePayload(payload), maxRetryCount: maxRetryCount, jobName: J.name, delayUntil: delayUntil, @@ -82,7 +83,8 @@ extension Queue { logger.trace("Pusing job to queue") try await self.push(id).get() logger.info("Dispatched job") - + Counter(label: "dispatched.jobs.counter", dimensions: [("queueName", self.queueName.string)]).increment() + await self.sendNotification(of: "dispatch", logger: logger) { try await $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: self.eventLoop).get() } diff --git a/Sources/Queues/Queue.swift b/Sources/Queues/Queue.swift index efee13e..6df5de1 100644 --- a/Sources/Queues/Queue.swift +++ b/Sources/Queues/Queue.swift @@ -1,29 +1,31 @@ -import NIOCore -import Logging + import Foundation +import Logging +import Metrics +import NIOCore /// A type that can store and retrieve jobs from a persistence layer public protocol Queue: Sendable { /// The job context var context: QueueContext { get } - + /// Gets the next job to be run /// - Parameter id: The ID of the job func get(_ id: JobIdentifier) -> EventLoopFuture - + /// Sets a job that should be run in the future /// - Parameters: /// - id: The ID of the job /// - data: Data for the job func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture - + /// Removes a job from the queue /// - Parameter id: The ID of the job func clear(_ id: JobIdentifier) -> EventLoopFuture /// Pops the next job in the queue func pop() -> EventLoopFuture - + /// Pushes the next job into a queue /// - Parameter id: The ID of the job func push(_ id: JobIdentifier) -> EventLoopFuture @@ -34,27 +36,27 @@ extension Queue { public var eventLoop: any EventLoop { self.context.eventLoop } - + /// A logger public var logger: Logger { self.context.logger } - + /// The configuration for the queue public var configuration: QueuesConfiguration { self.context.configuration } - + /// The queue's name public var queueName: QueueName { self.context.queueName } - + /// The key name of the queue public var key: String { self.queueName.makeKey(with: self.configuration.persistenceKey) } - + /// Dispatch a job into the queue for processing /// - Parameters: /// - job: The Job type @@ -94,7 +96,12 @@ extension Queue { logger.trace("Pusing job to queue") return self.push(id) }.flatMapWithEventLoop { _, eventLoop in - logger.info("Dispatched job") + Counter(label: "dispatched.jobs.counter", dimensions: [("queueName", self.queueName.string)]).increment() + self.logger.info("Dispatched queue job", metadata: [ + "job_id": .string(id.string), + "job_name": .string(job.name), + "queue": .string(self.queueName.string), + ]) return self.sendNotification(of: "dispatch", logger: logger) { $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: eventLoop) } diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index c916b4e..a626696 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -1,6 +1,8 @@ -import NIOCore -import Logging +import Dispatch import Foundation +import Logging +import Metrics +import NIOCore extension Queue { public var worker: QueueWorker { @@ -16,7 +18,7 @@ public struct QueueWorker: Sendable { /// This is a thin wrapper for ELF-style callers. public func run() -> EventLoopFuture { self.queue.eventLoop.makeFutureWithTask { - try await run() + try await self.run() } } @@ -25,14 +27,14 @@ public struct QueueWorker: Sendable { public func run() async throws { while try await self.runOneJob() {} } - + /// Pop a job off the queue and try to run it. If no jobs are available, do /// nothing. Returns whether a job was run. private func runOneJob() async throws -> Bool { var logger = self.queue.logger logger[metadataKey: "queue"] = "\(self.queue.queueName.string)" logger.trace("Popping job from queue") - + guard let id = try await self.queue.pop().get() else { // No job found, go around again. logger.trace("No pending jobs") @@ -41,7 +43,7 @@ public struct QueueWorker: Sendable { logger[metadataKey: "job-id"] = "\(id.string)" logger.trace("Found pending job") - + let data = try await self.queue.get(id).get() logger.trace("Received job data", metadata: ["job-data": "\(data)"]) logger[metadataKey: "job-name"] = "\(data.jobName)" @@ -68,11 +70,13 @@ public struct QueueWorker: Sendable { } private func runOneJob(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws { + let startTime = DispatchTime.now().uptimeNanoseconds logger.info("Dequeing and running job", metadata: ["attempt": "\(jobData.currentAttempt)", "retries-left": "\(jobData.remainingAttempts)"]) do { try await job._dequeue(self.queue.context, id: id.string, payload: jobData.payload).get() logger.trace("Job ran successfully", metadata: ["attempts-made": "\(jobData.currentAttempt)"]) + self.updateMetrics(for: id, startTime: startTime, queue: self.queue) await self.queue.sendNotification(of: "success", logger: logger) { try await $0.success(jobId: id.string, eventLoop: self.queue.context.eventLoop).get() } @@ -82,6 +86,7 @@ public struct QueueWorker: Sendable { return try await self.retry(id: id, job: job, jobData: jobData, error: error, logger: logger) } else { logger.warning("Job failed, no retries remaining", metadata: ["error": "\(error)", "attempts-made": "\(jobData.currentAttempt)"]) + self.updateMetrics(for: id, startTime: startTime, queue: self.queue, error: error) try await job._error(self.queue.context, id: id.string, error, payload: jobData.payload).get() await self.queue.sendNotification(of: "failure", logger: logger) { @@ -102,12 +107,45 @@ public struct QueueWorker: Sendable { queuedAt: .init(), attempts: jobData.currentAttempt ) - + logger.warning("Job failed, retrying", metadata: [ - "retry-delay": "\(delay)", "error": "\(error)", "next-attempt": "\(updatedData.currentAttempt)", "retries-left": "\(updatedData.remainingAttempts)" + "retry-delay": "\(delay)", "error": "\(error)", "next-attempt": "\(updatedData.currentAttempt)", "retries-left": "\(updatedData.remainingAttempts)", ]) try await self.queue.clear(id).get() try await self.queue.set(id, to: updatedData).get() try await self.queue.push(id).get() } + + private func updateMetrics( + for id: JobIdentifier, + startTime: UInt64, + queue: any Queue, + error: (any Error)? = nil + ) { + // Checks how long the job took to complete + Timer( + label: "\(id.string).jobDurationTimer", + dimensions: [ + ("success", "true"), + ("id", id.string), + ], + preferredDisplayUnit: .seconds + ).recordNanoseconds(DispatchTime.now().uptimeNanoseconds - startTime) + + // Adds the completed job to a different counter depending on its result + if let error = error { + Counter( + label: "error.completed.jobs.counter", + dimensions: [ + ("id", id.string), + ("error", error.localizedDescription), + ] + ).increment() + } else { + Counter( + label: "success.completed.jobs.counter", + dimensions: [("queueName", queue.queueName.string)] + ).increment() + } + } } diff --git a/Sources/XCTQueues/TestQueueDriver.swift b/Sources/XCTQueues/TestQueueDriver.swift index ace6885..de6f335 100644 --- a/Sources/XCTQueues/TestQueueDriver.swift +++ b/Sources/XCTQueues/TestQueueDriver.swift @@ -1,7 +1,7 @@ +import NIOConcurrencyHelpers +import NIOCore import Queues import Vapor -import NIOCore -import NIOConcurrencyHelpers extension Application.Queues.Provider { public static var test: Self { @@ -25,7 +25,7 @@ struct TestQueuesDriver: QueuesDriver { func makeQueue(with context: QueueContext) -> any Queue { TestQueue(_context: .init(context)) } - + func shutdown() { // nothing } @@ -43,6 +43,7 @@ extension Application.Queues { var jobs: [JobIdentifier: JobData] = [:] var queue: [JobIdentifier] = [] } + private let box = NIOLockedValueBox(.init()) public var jobs: [JobIdentifier: JobData] { @@ -56,31 +57,31 @@ extension Application.Queues { } /// Returns the payloads of all jobs in the queue having type `J`. - public func all(_ job: J.Type) -> [J.Payload] { + public func all(_: J.Type) -> [J.Payload] { let filteredJobIds = self.jobs.filter { $1.jobName == J.name }.map { $0.0 } return self.queue .filter { filteredJobIds.contains($0) } - .compactMap { jobs[$0] } + .compactMap { self.jobs[$0] } .compactMap { try? J.parsePayload($0.payload) } } /// Returns the payload of the first job in the queue having type `J`. - public func first(_ job: J.Type) -> J.Payload? { - let filteredJobIds = jobs.filter { $1.jobName == J.name }.map { $0.0 } - + public func first(_: J.Type) -> J.Payload? { + let filteredJobIds = self.jobs.filter { $1.jobName == J.name }.map { $0.0 } + guard let queueJob = self.queue.first(where: { filteredJobIds.contains($0) }), let jobData = self.jobs[queueJob] else { return nil } return try? J.parsePayload(jobData.payload) } - + /// Checks whether a job of type `J` was dispatched to queue public func contains(_ job: J.Type) -> Bool { self.first(job) != nil } } - + struct TestQueueKey: StorageKey, LockKey { typealias Value = TestQueueStorage } @@ -96,7 +97,7 @@ extension Application.Queues { struct AsyncTestQueueKey: StorageKey, LockKey { typealias Value = TestQueueStorage } - + public var asyncTest: TestQueueStorage { self.application.storage[AsyncTestQueueKey.self]! } @@ -109,34 +110,34 @@ extension Application.Queues { struct TestQueue: Queue { let _context: NIOLockedValueBox var context: QueueContext { self._context.withLockedValue { $0 } } - + func get(_ id: JobIdentifier) -> EventLoopFuture { self._context.withLockedValue { context in context.eventLoop.makeSucceededFuture(context.application.queues.test.jobs[id]!) } } - + func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { self._context.withLockedValue { context in context.application.queues.test.jobs[id] = data return context.eventLoop.makeSucceededVoidFuture() } } - + func clear(_ id: JobIdentifier) -> EventLoopFuture { self._context.withLockedValue { context in context.application.queues.test.jobs[id] = nil return context.eventLoop.makeSucceededVoidFuture() } } - + func pop() -> EventLoopFuture { self._context.withLockedValue { context in let last = context.application.queues.test.queue.popLast() return context.eventLoop.makeSucceededFuture(last) } } - + func push(_ id: JobIdentifier) -> EventLoopFuture { self._context.withLockedValue { context in context.application.queues.test.queue.append(id) @@ -148,7 +149,7 @@ struct TestQueue: Queue { struct AsyncTestQueue: AsyncQueue { let _context: NIOLockedValueBox var context: QueueContext { self._context.withLockedValue { $0 } } - + func get(_ id: JobIdentifier) async throws -> JobData { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id]! } } func set(_ id: JobIdentifier, to data: JobData) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = data } } func clear(_ id: JobIdentifier) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = nil } } diff --git a/Tests/QueuesTests/AsyncQueueTests.swift b/Tests/QueuesTests/AsyncQueueTests.swift index 52a2452..e1df95e 100644 --- a/Tests/QueuesTests/AsyncQueueTests.swift +++ b/Tests/QueuesTests/AsyncQueueTests.swift @@ -16,92 +16,80 @@ func XCTAssertNoThrowAsync( final class AsyncQueueTests: XCTestCase { var app: Application! - + override class func setUp() { XCTAssert(isLoggingConfigured) } override func setUp() async throws { - app = try await Application.make(.testing) + self.app = try await Application.make(.testing) } - + override func tearDown() async throws { - try await app.asyncShutdown() + try await self.app.asyncShutdown() } - + func testAsyncJobWithSyncQueue() async throws { - app.queues.use(.test) - - let promise = app.eventLoopGroup.any().makePromise(of: Void.self) - app.queues.add(MyAsyncJob(promise: promise)) - - app.get("foo") { req in + self.app.queues.use(.test) + + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req in try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "baz")) try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "quux")) return "done" } - - try await app.testable().test(.GET, "foo") { res async in + + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - - XCTAssertEqual(app.queues.test.queue.count, 3) - XCTAssertEqual(app.queues.test.jobs.count, 3) - let job = app.queues.test.first(MyAsyncJob.self) - XCTAssert(app.queues.test.contains(MyAsyncJob.self)) + + XCTAssertEqual(self.app.queues.test.queue.count, 3) + XCTAssertEqual(self.app.queues.test.jobs.count, 3) + let job = self.app.queues.test.first(MyAsyncJob.self) + XCTAssert(self.app.queues.test.contains(MyAsyncJob.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - - try await app.queues.queue.worker.run().get() - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) - + + try await self.app.queues.queue.worker.run().get() + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + await XCTAssertNoThrowAsync(try await promise.futureResult.get()) } func testAsyncJobWithAsyncQueue() async throws { - app.queues.use(.asyncTest) + self.app.queues.use(.asyncTest) - let promise = app.eventLoopGroup.any().makePromise(of: Void.self) - app.queues.add(MyAsyncJob(promise: promise)) - - app.get("foo") { req in + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req in try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "baz")) try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "quux")) return "done" } - - try await app.testable().test(.GET, "foo") { res async in + + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - - XCTAssertEqual(app.queues.asyncTest.queue.count, 3) - XCTAssertEqual(app.queues.asyncTest.jobs.count, 3) - let job = app.queues.asyncTest.first(MyAsyncJob.self) - XCTAssert(app.queues.asyncTest.contains(MyAsyncJob.self)) + + XCTAssertEqual(self.app.queues.asyncTest.queue.count, 3) + XCTAssertEqual(self.app.queues.asyncTest.jobs.count, 3) + let job = self.app.queues.asyncTest.first(MyAsyncJob.self) + XCTAssert(self.app.queues.asyncTest.contains(MyAsyncJob.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - - try await app.queues.queue.worker.run().get() - XCTAssertEqual(app.queues.asyncTest.queue.count, 0) - XCTAssertEqual(app.queues.asyncTest.jobs.count, 0) - await XCTAssertNoThrowAsync(try await promise.futureResult.get()) - } -} + try await self.app.queues.queue.worker.run().get() + XCTAssertEqual(self.app.queues.asyncTest.queue.count, 0) + XCTAssertEqual(self.app.queues.asyncTest.jobs.count, 0) -struct MyAsyncJob: AsyncJob { - let promise: EventLoopPromise - - struct Data: Codable { - var foo: String - } - - func dequeue(_ context: QueueContext, _ payload: Data) async throws { - self.promise.succeed() + await XCTAssertNoThrowAsync(try await promise.futureResult.get()) } } diff --git a/Tests/QueuesTests/MetricsTests.swift b/Tests/QueuesTests/MetricsTests.swift new file mode 100644 index 0000000..543738d --- /dev/null +++ b/Tests/QueuesTests/MetricsTests.swift @@ -0,0 +1,103 @@ +@testable import CoreMetrics +import Metrics +import NIOConcurrencyHelpers +import Queues +@testable import Vapor +import XCTQueues +import XCTVapor + +final class MetricsTests: XCTestCase { + var app: Application! + var metrics: CapturingMetricsSystem! + + override func setUp() async throws { + self.metrics = CapturingMetricsSystem() + MetricsSystem.bootstrapInternal(self.metrics) + + self.app = try await Application.make(.testing) + self.app.queues.use(.test) + } + + override func tearDown() async throws { + try await self.app.asyncShutdown() + } + + func testJobDurationTimer() async throws { + let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req async throws in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"), id: JobIdentifier(string: "some-id")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + try await self.app.queues.queue.worker.run() + + let timer = try XCTUnwrap(self.metrics.timers["some-id.jobDurationTimer"] as? TestTimer) + let successDimension = try XCTUnwrap(timer.dimensions.first(where: { $0.0 == "success" })) + let idDimension = try XCTUnwrap(timer.dimensions.first(where: { $0.0 == "id" })) + XCTAssertEqual(successDimension.1, "true") + XCTAssertEqual(idDimension.1, "some-id") + + try XCTAssertNoThrow(promise.futureResult.wait()) + } + + func testSuccessfullyCompletedJobsCounter() async throws { + let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) + let successHook = SuccessHook() + let errorHook = ErrorHook() + + self.app.queues.add(MyAsyncJob(promise: promise)) + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + + self.app.get("foo") { req async throws in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"), id: JobIdentifier(string: "first")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + try await self.app.queues.queue.worker.run() + XCTAssertEqual(successHook.successHit, true) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + + let counter = try XCTUnwrap(self.metrics.counters["success.completed.jobs.counter"] as? TestCounter) + let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) + XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + try XCTAssertNoThrow(promise.futureResult.wait()) + } + + func testDispatchedJobsCounter() async throws { + let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req async throws in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"), id: JobIdentifier(string: "first")) + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "rab"), id: JobIdentifier(string: "second")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + try await self.app.queues.queue.worker.run() + + let counter = try XCTUnwrap(self.metrics.counters["dispatched.jobs.counter"] as? TestCounter) + let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) + XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + try XCTAssertNoThrow(promise.futureResult.wait()) + } +} diff --git a/Tests/QueuesTests/Utilities/CapturingMetricsSystem.swift b/Tests/QueuesTests/Utilities/CapturingMetricsSystem.swift new file mode 100644 index 0000000..27afcca --- /dev/null +++ b/Tests/QueuesTests/Utilities/CapturingMetricsSystem.swift @@ -0,0 +1,160 @@ +@testable import CoreMetrics +import NIOConcurrencyHelpers +import Vapor + +final class CapturingMetricsSystem: MetricsFactory { + private let lock = NIOLock() + var counters = [String: any CounterHandler]() + var recorders = [String: any RecorderHandler]() + var timers = [String: any TimerHandler]() + + public func makeCounter(label: String, dimensions: [(String, String)]) -> any CounterHandler { + return self.make(label: label, dimensions: dimensions, registry: &self.counters, maker: TestCounter.init) + } + + public func makeRecorder(label: String, dimensions: [(String, String)], aggregate: Bool) -> any RecorderHandler { + let maker = { (label: String, dimensions: [(String, String)]) in + TestRecorder(label: label, dimensions: dimensions, aggregate: aggregate) + } + return self.make(label: label, dimensions: dimensions, registry: &self.recorders, maker: maker) + } + + public func makeTimer(label: String, dimensions: [(String, String)]) -> any TimerHandler { + return self.make(label: label, dimensions: dimensions, registry: &self.timers, maker: TestTimer.init) + } + + private func make(label: String, dimensions: [(String, String)], registry: inout [String: Item], maker: (String, [(String, String)]) -> Item) -> Item { + return self.lock.withLock { + let item = maker(label, dimensions) + registry[label] = item + return item + } + } + + func destroyCounter(_ handler: any CounterHandler) { + if let testCounter = handler as? TestCounter { + self.counters.removeValue(forKey: testCounter.label) + } + } + + func destroyRecorder(_ handler: any RecorderHandler) { + if let testRecorder = handler as? TestRecorder { + self.recorders.removeValue(forKey: testRecorder.label) + } + } + + func destroyTimer(_ handler: any TimerHandler) { + if let testTimer = handler as? TestTimer { + self.timers.removeValue(forKey: testTimer.label) + } + } +} + +final class TestCounter: CounterHandler, Equatable { + let id: String + let label: String + let dimensions: [(String, String)] + + let lock = NIOLock() + var values = [(Date, Int64)]() + + init(label: String, dimensions: [(String, String)]) { + self.id = UUID().uuidString + self.label = label + self.dimensions = dimensions + } + + func increment(by amount: Int64) { + self.lock.withLock { + self.values.append((Date(), amount)) + } + print("adding \(amount) to \(self.label)") + } + + func reset() { + self.lock.withLock { + self.values = [] + } + print("resetting \(self.label)") + } + + public static func == (lhs: TestCounter, rhs: TestCounter) -> Bool { + return lhs.id == rhs.id + } +} + +final class TestRecorder: RecorderHandler, Equatable { + let id: String + let label: String + let dimensions: [(String, String)] + let aggregate: Bool + + let lock = NIOLock() + var values = [(Date, Double)]() + + init(label: String, dimensions: [(String, String)], aggregate: Bool) { + self.id = UUID().uuidString + self.label = label + self.dimensions = dimensions + self.aggregate = aggregate + } + + func record(_ value: Int64) { + self.record(Double(value)) + } + + func record(_ value: Double) { + self.lock.withLock { + self.values.append((Date(), value)) + } + print("recording \(value) in \(self.label)") + } + + public static func == (lhs: TestRecorder, rhs: TestRecorder) -> Bool { + return lhs.id == rhs.id + } +} + +final class TestTimer: TimerHandler, Equatable { + let id: String + let label: String + var displayUnit: TimeUnit? + let dimensions: [(String, String)] + + let lock = NIOLock() + var values = [(Date, Int64)]() + + init(label: String, dimensions: [(String, String)]) { + self.id = UUID().uuidString + self.label = label + self.displayUnit = nil + self.dimensions = dimensions + } + + func preferDisplayUnit(_ unit: TimeUnit) { + self.lock.withLock { + self.displayUnit = unit + } + } + + func retrieveValueInPreferredUnit(atIndex i: Int) -> Double { + return self.lock.withLock { + let value = self.values[i].1 + guard let displayUnit = self.displayUnit else { + return Double(value) + } + return Double(value) / Double(displayUnit.scaleFromNanoseconds) + } + } + + func recordNanoseconds(_ duration: Int64) { + self.lock.withLock { + self.values.append((Date(), duration)) + } + print("recording \(duration) \(self.label)") + } + + public static func == (lhs: TestTimer, rhs: TestTimer) -> Bool { + return lhs.id == rhs.id + } +} diff --git a/Tests/QueuesTests/Utilities/MyAsyncJob.swift b/Tests/QueuesTests/Utilities/MyAsyncJob.swift new file mode 100644 index 0000000..23ba31f --- /dev/null +++ b/Tests/QueuesTests/Utilities/MyAsyncJob.swift @@ -0,0 +1,13 @@ +import Queues + +struct MyAsyncJob: AsyncJob { + let promise: EventLoopPromise + + struct Payload: Codable { + var foo: String + } + + func dequeue(_: QueueContext, _: Payload) async throws { + self.promise.succeed() + } +} From a285407af47ccee659997f5c58661079d712e6ff Mon Sep 17 00:00:00 2001 From: pt Date: Thu, 6 Oct 2022 17:40:30 +0200 Subject: [PATCH 2/6] Removed useless metric dimensions --- Sources/Queues/QueueWorker.swift | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index a626696..f1a5bad 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -126,20 +126,17 @@ public struct QueueWorker: Sendable { Timer( label: "\(id.string).jobDurationTimer", dimensions: [ - ("success", "true"), + ("success", error == nil ? "true" : "false"), ("id", id.string), ], preferredDisplayUnit: .seconds ).recordNanoseconds(DispatchTime.now().uptimeNanoseconds - startTime) // Adds the completed job to a different counter depending on its result - if let error = error { + if error != nil { Counter( label: "error.completed.jobs.counter", - dimensions: [ - ("id", id.string), - ("error", error.localizedDescription), - ] + dimensions: [("queueName", queue.queueName.string)] ).increment() } else { Counter( From 2b04e88d49d34080f3537efbb9ecbd5cd1d0a3da Mon Sep 17 00:00:00 2001 From: Paul Toffoloni Date: Wed, 28 Aug 2024 12:26:11 +0200 Subject: [PATCH 3/6] Add failing job test --- Tests/QueuesTests/MetricsTests.swift | 34 ++++-- Tests/QueuesTests/QueueTests.swift | 111 +++++++++--------- .../Utilities/FailingAsyncJob.swift | 15 +++ Tests/QueuesTests/Utilities/Failure.swift | 1 + 4 files changed, 92 insertions(+), 69 deletions(-) create mode 100644 Tests/QueuesTests/Utilities/FailingAsyncJob.swift create mode 100644 Tests/QueuesTests/Utilities/Failure.swift diff --git a/Tests/QueuesTests/MetricsTests.swift b/Tests/QueuesTests/MetricsTests.swift index 543738d..4bea5a4 100644 --- a/Tests/QueuesTests/MetricsTests.swift +++ b/Tests/QueuesTests/MetricsTests.swift @@ -49,15 +49,10 @@ final class MetricsTests: XCTestCase { func testSuccessfullyCompletedJobsCounter() async throws { let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) - let successHook = SuccessHook() - let errorHook = ErrorHook() - self.app.queues.add(MyAsyncJob(promise: promise)) - self.app.queues.add(successHook) - self.app.queues.add(errorHook) self.app.get("foo") { req async throws in - try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"), id: JobIdentifier(string: "first")) + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) return "done" } @@ -67,15 +62,29 @@ final class MetricsTests: XCTestCase { } try await self.app.queues.queue.worker.run() - XCTAssertEqual(successHook.successHit, true) - XCTAssertEqual(errorHook.errorCount, 0) - XCTAssertEqual(self.app.queues.test.queue.count, 0) - XCTAssertEqual(self.app.queues.test.jobs.count, 0) - let counter = try XCTUnwrap(self.metrics.counters["success.completed.jobs.counter"] as? TestCounter) let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) - try XCTAssertNoThrow(promise.futureResult.wait()) + } + + func testErroringJobsCounter() async throws { + let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) + self.app.queues.add(FailingAsyncJob(promise: promise)) + + self.app.get("foo") { req async throws in + try await req.queue.dispatch(FailingAsyncJob.self, .init(foo: "bar")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + try await self.app.queues.queue.worker.run() + let counter = try XCTUnwrap(self.metrics.counters["error.completed.jobs.counter"] as? TestCounter) + let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) + XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) } func testDispatchedJobsCounter() async throws { @@ -98,6 +107,5 @@ final class MetricsTests: XCTestCase { let counter = try XCTUnwrap(self.metrics.counters["dispatched.jobs.counter"] as? TestCounter) let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) - try XCTAssertNoThrow(promise.futureResult.wait()) } } diff --git a/Tests/QueuesTests/QueueTests.swift b/Tests/QueuesTests/QueueTests.swift index cf90318..031fea1 100644 --- a/Tests/QueuesTests/QueueTests.swift +++ b/Tests/QueuesTests/QueueTests.swift @@ -1,9 +1,9 @@ import Atomics +import NIOConcurrencyHelpers import Queues import XCTest -import XCTVapor import XCTQueues -import NIOConcurrencyHelpers +import XCTVapor func XCTAssertEqualAsync( _ expression1: @autoclosure () async throws -> T, @@ -56,19 +56,19 @@ final class QueueTests: XCTestCase { self.app = try await Application.make(.testing) self.app.queues.use(.test) } - + override func tearDown() async throws { try await self.app.asyncShutdown() self.app = nil } - + func testVaporIntegrationWithInProcessJob() async throws { let jobSignal1 = self.app.eventLoopGroup.any().makePromise(of: String.self) self.app.queues.add(Foo1(promise: jobSignal1)) let jobSignal2 = self.app.eventLoopGroup.any().makePromise(of: String.self) self.app.queues.add(Foo2(promise: jobSignal2)) try self.app.queues.startInProcessJobs(on: .default) - + self.app.get("bar1") { req in try await req.queue.dispatch(Foo1.self, .init(foo: "Bar payload")).get() return "job bar dispatched" @@ -78,7 +78,7 @@ final class QueueTests: XCTestCase { try await req.queue.dispatch(Foo2.self, .init(foo: "Bar payload")) return "job bar dispatched" } - + try await self.app.testable().test(.GET, "bar1") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "job bar dispatched") @@ -90,7 +90,7 @@ final class QueueTests: XCTestCase { await XCTAssertEqualAsync(try await jobSignal1.futureResult.get(), "Bar payload") await XCTAssertEqualAsync(try await jobSignal2.futureResult.get(), "Bar payload") } - + func testVaporIntegration() async throws { let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) self.app.queues.add(Foo1(promise: promise)) @@ -99,23 +99,23 @@ final class QueueTests: XCTestCase { try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) return "done" } - + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - + XCTAssertEqual(self.app.queues.test.queue.count, 1) XCTAssertEqual(self.app.queues.test.jobs.count, 1) let job = self.app.queues.test.first(Foo1.self) XCTAssert(self.app.queues.test.contains(Foo1.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - + try await self.app.queues.queue.worker.run() XCTAssertEqual(self.app.queues.test.queue.count, 0) XCTAssertEqual(self.app.queues.test.jobs.count, 0) - + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } @@ -155,23 +155,23 @@ final class QueueTests: XCTestCase { try await req.queue.dispatch(Foo1.self, .init(foo: "bar"), id: JobIdentifier(string: "my-custom-id")) return "done" } - + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - + XCTAssertEqual(self.app.queues.test.queue.count, 1) XCTAssertEqual(self.app.queues.test.jobs.count, 1) XCTAssert(self.app.queues.test.jobs.keys.map(\.string).contains("my-custom-id")) - + try await self.app.queues.queue.worker.run() XCTAssertEqual(self.app.queues.test.queue.count, 0) XCTAssertEqual(self.app.queues.test.jobs.count, 0) - + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } - + func testScheduleBuilderAPI() async throws { // yearly self.app.queues.schedule(Cleanup()).yearly().in(.may).on(23).at(.noon) @@ -194,19 +194,19 @@ final class QueueTests: XCTestCase { // hourly self.app.queues.schedule(Cleanup()).hourly().at(30) } - + func testRepeatingScheduledJob() async throws { let scheduledJob = TestingScheduledJob() XCTAssertEqual(scheduledJob.count.load(ordering: .relaxed), 0) self.app.queues.schedule(scheduledJob).everySecond() try self.app.queues.startScheduledJobs() - + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) self.app.eventLoopGroup.any().scheduleTask(in: .seconds(5)) { XCTAssert(scheduledJob.count.load(ordering: .relaxed) > 4) promise.succeed() } - + try await promise.futureResult.get() } @@ -215,20 +215,20 @@ final class QueueTests: XCTestCase { XCTAssertEqual(scheduledJob.count.load(ordering: .relaxed), 0) self.app.queues.schedule(scheduledJob).everySecond() try self.app.queues.startScheduledJobs() - + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) self.app.eventLoopGroup.any().scheduleTask(in: .seconds(5)) { XCTAssert(scheduledJob.count.load(ordering: .relaxed) > 4) promise.succeed() } - + try await promise.futureResult.get() } func testFailingScheduledJob() async throws { self.app.queues.schedule(FailingScheduledJob()).everySecond() try self.app.queues.startScheduledJobs() - + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) self.app.eventLoopGroup.any().scheduleTask(in: .seconds(1)) { promise.succeed() @@ -239,7 +239,7 @@ final class QueueTests: XCTestCase { func testAsyncFailingScheduledJob() async throws { self.app.queues.schedule(AsyncFailingScheduledJob()).everySecond() try self.app.queues.startScheduledJobs() - + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) self.app.eventLoopGroup.any().scheduleTask(in: .seconds(1)) { promise.succeed() @@ -250,7 +250,7 @@ final class QueueTests: XCTestCase { func testCustomWorkerCount() async throws { // Setup custom ELG with 4 threads let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 4) - + do { let count = self.app.eventLoopGroup.any().makePromise(of: Int.self) self.app.queues.use(custom: WorkerCountDriver(count: count)) @@ -306,7 +306,7 @@ final class QueueTests: XCTestCase { XCTAssertEqual(self.app.queues.test.queue.count, 0) XCTAssertEqual(self.app.queues.test.jobs.count, 0) XCTAssertTrue(dequeuedHook.successHit) - + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } @@ -361,7 +361,7 @@ final class QueueTests: XCTestCase { let errorHook = ErrorHook() self.app.queues.add(successHook) self.app.queues.add(errorHook) - + self.app.get("foo") { req in try await req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) return "done" @@ -397,7 +397,7 @@ final class QueueTests: XCTestCase { let errorHook = AsyncErrorHook() self.app.queues.add(successHook) self.app.queues.add(errorHook) - + self.app.get("foo") { req in try await req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) return "done" @@ -522,7 +522,7 @@ final class QueueTests: XCTestCase { final class DispatchHook: JobEventDelegate, @unchecked Sendable { var successHit = false - func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { + func dispatched(job _: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { self.successHit = true return eventLoop.makeSucceededVoidFuture() } @@ -531,7 +531,7 @@ final class DispatchHook: JobEventDelegate, @unchecked Sendable { final class SuccessHook: JobEventDelegate, @unchecked Sendable { var successHit = false - func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + func success(jobId _: String, eventLoop: any EventLoop) -> EventLoopFuture { self.successHit = true return eventLoop.makeSucceededVoidFuture() } @@ -540,7 +540,7 @@ final class SuccessHook: JobEventDelegate, @unchecked Sendable { final class ErrorHook: JobEventDelegate, @unchecked Sendable { var errorCount = 0 - func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture { + func error(jobId _: String, error _: any Error, eventLoop: any EventLoop) -> EventLoopFuture { self.errorCount += 1 return eventLoop.makeSucceededVoidFuture() } @@ -549,7 +549,7 @@ final class ErrorHook: JobEventDelegate, @unchecked Sendable { final class DequeuedHook: JobEventDelegate, @unchecked Sendable { var successHit = false - func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + func didDequeue(jobId _: String, eventLoop: any EventLoop) -> EventLoopFuture { self.successHit = true return eventLoop.makeSucceededVoidFuture() } @@ -557,22 +557,22 @@ final class DequeuedHook: JobEventDelegate, @unchecked Sendable { actor AsyncDispatchHook: AsyncJobEventDelegate { var successHit = false - func dispatched(job: JobEventData) async throws { self.successHit = true } + func dispatched(job _: JobEventData) async throws { self.successHit = true } } actor AsyncSuccessHook: AsyncJobEventDelegate { var successHit = false - func success(jobId: String) async throws { self.successHit = true } + func success(jobId _: String) async throws { self.successHit = true } } actor AsyncErrorHook: AsyncJobEventDelegate { var errorCount = 0 - func error(jobId: String, error: any Error) async throws { self.errorCount += 1 } + func error(jobId _: String, error _: any Error) async throws { self.errorCount += 1 } } actor AsyncDequeuedHook: AsyncJobEventDelegate { var successHit = false - func didDequeue(jobId: String) async throws { self.successHit = true } + func didDequeue(jobId _: String) async throws { self.successHit = true } } final class WorkerCountDriver: QueuesDriver, @unchecked Sendable { @@ -609,30 +609,29 @@ final class WorkerCountDriver: QueuesDriver, @unchecked Sendable { let driver: WorkerCountDriver var context: QueueContext - func get(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } - func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { fatalError() } - func clear(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } + func get(_: JobIdentifier) -> EventLoopFuture { fatalError() } + func set(_: JobIdentifier, to _: JobData) -> EventLoopFuture { fatalError() } + func clear(_: JobIdentifier) -> EventLoopFuture { fatalError() } func pop() -> EventLoopFuture { self.driver.record(eventLoop: self.context.eventLoop) return self.context.eventLoop.makeSucceededFuture(nil) } - func push(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } + + func push(_: JobIdentifier) -> EventLoopFuture { fatalError() } } } -struct Failure: Error {} - struct FailingScheduledJob: ScheduledJob { func run(context: QueueContext) -> EventLoopFuture { context.eventLoop.makeFailedFuture(Failure()) } } struct AsyncFailingScheduledJob: AsyncScheduledJob { - func run(context: QueueContext) async throws { throw Failure() } + func run(context _: QueueContext) async throws { throw Failure() } } struct TestingScheduledJob: ScheduledJob { var count = ManagedAtomic(0) - + func run(context: QueueContext) -> EventLoopFuture { self.count.wrappingIncrement(ordering: .relaxed) return context.eventLoop.makeSucceededVoidFuture() @@ -641,22 +640,22 @@ struct TestingScheduledJob: ScheduledJob { struct AsyncTestingScheduledJob: AsyncScheduledJob { var count = ManagedAtomic(0) - func run(context: QueueContext) async throws { self.count.wrappingIncrement(ordering: .relaxed) } + func run(context _: QueueContext) async throws { self.count.wrappingIncrement(ordering: .relaxed) } } struct Foo1: Job { let promise: EventLoopPromise - + struct Data: Codable { var foo: String } - + func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { self.promise.succeed(data.foo) return context.eventLoop.makeSucceededVoidFuture() } - - func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + + func error(_ context: QueueContext, _ error: any Error, _: Data) -> EventLoopFuture { self.promise.fail(error) return context.eventLoop.makeSucceededVoidFuture() } @@ -664,17 +663,17 @@ struct Foo1: Job { struct Foo2: Job { let promise: EventLoopPromise - + struct Data: Codable { var foo: String } - + func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { self.promise.succeed(data.foo) return context.eventLoop.makeSucceededVoidFuture() } - - func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + + func error(_ context: QueueContext, _ error: any Error, _: Data) -> EventLoopFuture { self.promise.fail(error) return context.eventLoop.makeSucceededVoidFuture() } @@ -685,11 +684,11 @@ struct Bar: Job { var foo: String } - func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { + func dequeue(_ context: QueueContext, _: Data) -> EventLoopFuture { context.eventLoop.makeFailedFuture(Abort(.badRequest)) } - func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + func error(_ context: QueueContext, _: any Error, _: Data) -> EventLoopFuture { context.eventLoop.makeSucceededVoidFuture() } } @@ -699,11 +698,11 @@ struct Baz: Job { var foo: String } - func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { + func dequeue(_ context: QueueContext, _: Data) -> EventLoopFuture { context.eventLoop.makeFailedFuture(Abort(.badRequest)) } - func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + func error(_ context: QueueContext, _: any Error, _: Data) -> EventLoopFuture { context.eventLoop.makeSucceededVoidFuture() } diff --git a/Tests/QueuesTests/Utilities/FailingAsyncJob.swift b/Tests/QueuesTests/Utilities/FailingAsyncJob.swift new file mode 100644 index 0000000..2e17bdf --- /dev/null +++ b/Tests/QueuesTests/Utilities/FailingAsyncJob.swift @@ -0,0 +1,15 @@ +import Queues + +struct FailingAsyncJob: AsyncJob { + let promise: EventLoopPromise + + struct Payload: Codable { + var foo: String + } + + func dequeue(_: QueueContext, _: Payload) async throws { + let failure = Failure() + self.promise.fail(failure) + throw failure + } +} diff --git a/Tests/QueuesTests/Utilities/Failure.swift b/Tests/QueuesTests/Utilities/Failure.swift new file mode 100644 index 0000000..6ec8021 --- /dev/null +++ b/Tests/QueuesTests/Utilities/Failure.swift @@ -0,0 +1 @@ +struct Failure: Error {} From cbc2196b7add3afb476cb00635e97f7da578b005 Mon Sep 17 00:00:00 2001 From: Paul Toffoloni Date: Wed, 28 Aug 2024 14:58:43 +0200 Subject: [PATCH 4/6] Address feedback --- Package.swift | 2 +- Package@swift-5.9.swift | 4 +- Sources/Queues/AsyncQueue.swift | 5 +- Sources/Queues/Queue.swift | 11 +- Tests/QueuesTests/MetricsTests.swift | 14 +- .../Utilities/CapturingMetricsSystem.swift | 160 ----- Tests/QueuesTests/Utilities/TestMetrics.swift | 591 ++++++++++++++++++ 7 files changed, 612 insertions(+), 175 deletions(-) delete mode 100644 Tests/QueuesTests/Utilities/CapturingMetricsSystem.swift create mode 100644 Tests/QueuesTests/Utilities/TestMetrics.swift diff --git a/Package.swift b/Package.swift index 7fae7b9..9edbf4f 100644 --- a/Package.swift +++ b/Package.swift @@ -16,7 +16,7 @@ let package = Package( dependencies: [ .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), - .package(url: "https://github.com/apple/swift-metrics.git", "2.0.0"), + .package(url: "https://github.com/apple/swift-metrics.git", from: "2.5.0"), ], targets: [ .target( diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift index 1092a83..79feedf 100644 --- a/Package@swift-5.9.swift +++ b/Package@swift-5.9.swift @@ -11,11 +11,12 @@ let package = Package( ], products: [ .library(name: "Queues", targets: ["Queues"]), - .library(name: "XCTQueues", targets: ["XCTQueues"]) + .library(name: "XCTQueues", targets: ["XCTQueues"]), ], dependencies: [ .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), + .package(url: "https://github.com/apple/swift-metrics.git", from: "2.5.0"), ], targets: [ .target( @@ -23,6 +24,7 @@ let package = Package( dependencies: [ .product(name: "Vapor", package: "vapor"), .product(name: "NIOCore", package: "swift-nio"), + .product(name: "Metrics", package: "swift-metrics"), ], swiftSettings: swiftSettings ), diff --git a/Sources/Queues/AsyncQueue.swift b/Sources/Queues/AsyncQueue.swift index a0bd785..577981c 100644 --- a/Sources/Queues/AsyncQueue.swift +++ b/Sources/Queues/AsyncQueue.swift @@ -83,7 +83,10 @@ extension Queue { logger.trace("Pusing job to queue") try await self.push(id).get() logger.info("Dispatched job") - Counter(label: "dispatched.jobs.counter", dimensions: [("queueName", self.queueName.string)]).increment() + Counter(label: "dispatched.jobs.counter", dimensions: [ + ("queueName", self.queueName.string), + ("jobName", J.name), + ]).increment() await self.sendNotification(of: "dispatch", logger: logger) { try await $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: self.eventLoop).get() diff --git a/Sources/Queues/Queue.swift b/Sources/Queues/Queue.swift index 6df5de1..ef7b420 100644 --- a/Sources/Queues/Queue.swift +++ b/Sources/Queues/Queue.swift @@ -96,12 +96,11 @@ extension Queue { logger.trace("Pusing job to queue") return self.push(id) }.flatMapWithEventLoop { _, eventLoop in - Counter(label: "dispatched.jobs.counter", dimensions: [("queueName", self.queueName.string)]).increment() - self.logger.info("Dispatched queue job", metadata: [ - "job_id": .string(id.string), - "job_name": .string(job.name), - "queue": .string(self.queueName.string), - ]) + Counter(label: "dispatched.jobs.counter", dimensions: [ + ("queueName", self.queueName.string), + ("jobName", J.name), + ]).increment() + self.logger.info("Dispatched queue job") return self.sendNotification(of: "dispatch", logger: logger) { $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: eventLoop) } diff --git a/Tests/QueuesTests/MetricsTests.swift b/Tests/QueuesTests/MetricsTests.swift index 4bea5a4..c09036d 100644 --- a/Tests/QueuesTests/MetricsTests.swift +++ b/Tests/QueuesTests/MetricsTests.swift @@ -8,10 +8,10 @@ import XCTVapor final class MetricsTests: XCTestCase { var app: Application! - var metrics: CapturingMetricsSystem! + var metrics: TestMetrics! override func setUp() async throws { - self.metrics = CapturingMetricsSystem() + self.metrics = TestMetrics() MetricsSystem.bootstrapInternal(self.metrics) self.app = try await Application.make(.testing) @@ -38,7 +38,7 @@ final class MetricsTests: XCTestCase { try await self.app.queues.queue.worker.run() - let timer = try XCTUnwrap(self.metrics.timers["some-id.jobDurationTimer"] as? TestTimer) + let timer = try XCTUnwrap(self.metrics.timers.first(where: { $0.label == "some-id.jobDurationTimer" })) let successDimension = try XCTUnwrap(timer.dimensions.first(where: { $0.0 == "success" })) let idDimension = try XCTUnwrap(timer.dimensions.first(where: { $0.0 == "id" })) XCTAssertEqual(successDimension.1, "true") @@ -62,7 +62,7 @@ final class MetricsTests: XCTestCase { } try await self.app.queues.queue.worker.run() - let counter = try XCTUnwrap(self.metrics.counters["success.completed.jobs.counter"] as? TestCounter) + let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "success.completed.jobs.counter" })) let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) } @@ -82,7 +82,7 @@ final class MetricsTests: XCTestCase { } try await self.app.queues.queue.worker.run() - let counter = try XCTUnwrap(self.metrics.counters["error.completed.jobs.counter"] as? TestCounter) + let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "error.completed.jobs.counter" })) let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) } @@ -104,8 +104,10 @@ final class MetricsTests: XCTestCase { try await self.app.queues.queue.worker.run() - let counter = try XCTUnwrap(self.metrics.counters["dispatched.jobs.counter"] as? TestCounter) + let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "dispatched.jobs.counter" })) let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) + let jobNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "jobName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + XCTAssertEqual(jobNameDimension.1, MyAsyncJob.name) } } diff --git a/Tests/QueuesTests/Utilities/CapturingMetricsSystem.swift b/Tests/QueuesTests/Utilities/CapturingMetricsSystem.swift deleted file mode 100644 index 27afcca..0000000 --- a/Tests/QueuesTests/Utilities/CapturingMetricsSystem.swift +++ /dev/null @@ -1,160 +0,0 @@ -@testable import CoreMetrics -import NIOConcurrencyHelpers -import Vapor - -final class CapturingMetricsSystem: MetricsFactory { - private let lock = NIOLock() - var counters = [String: any CounterHandler]() - var recorders = [String: any RecorderHandler]() - var timers = [String: any TimerHandler]() - - public func makeCounter(label: String, dimensions: [(String, String)]) -> any CounterHandler { - return self.make(label: label, dimensions: dimensions, registry: &self.counters, maker: TestCounter.init) - } - - public func makeRecorder(label: String, dimensions: [(String, String)], aggregate: Bool) -> any RecorderHandler { - let maker = { (label: String, dimensions: [(String, String)]) in - TestRecorder(label: label, dimensions: dimensions, aggregate: aggregate) - } - return self.make(label: label, dimensions: dimensions, registry: &self.recorders, maker: maker) - } - - public func makeTimer(label: String, dimensions: [(String, String)]) -> any TimerHandler { - return self.make(label: label, dimensions: dimensions, registry: &self.timers, maker: TestTimer.init) - } - - private func make(label: String, dimensions: [(String, String)], registry: inout [String: Item], maker: (String, [(String, String)]) -> Item) -> Item { - return self.lock.withLock { - let item = maker(label, dimensions) - registry[label] = item - return item - } - } - - func destroyCounter(_ handler: any CounterHandler) { - if let testCounter = handler as? TestCounter { - self.counters.removeValue(forKey: testCounter.label) - } - } - - func destroyRecorder(_ handler: any RecorderHandler) { - if let testRecorder = handler as? TestRecorder { - self.recorders.removeValue(forKey: testRecorder.label) - } - } - - func destroyTimer(_ handler: any TimerHandler) { - if let testTimer = handler as? TestTimer { - self.timers.removeValue(forKey: testTimer.label) - } - } -} - -final class TestCounter: CounterHandler, Equatable { - let id: String - let label: String - let dimensions: [(String, String)] - - let lock = NIOLock() - var values = [(Date, Int64)]() - - init(label: String, dimensions: [(String, String)]) { - self.id = UUID().uuidString - self.label = label - self.dimensions = dimensions - } - - func increment(by amount: Int64) { - self.lock.withLock { - self.values.append((Date(), amount)) - } - print("adding \(amount) to \(self.label)") - } - - func reset() { - self.lock.withLock { - self.values = [] - } - print("resetting \(self.label)") - } - - public static func == (lhs: TestCounter, rhs: TestCounter) -> Bool { - return lhs.id == rhs.id - } -} - -final class TestRecorder: RecorderHandler, Equatable { - let id: String - let label: String - let dimensions: [(String, String)] - let aggregate: Bool - - let lock = NIOLock() - var values = [(Date, Double)]() - - init(label: String, dimensions: [(String, String)], aggregate: Bool) { - self.id = UUID().uuidString - self.label = label - self.dimensions = dimensions - self.aggregate = aggregate - } - - func record(_ value: Int64) { - self.record(Double(value)) - } - - func record(_ value: Double) { - self.lock.withLock { - self.values.append((Date(), value)) - } - print("recording \(value) in \(self.label)") - } - - public static func == (lhs: TestRecorder, rhs: TestRecorder) -> Bool { - return lhs.id == rhs.id - } -} - -final class TestTimer: TimerHandler, Equatable { - let id: String - let label: String - var displayUnit: TimeUnit? - let dimensions: [(String, String)] - - let lock = NIOLock() - var values = [(Date, Int64)]() - - init(label: String, dimensions: [(String, String)]) { - self.id = UUID().uuidString - self.label = label - self.displayUnit = nil - self.dimensions = dimensions - } - - func preferDisplayUnit(_ unit: TimeUnit) { - self.lock.withLock { - self.displayUnit = unit - } - } - - func retrieveValueInPreferredUnit(atIndex i: Int) -> Double { - return self.lock.withLock { - let value = self.values[i].1 - guard let displayUnit = self.displayUnit else { - return Double(value) - } - return Double(value) / Double(displayUnit.scaleFromNanoseconds) - } - } - - func recordNanoseconds(_ duration: Int64) { - self.lock.withLock { - self.values.append((Date(), duration)) - } - print("recording \(duration) \(self.label)") - } - - public static func == (lhs: TestTimer, rhs: TestTimer) -> Bool { - return lhs.id == rhs.id - } -} diff --git a/Tests/QueuesTests/Utilities/TestMetrics.swift b/Tests/QueuesTests/Utilities/TestMetrics.swift new file mode 100644 index 0000000..40b0243 --- /dev/null +++ b/Tests/QueuesTests/Utilities/TestMetrics.swift @@ -0,0 +1,591 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Metrics API open source project +// +// Copyright (c) 2021 Apple Inc. and the Swift Metrics API project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Metrics API project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Cluster Membership open source project +// +// Copyright (c) 2020 Apple Inc. and the Swift Cluster Membership project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Cluster Membership project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import CoreMetrics +import Metrics +import XCTest + +/// Taken directly from `swift-cluster-memberships`'s own test target package, which +/// adopts the `TestMetrics` from `swift-metrics`. +/// +/// Metrics factory which allows inspecting recorded metrics programmatically. +/// Only intended for tests of the Metrics API itself. +/// +/// Created Handlers will store Metrics until they are explicitly destroyed. +/// +public final class TestMetrics: MetricsFactory { + private let lock = NSLock() + + public typealias Label = String + public typealias Dimensions = String + + public struct FullKey: Sendable { + let label: Label + let dimensions: [(String, String)] + } + + private var _counters = [FullKey: TestCounter]() + private var _meters = [FullKey: TestMeter]() + private var _recorders = [FullKey: TestRecorder]() + private var _timers = [FullKey: TestTimer]() + + public init() { + // nothing to do + } + + /// Reset method to destroy all created ``TestCounter``, ``TestMeter``, ``TestRecorder`` and ``TestTimer``. + /// Invoke this method in between test runs to verify that Counters are created as needed. + public func reset() { + self.lock.withLock { + self._counters = [:] + self._recorders = [:] + self._meters = [:] + self._timers = [:] + } + } + + public func makeCounter(label: String, dimensions: [(String, String)]) -> any CounterHandler { + return self.lock.withLock { () -> any CounterHandler in + if let existing = self._counters[.init(label: label, dimensions: dimensions)] { + return existing + } + let item = TestCounter(label: label, dimensions: dimensions) + self._counters[.init(label: label, dimensions: dimensions)] = item + return item + } + } + + public func makeMeter(label: String, dimensions: [(String, String)]) -> any MeterHandler { + return self.lock.withLock { () -> any MeterHandler in + if let existing = self._meters[.init(label: label, dimensions: dimensions)] { + return existing + } + let item = TestMeter(label: label, dimensions: dimensions) + self._meters[.init(label: label, dimensions: dimensions)] = item + return item + } + } + + public func makeRecorder(label: String, dimensions: [(String, String)], aggregate: Bool) -> any RecorderHandler { + return self.lock.withLock { () -> any RecorderHandler in + if let existing = self._recorders[.init(label: label, dimensions: dimensions)] { + return existing + } + let item = TestRecorder(label: label, dimensions: dimensions, aggregate: aggregate) + self._recorders[.init(label: label, dimensions: dimensions)] = item + return item + } + } + + public func makeTimer(label: String, dimensions: [(String, String)]) -> any TimerHandler { + return self.lock.withLock { () -> any TimerHandler in + if let existing = self._timers[.init(label: label, dimensions: dimensions)] { + return existing + } + let item = TestTimer(label: label, dimensions: dimensions) + self._timers[.init(label: label, dimensions: dimensions)] = item + return item + } + } + + public func destroyCounter(_ handler: any CounterHandler) { + if let testCounter = handler as? TestCounter { + self.lock.withLock { + self._counters.removeValue(forKey: testCounter.key) + } + } + } + + public func destroyMeter(_ handler: any MeterHandler) { + if let testMeter = handler as? TestMeter { + self.lock.withLock { () in + self._meters.removeValue(forKey: testMeter.key) + } + } + } + + public func destroyRecorder(_ handler: any RecorderHandler) { + if let testRecorder = handler as? TestRecorder { + self.lock.withLock { + self._recorders.removeValue(forKey: testRecorder.key) + } + } + } + + public func destroyTimer(_ handler: any TimerHandler) { + if let testTimer = handler as? TestTimer { + self.lock.withLock { + self._timers.removeValue(forKey: testTimer.key) + } + } + } +} + +extension TestMetrics.FullKey: Hashable { + public func hash(into hasher: inout Hasher) { + self.label.hash(into: &hasher) + for dim in self.dimensions { + dim.0.hash(into: &hasher) + dim.1.hash(into: &hasher) + } + } + + public static func == (lhs: TestMetrics.FullKey, rhs: TestMetrics.FullKey) -> Bool { + return lhs.label == rhs.label && + Dictionary(uniqueKeysWithValues: lhs.dimensions) == Dictionary(uniqueKeysWithValues: rhs.dimensions) + } +} + +// MARK: - Assertions + +extension TestMetrics { + // MARK: - Counter + + public func expectCounter(_ metric: Counter) throws -> TestCounter { + guard let counter = metric._handler as? TestCounter else { + throw TestMetricsError.illegalMetricType(metric: metric._handler, expected: "\(TestCounter.self)") + } + return counter + } + + public func expectCounter(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestCounter { + let maybeItem = self.lock.withLock { + self._counters[.init(label: label, dimensions: dimensions)] + } + guard let testCounter = maybeItem else { + throw TestMetricsError.missingMetric(label: label, dimensions: dimensions) + } + return testCounter + } + + /// All the counters which have been created and not destroyed + public var counters: [TestCounter] { + let counters = self.lock.withLock { + self._counters + } + return Array(counters.values) + } + + // MARK: - Gauge + + public func expectGauge(_ metric: Gauge) throws -> TestRecorder { + return try self.expectRecorder(metric) + } + + public func expectGauge(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestRecorder { + return try self.expectRecorder(label, dimensions) + } + + // MARK: - Meter + + public func expectMeter(_ metric: Meter) throws -> TestMeter { + guard let meter = metric._handler as? TestMeter else { + throw TestMetricsError.illegalMetricType(metric: metric._handler, expected: "\(TestMeter.self)") + } + return meter + } + + public func expectMeter(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestMeter { + let maybeItem = self.lock.withLock { + self._meters[.init(label: label, dimensions: dimensions)] + } + guard let testMeter = maybeItem else { + throw TestMetricsError.missingMetric(label: label, dimensions: dimensions) + } + return testMeter + } + + /// All the meters which have been created and not destroyed + public var meters: [TestMeter] { + let meters = self.lock.withLock { + self._meters + } + return Array(meters.values) + } + + // MARK: - Recorder + + public func expectRecorder(_ metric: Recorder) throws -> TestRecorder { + guard let recorder = metric._handler as? TestRecorder else { + throw TestMetricsError.illegalMetricType(metric: metric._handler, expected: "\(TestRecorder.self)") + } + return recorder + } + + public func expectRecorder(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestRecorder { + let maybeItem = self.lock.withLock { + self._recorders[.init(label: label, dimensions: dimensions)] + } + guard let testRecorder = maybeItem else { + throw TestMetricsError.missingMetric(label: label, dimensions: dimensions) + } + return testRecorder + } + + /// All the recorders which have been created and not destroyed + public var recorders: [TestRecorder] { + let recorders = self.lock.withLock { + self._recorders + } + return Array(recorders.values) + } + + // MARK: - Timer + + public func expectTimer(_ metric: CoreMetrics.Timer) throws -> TestTimer { + guard let timer = metric._handler as? TestTimer else { + throw TestMetricsError.illegalMetricType(metric: metric._handler, expected: "\(TestTimer.self)") + } + return timer + } + + public func expectTimer(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestTimer { + let maybeItem = self.lock.withLock { + self._timers[.init(label: label, dimensions: dimensions)] + } + guard let testTimer = maybeItem else { + throw TestMetricsError.missingMetric(label: label, dimensions: dimensions) + } + return testTimer + } + + /// All the timers which have been created and not destroyed + public var timers: [TestTimer] { + let timers = self.lock.withLock { + self._timers + } + return Array(timers.values) + } +} + +// MARK: - Metric type implementations + +public protocol TestMetric { + associatedtype Value + + var key: TestMetrics.FullKey { get } + + var lastValue: Value? { get } + var last: (Date, Value)? { get } +} + +public final class TestCounter: TestMetric, CounterHandler, Equatable { + public let id: String + public let label: String + public let dimensions: [(String, String)] + + public var key: TestMetrics.FullKey { + return TestMetrics.FullKey(label: self.label, dimensions: self.dimensions) + } + + let lock = NSLock() + private var _values = [(Date, Int64)]() + + init(label: String, dimensions: [(String, String)]) { + self.id = UUID().uuidString + self.label = label + self.dimensions = dimensions + } + + public func increment(by amount: Int64) { + self.lock.withLock { + self._values.append((Date(), amount)) + } + } + + public func reset() { + return self.lock.withLock { + self._values = [] + } + } + + public var lastValue: Int64? { + return self.last?.1 + } + + public var totalValue: Int64 { + return self.values.reduce(0, +) + } + + public var last: (Date, Int64)? { + return self.lock.withLock { + self._values.last + } + } + + public var values: [Int64] { + return self.lock.withLock { + self._values.map { $0.1 } + } + } + + public static func == (lhs: TestCounter, rhs: TestCounter) -> Bool { + return lhs.id == rhs.id + } +} + +public final class TestMeter: TestMetric, MeterHandler, Equatable { + public let id: String + public let label: String + public let dimensions: [(String, String)] + + public var key: TestMetrics.FullKey { + return TestMetrics.FullKey(label: self.label, dimensions: self.dimensions) + } + + let lock = NSLock() + private var _values = [(Date, Double)]() + + init(label: String, dimensions: [(String, String)]) { + self.id = UUID().uuidString + self.label = label + self.dimensions = dimensions + } + + public func set(_ value: Int64) { + self.set(Double(value)) + } + + public func set(_ value: Double) { + self.lock.withLock { + // this may lose precision but good enough as an example + self._values.append((Date(), Double(value))) + } + } + + public func increment(by amount: Double) { + // Drop illegal values + // - cannot increment by NaN + guard !amount.isNaN else { + return + } + // - cannot increment by infinite quantities + guard !amount.isInfinite else { + return + } + // - cannot increment by negative values + guard amount.sign == .plus else { + return + } + // - cannot increment by zero + guard !amount.isZero else { + return + } + + self.lock.withLock { + let lastValue: Double = self._values.last?.1 ?? 0 + let newValue = lastValue + amount + self._values.append((Date(), newValue)) + } + } + + public func decrement(by amount: Double) { + // Drop illegal values + // - cannot decrement by NaN + guard !amount.isNaN else { + return + } + // - cannot decrement by infinite quantities + guard !amount.isInfinite else { + return + } + // - cannot decrement by negative values + guard amount.sign == .plus else { + return + } + // - cannot decrement by zero + guard !amount.isZero else { + return + } + + self.lock.withLock { + let lastValue: Double = self._values.last?.1 ?? 0 + let newValue = lastValue - amount + self._values.append((Date(), newValue)) + } + } + + public var lastValue: Double? { + return self.last?.1 + } + + public var last: (Date, Double)? { + return self.lock.withLock { + self._values.last + } + } + + public var values: [Double] { + return self.lock.withLock { + self._values.map { $0.1 } + } + } + + public static func == (lhs: TestMeter, rhs: TestMeter) -> Bool { + return lhs.id == rhs.id + } +} + +public final class TestRecorder: TestMetric, RecorderHandler, Equatable { + public let id: String + public let label: String + public let dimensions: [(String, String)] + public let aggregate: Bool + + public var key: TestMetrics.FullKey { + return TestMetrics.FullKey(label: self.label, dimensions: self.dimensions) + } + + let lock = NSLock() + private var _values = [(Date, Double)]() + + init(label: String, dimensions: [(String, String)], aggregate: Bool) { + self.id = UUID().uuidString + self.label = label + self.dimensions = dimensions + self.aggregate = aggregate + } + + public func record(_ value: Int64) { + self.record(Double(value)) + } + + public func record(_ value: Double) { + self.lock.withLock { + // this may lose precision but good enough as an example + self._values.append((Date(), Double(value))) + } + } + + public var lastValue: Double? { + return self.last?.1 + } + + public var last: (Date, Double)? { + return self.lock.withLock { + self._values.last + } + } + + public var values: [Double] { + return self.lock.withLock { + self._values.map { $0.1 } + } + } + + public static func == (lhs: TestRecorder, rhs: TestRecorder) -> Bool { + return lhs.id == rhs.id + } +} + +public final class TestTimer: TestMetric, TimerHandler, Equatable { + public let id: String + public let label: String + public var displayUnit: TimeUnit? + public let dimensions: [(String, String)] + + public var key: TestMetrics.FullKey { + return TestMetrics.FullKey(label: self.label, dimensions: self.dimensions) + } + + let lock = NSLock() + private var _values = [(Date, Int64)]() + + init(label: String, dimensions: [(String, String)]) { + self.id = UUID().uuidString + self.label = label + self.displayUnit = nil + self.dimensions = dimensions + } + + public func preferDisplayUnit(_ unit: TimeUnit) { + self.lock.withLock { + self.displayUnit = unit + } + } + + public func valueInPreferredUnit(atIndex i: Int) -> Double { + let value = self.values[i] + guard let displayUnit = self.displayUnit else { + return Double(value) + } + return Double(value) / Double(displayUnit.scaleFromNanoseconds) + } + + public func recordNanoseconds(_ duration: Int64) { + self.lock.withLock { + self._values.append((Date(), duration)) + } + } + + public var lastValue: Int64? { + return self.last?.1 + } + + public var values: [Int64] { + return self.lock.withLock { + self._values.map { $0.1 } + } + } + + public var last: (Date, Int64)? { + return self.lock.withLock { + self._values.last + } + } + + public static func == (lhs: TestTimer, rhs: TestTimer) -> Bool { + return lhs.id == rhs.id + } +} + +extension NSLock { + @discardableResult + fileprivate func withLock(_ body: () -> T) -> T { + self.lock() + defer { + self.unlock() + } + return body() + } +} + +// MARK: - Errors + +public enum TestMetricsError: Error { + case missingMetric(label: String, dimensions: [(String, String)]) + case illegalMetricType(metric: any Sendable, expected: String) +} + +// MARK: - Sendable support + +// ideally we would not be using @unchecked here, but concurrency-safety checks do not recognize locks +extension TestMetrics: @unchecked Sendable {} +extension TestCounter: @unchecked Sendable {} +extension TestMeter: @unchecked Sendable {} +extension TestRecorder: @unchecked Sendable {} +extension TestTimer: @unchecked Sendable {} From 439eb3eb7a29646d2e33ff3184ae30cc3b7fa1df Mon Sep 17 00:00:00 2001 From: Paul Toffoloni Date: Wed, 28 Aug 2024 17:39:37 +0200 Subject: [PATCH 5/6] Remove useless test file --- Package.swift | 1 + Package@swift-5.9.swift | 5 +- Tests/QueuesTests/MetricsTests.swift | 1 + Tests/QueuesTests/Utilities/TestMetrics.swift | 591 ------------------ 4 files changed, 5 insertions(+), 593 deletions(-) delete mode 100644 Tests/QueuesTests/Utilities/TestMetrics.swift diff --git a/Package.swift b/Package.swift index 9edbf4f..410e747 100644 --- a/Package.swift +++ b/Package.swift @@ -41,6 +41,7 @@ let package = Package( .target(name: "Queues"), .target(name: "XCTQueues"), .product(name: "XCTVapor", package: "vapor"), + .product(name: "MetricsTestKit", package: "swift-metrics"), ], swiftSettings: swiftSettings ), diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift index 79feedf..f04268b 100644 --- a/Package@swift-5.9.swift +++ b/Package@swift-5.9.swift @@ -14,8 +14,8 @@ let package = Package( .library(name: "XCTQueues", targets: ["XCTQueues"]), ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), + .package(url: "https://github.com/vapor/vapor.git", from: "4.104.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.70.0"), .package(url: "https://github.com/apple/swift-metrics.git", from: "2.5.0"), ], targets: [ @@ -41,6 +41,7 @@ let package = Package( .target(name: "Queues"), .target(name: "XCTQueues"), .product(name: "XCTVapor", package: "vapor"), + .product(name: "MetricsTestKit", package: "swift-metrics"), ], swiftSettings: swiftSettings ), diff --git a/Tests/QueuesTests/MetricsTests.swift b/Tests/QueuesTests/MetricsTests.swift index c09036d..74673f7 100644 --- a/Tests/QueuesTests/MetricsTests.swift +++ b/Tests/QueuesTests/MetricsTests.swift @@ -1,5 +1,6 @@ @testable import CoreMetrics import Metrics +import MetricsTestKit import NIOConcurrencyHelpers import Queues @testable import Vapor diff --git a/Tests/QueuesTests/Utilities/TestMetrics.swift b/Tests/QueuesTests/Utilities/TestMetrics.swift deleted file mode 100644 index 40b0243..0000000 --- a/Tests/QueuesTests/Utilities/TestMetrics.swift +++ /dev/null @@ -1,591 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Metrics API open source project -// -// Copyright (c) 2021 Apple Inc. and the Swift Metrics API project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of Swift Metrics API project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Cluster Membership open source project -// -// Copyright (c) 2020 Apple Inc. and the Swift Cluster Membership project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.md for the list of Swift Cluster Membership project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import CoreMetrics -import Metrics -import XCTest - -/// Taken directly from `swift-cluster-memberships`'s own test target package, which -/// adopts the `TestMetrics` from `swift-metrics`. -/// -/// Metrics factory which allows inspecting recorded metrics programmatically. -/// Only intended for tests of the Metrics API itself. -/// -/// Created Handlers will store Metrics until they are explicitly destroyed. -/// -public final class TestMetrics: MetricsFactory { - private let lock = NSLock() - - public typealias Label = String - public typealias Dimensions = String - - public struct FullKey: Sendable { - let label: Label - let dimensions: [(String, String)] - } - - private var _counters = [FullKey: TestCounter]() - private var _meters = [FullKey: TestMeter]() - private var _recorders = [FullKey: TestRecorder]() - private var _timers = [FullKey: TestTimer]() - - public init() { - // nothing to do - } - - /// Reset method to destroy all created ``TestCounter``, ``TestMeter``, ``TestRecorder`` and ``TestTimer``. - /// Invoke this method in between test runs to verify that Counters are created as needed. - public func reset() { - self.lock.withLock { - self._counters = [:] - self._recorders = [:] - self._meters = [:] - self._timers = [:] - } - } - - public func makeCounter(label: String, dimensions: [(String, String)]) -> any CounterHandler { - return self.lock.withLock { () -> any CounterHandler in - if let existing = self._counters[.init(label: label, dimensions: dimensions)] { - return existing - } - let item = TestCounter(label: label, dimensions: dimensions) - self._counters[.init(label: label, dimensions: dimensions)] = item - return item - } - } - - public func makeMeter(label: String, dimensions: [(String, String)]) -> any MeterHandler { - return self.lock.withLock { () -> any MeterHandler in - if let existing = self._meters[.init(label: label, dimensions: dimensions)] { - return existing - } - let item = TestMeter(label: label, dimensions: dimensions) - self._meters[.init(label: label, dimensions: dimensions)] = item - return item - } - } - - public func makeRecorder(label: String, dimensions: [(String, String)], aggregate: Bool) -> any RecorderHandler { - return self.lock.withLock { () -> any RecorderHandler in - if let existing = self._recorders[.init(label: label, dimensions: dimensions)] { - return existing - } - let item = TestRecorder(label: label, dimensions: dimensions, aggregate: aggregate) - self._recorders[.init(label: label, dimensions: dimensions)] = item - return item - } - } - - public func makeTimer(label: String, dimensions: [(String, String)]) -> any TimerHandler { - return self.lock.withLock { () -> any TimerHandler in - if let existing = self._timers[.init(label: label, dimensions: dimensions)] { - return existing - } - let item = TestTimer(label: label, dimensions: dimensions) - self._timers[.init(label: label, dimensions: dimensions)] = item - return item - } - } - - public func destroyCounter(_ handler: any CounterHandler) { - if let testCounter = handler as? TestCounter { - self.lock.withLock { - self._counters.removeValue(forKey: testCounter.key) - } - } - } - - public func destroyMeter(_ handler: any MeterHandler) { - if let testMeter = handler as? TestMeter { - self.lock.withLock { () in - self._meters.removeValue(forKey: testMeter.key) - } - } - } - - public func destroyRecorder(_ handler: any RecorderHandler) { - if let testRecorder = handler as? TestRecorder { - self.lock.withLock { - self._recorders.removeValue(forKey: testRecorder.key) - } - } - } - - public func destroyTimer(_ handler: any TimerHandler) { - if let testTimer = handler as? TestTimer { - self.lock.withLock { - self._timers.removeValue(forKey: testTimer.key) - } - } - } -} - -extension TestMetrics.FullKey: Hashable { - public func hash(into hasher: inout Hasher) { - self.label.hash(into: &hasher) - for dim in self.dimensions { - dim.0.hash(into: &hasher) - dim.1.hash(into: &hasher) - } - } - - public static func == (lhs: TestMetrics.FullKey, rhs: TestMetrics.FullKey) -> Bool { - return lhs.label == rhs.label && - Dictionary(uniqueKeysWithValues: lhs.dimensions) == Dictionary(uniqueKeysWithValues: rhs.dimensions) - } -} - -// MARK: - Assertions - -extension TestMetrics { - // MARK: - Counter - - public func expectCounter(_ metric: Counter) throws -> TestCounter { - guard let counter = metric._handler as? TestCounter else { - throw TestMetricsError.illegalMetricType(metric: metric._handler, expected: "\(TestCounter.self)") - } - return counter - } - - public func expectCounter(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestCounter { - let maybeItem = self.lock.withLock { - self._counters[.init(label: label, dimensions: dimensions)] - } - guard let testCounter = maybeItem else { - throw TestMetricsError.missingMetric(label: label, dimensions: dimensions) - } - return testCounter - } - - /// All the counters which have been created and not destroyed - public var counters: [TestCounter] { - let counters = self.lock.withLock { - self._counters - } - return Array(counters.values) - } - - // MARK: - Gauge - - public func expectGauge(_ metric: Gauge) throws -> TestRecorder { - return try self.expectRecorder(metric) - } - - public func expectGauge(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestRecorder { - return try self.expectRecorder(label, dimensions) - } - - // MARK: - Meter - - public func expectMeter(_ metric: Meter) throws -> TestMeter { - guard let meter = metric._handler as? TestMeter else { - throw TestMetricsError.illegalMetricType(metric: metric._handler, expected: "\(TestMeter.self)") - } - return meter - } - - public func expectMeter(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestMeter { - let maybeItem = self.lock.withLock { - self._meters[.init(label: label, dimensions: dimensions)] - } - guard let testMeter = maybeItem else { - throw TestMetricsError.missingMetric(label: label, dimensions: dimensions) - } - return testMeter - } - - /// All the meters which have been created and not destroyed - public var meters: [TestMeter] { - let meters = self.lock.withLock { - self._meters - } - return Array(meters.values) - } - - // MARK: - Recorder - - public func expectRecorder(_ metric: Recorder) throws -> TestRecorder { - guard let recorder = metric._handler as? TestRecorder else { - throw TestMetricsError.illegalMetricType(metric: metric._handler, expected: "\(TestRecorder.self)") - } - return recorder - } - - public func expectRecorder(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestRecorder { - let maybeItem = self.lock.withLock { - self._recorders[.init(label: label, dimensions: dimensions)] - } - guard let testRecorder = maybeItem else { - throw TestMetricsError.missingMetric(label: label, dimensions: dimensions) - } - return testRecorder - } - - /// All the recorders which have been created and not destroyed - public var recorders: [TestRecorder] { - let recorders = self.lock.withLock { - self._recorders - } - return Array(recorders.values) - } - - // MARK: - Timer - - public func expectTimer(_ metric: CoreMetrics.Timer) throws -> TestTimer { - guard let timer = metric._handler as? TestTimer else { - throw TestMetricsError.illegalMetricType(metric: metric._handler, expected: "\(TestTimer.self)") - } - return timer - } - - public func expectTimer(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestTimer { - let maybeItem = self.lock.withLock { - self._timers[.init(label: label, dimensions: dimensions)] - } - guard let testTimer = maybeItem else { - throw TestMetricsError.missingMetric(label: label, dimensions: dimensions) - } - return testTimer - } - - /// All the timers which have been created and not destroyed - public var timers: [TestTimer] { - let timers = self.lock.withLock { - self._timers - } - return Array(timers.values) - } -} - -// MARK: - Metric type implementations - -public protocol TestMetric { - associatedtype Value - - var key: TestMetrics.FullKey { get } - - var lastValue: Value? { get } - var last: (Date, Value)? { get } -} - -public final class TestCounter: TestMetric, CounterHandler, Equatable { - public let id: String - public let label: String - public let dimensions: [(String, String)] - - public var key: TestMetrics.FullKey { - return TestMetrics.FullKey(label: self.label, dimensions: self.dimensions) - } - - let lock = NSLock() - private var _values = [(Date, Int64)]() - - init(label: String, dimensions: [(String, String)]) { - self.id = UUID().uuidString - self.label = label - self.dimensions = dimensions - } - - public func increment(by amount: Int64) { - self.lock.withLock { - self._values.append((Date(), amount)) - } - } - - public func reset() { - return self.lock.withLock { - self._values = [] - } - } - - public var lastValue: Int64? { - return self.last?.1 - } - - public var totalValue: Int64 { - return self.values.reduce(0, +) - } - - public var last: (Date, Int64)? { - return self.lock.withLock { - self._values.last - } - } - - public var values: [Int64] { - return self.lock.withLock { - self._values.map { $0.1 } - } - } - - public static func == (lhs: TestCounter, rhs: TestCounter) -> Bool { - return lhs.id == rhs.id - } -} - -public final class TestMeter: TestMetric, MeterHandler, Equatable { - public let id: String - public let label: String - public let dimensions: [(String, String)] - - public var key: TestMetrics.FullKey { - return TestMetrics.FullKey(label: self.label, dimensions: self.dimensions) - } - - let lock = NSLock() - private var _values = [(Date, Double)]() - - init(label: String, dimensions: [(String, String)]) { - self.id = UUID().uuidString - self.label = label - self.dimensions = dimensions - } - - public func set(_ value: Int64) { - self.set(Double(value)) - } - - public func set(_ value: Double) { - self.lock.withLock { - // this may lose precision but good enough as an example - self._values.append((Date(), Double(value))) - } - } - - public func increment(by amount: Double) { - // Drop illegal values - // - cannot increment by NaN - guard !amount.isNaN else { - return - } - // - cannot increment by infinite quantities - guard !amount.isInfinite else { - return - } - // - cannot increment by negative values - guard amount.sign == .plus else { - return - } - // - cannot increment by zero - guard !amount.isZero else { - return - } - - self.lock.withLock { - let lastValue: Double = self._values.last?.1 ?? 0 - let newValue = lastValue + amount - self._values.append((Date(), newValue)) - } - } - - public func decrement(by amount: Double) { - // Drop illegal values - // - cannot decrement by NaN - guard !amount.isNaN else { - return - } - // - cannot decrement by infinite quantities - guard !amount.isInfinite else { - return - } - // - cannot decrement by negative values - guard amount.sign == .plus else { - return - } - // - cannot decrement by zero - guard !amount.isZero else { - return - } - - self.lock.withLock { - let lastValue: Double = self._values.last?.1 ?? 0 - let newValue = lastValue - amount - self._values.append((Date(), newValue)) - } - } - - public var lastValue: Double? { - return self.last?.1 - } - - public var last: (Date, Double)? { - return self.lock.withLock { - self._values.last - } - } - - public var values: [Double] { - return self.lock.withLock { - self._values.map { $0.1 } - } - } - - public static func == (lhs: TestMeter, rhs: TestMeter) -> Bool { - return lhs.id == rhs.id - } -} - -public final class TestRecorder: TestMetric, RecorderHandler, Equatable { - public let id: String - public let label: String - public let dimensions: [(String, String)] - public let aggregate: Bool - - public var key: TestMetrics.FullKey { - return TestMetrics.FullKey(label: self.label, dimensions: self.dimensions) - } - - let lock = NSLock() - private var _values = [(Date, Double)]() - - init(label: String, dimensions: [(String, String)], aggregate: Bool) { - self.id = UUID().uuidString - self.label = label - self.dimensions = dimensions - self.aggregate = aggregate - } - - public func record(_ value: Int64) { - self.record(Double(value)) - } - - public func record(_ value: Double) { - self.lock.withLock { - // this may lose precision but good enough as an example - self._values.append((Date(), Double(value))) - } - } - - public var lastValue: Double? { - return self.last?.1 - } - - public var last: (Date, Double)? { - return self.lock.withLock { - self._values.last - } - } - - public var values: [Double] { - return self.lock.withLock { - self._values.map { $0.1 } - } - } - - public static func == (lhs: TestRecorder, rhs: TestRecorder) -> Bool { - return lhs.id == rhs.id - } -} - -public final class TestTimer: TestMetric, TimerHandler, Equatable { - public let id: String - public let label: String - public var displayUnit: TimeUnit? - public let dimensions: [(String, String)] - - public var key: TestMetrics.FullKey { - return TestMetrics.FullKey(label: self.label, dimensions: self.dimensions) - } - - let lock = NSLock() - private var _values = [(Date, Int64)]() - - init(label: String, dimensions: [(String, String)]) { - self.id = UUID().uuidString - self.label = label - self.displayUnit = nil - self.dimensions = dimensions - } - - public func preferDisplayUnit(_ unit: TimeUnit) { - self.lock.withLock { - self.displayUnit = unit - } - } - - public func valueInPreferredUnit(atIndex i: Int) -> Double { - let value = self.values[i] - guard let displayUnit = self.displayUnit else { - return Double(value) - } - return Double(value) / Double(displayUnit.scaleFromNanoseconds) - } - - public func recordNanoseconds(_ duration: Int64) { - self.lock.withLock { - self._values.append((Date(), duration)) - } - } - - public var lastValue: Int64? { - return self.last?.1 - } - - public var values: [Int64] { - return self.lock.withLock { - self._values.map { $0.1 } - } - } - - public var last: (Date, Int64)? { - return self.lock.withLock { - self._values.last - } - } - - public static func == (lhs: TestTimer, rhs: TestTimer) -> Bool { - return lhs.id == rhs.id - } -} - -extension NSLock { - @discardableResult - fileprivate func withLock(_ body: () -> T) -> T { - self.lock() - defer { - self.unlock() - } - return body() - } -} - -// MARK: - Errors - -public enum TestMetricsError: Error { - case missingMetric(label: String, dimensions: [(String, String)]) - case illegalMetricType(metric: any Sendable, expected: String) -} - -// MARK: - Sendable support - -// ideally we would not be using @unchecked here, but concurrency-safety checks do not recognize locks -extension TestMetrics: @unchecked Sendable {} -extension TestCounter: @unchecked Sendable {} -extension TestMeter: @unchecked Sendable {} -extension TestRecorder: @unchecked Sendable {} -extension TestTimer: @unchecked Sendable {} From aa9123e71cadd50a3008f640868eed8ff9400486 Mon Sep 17 00:00:00 2001 From: Paul Toffoloni Date: Wed, 28 Aug 2024 17:44:07 +0200 Subject: [PATCH 6/6] Update 5.8 manifest --- Package.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Package.swift b/Package.swift index 410e747..6c3a2ed 100644 --- a/Package.swift +++ b/Package.swift @@ -14,8 +14,8 @@ let package = Package( .library(name: "XCTQueues", targets: ["XCTQueues"]), ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), + .package(url: "https://github.com/vapor/vapor.git", from: "4.104.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.70.0"), .package(url: "https://github.com/apple/swift-metrics.git", from: "2.5.0"), ], targets: [