diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index 96d4970..ff585ea 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -65,6 +65,11 @@ public struct QueueWorker: Sendable { try await $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).get() } + Meter( + label: "jobs.in.progress.meter", + dimensions: [("queueName", self.queue.queueName.string)] + ).increment() + try await self.runOneJob(id: id, job: job, jobData: data, logger: logger) return true } @@ -122,17 +127,15 @@ public struct QueueWorker: Sendable { queue: any Queue, error: (any Error)? = nil ) { - // Checks how long the job took to complete Timer( label: "\(jobName).jobDurationTimer", dimensions: [ ("success", error == nil ? "true" : "false"), ("jobName", jobName), ], - preferredDisplayUnit: .seconds + preferredDisplayUnit: .milliseconds ).recordNanoseconds(DispatchTime.now().uptimeNanoseconds - startTime) - // Adds the completed job to a different counter depending on its result if error != nil { Counter( label: "error.completed.jobs.counter", @@ -144,5 +147,10 @@ public struct QueueWorker: Sendable { dimensions: [("queueName", queue.queueName.string)] ).increment() } + + Meter( + label: "jobs.in.progress.meter", + dimensions: [("queueName", queue.queueName.string)] + ).decrement() } } diff --git a/Tests/QueuesTests/MetricsTests.swift b/Tests/QueuesTests/MetricsTests.swift index cfef6f8..6051c92 100644 --- a/Tests/QueuesTests/MetricsTests.swift +++ b/Tests/QueuesTests/MetricsTests.swift @@ -66,6 +66,7 @@ final class MetricsTests: XCTestCase { let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "success.completed.jobs.counter" })) let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + XCTAssertEqual(counter.lastValue, 1) } func testErroringJobsCounter() async throws { @@ -86,6 +87,7 @@ final class MetricsTests: XCTestCase { let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "error.completed.jobs.counter" })) let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + XCTAssertEqual(counter.lastValue, 1) } func testDispatchedJobsCounter() async throws { @@ -110,5 +112,28 @@ final class MetricsTests: XCTestCase { let jobNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "jobName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) XCTAssertEqual(jobNameDimension.1, MyAsyncJob.name) + XCTAssertEqual(counter.totalValue, 2) + } + + func testInProgressJobsGauge() async throws { + let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req async throws in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + try await self.app.queues.queue.worker.run() + + let meter = try XCTUnwrap(self.metrics.meters.first(where: { $0.label == "jobs.in.progress.meter" })) + let queueNameDimension = try XCTUnwrap(meter.dimensions.first(where: { $0.0 == "queueName" })) + XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + XCTAssertEqual(meter.values, [1, 0]) } }