Skip to content

Commit

Permalink
Add AsyncStream-based serial queue
Browse files Browse the repository at this point in the history
  • Loading branch information
mattmassicotte committed Dec 6, 2023
1 parent 8065ac4 commit c7880b3
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 0 deletions.
70 changes: 70 additions & 0 deletions Sources/Queue/AsyncSerialQueue.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
actor AsyncSerialQueue {
public actor QueueTask {
var cancelled = false
let operation: @Sendable () async throws -> Void

init(operation: @escaping @Sendable () async throws -> Void) {
self.operation = operation
}

public nonisolated func cancel() {
Task { await internalCancel() }
}

private func internalCancel() {
cancelled = true
}

func run() async throws {
if cancelled {
return
}

try await operation()
}
}

typealias Stream = AsyncStream<QueueTask>

private let continuation: Stream.Continuation

public init() {
let (stream, continuation) = Stream.makeStream()

self.continuation = continuation

Task {
for await item in stream {
try? await item.run()
}
}
}

deinit {
continuation.finish()
}

/// Submit a throwing operation to the queue.
@discardableResult
public nonisolated func addOperation(
@_inheritActorContext operation: @escaping @Sendable () async throws -> Void
) -> QueueTask {
let queueTask = QueueTask(operation: operation)

continuation.yield(queueTask)

return queueTask
}

/// Submit an operation to the queue.
@discardableResult
public nonisolated func addOperation(
@_inheritActorContext operation: @escaping @Sendable () async -> Void
) -> QueueTask {
let queueTask = QueueTask(operation: operation)

continuation.yield(queueTask)

return queueTask
}
}
23 changes: 23 additions & 0 deletions Tests/QueueTests/AsyncSerialQueueTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import XCTest
@testable import Queue

final class AsyncSerialQueueTests: XCTestCase {

func testEnqueuePerformance() {
let queue = AsyncSerialQueue()

measure {
// techincally measuring the actor creation time too, but I don't think that is a big deal
let s = Counter()

for i in 1 ... 1_000 {
queue.addOperation { [i] in
let result = await s.increment()
if i != result {
print(i, "does not match", result)
}
}
}
}
}
}

0 comments on commit c7880b3

Please sign in to comment.