Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(DataStore): improve MutationEvent resiliency to interruptions #3492

Merged
merged 8 commits into from
Feb 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,18 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
condition: QueryPredicate? = nil,
eagerLoad: Bool = true,
completion: DataStoreCallback<M>) {
completion(save(model,
modelSchema: modelSchema,
condition: condition,
eagerLoad: eagerLoad))
}

func save<M: Model>(_ model: M,
modelSchema: ModelSchema,
condition: QueryPredicate? = nil,
eagerLoad: Bool = true) -> DataStoreResult<M> {
guard let connection = connection else {
completion(.failure(DataStoreError.nilSQLiteConnection()))
return
return .failure(DataStoreError.nilSQLiteConnection())
}
do {
let modelType = type(of: model)
Expand All @@ -162,8 +171,7 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
let dataStoreError = DataStoreError.invalidCondition(
"Cannot apply a condition on model which does not exist.",
"Save the model instance without a condition first.")
completion(.failure(causedBy: dataStoreError))
return
return .failure(causedBy: dataStoreError)
}

let statement = InsertStatement(model: model, modelSchema: modelSchema)
Expand All @@ -179,9 +187,7 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
let dataStoreError = DataStoreError.invalidCondition(
"Save failed due to condition did not match existing model instance.",
"The save will continue to fail until the model instance is updated.")
completion(.failure(causedBy: dataStoreError))

return
return .failure(causedBy: dataStoreError)
}
}

Expand All @@ -192,23 +198,22 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
}

// load the recent saved instance and pass it back to the callback
query(modelType, modelSchema: modelSchema,
predicate: model.identifier(schema: modelSchema).predicate,
eagerLoad: eagerLoad) {
switch $0 {
case .success(let result):
if let saved = result.first {
completion(.success(saved))
} else {
completion(.failure(.nonUniqueResult(model: modelType.modelName,
count: result.count)))
}
case .failure(let error):
completion(.failure(error))
let queryResult = query(modelType, modelSchema: modelSchema,
predicate: model.identifier(schema: modelSchema).predicate,
eagerLoad: eagerLoad)
switch queryResult {
case .success(let result):
if let saved = result.first {
return .success(saved)
} else {
return .failure(.nonUniqueResult(model: modelType.modelName,
count: result.count))
}
case .failure(let error):
return .failure(error)
}
} catch {
completion(.failure(causedBy: error))
return .failure(causedBy: error)
}
}

Expand Down Expand Up @@ -321,9 +326,22 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
paginationInput: QueryPaginationInput? = nil,
eagerLoad: Bool = true,
completion: DataStoreCallback<[M]>) {
completion(query(modelType,
modelSchema: modelSchema,
predicate: predicate,
sort: sort,
paginationInput: paginationInput,
eagerLoad: eagerLoad))
}

func query<M: Model>(_ modelType: M.Type,
lawmicha marked this conversation as resolved.
Show resolved Hide resolved
modelSchema: ModelSchema,
predicate: QueryPredicate? = nil,
sort: [QuerySortDescriptor]? = nil,
paginationInput: QueryPaginationInput? = nil,
eagerLoad: Bool = true) -> DataStoreResult<[M]> {
guard let connection = connection else {
completion(.failure(DataStoreError.nilSQLiteConnection()))
return
return .failure(DataStoreError.nilSQLiteConnection())
}
do {
let statement = SelectStatement(from: modelSchema,
Expand All @@ -336,9 +354,9 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
withSchema: modelSchema,
using: statement,
eagerLoad: eagerLoad)
completion(.success(result))
return .success(result)
} catch {
completion(.failure(causedBy: error))
return .failure(causedBy: error)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,31 +208,41 @@ final class StorageEngine: StorageEngineBehavior {
completion(.failure(causedBy: dataStoreError))
}

let wrappedCompletion: DataStoreCallback<M> = { result in
guard modelSchema.isSyncable, let syncEngine = self.syncEngine else {
completion(result)
return
}

guard case .success(let savedModel) = result else {
completion(result)
return
do {
try storageAdapter.transaction {
lawmicha marked this conversation as resolved.
Show resolved Hide resolved
let result = self.storageAdapter.save(model,
modelSchema: modelSchema,
condition: condition,
eagerLoad: eagerLoad)
guard modelSchema.isSyncable else {
completion(result)
return
}

guard case .success(let savedModel) = result else {
completion(result)
return
}

guard let syncEngine else {
let message = "No SyncEngine available to sync mutation event, rollback save."
self.log.verbose("\(#function) \(message) : \(savedModel)")
throw DataStoreError.internalOperation(
message,
"Save was interrupted. `DataStore.stop()` may have been called.",
lawmicha marked this conversation as resolved.
Show resolved Hide resolved
nil)
}
self.log.verbose("\(#function) syncing mutation for \(savedModel)")
self.syncMutation(of: savedModel,
modelSchema: modelSchema,
mutationType: mutationType,
predicate: condition,
syncEngine: syncEngine,
completion: completion)
}

self.log.verbose("\(#function) syncing mutation for \(savedModel)")
self.syncMutation(of: savedModel,
modelSchema: modelSchema,
mutationType: mutationType,
predicate: condition,
syncEngine: syncEngine,
completion: completion)
} catch {
completion(.failure(causedBy: error))
}

storageAdapter.save(model,
modelSchema: modelSchema,
condition: condition,
eagerLoad: eagerLoad,
completion: wrappedCompletion)
}

func save<M: Model>(_ model: M,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ protocol StorageEngineAdapter: AnyObject, ModelStorageBehavior, ModelStorageErro

// MARK: - Synchronous APIs

func save<M: Model>(_ model: M,
modelSchema: ModelSchema,
condition: QueryPredicate?,
eagerLoad: Bool) -> DataStoreResult<M>

func exists(_ modelSchema: ModelSchema,
withIdentifier id: ModelIdentifierProtocol,
predicate: QueryPredicate?) throws -> Bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,30 @@ extension AWSMutationDatabaseAdapter: MutationEventSource {
completion(.failure(DataStoreError.nilStorageAdapter()))
return
}

let fields = MutationEvent.keys
let predicate = fields.inProcess == false || fields.inProcess == nil
let sort = QuerySortDescriptor(fieldName: MutationEvent.keys.createdAt.stringValue, order: .ascending)
storageAdapter.query(MutationEvent.self,
predicate: predicate,
sort: [sort],
paginationInput: nil,
eagerLoad: true) { result in
switch result {
case .failure(let dataStoreError):
completion(.failure(dataStoreError))
case .success(let mutationEvents):
guard let notInProcessEvent = mutationEvents.first else {
self.nextEventPromise.set(completion)
return
}
self.markInProcess(mutationEvent: notInProcessEvent,
storageAdapter: storageAdapter,
completion: completion)
}
storageAdapter.query(
lawmicha marked this conversation as resolved.
Show resolved Hide resolved
MutationEvent.self,
predicate: nil,
sort: [sort],
paginationInput: nil,
eagerLoad: true) { result in
switch result {
case .failure(let dataStoreError):
completion(.failure(dataStoreError))
case .success(let mutationEvents):
guard let mutationEvent = mutationEvents.first else {
self.nextEventPromise.set(completion)
return
}
if mutationEvent.inProcess {
log.verbose("The head of the MutationEvent queue was already inProcess (most likely interrupted process): \(mutationEvent)")
completion(.success(mutationEvent))
} else {
self.markInProcess(mutationEvent: mutationEvent,
storageAdapter: storageAdapter,
completion: completion)
}
}

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ class MockSQLiteStorageEngineAdapter: StorageEngineAdapter {
: completion(.success(model))
}

func save<M: Model>(_ model: M,
modelSchema: ModelSchema,
condition: QueryPredicate?,
eagerLoad: Bool) -> DataStoreResult<M> {
XCTFail("Not yet implemented")
return .failure(.internalOperation("", "", nil))
}

func save<M: Model>(_ model: M,
modelSchema: ModelSchema,
condition where: QueryPredicate?,
Expand Down
Loading