Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: kickoff release #3840

Merged
merged 6 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Amplify/Core/Configuration/AmplifyOutputsData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ public struct AmplifyOutputsData: Codable {
public struct AmplifyOutputs {

/// A closure that resolves the `AmplifyOutputsData` configuration
let resolveConfiguration: () throws -> AmplifyOutputsData
@_spi(InternalAmplifyConfiguration)
public let resolveConfiguration: () throws -> AmplifyOutputsData

/// Resolves configuration with `amplify_outputs.json` in the main bundle.
public static let amplifyOutputs: AmplifyOutputs = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,15 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
// Placing the actual subscription work in a deferred task and
// promptly returning the filtered publisher for downstream consumption of all error messages.
defer {
Task { [weak self] in
let task = Task { [weak self] in
guard let self = self else { return }
if !(await self.isConnected) {
try await connect()
try await waitForState(.connected)
}
await self.bindCancellableToConnection(try await self.startSubscription(id))
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
await self.storeInConnectionCancellables(try await self.startSubscription(id))
}
self.storeInConnectionCancellables(task.toAnyCancellable)
}

return filterAppSyncSubscriptionEvent(with: id)
Expand Down Expand Up @@ -236,24 +237,29 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
}

private func subscribeToWebSocketEvent() async {
await self.webSocketClient.publisher.sink { [weak self] _ in
let cancellable = await self.webSocketClient.publisher.sink { [weak self] _ in
self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated")
} receiveValue: { webSocketEvent in
Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}.toAnyCancellable.store(in: &self.cancellables)
let task = Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}
await self?.storeInCancellables(task.toAnyCancellable)
}
}
.store(in: &cancellables)
self.storeInCancellables(cancellable)
}

private func resumeExistingSubscriptions() {
log.debug("[AppSyncRealTimeClient] Resuming existing subscriptions")
for (id, _) in self.subscriptions {
Task {
Task { [weak self] in
do {
try await self.startSubscription(id).store(in: &cancellablesBindToConnection)
if let cancellable = try await self?.startSubscription(id) {
await self?.storeInConnectionCancellables(cancellable)
}
} catch {
log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
Self.log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
}
}
}
Expand Down Expand Up @@ -286,7 +292,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
subject.filter {
switch $0 {
case .success(let response): return response.id == id || response.type == .connectionError
case .failure(let error): return true
case .failure: return true
}
}
.map { result -> AppSyncSubscriptionEvent? in
Expand Down Expand Up @@ -350,10 +356,6 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
}

private func bindCancellableToConnection(_ cancellable: AnyCancellable) {
cancellable.store(in: &cancellablesBindToConnection)
}

}

// MARK: - On WebSocket Events
Expand All @@ -366,8 +368,11 @@ extension AppSyncRealTimeClient {
if self.state.value == .connectionDropped {
log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop")
Task { [weak self] in
try? await self?.connect()
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
let task = Task { [weak self] in
try? await self?.connect()
}
await self?.storeInConnectionCancellables(task.toAnyCancellable)
}
}

case let .disconnected(closeCode, reason): //
Expand Down Expand Up @@ -425,24 +430,37 @@ extension AppSyncRealTimeClient {
}
}

private func monitorHeartBeats(_ connectionAck: JSONValue?) {
func monitorHeartBeats(_ connectionAck: JSONValue?) {
let timeoutMs = connectionAck?.connectionTimeoutMs?.intValue ?? 0
log.debug("[AppSyncRealTimeClient] Starting heart beat monitor with interval \(timeoutMs) ms")
heartBeats.eraseToAnyPublisher()
let cancellable = heartBeats.eraseToAnyPublisher()
.debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global())
.first()
.sink(receiveValue: {
self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
.sink(receiveValue: { [weak self] in
Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
Task { [weak self] in
await self?.reconnect()
}.toAnyCancellable.store(in: &self.cancellables)
let task = Task { [weak self] in
await self?.reconnect()
}
await self?.storeInCancellables(task.toAnyCancellable)
}
})
.store(in: &cancellablesBindToConnection)
self.storeInConnectionCancellables(cancellable)
// start counting down
heartBeats.send(())
}
}

extension AppSyncRealTimeClient {
private func storeInCancellables(_ cancellable: AnyCancellable) {
self.cancellables.insert(cancellable)
}

private func storeInConnectionCancellables(_ cancellable: AnyCancellable) {
self.cancellablesBindToConnection.insert(cancellable)
}
}

extension Publisher where Output == AppSyncRealTimeSubscription.State, Failure == Never {
func toAppSyncSubscriptionEventStream() -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
self.compactMap { subscriptionState -> AppSyncSubscriptionEvent? in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,4 +551,32 @@ class AppSyncRealTimeClientTests: XCTestCase {
await fulfillment(of: [startTriggered, errorReceived], timeout: 2)

}

func testReconnect_whenHeartBeatSignalIsNotReceived() async throws {
var cancellables = Set<AnyCancellable>()
let timeout = 1.0
let mockWebSocketClient = MockWebSocketClient()
let mockAppSyncRequestInterceptor = MockAppSyncRequestInterceptor()
let appSyncClient = AppSyncRealTimeClient(
endpoint: URL(string: "https://example.com")!,
requestInterceptor: mockAppSyncRequestInterceptor,
webSocketClient: mockWebSocketClient
)

// start monitoring
await appSyncClient.monitorHeartBeats(.object([
"connectionTimeoutMs": 100
]))

let reconnect = expectation(description: "webSocket triggers event to connection")
await mockWebSocketClient.actionSubject.sink { action in
switch action {
case .connect:
reconnect.fulfill()
default: break
}
}.store(in: &cancellables)
await fulfillment(of: [reconnect], timeout: 2)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation
import Amplify // Amplify.Auth
import AWSPluginsCore // AuthAWSCredentialsProvider
import AWSClientRuntime // AWSClientRuntime.CredentialsProviding
import ClientRuntime // SdkHttpRequestBuilder
import AwsCommonRuntimeKit // CommonRuntimeKit.initialize()

extension AWSCognitoAuthPlugin {


/// Creates a AWS IAM SigV4 signer capable of signing AWS AppSync requests.
///
/// **Note**. Although this method is static, **Amplify.Auth** is required to be configured with **AWSCognitoAuthPlugin** as
/// it depends on the credentials provider from Cognito through `Amplify.Auth.fetchAuthSession()`. The static type allows
/// developers to simplify their callsite without having to access the method on the plugin instance.
///
/// - Parameter region: The region of the AWS AppSync API
/// - Returns: A closure that takes in a requestand returns a signed request.
public static func createAppSyncSigner(region: String) -> ((URLRequest) async throws -> URLRequest) {
return { request in
try await signAppSyncRequest(request,
region: region)
}
}

static func signAppSyncRequest(_ urlRequest: URLRequest,
region: Swift.String,
signingName: Swift.String = "appsync",
date: ClientRuntime.Date = Date()) async throws -> URLRequest {
CommonRuntimeKit.initialize()

// Convert URLRequest to SDK's HTTPRequest
guard let requestBuilder = try createAppSyncSdkHttpRequestBuilder(
urlRequest: urlRequest) else {
return urlRequest
}

// Retrieve the credentials from credentials provider
let credentials: AWSClientRuntime.AWSCredentials
let authSession = try await Amplify.Auth.fetchAuthSession()
if let awsCredentialsProvider = authSession as? AuthAWSCredentialsProvider {
let awsCredentials = try awsCredentialsProvider.getAWSCredentials().get()
credentials = awsCredentials.toAWSSDKCredentials()
} else {
let error = AuthError.unknown("Auth session does not include AWS credentials information")
throw error
}

// Prepare signing
let flags = SigningFlags(useDoubleURIEncode: true,
shouldNormalizeURIPath: true,
omitSessionToken: false)
let signedBodyHeader: AWSSignedBodyHeader = .none
let signedBodyValue: AWSSignedBodyValue = .empty
let signingConfig = AWSSigningConfig(credentials: credentials,
signedBodyHeader: signedBodyHeader,
signedBodyValue: signedBodyValue,
flags: flags,
date: date,
service: signingName,
region: region,
signatureType: .requestHeaders,
signingAlgorithm: .sigv4)

// Sign request
guard let httpRequest = await AWSSigV4Signer.sigV4SignedRequest(
requestBuilder: requestBuilder,

signingConfig: signingConfig
) else {
return urlRequest
}

// Update original request with new headers
return setHeaders(from: httpRequest, to: urlRequest)
}

static func setHeaders(from sdkRequest: SdkHttpRequest, to urlRequest: URLRequest) -> URLRequest {
var urlRequest = urlRequest
for header in sdkRequest.headers.headers {
urlRequest.setValue(header.value.joined(separator: ","), forHTTPHeaderField: header.name)
}
return urlRequest
}

static func createAppSyncSdkHttpRequestBuilder(urlRequest: URLRequest) throws -> SdkHttpRequestBuilder? {

guard let url = urlRequest.url,
let host = url.host else {
return nil
}

var headers = urlRequest.allHTTPHeaderFields ?? [:]
headers.updateValue(host, forKey: "host")

let httpMethod = (urlRequest.httpMethod?.uppercased())
.flatMap(HttpMethodType.init(rawValue:)) ?? .get

let queryItems = URLComponents(url: url, resolvingAgainstBaseURL: false)?.queryItems?
.map { ClientRuntime.SDKURLQueryItem(name: $0.name, value: $0.value)} ?? []

let requestBuilder = SdkHttpRequestBuilder()
.withHost(host)
.withPath(url.path)
.withQueryItems(queryItems)
.withMethod(httpMethod)
.withPort(443)
.withProtocol(.https)
.withHeaders(.init(headers))
.withBody(.data(urlRequest.httpBody))

return requestBuilder
}
}

extension AWSPluginsCore.AWSCredentials {

func toAWSSDKCredentials() -> AWSClientRuntime.AWSCredentials {
if let tempCredentials = self as? AWSTemporaryCredentials {
return AWSClientRuntime.AWSCredentials(
accessKey: tempCredentials.accessKeyId,
secret: tempCredentials.secretAccessKey,
expirationTimeout: tempCredentials.expiration,
sessionToken: tempCredentials.sessionToken)
} else {
return AWSClientRuntime.AWSCredentials(
accessKey: accessKeyId,
secret: secretAccessKey,
expirationTimeout: Date())
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ struct AWSCognitoAuthCredentialStore {
newIdentityConfigData != nil &&
oldIdentityPoolConfiguration == newIdentityConfigData
{

// retrieve data from the old namespace and save with the new namespace
if let oldCognitoCredentialsData = try? keychain._getData(oldNameSpace) {
try? keychain._set(oldCognitoCredentialsData, key: newNameSpace)
}
} else if oldAuthConfigData != currentAuthConfig {
} else if oldAuthConfigData != currentAuthConfig &&
oldNameSpace != newNameSpace {
// Clear the old credentials
try? keychain._remove(oldNameSpace)
}
Expand Down
Loading
Loading