Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(DataStore): improve MutationEvent resiliency to interruptions #3492

Merged
merged 8 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,18 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
condition: QueryPredicate? = nil,
eagerLoad: Bool = true,
completion: DataStoreCallback<M>) {
completion(save(model,
modelSchema: modelSchema,
condition: condition,
eagerLoad: eagerLoad))
}

func save<M: Model>(_ model: M,
modelSchema: ModelSchema,
condition: QueryPredicate? = nil,
eagerLoad: Bool = true) -> DataStoreResult<M> {
guard let connection = connection else {
completion(.failure(DataStoreError.nilSQLiteConnection()))
return
return .failure(DataStoreError.nilSQLiteConnection())
}
do {
let modelType = type(of: model)
Expand All @@ -162,8 +171,7 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
let dataStoreError = DataStoreError.invalidCondition(
"Cannot apply a condition on model which does not exist.",
"Save the model instance without a condition first.")
completion(.failure(causedBy: dataStoreError))
return
return .failure(causedBy: dataStoreError)
}

let statement = InsertStatement(model: model, modelSchema: modelSchema)
Expand All @@ -179,9 +187,7 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
let dataStoreError = DataStoreError.invalidCondition(
"Save failed due to condition did not match existing model instance.",
"The save will continue to fail until the model instance is updated.")
completion(.failure(causedBy: dataStoreError))

return
return .failure(causedBy: dataStoreError)
}
}

Expand All @@ -192,23 +198,22 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
}

// load the recent saved instance and pass it back to the callback
query(modelType, modelSchema: modelSchema,
predicate: model.identifier(schema: modelSchema).predicate,
eagerLoad: eagerLoad) {
switch $0 {
case .success(let result):
if let saved = result.first {
completion(.success(saved))
} else {
completion(.failure(.nonUniqueResult(model: modelType.modelName,
count: result.count)))
}
case .failure(let error):
completion(.failure(error))
let queryResult = query(modelType, modelSchema: modelSchema,
predicate: model.identifier(schema: modelSchema).predicate,
eagerLoad: eagerLoad)
switch queryResult {
case .success(let result):
if let saved = result.first {
return .success(saved)
} else {
return .failure(.nonUniqueResult(model: modelType.modelName,
count: result.count))
}
case .failure(let error):
return .failure(error)
}
} catch {
completion(.failure(causedBy: error))
return .failure(causedBy: error)
}
}

Expand Down Expand Up @@ -321,9 +326,22 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
paginationInput: QueryPaginationInput? = nil,
eagerLoad: Bool = true,
completion: DataStoreCallback<[M]>) {
completion(query(modelType,
modelSchema: modelSchema,
predicate: predicate,
sort: sort,
paginationInput: paginationInput,
eagerLoad: eagerLoad))
}

private func query<M: Model>(_ modelType: M.Type,
modelSchema: ModelSchema,
predicate: QueryPredicate? = nil,
sort: [QuerySortDescriptor]? = nil,
paginationInput: QueryPaginationInput? = nil,
eagerLoad: Bool = true) -> DataStoreResult<[M]> {
guard let connection = connection else {
completion(.failure(DataStoreError.nilSQLiteConnection()))
return
return .failure(DataStoreError.nilSQLiteConnection())
}
do {
let statement = SelectStatement(from: modelSchema,
Expand All @@ -336,9 +354,9 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
withSchema: modelSchema,
using: statement,
eagerLoad: eagerLoad)
completion(.success(result))
return .success(result)
} catch {
completion(.failure(causedBy: error))
return .failure(causedBy: error)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,31 +208,41 @@ final class StorageEngine: StorageEngineBehavior {
completion(.failure(causedBy: dataStoreError))
}

let wrappedCompletion: DataStoreCallback<M> = { result in
guard modelSchema.isSyncable, let syncEngine = self.syncEngine else {
completion(result)
return
}

guard case .success(let savedModel) = result else {
completion(result)
return
do {
try storageAdapter.transaction {
lawmicha marked this conversation as resolved.
Show resolved Hide resolved
let result = self.storageAdapter.save(model,
modelSchema: modelSchema,
condition: condition,
eagerLoad: eagerLoad)
guard modelSchema.isSyncable else {
completion(result)
return
}

guard case .success(let savedModel) = result else {
completion(result)
return
}

guard let syncEngine else {
let message = "No SyncEngine available to sync mutation event, rollback save."
self.log.verbose("\(#function) \(message) : \(savedModel)")
throw DataStoreError.internalOperation(
message,
"`DataStore.save()` was interrupted. `DataStore.stop()` may have been called.",
nil)
}
self.log.verbose("\(#function) syncing mutation for \(savedModel)")
self.syncMutation(of: savedModel,
modelSchema: modelSchema,
mutationType: mutationType,
predicate: condition,
syncEngine: syncEngine,
completion: completion)
}

self.log.verbose("\(#function) syncing mutation for \(savedModel)")
self.syncMutation(of: savedModel,
modelSchema: modelSchema,
mutationType: mutationType,
predicate: condition,
syncEngine: syncEngine,
completion: completion)
} catch {
completion(.failure(causedBy: error))
}

storageAdapter.save(model,
modelSchema: modelSchema,
condition: condition,
eagerLoad: eagerLoad,
completion: wrappedCompletion)
}

func save<M: Model>(_ model: M,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ protocol StorageEngineAdapter: AnyObject, ModelStorageBehavior, ModelStorageErro

// MARK: - Synchronous APIs

func save<M: Model>(_ model: M,
modelSchema: ModelSchema,
condition: QueryPredicate?,
eagerLoad: Bool) -> DataStoreResult<M>

func exists(_ modelSchema: ModelSchema,
withIdentifier id: ModelIdentifierProtocol,
predicate: QueryPredicate?) throws -> Bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,55 @@ import Amplify
import Combine

extension AWSMutationDatabaseAdapter: MutationEventSource {

/// DataStore implements a FIFO queue of MutationEvents by using the local database
/// and querying for the earliest MutationEvent by its `createdAt` field.
///
/// **Note**: In a previous revision of this code, this query used to filter on `InProcess` == `false` MutationEvents.
/// This was to skip over already in-flight mutation events and grab the next one. However, it was observed in highly
/// concurrent calls to `DataStore.start()` /`stop()` / `save()` that it will interrupt the
/// **OutgoingMutationQueue** of processing and deleting a **MutationEvent** . `DataStore.start()`,
/// which starts the remote sync engine, should perform a step to move all `InProcess` **MutationEvents** back
/// to false, however there's a timing issue that is difficult to pinpoint. **OutgoingMutationQueue**'s query manages
/// to pick up the second MutationEvent in the queue and sends it off, while the first one that is marked as `inProcess`
/// isn't being processed, likely that process was already cancelled. The query below was updated to always dequeue the
/// first regardless of `InProcess` in the [PR #3492](https://github.com/aws-amplify/amplify-swift/pull/3492).
/// By removing the filter, there is a small chance that the same event may be sent twice. Sending the event twice is idempotent
/// and the second response will be `ConditionalCheckFailed`. The `InProcess` flag is still needed for the
/// handling consecutive update scenarios.
///
/// - Parameter completion: The first MutationEvent in the FIFO queue.
func getNextMutationEvent(completion: @escaping DataStoreCallback<MutationEvent>) {
log.verbose(#function)

guard let storageAdapter = storageAdapter else {
completion(.failure(DataStoreError.nilStorageAdapter()))
return
}

let fields = MutationEvent.keys
let predicate = fields.inProcess == false || fields.inProcess == nil
let sort = QuerySortDescriptor(fieldName: MutationEvent.keys.createdAt.stringValue, order: .ascending)
storageAdapter.query(MutationEvent.self,
predicate: predicate,
sort: [sort],
paginationInput: nil,
eagerLoad: true) { result in
switch result {
case .failure(let dataStoreError):
completion(.failure(dataStoreError))
case .success(let mutationEvents):
guard let notInProcessEvent = mutationEvents.first else {
self.nextEventPromise.set(completion)
return
}
self.markInProcess(mutationEvent: notInProcessEvent,
storageAdapter: storageAdapter,
completion: completion)
}
storageAdapter.query(
lawmicha marked this conversation as resolved.
Show resolved Hide resolved
MutationEvent.self,
predicate: nil,
sort: [sort],
paginationInput: nil,
eagerLoad: true) { result in
switch result {
case .failure(let dataStoreError):
completion(.failure(dataStoreError))
case .success(let mutationEvents):
guard let mutationEvent = mutationEvents.first else {
self.nextEventPromise.set(completion)
return
}
if mutationEvent.inProcess {
log.verbose("The head of the MutationEvent queue was already inProcess (most likely interrupted process): \(mutationEvent)")
completion(.success(mutationEvent))
} else {
self.markInProcess(mutationEvent: mutationEvent,
storageAdapter: storageAdapter,
completion: completion)
}
}

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,48 @@ final class StorageEngineTestsPostComment4V2Tests: StorageEngineTestsBase, Share
}
}

func testSavePostAndSyncSuccess() async throws {
let receivedMutationEvent = expectation(description: "Mutation Events submitted to sync engine")
let expectedSuccess = expectation(description: "Simulated success on mutation event submitted to sync engine")
let post = ParentPost4V2(
id: "postId1",
title: "title1")

syncEngine.setCallbackOnSubmit { submittedMutationEvent, completion in
receivedMutationEvent.fulfill()
if submittedMutationEvent.modelId == post.id {
expectedSuccess.fulfill()
completion(.success(submittedMutationEvent))
} else {
XCTFail("Unexpected submitted MutationEvent \(submittedMutationEvent)")
completion(.failure(.internalOperation("mockError", "", nil)))
}
}
try await saveAsync(post)
await fulfillment(of: [receivedMutationEvent, expectedSuccess], timeout: 1)

}

/// A save should fail if the corresponding MutationEvent could not be submitted to the syncEngine.
func testSavePostFailDueToSyncEngineMissing() async throws {
storageEngine.syncEngine = nil
do {
try await saveAsync(
ParentPost4V2(
id: "postId1",
title: "title1"))
XCTFail("Expected to fail when sync engine is `nil`")
} catch {
guard let dataStoreError = error as? DataStoreError else {
XCTFail("Unexpected type of error \(error)")
return
}
XCTAssertEqual(
dataStoreError.errorDescription,
"No SyncEngine available to sync mutation event, rollback save.")
}
}

func testSaveCommentThenQueryComment() async throws {
let comment = ChildComment4V2(content: "content")
let savedComment = try await saveAsync(comment)
Expand Down
Loading
Loading