Skip to content

Commit

Permalink
Add Meter for in-progress jobs (#139)
Browse files Browse the repository at this point in the history
* Add Gauge for in-progress jobs

* Whitespace nit

* Move the gauge to the right place

* Make `Gauge` a `Meter` and add test

* Change back job duration timer label
  • Loading branch information
ptoffy authored Nov 26, 2024
1 parent c502c4a commit 7511f2d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
14 changes: 11 additions & 3 deletions Sources/Queues/QueueWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public struct QueueWorker: Sendable {
try await $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).get()
}

Meter(
label: "jobs.in.progress.meter",
dimensions: [("queueName", self.queue.queueName.string)]
).increment()

try await self.runOneJob(id: id, job: job, jobData: data, logger: logger)
return true
}
Expand Down Expand Up @@ -122,17 +127,15 @@ public struct QueueWorker: Sendable {
queue: any Queue,
error: (any Error)? = nil
) {
// Checks how long the job took to complete
Timer(
label: "\(jobName).jobDurationTimer",
dimensions: [
("success", error == nil ? "true" : "false"),
("jobName", jobName),
],
preferredDisplayUnit: .seconds
preferredDisplayUnit: .milliseconds
).recordNanoseconds(DispatchTime.now().uptimeNanoseconds - startTime)

// Adds the completed job to a different counter depending on its result
if error != nil {
Counter(
label: "error.completed.jobs.counter",
Expand All @@ -144,5 +147,10 @@ public struct QueueWorker: Sendable {
dimensions: [("queueName", queue.queueName.string)]
).increment()
}

Meter(
label: "jobs.in.progress.meter",
dimensions: [("queueName", queue.queueName.string)]
).decrement()
}
}
25 changes: 25 additions & 0 deletions Tests/QueuesTests/MetricsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ final class MetricsTests: XCTestCase {
let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "success.completed.jobs.counter" }))
let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" }))
XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string)
XCTAssertEqual(counter.lastValue, 1)
}

func testErroringJobsCounter() async throws {
Expand All @@ -86,6 +87,7 @@ final class MetricsTests: XCTestCase {
let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "error.completed.jobs.counter" }))
let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" }))
XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string)
XCTAssertEqual(counter.lastValue, 1)
}

func testDispatchedJobsCounter() async throws {
Expand All @@ -110,5 +112,28 @@ final class MetricsTests: XCTestCase {
let jobNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "jobName" }))
XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string)
XCTAssertEqual(jobNameDimension.1, MyAsyncJob.name)
XCTAssertEqual(counter.totalValue, 2)
}

func testInProgressJobsGauge() async throws {
let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self)
self.app.queues.add(MyAsyncJob(promise: promise))

self.app.get("foo") { req async throws in
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"))
return "done"
}

try await self.app.testable().test(.GET, "foo") { res async in
XCTAssertEqual(res.status, .ok)
XCTAssertEqual(res.body.string, "done")
}

try await self.app.queues.queue.worker.run()

let meter = try XCTUnwrap(self.metrics.meters.first(where: { $0.label == "jobs.in.progress.meter" }))
let queueNameDimension = try XCTUnwrap(meter.dimensions.first(where: { $0.0 == "queueName" }))
XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string)
XCTAssertEqual(meter.values, [1, 0])
}
}

0 comments on commit 7511f2d

Please sign in to comment.