Skip to content

Commit

Permalink
[Runtime] SOAR-0010: Event streams sequences (#91)
Browse files Browse the repository at this point in the history
### Motivation

Land changes approved in
apple/swift-openapi-generator#495.

### Modifications

Introduced the new APIs.

### Result

Easy use of event streams.

### Test Plan

Added unit tests for all.

---------

Co-authored-by: Si Beaumont <[email protected]>
  • Loading branch information
czechboy0 and simonjbeaumont authored Jan 8, 2024
1 parent fd101c3 commit 6be221f
Show file tree
Hide file tree
Showing 16 changed files with 1,915 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ enum ASCII {
/// The line feed `<LF>` character.
static let lf: UInt8 = 0x0a

/// The record separator `<RS>` character.
static let rs: UInt8 = 0x1e

/// The colon `:` character.
static let colon: UInt8 = 0x3a

Expand Down Expand Up @@ -122,3 +125,34 @@ extension RandomAccessCollection where Element: Equatable {
return .noMatch
}
}

/// A value returned by the `matchOfOneOf` method.
enum MatchOfOneOfResult<C: RandomAccessCollection> {

/// No match found at any position in self.
case noMatch

/// The first option matched.
case first(C.Index)

/// The second option matched.
case second(C.Index)
}

extension RandomAccessCollection where Element: Equatable {
/// Returns the index of the first match of one of two elements.
/// - Parameters:
/// - first: The first element to match.
/// - second: The second element to match.
/// - Returns: The result.
func matchOfOneOf(first: Element, second: Element) -> MatchOfOneOfResult<Self> {
var index = startIndex
while index < endIndex {
let element = self[index]
if element == first { return .first(index) }
if element == second { return .second(index) }
formIndex(after: &index)
}
return .noMatch
}
}
181 changes: 181 additions & 0 deletions Sources/OpenAPIRuntime/EventStreams/JSONLinesDecoding.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftOpenAPIGenerator open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if canImport(Darwin)
import class Foundation.JSONDecoder
#else
@preconcurrency import class Foundation.JSONDecoder
#endif
import struct Foundation.Data

/// A sequence that parses arbitrary byte chunks into lines using the JSON Lines format.
public struct JSONLinesDeserializationSequence<Upstream: AsyncSequence & Sendable>: Sendable
where Upstream.Element == ArraySlice<UInt8> {

/// The upstream sequence.
private let upstream: Upstream

/// Creates a new sequence.
/// - Parameter upstream: The upstream sequence of arbitrary byte chunks.
public init(upstream: Upstream) { self.upstream = upstream }
}

extension JSONLinesDeserializationSequence: AsyncSequence {

/// The type of element produced by this asynchronous sequence.
public typealias Element = ArraySlice<UInt8>

/// The iterator of `JSONLinesDeserializationSequence`.
public struct Iterator<UpstreamIterator: AsyncIteratorProtocol>: AsyncIteratorProtocol
where UpstreamIterator.Element == Element {

/// The upstream iterator of arbitrary byte chunks.
var upstream: UpstreamIterator

/// The state machine of the iterator.
var stateMachine: StateMachine = .init()

/// Asynchronously advances to the next element and returns it, or ends the
/// sequence if there is no next element.
public mutating func next() async throws -> ArraySlice<UInt8>? {
while true {
switch stateMachine.next() {
case .returnNil: return nil
case .emitLine(let line): return line
case .needsMore:
let value = try await upstream.next()
switch stateMachine.receivedValue(value) {
case .returnNil: return nil
case .emitLine(let line): return line
case .noop: continue
}
}
}
}
}

/// Creates the asynchronous iterator that produces elements of this
/// asynchronous sequence.
public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator> {
Iterator(upstream: upstream.makeAsyncIterator())
}
}

extension AsyncSequence where Element == ArraySlice<UInt8> {

/// Returns another sequence that decodes each JSON Lines event as the provided type using the provided decoder.
/// - Parameters:
/// - eventType: The type to decode the JSON event into.
/// - decoder: The JSON decoder to use.
/// - Returns: A sequence that provides the decoded JSON events.
public func asDecodedJSONLines<Event: Decodable>(
of eventType: Event.Type = Event.self,
decoder: JSONDecoder = .init()
) -> AsyncThrowingMapSequence<JSONLinesDeserializationSequence<Self>, Event> {
JSONLinesDeserializationSequence(upstream: self)
.map { line in try decoder.decode(Event.self, from: Data(line)) }
}
}

extension JSONLinesDeserializationSequence.Iterator {

/// A state machine representing the JSON Lines deserializer.
struct StateMachine {

/// The possible states of the state machine.
enum State: Hashable {

/// Is waiting for the end of line.
case waitingForDelimiter(buffer: [UInt8])

/// Finished, the terminal state.
case finished

/// Helper state to avoid copy-on-write copies.
case mutating
}

/// The current state of the state machine.
private(set) var state: State

/// Creates a new state machine.
init() { self.state = .waitingForDelimiter(buffer: []) }

/// An action returned by the `next` method.
enum NextAction {

/// Return nil to the caller, no more bytes.
case returnNil

/// Emit a full line.
case emitLine(ArraySlice<UInt8>)

/// The line is not complete yet, needs more bytes.
case needsMore
}

/// Read the next line parsed from upstream bytes.
/// - Returns: An action to perform.
mutating func next() -> NextAction {
switch state {
case .waitingForDelimiter(var buffer):
state = .mutating
guard let indexOfNewline = buffer.firstIndex(of: ASCII.lf) else {
state = .waitingForDelimiter(buffer: buffer)
return .needsMore
}
let line = buffer[..<indexOfNewline]
buffer.removeSubrange(...indexOfNewline)
state = .waitingForDelimiter(buffer: buffer)
return .emitLine(line)
case .finished: return .returnNil
case .mutating: preconditionFailure("Invalid state")
}
}

/// An action returned by the `receivedValue` method.
enum ReceivedValueAction {

/// Return nil to the caller, no more lines.
case returnNil

/// Emit a full line.
case emitLine(ArraySlice<UInt8>)

/// No action, rerun the parsing loop.
case noop
}

/// Ingest the provided bytes.
/// - Parameter value: A new byte chunk. If `nil`, then the source of bytes is finished.
/// - Returns: An action to perform.
mutating func receivedValue(_ value: ArraySlice<UInt8>?) -> ReceivedValueAction {
switch state {
case .waitingForDelimiter(var buffer):
if let value {
state = .mutating
buffer.append(contentsOf: value)
state = .waitingForDelimiter(buffer: buffer)
return .noop
} else {
let line = ArraySlice(buffer)
buffer = []
state = .finished
if line.isEmpty { return .returnNil } else { return .emitLine(line) }
}
case .finished, .mutating: preconditionFailure("Invalid state")
}
}
}
}
156 changes: 156 additions & 0 deletions Sources/OpenAPIRuntime/EventStreams/JSONLinesEncoding.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftOpenAPIGenerator open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if canImport(Darwin)
import class Foundation.JSONEncoder
#else
@preconcurrency import class Foundation.JSONEncoder
#endif

/// A sequence that serializes lines by concatenating them using the JSON Lines format.
public struct JSONLinesSerializationSequence<Upstream: AsyncSequence & Sendable>: Sendable
where Upstream.Element == ArraySlice<UInt8> {

/// The upstream sequence.
private let upstream: Upstream

/// Creates a new sequence.
/// - Parameter upstream: The upstream sequence of lines.
public init(upstream: Upstream) { self.upstream = upstream }
}

extension JSONLinesSerializationSequence: AsyncSequence {

/// The type of element produced by this asynchronous sequence.
public typealias Element = ArraySlice<UInt8>

/// The iterator of `JSONLinesSerializationSequence`.
public struct Iterator<UpstreamIterator: AsyncIteratorProtocol>: AsyncIteratorProtocol
where UpstreamIterator.Element == Element {

/// The upstream iterator of lines.
var upstream: UpstreamIterator

/// The state machine of the iterator.
var stateMachine: StateMachine = .init()

/// Asynchronously advances to the next element and returns it, or ends the
/// sequence if there is no next element.
public mutating func next() async throws -> ArraySlice<UInt8>? {
while true {
switch stateMachine.next() {
case .returnNil: return nil
case .needsMore:
let value = try await upstream.next()
switch stateMachine.receivedValue(value) {
case .returnNil: return nil
case .emitBytes(let bytes): return bytes
}
}
}
}
}

/// Creates the asynchronous iterator that produces elements of this
/// asynchronous sequence.
public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator> {
Iterator(upstream: upstream.makeAsyncIterator())
}
}

extension AsyncSequence where Element: Encodable & Sendable, Self: Sendable {

/// Returns another sequence that encodes the events using the provided encoder into JSON Lines.
/// - Parameter encoder: The JSON encoder to use.
/// - Returns: A sequence that provides the serialized JSON Lines.
public func asEncodedJSONLines(
encoder: JSONEncoder = {
let encoder = JSONEncoder()
encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes]
return encoder
}()
) -> JSONLinesSerializationSequence<AsyncThrowingMapSequence<Self, ArraySlice<UInt8>>> {
.init(upstream: map { event in try ArraySlice(encoder.encode(event)) })
}
}

extension JSONLinesSerializationSequence.Iterator {

/// A state machine representing the JSON Lines serializer.
struct StateMachine {

/// The possible states of the state machine.
enum State {

/// Is emitting serialized JSON Lines events.
case running

/// Finished, the terminal state.
case finished
}

/// The current state of the state machine.
private(set) var state: State

/// Creates a new state machine.
init() { self.state = .running }

/// An action returned by the `next` method.
enum NextAction {

/// Return nil to the caller, no more bytes.
case returnNil

/// Needs more bytes.
case needsMore
}

/// Read the next byte chunk serialized from upstream lines.
/// - Returns: An action to perform.
mutating func next() -> NextAction {
switch state {
case .running: return .needsMore
case .finished: return .returnNil
}
}

/// An action returned by the `receivedValue` method.
enum ReceivedValueAction {

/// Return nil to the caller, no more bytes.
case returnNil

/// Emit the provided bytes.
case emitBytes(ArraySlice<UInt8>)
}

/// Ingest the provided line.
/// - Parameter value: A new line. If `nil`, then the source of line is finished.
/// - Returns: An action to perform.
mutating func receivedValue(_ value: ArraySlice<UInt8>?) -> ReceivedValueAction {
switch state {
case .running:
if let value {
var buffer = value
buffer.append(ASCII.lf)
return .emitBytes(ArraySlice(buffer))
} else {
state = .finished
return .returnNil
}
case .finished: preconditionFailure("Invalid state")
}
}
}
}
Loading

0 comments on commit 6be221f

Please sign in to comment.