From b5f0dd532704da82163afeb7e969235f9891cea9 Mon Sep 17 00:00:00 2001 From: Di Wu Date: Tue, 21 May 2024 15:36:02 +0000 Subject: [PATCH] refactor: use makeStream for creating AsyncStream with continuation (#3710) --- Amplify/Core/Support/AmplifyAsyncSequence.swift | 8 +++----- .../Support/AmplifyAsyncThrowingSequence.swift | 8 +++----- Amplify/Core/Support/TaskQueue.swift | 7 +++---- .../CancellableAsyncStream.swift | 16 ++++++++-------- 4 files changed, 17 insertions(+), 22 deletions(-) diff --git a/Amplify/Core/Support/AmplifyAsyncSequence.swift b/Amplify/Core/Support/AmplifyAsyncSequence.swift index b98c0f17bb..0320b6c9a3 100644 --- a/Amplify/Core/Support/AmplifyAsyncSequence.swift +++ b/Amplify/Core/Support/AmplifyAsyncSequence.swift @@ -11,8 +11,8 @@ public typealias WeakAmplifyAsyncSequenceRef = WeakRef: AsyncSequence, Cancellable { public typealias Iterator = AsyncStream.Iterator - private var asyncStream: AsyncStream! = nil - private var continuation: AsyncStream.Continuation! = nil + private let asyncStream: AsyncStream + private let continuation: AsyncStream.Continuation private var parent: Cancellable? public private(set) var isCancelled: Bool = false @@ -20,9 +20,7 @@ public class AmplifyAsyncSequence: AsyncSequence, Cancellable public init(parent: Cancellable? = nil, bufferingPolicy: AsyncStream.Continuation.BufferingPolicy = .unbounded) { self.parent = parent - asyncStream = AsyncStream(Element.self, bufferingPolicy: bufferingPolicy) { continuation in - self.continuation = continuation - } + (asyncStream, continuation) = AsyncStream.makeStream(of: Element.self, bufferingPolicy: bufferingPolicy) } public func makeAsyncIterator() -> Iterator { diff --git a/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift b/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift index 38772392da..3bb58d9f64 100644 --- a/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift +++ b/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift @@ -11,8 +11,8 @@ public typealias WeakAmplifyAsyncThrowingSequenceRef = WeakRef: AsyncSequence, Cancellable { public typealias Iterator = AsyncThrowingStream.Iterator - private var asyncStream: AsyncThrowingStream! = nil - private var continuation: AsyncThrowingStream.Continuation! = nil + private let asyncStream: AsyncThrowingStream + private let continuation: AsyncThrowingStream.Continuation private var parent: Cancellable? public private(set) var isCancelled: Bool = false @@ -20,9 +20,7 @@ public class AmplifyAsyncThrowingSequence: AsyncSequence, Can public init(parent: Cancellable? = nil, bufferingPolicy: AsyncThrowingStream.Continuation.BufferingPolicy = .unbounded) { self.parent = parent - asyncStream = AsyncThrowingStream(Element.self, bufferingPolicy: bufferingPolicy, { continuation in - self.continuation = continuation - }) + (asyncStream, continuation) = AsyncThrowingStream.makeStream(of: Element.self, bufferingPolicy: bufferingPolicy) } public func makeAsyncIterator() -> Iterator { diff --git a/Amplify/Core/Support/TaskQueue.swift b/Amplify/Core/Support/TaskQueue.swift index a09bfcc4d8..b7ba5c0553 100644 --- a/Amplify/Core/Support/TaskQueue.swift +++ b/Amplify/Core/Support/TaskQueue.swift @@ -10,12 +10,11 @@ import Foundation /// A helper for executing asynchronous work serially. public class TaskQueue { typealias Block = @Sendable () async -> Void - private var streamContinuation: AsyncStream.Continuation! + private let streamContinuation: AsyncStream.Continuation public init() { - let stream = AsyncStream.init { continuation in - streamContinuation = continuation - } + let (stream, continuation) = AsyncStream.makeStream(of: Block.self) + self.streamContinuation = continuation Task { for await block in stream { diff --git a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/CancellableAsyncStream.swift b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/CancellableAsyncStream.swift index 35ce4b65c1..e25c67408f 100644 --- a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/CancellableAsyncStream.swift +++ b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/CancellableAsyncStream.swift @@ -23,14 +23,14 @@ class CancellableAsyncStream: AsyncSequence { } convenience init(with publisher: AnyPublisher) { - var cancellable: AnyCancellable? - self.init(asyncStream: AsyncStream { continuation in - cancellable = publisher.sink { _ in - continuation.finish() - } receiveValue: { - continuation.yield($0) - } - }, cancellable: cancellable) + let (asyncStream, contiuation) = AsyncStream.makeStream(of: Element.self) + let cancellable = publisher.sink { _ in + contiuation.finish() + } receiveValue: { + contiuation.yield($0) + } + + self.init(asyncStream: asyncStream, cancellable: cancellable) } func makeAsyncIterator() -> AsyncStream.AsyncIterator {