From 6be221fe0ae025d01a49cd733df746db0e3e8fc9 Mon Sep 17 00:00:00 2001 From: Honza Dvorsky Date: Mon, 8 Jan 2024 14:06:57 +0000 Subject: [PATCH] [Runtime] SOAR-0010: Event streams sequences (#91) ### Motivation Land changes approved in https://github.com/apple/swift-openapi-generator/pull/495. ### Modifications Introduced the new APIs. ### Result Easy use of event streams. ### Test Plan Added unit tests for all. --------- Co-authored-by: Si Beaumont --- .../{Multipart => Base}/ByteUtilities.swift | 34 ++ .../EventStreams/JSONLinesDecoding.swift | 181 +++++++ .../EventStreams/JSONLinesEncoding.swift | 156 ++++++ .../EventStreams/JSONSequenceDecoding.swift | 236 +++++++++ .../EventStreams/JSONSequenceEncoding.swift | 157 ++++++ .../EventStreams/ServerSentEvents.swift | 87 ++++ .../ServerSentEventsDecoding.swift | 451 ++++++++++++++++++ .../ServerSentEventsEncoding.swift | 190 ++++++++ .../MultipartFramesToBytesSequence.swift | 1 + .../EventStreams/Test_JSONLinesDecoding.swift | 33 ++ .../EventStreams/Test_JSONLinesEncoding.swift | 29 ++ .../Test_JSONSequenceDecoding.swift | 32 ++ .../Test_JSONSequenceEncoding.swift | 30 ++ .../Test_ServerSentEventsDecoding.swift | 144 ++++++ .../Test_ServerSentEventsEncoding.swift | 103 ++++ Tests/OpenAPIRuntimeTests/Test_Runtime.swift | 59 ++- 16 files changed, 1915 insertions(+), 8 deletions(-) rename Sources/OpenAPIRuntime/{Multipart => Base}/ByteUtilities.swift (81%) create mode 100644 Sources/OpenAPIRuntime/EventStreams/JSONLinesDecoding.swift create mode 100644 Sources/OpenAPIRuntime/EventStreams/JSONLinesEncoding.swift create mode 100644 Sources/OpenAPIRuntime/EventStreams/JSONSequenceDecoding.swift create mode 100644 Sources/OpenAPIRuntime/EventStreams/JSONSequenceEncoding.swift create mode 100644 Sources/OpenAPIRuntime/EventStreams/ServerSentEvents.swift create mode 100644 Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift create mode 100644 Sources/OpenAPIRuntime/EventStreams/ServerSentEventsEncoding.swift create mode 100644 Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONLinesDecoding.swift create mode 100644 Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONLinesEncoding.swift create mode 100644 Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONSequenceDecoding.swift create mode 100644 Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONSequenceEncoding.swift create mode 100644 Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift create mode 100644 Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsEncoding.swift diff --git a/Sources/OpenAPIRuntime/Multipart/ByteUtilities.swift b/Sources/OpenAPIRuntime/Base/ByteUtilities.swift similarity index 81% rename from Sources/OpenAPIRuntime/Multipart/ByteUtilities.swift rename to Sources/OpenAPIRuntime/Base/ByteUtilities.swift index 9ae1c6a5..039c03f2 100644 --- a/Sources/OpenAPIRuntime/Multipart/ByteUtilities.swift +++ b/Sources/OpenAPIRuntime/Base/ByteUtilities.swift @@ -24,6 +24,9 @@ enum ASCII { /// The line feed `` character. static let lf: UInt8 = 0x0a + /// The record separator `` character. + static let rs: UInt8 = 0x1e + /// The colon `:` character. static let colon: UInt8 = 0x3a @@ -122,3 +125,34 @@ extension RandomAccessCollection where Element: Equatable { return .noMatch } } + +/// A value returned by the `matchOfOneOf` method. +enum MatchOfOneOfResult { + + /// No match found at any position in self. + case noMatch + + /// The first option matched. + case first(C.Index) + + /// The second option matched. + case second(C.Index) +} + +extension RandomAccessCollection where Element: Equatable { + /// Returns the index of the first match of one of two elements. + /// - Parameters: + /// - first: The first element to match. + /// - second: The second element to match. + /// - Returns: The result. + func matchOfOneOf(first: Element, second: Element) -> MatchOfOneOfResult { + var index = startIndex + while index < endIndex { + let element = self[index] + if element == first { return .first(index) } + if element == second { return .second(index) } + formIndex(after: &index) + } + return .noMatch + } +} diff --git a/Sources/OpenAPIRuntime/EventStreams/JSONLinesDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/JSONLinesDecoding.swift new file mode 100644 index 00000000..eed9acdc --- /dev/null +++ b/Sources/OpenAPIRuntime/EventStreams/JSONLinesDecoding.swift @@ -0,0 +1,181 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if canImport(Darwin) +import class Foundation.JSONDecoder +#else +@preconcurrency import class Foundation.JSONDecoder +#endif +import struct Foundation.Data + +/// A sequence that parses arbitrary byte chunks into lines using the JSON Lines format. +public struct JSONLinesDeserializationSequence: Sendable +where Upstream.Element == ArraySlice { + + /// The upstream sequence. + private let upstream: Upstream + + /// Creates a new sequence. + /// - Parameter upstream: The upstream sequence of arbitrary byte chunks. + public init(upstream: Upstream) { self.upstream = upstream } +} + +extension JSONLinesDeserializationSequence: AsyncSequence { + + /// The type of element produced by this asynchronous sequence. + public typealias Element = ArraySlice + + /// The iterator of `JSONLinesDeserializationSequence`. + public struct Iterator: AsyncIteratorProtocol + where UpstreamIterator.Element == Element { + + /// The upstream iterator of arbitrary byte chunks. + var upstream: UpstreamIterator + + /// The state machine of the iterator. + var stateMachine: StateMachine = .init() + + /// Asynchronously advances to the next element and returns it, or ends the + /// sequence if there is no next element. + public mutating func next() async throws -> ArraySlice? { + while true { + switch stateMachine.next() { + case .returnNil: return nil + case .emitLine(let line): return line + case .needsMore: + let value = try await upstream.next() + switch stateMachine.receivedValue(value) { + case .returnNil: return nil + case .emitLine(let line): return line + case .noop: continue + } + } + } + } + } + + /// Creates the asynchronous iterator that produces elements of this + /// asynchronous sequence. + public func makeAsyncIterator() -> Iterator { + Iterator(upstream: upstream.makeAsyncIterator()) + } +} + +extension AsyncSequence where Element == ArraySlice { + + /// Returns another sequence that decodes each JSON Lines event as the provided type using the provided decoder. + /// - Parameters: + /// - eventType: The type to decode the JSON event into. + /// - decoder: The JSON decoder to use. + /// - Returns: A sequence that provides the decoded JSON events. + public func asDecodedJSONLines( + of eventType: Event.Type = Event.self, + decoder: JSONDecoder = .init() + ) -> AsyncThrowingMapSequence, Event> { + JSONLinesDeserializationSequence(upstream: self) + .map { line in try decoder.decode(Event.self, from: Data(line)) } + } +} + +extension JSONLinesDeserializationSequence.Iterator { + + /// A state machine representing the JSON Lines deserializer. + struct StateMachine { + + /// The possible states of the state machine. + enum State: Hashable { + + /// Is waiting for the end of line. + case waitingForDelimiter(buffer: [UInt8]) + + /// Finished, the terminal state. + case finished + + /// Helper state to avoid copy-on-write copies. + case mutating + } + + /// The current state of the state machine. + private(set) var state: State + + /// Creates a new state machine. + init() { self.state = .waitingForDelimiter(buffer: []) } + + /// An action returned by the `next` method. + enum NextAction { + + /// Return nil to the caller, no more bytes. + case returnNil + + /// Emit a full line. + case emitLine(ArraySlice) + + /// The line is not complete yet, needs more bytes. + case needsMore + } + + /// Read the next line parsed from upstream bytes. + /// - Returns: An action to perform. + mutating func next() -> NextAction { + switch state { + case .waitingForDelimiter(var buffer): + state = .mutating + guard let indexOfNewline = buffer.firstIndex(of: ASCII.lf) else { + state = .waitingForDelimiter(buffer: buffer) + return .needsMore + } + let line = buffer[..) + + /// No action, rerun the parsing loop. + case noop + } + + /// Ingest the provided bytes. + /// - Parameter value: A new byte chunk. If `nil`, then the source of bytes is finished. + /// - Returns: An action to perform. + mutating func receivedValue(_ value: ArraySlice?) -> ReceivedValueAction { + switch state { + case .waitingForDelimiter(var buffer): + if let value { + state = .mutating + buffer.append(contentsOf: value) + state = .waitingForDelimiter(buffer: buffer) + return .noop + } else { + let line = ArraySlice(buffer) + buffer = [] + state = .finished + if line.isEmpty { return .returnNil } else { return .emitLine(line) } + } + case .finished, .mutating: preconditionFailure("Invalid state") + } + } + } +} diff --git a/Sources/OpenAPIRuntime/EventStreams/JSONLinesEncoding.swift b/Sources/OpenAPIRuntime/EventStreams/JSONLinesEncoding.swift new file mode 100644 index 00000000..f1d9b9b8 --- /dev/null +++ b/Sources/OpenAPIRuntime/EventStreams/JSONLinesEncoding.swift @@ -0,0 +1,156 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if canImport(Darwin) +import class Foundation.JSONEncoder +#else +@preconcurrency import class Foundation.JSONEncoder +#endif + +/// A sequence that serializes lines by concatenating them using the JSON Lines format. +public struct JSONLinesSerializationSequence: Sendable +where Upstream.Element == ArraySlice { + + /// The upstream sequence. + private let upstream: Upstream + + /// Creates a new sequence. + /// - Parameter upstream: The upstream sequence of lines. + public init(upstream: Upstream) { self.upstream = upstream } +} + +extension JSONLinesSerializationSequence: AsyncSequence { + + /// The type of element produced by this asynchronous sequence. + public typealias Element = ArraySlice + + /// The iterator of `JSONLinesSerializationSequence`. + public struct Iterator: AsyncIteratorProtocol + where UpstreamIterator.Element == Element { + + /// The upstream iterator of lines. + var upstream: UpstreamIterator + + /// The state machine of the iterator. + var stateMachine: StateMachine = .init() + + /// Asynchronously advances to the next element and returns it, or ends the + /// sequence if there is no next element. + public mutating func next() async throws -> ArraySlice? { + while true { + switch stateMachine.next() { + case .returnNil: return nil + case .needsMore: + let value = try await upstream.next() + switch stateMachine.receivedValue(value) { + case .returnNil: return nil + case .emitBytes(let bytes): return bytes + } + } + } + } + } + + /// Creates the asynchronous iterator that produces elements of this + /// asynchronous sequence. + public func makeAsyncIterator() -> Iterator { + Iterator(upstream: upstream.makeAsyncIterator()) + } +} + +extension AsyncSequence where Element: Encodable & Sendable, Self: Sendable { + + /// Returns another sequence that encodes the events using the provided encoder into JSON Lines. + /// - Parameter encoder: The JSON encoder to use. + /// - Returns: A sequence that provides the serialized JSON Lines. + public func asEncodedJSONLines( + encoder: JSONEncoder = { + let encoder = JSONEncoder() + encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes] + return encoder + }() + ) -> JSONLinesSerializationSequence>> { + .init(upstream: map { event in try ArraySlice(encoder.encode(event)) }) + } +} + +extension JSONLinesSerializationSequence.Iterator { + + /// A state machine representing the JSON Lines serializer. + struct StateMachine { + + /// The possible states of the state machine. + enum State { + + /// Is emitting serialized JSON Lines events. + case running + + /// Finished, the terminal state. + case finished + } + + /// The current state of the state machine. + private(set) var state: State + + /// Creates a new state machine. + init() { self.state = .running } + + /// An action returned by the `next` method. + enum NextAction { + + /// Return nil to the caller, no more bytes. + case returnNil + + /// Needs more bytes. + case needsMore + } + + /// Read the next byte chunk serialized from upstream lines. + /// - Returns: An action to perform. + mutating func next() -> NextAction { + switch state { + case .running: return .needsMore + case .finished: return .returnNil + } + } + + /// An action returned by the `receivedValue` method. + enum ReceivedValueAction { + + /// Return nil to the caller, no more bytes. + case returnNil + + /// Emit the provided bytes. + case emitBytes(ArraySlice) + } + + /// Ingest the provided line. + /// - Parameter value: A new line. If `nil`, then the source of line is finished. + /// - Returns: An action to perform. + mutating func receivedValue(_ value: ArraySlice?) -> ReceivedValueAction { + switch state { + case .running: + if let value { + var buffer = value + buffer.append(ASCII.lf) + return .emitBytes(ArraySlice(buffer)) + } else { + state = .finished + return .returnNil + } + case .finished: preconditionFailure("Invalid state") + } + } + } +} diff --git a/Sources/OpenAPIRuntime/EventStreams/JSONSequenceDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/JSONSequenceDecoding.swift new file mode 100644 index 00000000..4b34658c --- /dev/null +++ b/Sources/OpenAPIRuntime/EventStreams/JSONSequenceDecoding.swift @@ -0,0 +1,236 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if canImport(Darwin) +import class Foundation.JSONDecoder +#else +@preconcurrency import class Foundation.JSONDecoder +#endif +import protocol Foundation.LocalizedError +import struct Foundation.Data + +/// A sequence that parses arbitrary byte chunks into lines using the JSON Sequence format. +public struct JSONSequenceDeserializationSequence: Sendable +where Upstream.Element == ArraySlice { + + /// The upstream sequence. + private let upstream: Upstream + + /// Creates a new sequence. + /// - Parameter upstream: The upstream sequence of arbitrary byte chunks. + public init(upstream: Upstream) { self.upstream = upstream } +} + +extension JSONSequenceDeserializationSequence: AsyncSequence { + + /// The type of element produced by this asynchronous sequence. + public typealias Element = ArraySlice + + /// An error thrown by the deserializer. + struct DeserializerError: Swift.Error, CustomStringConvertible, + LocalizedError + where UpstreamIterator.Element == Element { + + /// The underlying error emitted by the state machine. + let error: Iterator.StateMachine.ActionError + + var description: String { + switch error { + case .missingInitialRS: return "Missing an initial character, the bytes might not be a JSON Sequence." + } + } + + var errorDescription: String? { description } + } + + /// The iterator of `JSONSequenceDeserializationSequence`. + public struct Iterator: AsyncIteratorProtocol + where UpstreamIterator.Element == Element { + + /// The upstream iterator of arbitrary byte chunks. + var upstream: UpstreamIterator + + /// The state machine of the iterator. + var stateMachine: StateMachine = .init() + + /// Asynchronously advances to the next element and returns it, or ends the + /// sequence if there is no next element. + public mutating func next() async throws -> ArraySlice? { + while true { + switch stateMachine.next() { + case .returnNil: return nil + case .emitLine(let line): return line + case .needsMore: + let value = try await upstream.next() + switch stateMachine.receivedValue(value) { + case .returnNil: return nil + case .emitLine(let line): return line + case .noop: continue + } + case .emitError(let error): throw DeserializerError(error: error) + case .noop: continue + } + } + } + } + + /// Creates the asynchronous iterator that produces elements of this + /// asynchronous sequence. + public func makeAsyncIterator() -> Iterator { + Iterator(upstream: upstream.makeAsyncIterator()) + } +} + +extension AsyncSequence where Element == ArraySlice { + + /// Returns another sequence that decodes each JSON Sequence event as the provided type using the provided decoder. + /// - Parameters: + /// - eventType: The type to decode the JSON event into. + /// - decoder: The JSON decoder to use. + /// - Returns: A sequence that provides the decoded JSON events. + public func asDecodedJSONSequence( + of eventType: Event.Type = Event.self, + decoder: JSONDecoder = .init() + ) -> AsyncThrowingMapSequence, Event> { + JSONSequenceDeserializationSequence(upstream: self) + .map { line in try decoder.decode(Event.self, from: Data(line)) } + } +} + +extension JSONSequenceDeserializationSequence.Iterator { + + /// A state machine representing the JSON Lines deserializer. + struct StateMachine { + + /// The possible states of the state machine. + enum State: Hashable { + + /// Has not yet fully parsed the initial boundary. + case initial(buffer: [UInt8]) + + /// Is parsing a line, waiting for the end newline. + case parsingLine(buffer: [UInt8]) + + /// Finished, the terminal state. + case finished + + /// Helper state to avoid copy-on-write copies. + case mutating + } + + /// The current state of the state machine. + private(set) var state: State + + /// Creates a new state machine. + init() { self.state = .initial(buffer: []) } + + /// An error returned by the state machine. + enum ActionError { + + /// The initial boundary `` was not found. + case missingInitialRS + } + + /// An action returned by the `next` method. + enum NextAction { + + /// Return nil to the caller, no more bytes. + case returnNil + + /// Emit a full line. + case emitLine(ArraySlice) + + /// Emit an error. + case emitError(ActionError) + + /// The line is not complete yet, needs more bytes. + case needsMore + + /// Rerun the parsing loop. + case noop + } + + /// Read the next line parsed from upstream bytes. + /// - Returns: An action to perform. + mutating func next() -> NextAction { + switch state { + case .initial(var buffer): + guard !buffer.isEmpty else { return .needsMore } + guard buffer.first! == ASCII.rs else { return .emitError(.missingInitialRS) } + state = .mutating + buffer.removeFirst() + state = .parsingLine(buffer: buffer) + return .noop + case .parsingLine(var buffer): + state = .mutating + guard let indexOfRecordSeparator = buffer.firstIndex(of: ASCII.rs) else { + state = .parsingLine(buffer: buffer) + return .needsMore + } + let line = buffer[..) + + /// No action, rerun the parsing loop. + case noop + } + + /// Ingest the provided bytes. + /// - Parameter value: A new byte chunk. If `nil`, then the source of bytes is finished. + /// - Returns: An action to perform. + mutating func receivedValue(_ value: ArraySlice?) -> ReceivedValueAction { + switch state { + case .initial(var buffer): + if let value { + state = .mutating + buffer.append(contentsOf: value) + state = .initial(buffer: buffer) + return .noop + } else { + let line = ArraySlice(buffer) + buffer = [] + state = .finished + if line.isEmpty { return .returnNil } else { return .emitLine(line) } + } + case .parsingLine(var buffer): + if let value { + state = .mutating + buffer.append(contentsOf: value) + state = .parsingLine(buffer: buffer) + return .noop + } else { + let line = ArraySlice(buffer) + buffer = [] + state = .finished + if line.isEmpty { return .returnNil } else { return .emitLine(line) } + } + case .finished, .mutating: preconditionFailure("Invalid state") + } + } + } +} diff --git a/Sources/OpenAPIRuntime/EventStreams/JSONSequenceEncoding.swift b/Sources/OpenAPIRuntime/EventStreams/JSONSequenceEncoding.swift new file mode 100644 index 00000000..a6ffe940 --- /dev/null +++ b/Sources/OpenAPIRuntime/EventStreams/JSONSequenceEncoding.swift @@ -0,0 +1,157 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if canImport(Darwin) +import class Foundation.JSONEncoder +#else +@preconcurrency import class Foundation.JSONEncoder +#endif + +/// A sequence that serializes lines by concatenating them using the JSON Sequence format. +public struct JSONSequenceSerializationSequence: Sendable +where Upstream.Element == ArraySlice { + + /// The upstream sequence. + private let upstream: Upstream + + /// Creates a new sequence. + /// - Parameter upstream: The upstream sequence of lines. + public init(upstream: Upstream) { self.upstream = upstream } +} + +extension JSONSequenceSerializationSequence: AsyncSequence { + + /// The type of element produced by this asynchronous sequence. + public typealias Element = ArraySlice + + /// The iterator of `JSONSequenceSerializationSequence`. + public struct Iterator: AsyncIteratorProtocol + where UpstreamIterator.Element == Element { + + /// The upstream iterator of lines. + var upstream: UpstreamIterator + + /// The state machine of the iterator. + var stateMachine: StateMachine = .init() + + /// Asynchronously advances to the next element and returns it, or ends the + /// sequence if there is no next element. + public mutating func next() async throws -> ArraySlice? { + while true { + switch stateMachine.next() { + case .returnNil: return nil + case .needsMore: + let value = try await upstream.next() + switch stateMachine.receivedValue(value) { + case .returnNil: return nil + case .emitBytes(let bytes): return bytes + } + } + } + } + } + + /// Creates the asynchronous iterator that produces elements of this + /// asynchronous sequence. + public func makeAsyncIterator() -> Iterator { + Iterator(upstream: upstream.makeAsyncIterator()) + } +} + +extension AsyncSequence where Element: Encodable & Sendable, Self: Sendable { + + /// Returns another sequence that encodes the events using the provided encoder into a JSON Sequence. + /// - Parameter encoder: The JSON encoder to use. + /// - Returns: A sequence that provides the serialized JSON Sequence. + public func asEncodedJSONSequence( + encoder: JSONEncoder = { + let encoder = JSONEncoder() + encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes] + return encoder + }() + ) -> JSONSequenceSerializationSequence>> { + .init(upstream: map { event in try ArraySlice(encoder.encode(event)) }) + } +} + +extension JSONSequenceSerializationSequence.Iterator { + + /// A state machine representing the JSON Sequence serializer. + struct StateMachine { + + /// The possible states of the state machine. + enum State { + + /// Is emitting serialized JSON Sequence events. + case running + + /// Finished, the terminal state. + case finished + } + + /// The current state of the state machine. + private(set) var state: State + /// Creates a new state machine. + init() { self.state = .running } + /// An action returned by the `next` method. + enum NextAction { + + /// Return nil to the caller, no more bytes. + case returnNil + + /// Needs more bytes. + case needsMore + } + + /// Read the next byte chunk serialized from upstream lines. + /// - Returns: An action to perform. + mutating func next() -> NextAction { + switch state { + case .running: return .needsMore + case .finished: return .returnNil + } + } + + /// An action returned by the `receivedValue` method. + enum ReceivedValueAction { + + /// Return nil to the caller, no more bytes. + case returnNil + + /// Emit the provided bytes. + case emitBytes(ArraySlice) + } + + /// Ingest the provided line. + /// - Parameter value: A new line. If `nil`, then the source of line is finished. + /// - Returns: An action to perform. + mutating func receivedValue(_ value: ArraySlice?) -> ReceivedValueAction { + switch state { + case .running: + if let value { + var buffer: [UInt8] = [] + buffer.reserveCapacity(value.count + 2) + buffer.append(ASCII.rs) + buffer.append(contentsOf: value) + buffer.append(ASCII.lf) + return .emitBytes(ArraySlice(buffer)) + } else { + state = .finished + return .returnNil + } + case .finished: preconditionFailure("Invalid state") + } + } + } +} diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEvents.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEvents.swift new file mode 100644 index 00000000..bced26f9 --- /dev/null +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEvents.swift @@ -0,0 +1,87 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// An event sent by the server that has a JSON payload in the data field. +/// +/// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation +public struct ServerSentEventWithJSONData: Sendable, Hashable { + + /// A type of the event, helps inform how to interpret the data. + public var event: String? + + /// The payload of the event. + public var data: JSONDataType? + + /// A unique identifier of the event, can be used to resume an interrupted stream by + /// making a new request with the `Last-Event-ID` header field set to this value. + /// + /// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-last-event-id-header + public var id: String? + + /// The amount of time, in milliseconds, the client should wait before reconnecting in case + /// of an interruption. + /// + /// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface + public var retry: Int64? + + /// Creates a new event. + /// - Parameters: + /// - event: A type of the event, helps inform how to interpret the data. + /// - data: The payload of the event. + /// - id: A unique identifier of the event. + /// - retry: The amount of time, in milliseconds, to wait before retrying. + public init(event: String? = nil, data: JSONDataType? = nil, id: String? = nil, retry: Int64? = nil) { + self.event = event + self.data = data + self.id = id + self.retry = retry + } +} + +/// An event sent by the server. +/// +/// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation +public struct ServerSentEvent: Sendable, Hashable { + + /// A unique identifier of the event, can be used to resume an interrupted stream by + /// making a new request with the `Last-Event-ID` header field set to this value. + /// + /// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-last-event-id-header + public var id: String? + + /// A type of the event, helps inform how to interpret the data. + public var event: String? + + /// The payload of the event. + public var data: String? + + /// The amount of time, in milliseconds, the client should wait before reconnecting in case + /// of an interruption. + /// + /// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface + public var retry: Int64? + + /// Creates a new event. + /// - Parameters: + /// - id: A unique identifier of the event. + /// - event: A type of the event, helps inform how to interpret the data. + /// - data: The payload of the event. + /// - retry: The amount of time, in milliseconds, to wait before retrying. + public init(id: String? = nil, event: String? = nil, data: String? = nil, retry: Int64? = nil) { + self.id = id + self.event = event + self.data = data + self.retry = retry + } +} diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift new file mode 100644 index 00000000..421e5319 --- /dev/null +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -0,0 +1,451 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if canImport(Darwin) +import class Foundation.JSONDecoder +#else +@preconcurrency import class Foundation.JSONDecoder +#endif +import struct Foundation.Data + +/// A sequence that parses arbitrary byte chunks into events using the Server-sent Events format. +/// +/// https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events +public struct ServerSentEventsDeserializationSequence: Sendable +where Upstream.Element == ArraySlice { + + /// The upstream sequence. + private let upstream: Upstream + + /// Creates a new sequence. + /// - Parameter upstream: The upstream sequence of arbitrary byte chunks. + public init(upstream: Upstream) { self.upstream = upstream } +} + +extension ServerSentEventsDeserializationSequence: AsyncSequence { + + /// The type of element produced by this asynchronous sequence. + public typealias Element = ServerSentEvent + + /// The iterator of `ServerSentEventsDeserializationSequence`. + public struct Iterator: AsyncIteratorProtocol + where UpstreamIterator.Element == ArraySlice { + + /// The upstream iterator of arbitrary byte chunks. + var upstream: UpstreamIterator + + /// The state machine of the iterator. + var stateMachine: StateMachine = .init() + + /// Asynchronously advances to the next element and returns it, or ends the + /// sequence if there is no next element. + public mutating func next() async throws -> ServerSentEvent? { + while true { + switch stateMachine.next() { + case .returnNil: return nil + case .emitEvent(let event): return event + case .noop: continue + case .needsMore: + let value = try await upstream.next() + switch stateMachine.receivedValue(value) { + case .returnNil: return nil + case .noop: continue + } + } + } + } + } + + /// Creates the asynchronous iterator that produces elements of this + /// asynchronous sequence. + public func makeAsyncIterator() -> Iterator { + Iterator(upstream: upstream.makeAsyncIterator()) + } +} + +extension AsyncSequence where Element == ArraySlice, Self: Sendable { + + /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. + /// + /// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`. + /// - Returns: A sequence that provides the events. + public func asDecodedServerSentEvents() -> ServerSentEventsDeserializationSequence< + ServerSentEventsLineDeserializationSequence + > { .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self)) } + + /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. + /// + /// Use this method if the event's `data` field is JSON. + /// - Parameters: + /// - dataType: The type to decode the JSON data into. + /// - decoder: The JSON decoder to use. + /// - Returns: A sequence that provides the events with the decoded JSON data. + public func asDecodedServerSentEventsWithJSONData( + of dataType: JSONDataType.Type = JSONDataType.self, + decoder: JSONDecoder = .init() + ) -> AsyncThrowingMapSequence< + ServerSentEventsDeserializationSequence>, + ServerSentEventWithJSONData + > { + asDecodedServerSentEvents() + .map { event in + ServerSentEventWithJSONData( + event: event.event, + data: try event.data.flatMap { stringData in + try decoder.decode(JSONDataType.self, from: Data(stringData.utf8)) + }, + id: event.id, + retry: event.retry + ) + } + } +} + +extension ServerSentEventsDeserializationSequence.Iterator { + + /// A state machine representing the Server-sent Events deserializer. + struct StateMachine { + + /// The possible states of the state machine. + enum State: Hashable { + + /// Accumulating an event, which hasn't been emitted yet. + case accumulatingEvent(ServerSentEvent, buffer: [ArraySlice]) + + /// Finished, the terminal state. + case finished + + /// Helper state to avoid copy-on-write copies. + case mutating + } + + /// The current state of the state machine. + private(set) var state: State + + /// Creates a new state machine. + init() { self.state = .accumulatingEvent(.init(), buffer: []) } + + /// An action returned by the `next` method. + enum NextAction { + + /// Return nil to the caller, no more bytes. + case returnNil + + /// Emit a completed event. + case emitEvent(ServerSentEvent) + + /// The line is not complete yet, needs more bytes. + case needsMore + + /// Rerun the parsing loop. + case noop + } + + /// Read the next line parsed from upstream bytes. + /// - Returns: An action to perform. + mutating func next() -> NextAction { + switch state { + case .accumulatingEvent(var event, var buffer): + guard let line = buffer.first else { return .needsMore } + state = .mutating + buffer.removeFirst() + if line.isEmpty { + // Dispatch the accumulated event. + state = .accumulatingEvent(.init(), buffer: buffer) + // If the last character of data is a newline, strip it. + if event.data?.hasSuffix("\n") ?? false { event.data?.removeLast() } + return .emitEvent(event) + } + if line.first! == ASCII.colon { + // A comment, skip this line. + state = .accumulatingEvent(event, buffer: buffer) + return .noop + } + // Parse the field name and value. + let field: String + let value: String? + if let indexOfFirstColon = line.firstIndex(of: ASCII.colon) { + field = String(decoding: line[.. + if valueBytes.isEmpty { + resolvedValueBytes = [] + } else if valueBytes.first! == ASCII.space { + resolvedValueBytes = valueBytes.dropFirst() + } else { + resolvedValueBytes = valueBytes + } + value = String(decoding: resolvedValueBytes, as: UTF8.self) + } else { + field = String(decoding: line, as: UTF8.self) + value = nil + } + guard let value else { + // An unknown type of event, skip. + state = .accumulatingEvent(event, buffer: buffer) + return .noop + } + // Process the field. + switch field { + case "event": event.event = value + case "data": + var data = event.data ?? "" + data.append(value) + data.append("\n") + event.data = data + case "id": event.id = value + case "retry": + if let retry = Int64(value) { + event.retry = retry + } else { + // Skip this line. + fallthrough + } + default: + // An unknown or invalid field, skip. + state = .accumulatingEvent(event, buffer: buffer) + return .noop + } + // Processed the field, continue. + state = .accumulatingEvent(event, buffer: buffer) + return .noop + case .finished: return .returnNil + case .mutating: preconditionFailure("Invalid state") + } + } + + /// An action returned by the `receivedValue` method. + enum ReceivedValueAction { + + /// Return nil to the caller, no more lines. + case returnNil + + /// No action, rerun the parsing loop. + case noop + } + + /// Ingest the provided bytes. + /// - Parameter value: A new byte chunk. If `nil`, then the source of bytes is finished. + /// - Returns: An action to perform. + mutating func receivedValue(_ value: ArraySlice?) -> ReceivedValueAction { + switch state { + case .accumulatingEvent(let event, var buffer): + if let value { + state = .mutating + buffer.append(value) + state = .accumulatingEvent(event, buffer: buffer) + return .noop + } else { + // If no value is received, drop the existing event on the floor. + // The specification explicitly states this. + // > Once the end of the file is reached, any pending data must be discarded. + // > (If the file ends in the middle of an event, before the final empty line, + // > the incomplete event is not dispatched.) + // Source: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation + state = .finished + return .returnNil + } + case .finished, .mutating: preconditionFailure("Invalid state") + } + } + } +} + +/// A sequence that parses arbitrary byte chunks into lines using the Server-sent Events format. +public struct ServerSentEventsLineDeserializationSequence: Sendable +where Upstream.Element == ArraySlice { + + /// The upstream sequence. + private let upstream: Upstream + + /// Creates a new sequence. + /// - Parameter upstream: The upstream sequence of arbitrary byte chunks. + public init(upstream: Upstream) { self.upstream = upstream } +} + +extension ServerSentEventsLineDeserializationSequence: AsyncSequence { + + /// The type of element produced by this asynchronous sequence. + public typealias Element = ArraySlice + + /// The iterator of `ServerSentEventsLineDeserializationSequence`. + public struct Iterator: AsyncIteratorProtocol + where UpstreamIterator.Element == Element { + + /// The upstream iterator of arbitrary byte chunks. + var upstream: UpstreamIterator + + /// The state machine of the iterator. + var stateMachine: StateMachine = .init() + + /// Asynchronously advances to the next element and returns it, or ends the + /// sequence if there is no next element. + public mutating func next() async throws -> ArraySlice? { + while true { + switch stateMachine.next() { + case .returnNil: return nil + case .emitLine(let line): return line + case .noop: continue + case .needsMore: + let value = try await upstream.next() + switch stateMachine.receivedValue(value) { + case .returnNil: return nil + case .emitLine(let line): return line + case .noop: continue + } + } + } + } + } + + /// Creates the asynchronous iterator that produces elements of this + /// asynchronous sequence. + public func makeAsyncIterator() -> Iterator { + Iterator(upstream: upstream.makeAsyncIterator()) + } +} + +extension ServerSentEventsLineDeserializationSequence.Iterator { + + /// A state machine for parsing lines in Server-Sent Events. + /// + /// https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream + /// + /// This is not trivial to do with a streaming parser, as the end of line can be: + /// - LF + /// - CR + /// - CRLF + /// + /// So when we get CR, but have no more data, we want to be able to emit the previous line, + /// however we need to discard a LF if one comes. + struct StateMachine { + + /// A state machine representing the Server-sent Events deserializer. + enum State { + + /// Is waiting for the end of line. + case waitingForEndOfLine(buffer: [UInt8]) + + /// Consumed a `` character, so possibly the end of line. + case consumedCR(buffer: [UInt8]) + + /// Finished, the terminal state. + case finished + + /// Helper state to avoid copy-on-write copies. + case mutating + } + + /// The current state of the state machine. + private(set) var state: State + + /// Creates a new state machine. + init() { self.state = .waitingForEndOfLine(buffer: []) } + + /// An action returned by the `next` method. + enum NextAction { + + /// Return nil to the caller, no more bytes. + case returnNil + + /// Emit a full line. + case emitLine(ArraySlice) + + /// The line is not complete yet, needs more bytes. + case needsMore + + /// No action, rerun the parsing loop. + case noop + } + + mutating func next() -> NextAction { + switch state { + case .waitingForEndOfLine(var buffer): + switch buffer.matchOfOneOf(first: ASCII.lf, second: ASCII.cr) { + case .noMatch: return .needsMore + case .first(let index): + // Just a LF, so consume the line and move onto the next line. + state = .mutating + let line = buffer[..) + + /// No action, rerun the parsing loop. + case noop + } + + /// Ingest the provided bytes. + /// - Parameter value: A new byte chunk. If `nil`, then the source of bytes is finished. + /// - Returns: An action to perform. + mutating func receivedValue(_ value: ArraySlice?) -> ReceivedValueAction { + switch state { + case .waitingForEndOfLine(var buffer): + if let value { + state = .mutating + buffer.append(contentsOf: value) + state = .waitingForEndOfLine(buffer: buffer) + return .noop + } else { + let line = ArraySlice(buffer) + buffer = [] + state = .finished + if line.isEmpty { return .returnNil } else { return .emitLine(line) } + } + case .consumedCR(var buffer): + if let value { + state = .mutating + buffer.append(contentsOf: value) + state = .consumedCR(buffer: buffer) + return .noop + } else { + let line = ArraySlice(buffer) + buffer = [] + state = .finished + if line.isEmpty { return .returnNil } else { return .emitLine(line) } + } + case .finished, .mutating: preconditionFailure("Invalid state") + } + } + } +} diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsEncoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsEncoding.swift new file mode 100644 index 00000000..853d76d2 --- /dev/null +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsEncoding.swift @@ -0,0 +1,190 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if canImport(Darwin) +import class Foundation.JSONEncoder +#else +@preconcurrency import class Foundation.JSONEncoder +#endif + +/// A sequence that serializes Server-sent Events. +public struct ServerSentEventsSerializationSequence: Sendable +where Upstream.Element == ServerSentEvent { + + /// The upstream sequence. + private let upstream: Upstream + + /// Creates a new sequence. + /// - Parameter upstream: The upstream sequence of events. + public init(upstream: Upstream) { self.upstream = upstream } +} + +extension ServerSentEventsSerializationSequence: AsyncSequence { + + /// The type of element produced by this asynchronous sequence. + public typealias Element = ArraySlice + + /// The iterator of `ServerSentEventsSerializationSequence`. + public struct Iterator: AsyncIteratorProtocol + where UpstreamIterator.Element == ServerSentEvent { + + /// The upstream iterator of lines. + var upstream: UpstreamIterator + + /// The state machine of the iterator. + var stateMachine: StateMachine = .init() + + /// Asynchronously advances to the next element and returns it, or ends the + /// sequence if there is no next element. + public mutating func next() async throws -> ArraySlice? { + while true { + switch stateMachine.next() { + case .returnNil: return nil + case .needsMore: + let value = try await upstream.next() + switch stateMachine.receivedValue(value) { + case .returnNil: return nil + case .returnBytes(let bytes): return bytes + } + } + } + } + } + + /// Creates the asynchronous iterator that produces elements of this + /// asynchronous sequence. + public func makeAsyncIterator() -> Iterator { + Iterator(upstream: upstream.makeAsyncIterator()) + } +} + +extension AsyncSequence { + + /// Returns another sequence that encodes Server-sent Events with generic data in the data field. + /// - Returns: A sequence that provides the serialized Server-sent Events. + public func asEncodedServerSentEvents() -> ServerSentEventsSerializationSequence + where Element == ServerSentEvent { .init(upstream: self) } + + /// Returns another sequence that encodes Server-sent Events that have a JSON value in the data field. + /// - Parameter encoder: The JSON encoder to use. + /// - Returns: A sequence that provides the serialized Server-sent Events. + public func asEncodedServerSentEventsWithJSONData( + encoder: JSONEncoder = { + let encoder = JSONEncoder() + encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes] + return encoder + }() + ) -> ServerSentEventsSerializationSequence> + where Element == ServerSentEventWithJSONData { + ServerSentEventsSerializationSequence( + upstream: map { event in + ServerSentEvent( + id: event.id, + event: event.event, + data: try event.data.flatMap { try String(decoding: encoder.encode($0), as: UTF8.self) }, + retry: event.retry + ) + } + ) + } +} + +extension ServerSentEventsSerializationSequence.Iterator { + + /// A state machine representing the JSON Lines serializer. + struct StateMachine { + + /// The possible states of the state machine. + enum State { + + /// Is emitting serialized JSON Lines events. + case running + + /// Finished, the terminal state. + case finished + } + + /// The current state of the state machine. + private(set) var state: State + + /// Creates a new state machine. + init() { self.state = .running } + + /// An action returned by the `next` method. + enum NextAction { + + /// Return nil to the caller, no more bytes. + case returnNil + + /// Needs more bytes. + case needsMore + } + + /// Read the next byte chunk serialized from upstream lines. + /// - Returns: An action to perform. + mutating func next() -> NextAction { + switch state { + case .running: return .needsMore + case .finished: return .returnNil + } + } + + /// An action returned by the `receivedValue` method. + enum ReceivedValueAction { + + /// Return nil to the caller, no more bytes. + case returnNil + + /// Emit the provided bytes. + case returnBytes(ArraySlice) + } + + /// Ingest the provided event. + /// - Parameter value: A new event. If `nil`, then the source of events is finished. + /// - Returns: An action to perform. + mutating func receivedValue(_ value: ServerSentEvent?) -> ReceivedValueAction { + switch state { + case .running: + if let value { + var buffer: [UInt8] = [] + func encodeField(name: String, value: some StringProtocol) { + buffer.append(contentsOf: name.utf8) + buffer.append(ASCII.colon) + buffer.append(ASCII.space) + buffer.append(contentsOf: value.utf8) + buffer.append(ASCII.lf) + } + if let id = value.id { encodeField(name: "id", value: id) } + if let event = value.event { encodeField(name: "event", value: event) } + if let retry = value.retry { encodeField(name: "retry", value: String(retry)) } + if let data = value.data { + // Normalize the data section by replacing CRLF and CR with just LF. + // Then split the section into individual field/value pairs. + let lines = data.replacingOccurrences(of: "\r\n", with: "\n") + .replacingOccurrences(of: "\r", with: "\n") + .split(separator: "\n", omittingEmptySubsequences: false) + for line in lines { encodeField(name: "data", value: line) } + } + // End the event. + buffer.append(ASCII.lf) + return .returnBytes(ArraySlice(buffer)) + } else { + state = .finished + return .returnNil + } + case .finished: preconditionFailure("Invalid state") + } + } + } +} diff --git a/Sources/OpenAPIRuntime/Multipart/MultipartFramesToBytesSequence.swift b/Sources/OpenAPIRuntime/Multipart/MultipartFramesToBytesSequence.swift index 441c85fd..07538233 100644 --- a/Sources/OpenAPIRuntime/Multipart/MultipartFramesToBytesSequence.swift +++ b/Sources/OpenAPIRuntime/Multipart/MultipartFramesToBytesSequence.swift @@ -90,6 +90,7 @@ struct MultipartSerializer { self.stateMachine = .init() self.outBuffer = [] } + /// Requests the next byte chunk. /// - Parameter fetchFrame: A closure that is called when the serializer is ready to serialize the next frame. /// - Returns: A byte chunk. diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONLinesDecoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONLinesDecoding.swift new file mode 100644 index 00000000..a841dc67 --- /dev/null +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONLinesDecoding.swift @@ -0,0 +1,33 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +@_spi(Generated) @testable import OpenAPIRuntime +import Foundation + +final class Test_JSONLinesDecoding: Test_Runtime { + func testParsed() async throws { + let upstream = asOneBytePerElementSequence(ArraySlice("hello\nworld\n".utf8)) + let sequence = JSONLinesDeserializationSequence(upstream: upstream) + let lines = try await [ArraySlice](collecting: sequence) + XCTAssertEqual(lines.count, 2) + XCTAssertEqualData(lines[0], "hello".utf8) + XCTAssertEqualData(lines[1], "world".utf8) + } + + func testTyped() async throws { + let sequence = testJSONLinesOneBytePerElementSequence.asDecodedJSONLines(of: TestPet.self) + let events = try await [TestPet](collecting: sequence) + XCTAssertEqual(events, testEvents) + } +} diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONLinesEncoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONLinesEncoding.swift new file mode 100644 index 00000000..d7a44319 --- /dev/null +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONLinesEncoding.swift @@ -0,0 +1,29 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +@_spi(Generated) @testable import OpenAPIRuntime +import Foundation + +final class Test_JSONLinesEncoding: Test_Runtime { + func testSerialized() async throws { + let upstream = WrappedSyncSequence(sequence: [ArraySlice("hello".utf8), ArraySlice("world".utf8)]) + let sequence = JSONLinesSerializationSequence(upstream: upstream) + try await XCTAssertEqualAsyncData(sequence, "hello\nworld\n".utf8) + } + + func testTyped() async throws { + let sequence = testEventsAsyncSequence.asEncodedJSONLines() + try await XCTAssertEqualAsyncData(sequence, testJSONLinesBytes) + } +} diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONSequenceDecoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONSequenceDecoding.swift new file mode 100644 index 00000000..ab967e1e --- /dev/null +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONSequenceDecoding.swift @@ -0,0 +1,32 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +@_spi(Generated) @testable import OpenAPIRuntime +import Foundation + +final class Test_JSONSequenceDecoding: Test_Runtime { + func testParsed() async throws { + let upstream = testJSONSequenceOneBytePerElementSequence + let sequence = JSONSequenceDeserializationSequence(upstream: upstream) + let events = try await [ArraySlice](collecting: sequence) + XCTAssertEqual(events.count, 2) + XCTAssertEqualData(events[0], "{\"name\":\"Rover\"}\n".utf8) + XCTAssertEqualData(events[1], "{\"name\":\"Pancake\"}\n".utf8) + } + func testTyped() async throws { + let sequence = testJSONSequenceOneBytePerElementSequence.asDecodedJSONSequence(of: TestPet.self) + let events = try await [TestPet](collecting: sequence) + XCTAssertEqual(events, testEvents) + } +} diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONSequenceEncoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONSequenceEncoding.swift new file mode 100644 index 00000000..f03e00cd --- /dev/null +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_JSONSequenceEncoding.swift @@ -0,0 +1,30 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +@_spi(Generated) @testable import OpenAPIRuntime +import Foundation + +final class Test_JSONSequenceEncoding: Test_Runtime { + func testSerialized() async throws { + let upstream = WrappedSyncSequence(sequence: [ + ArraySlice(#"{"name":"Rover"}"#.utf8), ArraySlice(#"{"name":"Pancake"}"#.utf8), + ]) + let sequence = JSONSequenceSerializationSequence(upstream: upstream) + try await XCTAssertEqualAsyncData(sequence, testJSONSequenceBytes) + } + func testTyped() async throws { + let sequence = testEventsAsyncSequence.asEncodedJSONSequence() + try await XCTAssertEqualAsyncData(sequence, testJSONSequenceBytes) + } +} diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift new file mode 100644 index 00000000..be98e6f1 --- /dev/null +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift @@ -0,0 +1,144 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +@_spi(Generated) @testable import OpenAPIRuntime +import Foundation + +final class Test_ServerSentEventsDecoding: Test_Runtime { + func _test(input: String, output: [ServerSentEvent], file: StaticString = #file, line: UInt = #line) async throws { + let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)).asDecodedServerSentEvents() + let events = try await [ServerSentEvent](collecting: sequence) + XCTAssertEqual(events.count, output.count, file: file, line: line) + for (index, linePair) in zip(events, output).enumerated() { + let (actualEvent, expectedEvent) = linePair + XCTAssertEqual(actualEvent, expectedEvent, "Event: \(index)", file: file, line: line) + } + } + func test() async throws { + // Simple event. + try await _test( + input: #""" + data: hello + data: world + + + """#, + output: [.init(data: "hello\nworld")] + ) + // Two simple events. + try await _test( + input: #""" + data: hello + data: world + + data: hello2 + data: world2 + + + """#, + output: [.init(data: "hello\nworld"), .init(data: "hello2\nworld2")] + ) + // Incomplete event is not emitted. + try await _test( + input: #""" + data: hello + """#, + output: [] + ) + // A few events. + try await _test( + input: #""" + retry: 5000 + + data: This is the first message. + + data: This is the second + data: message. + + event: customEvent + data: This is a custom event message. + + id: 123 + data: This is a message with an ID. + + + """#, + output: [ + .init(retry: 5000), .init(data: "This is the first message."), + .init(data: "This is the second\nmessage."), + .init(event: "customEvent", data: "This is a custom event message."), + .init(id: "123", data: "This is a message with an ID."), + ] + ) + } + func _testJSONData( + input: String, + output: [ServerSentEventWithJSONData], + file: StaticString = #file, + line: UInt = #line + ) async throws { + let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)) + .asDecodedServerSentEventsWithJSONData(of: JSONType.self) + let events = try await [ServerSentEventWithJSONData](collecting: sequence) + XCTAssertEqual(events.count, output.count, file: file, line: line) + for (index, linePair) in zip(events, output).enumerated() { + let (actualEvent, expectedEvent) = linePair + XCTAssertEqual(actualEvent, expectedEvent, "Event: \(index)", file: file, line: line) + } + } + struct TestEvent: Decodable, Hashable, Sendable { var index: Int } + func testJSONData() async throws { + // Simple event. + try await _testJSONData( + input: #""" + event: event1 + id: 1 + data: {"index":1} + + event: event2 + id: 2 + data: { + data: "index": 2 + data: } + + + """#, + output: [ + .init(event: "event1", data: TestEvent(index: 1), id: "1"), + .init(event: "event2", data: TestEvent(index: 2), id: "2"), + ] + ) + } +} + +final class Test_ServerSentEventsDecoding_Lines: Test_Runtime { + func _test(input: String, output: [String], file: StaticString = #file, line: UInt = #line) async throws { + let upstream = asOneBytePerElementSequence(ArraySlice(input.utf8)) + let sequence = ServerSentEventsLineDeserializationSequence(upstream: upstream) + let lines = try await [ArraySlice](collecting: sequence) + XCTAssertEqual(lines.count, output.count, file: file, line: line) + for (index, linePair) in zip(lines, output).enumerated() { + let (actualLine, expectedLine) = linePair + XCTAssertEqualData(actualLine, expectedLine.utf8, "Line: \(index)", file: file, line: line) + } + } + func test() async throws { + // LF + try await _test(input: "hello\nworld\n", output: ["hello", "world"]) + // CR + try await _test(input: "hello\rworld\r", output: ["hello", "world"]) + // CRLF + try await _test(input: "hello\r\nworld\r\n", output: ["hello", "world"]) + } +} diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsEncoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsEncoding.swift new file mode 100644 index 00000000..db88cd60 --- /dev/null +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsEncoding.swift @@ -0,0 +1,103 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftOpenAPIGenerator open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +@_spi(Generated) @testable import OpenAPIRuntime +import Foundation + +final class Test_ServerSentEventsEncoding: Test_Runtime { + func _test(input: [ServerSentEvent], output: String, file: StaticString = #file, line: UInt = #line) async throws { + let sequence = WrappedSyncSequence(sequence: input).asEncodedServerSentEvents() + try await XCTAssertEqualAsyncData(sequence, output.utf8, file: file, line: line) + } + func test() async throws { + // Simple event. + try await _test( + input: [.init(data: "hello\nworld")], + output: #""" + data: hello + data: world + + + """# + ) + // Two simple events. + try await _test( + input: [.init(data: "hello\nworld"), .init(data: "hello2\nworld2")], + output: #""" + data: hello + data: world + + data: hello2 + data: world2 + + + """# + ) + // A few events. + try await _test( + input: [ + .init(retry: 5000), .init(data: "This is the first message."), + .init(data: "This is the second\nmessage."), + .init(event: "customEvent", data: "This is a custom event message."), + .init(id: "123", data: "This is a message with an ID."), + ], + output: #""" + retry: 5000 + + data: This is the first message. + + data: This is the second + data: message. + + event: customEvent + data: This is a custom event message. + + id: 123 + data: This is a message with an ID. + + + """# + ) + } + func _testJSONData( + input: [ServerSentEventWithJSONData], + output: String, + file: StaticString = #file, + line: UInt = #line + ) async throws { + let sequence = WrappedSyncSequence(sequence: input).asEncodedServerSentEventsWithJSONData() + try await XCTAssertEqualAsyncData(sequence, output.utf8, file: file, line: line) + } + struct TestEvent: Encodable, Hashable, Sendable { var index: Int } + func testJSONData() async throws { + // Simple event. + try await _testJSONData( + input: [ + .init(event: "event1", data: TestEvent(index: 1), id: "1"), + .init(event: "event2", data: TestEvent(index: 2), id: "2"), + ], + output: #""" + id: 1 + event: event1 + data: {"index":1} + + id: 2 + event: event2 + data: {"index":2} + + + """# + ) + } +} diff --git a/Tests/OpenAPIRuntimeTests/Test_Runtime.swift b/Tests/OpenAPIRuntimeTests/Test_Runtime.swift index 0d7d108e..7f1f2255 100644 --- a/Tests/OpenAPIRuntimeTests/Test_Runtime.swift +++ b/Tests/OpenAPIRuntimeTests/Test_Runtime.swift @@ -114,6 +114,29 @@ class Test_Runtime: XCTestCase { var testStructURLFormData: Data { Data(testStructURLFormString.utf8) } + var testEvents: [TestPet] { [.init(name: "Rover"), .init(name: "Pancake")] } + var testEventsAsyncSequence: WrappedSyncSequence<[TestPet]> { WrappedSyncSequence(sequence: testEvents) } + + var testJSONLinesBytes: ArraySlice { + let encoder = JSONEncoder() + let bytes = try! testEvents.map { try encoder.encode($0) + [ASCII.lf] }.joined() + return ArraySlice(bytes) + } + var testJSONSequenceBytes: ArraySlice { + let encoder = JSONEncoder() + let bytes = try! testEvents.map { try [ASCII.rs] + encoder.encode($0) + [ASCII.lf] }.joined() + return ArraySlice(bytes) + } + + func asOneBytePerElementSequence(_ source: ArraySlice) -> HTTPBody { + HTTPBody( + WrappedSyncSequence(sequence: source).map { ArraySlice([$0]) }, + length: .known(Int64(source.count)), + iterationBehavior: .multiple + ) + } + var testJSONLinesOneBytePerElementSequence: HTTPBody { asOneBytePerElementSequence(testJSONLinesBytes) } + var testJSONSequenceOneBytePerElementSequence: HTTPBody { asOneBytePerElementSequence(testJSONSequenceBytes) } @discardableResult func _testPrettyEncoded(_ value: Value, expectedJSON: String) throws -> String { let encoder = JSONEncoder() @@ -147,7 +170,7 @@ func chunkFromStringLines(_ strings: [String], addExtraCRLFs: Int = 0) -> ArrayS func chunkFromString(_ string: String, addCRLFs: Int = 0) -> ArraySlice { var slice = ArraySlice(string.utf8) - for _ in 0.. [UInt8] { Array(string.utf8) } extension ArraySlice { mutating func append(_ string: String) { append(contentsOf: chunkFromString(string)) } - mutating func appendCRLF() { append(contentsOf: [0x0d, 0x0a]) } + mutating func appendCRLF() { append(contentsOf: ASCII.crlf) } } struct TestError: Error, Equatable {} @@ -338,8 +361,9 @@ fileprivate extension UInt8 { var asHex: String { let original: String switch self { - case 0x0d: original = "CR" - case 0x0a: original = "LF" + case ASCII.cr: original = "CR" + case ASCII.lf: original = "LF" + case ASCII.rs: original = "RS" default: original = "\(UnicodeScalar(self)) " } return String(format: "%02x \(original)", self) @@ -395,17 +419,36 @@ public func XCTAssertEqualData( } /// Asserts that the data matches the expected value. -public func XCTAssertEqualData( - _ expression1: @autoclosure () throws -> HTTPBody?, +public func XCTAssertEqualAsyncData( + _ expression1: @autoclosure () throws -> AS?, _ expression2: @autoclosure () throws -> C, _ message: @autoclosure () -> String = "Data doesn't match.", file: StaticString = #filePath, line: UInt = #line -) async throws where C.Element == UInt8 { +) async throws where C.Element == UInt8, AS.Element == ArraySlice { guard let actualBytesBody = try expression1() else { XCTFail("First value is nil", file: file, line: line) return } - let actualBytes = try await [UInt8](collecting: actualBytesBody, upTo: .max) + let actualBytes = try await [ArraySlice](collecting: actualBytesBody).flatMap { $0 } XCTAssertEqualData(actualBytes, try expression2(), file: file, line: line) } + +/// Asserts that the data matches the expected value. +public func XCTAssertEqualData( + _ expression1: @autoclosure () throws -> HTTPBody?, + _ expression2: @autoclosure () throws -> C, + _ message: @autoclosure () -> String = "Data doesn't match.", + file: StaticString = #filePath, + line: UInt = #line +) async throws where C.Element == UInt8 { + try await XCTAssertEqualAsyncData(try expression1(), try expression2(), file: file, line: line) +} + +extension Array { + init(collecting source: Source) async throws where Source.Element == Element { + var elements: [Element] = [] + for try await element in source { elements.append(element) } + self = elements + } +}