Skip to content

Commit

Permalink
[STAD-615] Improve Serialization Performance (#178)
Browse files Browse the repository at this point in the history
Obviously it takes very long to fetch a folder on iOS. Previously this was done on each new SensorValue. Now it is done at the beginning of a measurement and kept for the whole measurement, to avoid the overhead. This should slower devices enable to actually use apps developed on the Cyface SDK:
  • Loading branch information
muthenberg authored Jun 20, 2024
2 parents b22fee2 + ae9433f commit e18d0e3
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import OSLog
A `Measurement` can be synchronized to a Cyface server via an instance of `Synchronizer`.

- Author: Klemens Muthmann
- Version: 2.0.0
- since: 11.0.0
*/
public class FinishedMeasurement: Hashable, Equatable {
/// A device wide unique identifier for this measurement. Usually set by incrementing a counter.
Expand Down Expand Up @@ -75,9 +73,19 @@ public class FinishedMeasurement: Hashable, Equatable {
- throws: `InconstantData.locationOrderViolation` if the timestamps of the locations in this measurement are not strongly monotonically increasing.
*/
public convenience init(managedObject: MeasurementMO) throws {
let accelerationFile = SensorValueFile(fileType: .accelerationValueType, qualifier: String(managedObject.unsignedIdentifier))
let directionFile = SensorValueFile(fileType: .directionValueType, qualifier: String(managedObject.unsignedIdentifier))
let rotationFile = SensorValueFile(fileType: .rotationValueType, qualifier: String(managedObject.unsignedIdentifier))
let sensorValueFileFactory = DefaultSensorValueFileFactory()
let accelerationFile = try sensorValueFileFactory.create(
fileType: .accelerationValueType,
qualifier: String(managedObject.unsignedIdentifier)
)
let directionFile = try sensorValueFileFactory.create(
fileType: .directionValueType,
qualifier: String(managedObject.unsignedIdentifier)
)
let rotationFile = try sensorValueFileFactory.create(
fileType: .rotationValueType,
qualifier: String(managedObject.unsignedIdentifier)
)

self.init(
identifier: managedObject.unsignedIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import CoreData
Implementations of this protocol are capable of storing captured data to some kind of permanent storage.

- author: Klemens Muthmann
- version: 1.0.0
- Since: 12.0.0
*/
public protocol CapturedDataStorage {
/// Subscribe to a running measurement and store the data produced by that measurement.
Expand All @@ -45,8 +43,6 @@ public protocol CapturedDataStorage {
An implementation of `CapturedDataStorage` for storing the data to a CoreData database.

- author: Klemens Muthmann
- version: 1.0.1
- Since: 12.0.0
*/
public class CapturedCoreDataStorage<SVFF: SensorValueFileFactory> where SVFF.Serializable == [SensorValue] {
// MARK: - Properties
Expand Down Expand Up @@ -93,47 +89,84 @@ public class CapturedCoreDataStorage<SVFF: SensorValueFileFactory> where SVFF.Se
return identifier
}
}
}

private func load(measurement identifier: UInt64, from context: NSManagedObjectContext) throws -> MeasurementMO {
guard let measurementRequest = context.persistentStoreCoordinator?.managedObjectModel.fetchRequestFromTemplate(
withName: "measurementByIdentifier",
substitutionVariables: ["identifier": identifier]
) else {
os_log(
"Unable to load measurement fetch request.",
log: OSLog.persistence,
type: .debug
)
throw PersistenceError.measurementNotLoadable(identifier)
}
guard let measurementMO = try measurementRequest.execute().first as? MeasurementMO else {
os_log(
"Unable to load measurement to store to",
log: OSLog.persistence,
type: .debug
)
throw PersistenceError.measurementNotLoadable(identifier)
}
// MARK: - Implementation of CapturedDataStorage Protocol
extension CapturedCoreDataStorage: CapturedDataStorage {

return measurementMO
/// Recievie updates from the provided ``Measurement`` and store the data to a ``DataStoreStack``.
public func subscribe(
to measurement: Measurement,
_ initialMode: String,
_ receiveCompletion: @escaping ((_ databaseIdentifier: UInt64) async -> Void)
) throws -> UInt64 {
let measurementIdentifier = try createMeasurement(initialMode)
let messageHandler = try MessageHandler(fileFactory: sensorValueFileFactory, measurementIdentifier: measurementIdentifier, dataStoreStack: dataStoreStack)

let cachedFlow = measurement.measurementMessages.collect(.byTime(cachingQueue, 1.0))
cachedFlow
.sink(receiveCompletion: { status in
switch status {
case .finished:
os_log(
"Completing storage flow.",
log: OSLog.persistence,
type: .debug
)
Task {
await receiveCompletion(measurementIdentifier)
}
case .failure(let error):
os_log("Unable to complete measurement %@", log: OSLog.persistence, type: .error, error.localizedDescription)
}
}
) { messages in
do {
try messageHandler.handle(messages: messages)
} catch {
os_log("Unable to store data! Error %{PUBLIC}@",log: OSLog.persistence ,type: .error, error.localizedDescription)
}
}.store(in: &cancellables)

return measurementIdentifier
}

private func handle(messages: [Message], measurement identifier: UInt64) throws {
try self.dataStoreStack.wrapInContext { context in
let measurementMo = try self.load(measurement: identifier, from: context)
public func unsubscribe() {
cancellables.removeAll(keepingCapacity: true)
}
}

let accelerationsFile = self.sensorValueFileFactory.create(
fileType: SensorValueFileType.accelerationValueType,
qualifier: String(measurementMo.unsignedIdentifier)
)
let rotationsFile = self.sensorValueFileFactory.create(
fileType: SensorValueFileType.rotationValueType,
qualifier: String(measurementMo.unsignedIdentifier)
)
let directionsFile = self.sensorValueFileFactory.create(
fileType: SensorValueFileType.directionValueType,
qualifier: String(measurementMo.unsignedIdentifier)
)
struct MessageHandler<SVFF: SensorValueFileFactory> where SVFF.Serializable == [SensorValue] {
// MARK: - Properties
public let measurementIdentifier: UInt64
public let accelerationsFile: SVFF.FileType
public let rotationsFile: SVFF.FileType
public let directionsFile: SVFF.FileType
let dataStoreStack: DataStoreStack

// MARK: - Initializers
init(fileFactory: SVFF, measurementIdentifier: UInt64, dataStoreStack: DataStoreStack) throws {
self.dataStoreStack = dataStoreStack
self.measurementIdentifier = measurementIdentifier
self.accelerationsFile = try fileFactory.create(
fileType: SensorValueFileType.accelerationValueType,
qualifier: String(measurementIdentifier)
)
self.rotationsFile = try fileFactory.create(
fileType: SensorValueFileType.rotationValueType,
qualifier: String(measurementIdentifier)
)
self.directionsFile = try fileFactory.create(
fileType: SensorValueFileType.directionValueType,
qualifier: String(measurementIdentifier)
)
}

// MARK: - Methods

func handle(messages: [Message]) throws {
try self.dataStoreStack.wrapInContext { context in
let measurementMo = try self.load(measurement: measurementIdentifier, from: context)

try messages.forEach { message in
switch message {
Expand Down Expand Up @@ -167,78 +200,77 @@ public class CapturedCoreDataStorage<SVFF: SensorValueFileFactory> where SVFF.Se
}
}

// MARK: - Private Methods
/// Load a measurement from the database. This should only be executed within a valid CoreData context.
private func load(measurement identifier: UInt64, from context: NSManagedObjectContext) throws -> MeasurementMO {
guard let measurementRequest = context.persistentStoreCoordinator?.managedObjectModel.fetchRequestFromTemplate(
withName: "measurementByIdentifier",
substitutionVariables: ["identifier": identifier]
) else {
os_log(
"Unable to load measurement fetch request.",
log: OSLog.persistence,
type: .debug
)
throw PersistenceError.measurementNotLoadable(identifier)
}
guard let measurementMO = try measurementRequest.execute().first as? MeasurementMO else {
os_log(
"Unable to load measurement to store to",
log: OSLog.persistence,
type: .debug
)
throw PersistenceError.measurementNotLoadable(identifier)
}

return measurementMO
}

/// Store a ``GeoLocation`` to the database.
///
/// - Parameter location: The location to store.
/// - Parameter measurementMo: The measurement to store the location to.
private func store(location: GeoLocation, to measurementMo: MeasurementMO, _ context: NSManagedObjectContext) {
os_log("Storing location to database.", log: OSLog.persistence, type: .debug)
if let lastTrack = measurementMo.typedTracks().last {
lastTrack.addToLocations(GeoLocationMO(location: location, context: context))
}
}

/// Store an ``Altitude`` to the database.
///
/// - Parameter altitude: The altitude to store.
/// - Parameter measurementMo: The measurement to store the altitude to.
private func store(altitude: Altitude, to measurementMo: MeasurementMO, _ context: NSManagedObjectContext) {
if let lastTrack = measurementMo.typedTracks().last {
lastTrack.addToAltitudes(AltitudeMO(altitude: altitude, context: context))
}
}

/// Store a sensor value (e.g. direction, rotation, acceleration) to a file on the local disk.
///
/// - Parameter value: The value to store to the file.
/// - Parameter to: The file to store the value to.
private func store<SVF: FileSupport>(_ value: SensorValue, to file: SVF) throws where SVF.Serializable == SVFF.Serializable {
do {
_ = try file.write(serializable: [value])
} catch {
debugPrint("Unable to write data to file \(file.fileName)!")
debugPrint("Unable to write data to file \(file.qualifiedPath)!")
throw error
}
}

/// Callback, called when the measurement has been stopped and all values have been stored.
///
/// - Parameter measurement: The measurement that was finished.
/// - Parameter context: The database context used to communicate with the database.
/// - Parameter time: The time for the final stop event.
/// - Attention: Only call this within a valid CoreData context (same thread as the one that opened the provided context).
private func onStop(measurement measurementMo: MeasurementMO, _ context: NSManagedObjectContext, _ time: Date) throws {
os_log("Storing stopped event to database.", log: OSLog.persistence, type: .debug)
measurementMo.addToEvents(EventMO(event: Event(time: time, type: .lifecycleStop), context: context))
measurementMo.synchronizable = true
try context.save()
os_log("Stored finished measurement.", log: OSLog.persistence, type: .debug)
}

}

// MARK: - Implementation of CapturedDataStorage Protocol
extension CapturedCoreDataStorage: CapturedDataStorage {

/// Recievie updates from the provided ``Measurement`` and store the data to a ``DataStoreStack``.
public func subscribe(
to measurement: Measurement,
_ initialMode: String,
_ receiveCompletion: @escaping ((_ databaseIdentifier: UInt64) async -> Void)
) throws -> UInt64 {
let measurementIdentifier = try createMeasurement(initialMode)

let cachedFlow = measurement.measurementMessages.collect(.byTime(cachingQueue, 1.0))
cachedFlow
.sink(receiveCompletion: { status in
switch status {
case .finished:
os_log(
"Completing storage flow.",
log: OSLog.persistence,
type: .debug
)
Task {
await receiveCompletion(measurementIdentifier)
}
case .failure(let error):
os_log("Unable to complete measurement %@", log: OSLog.persistence, type: .error, error.localizedDescription)
}
}
) { [weak self] (messages: [Message]) in
do {
try self?.handle(messages: messages, measurement: measurementIdentifier)
} catch {
os_log("Unable to store data! Error %{PUBLIC}@",log: OSLog.persistence ,type: .error, error.localizedDescription)
}
}.store(in: &cancellables)

return measurementIdentifier
}

public func unsubscribe() {
cancellables.removeAll(keepingCapacity: true)
}
}
Loading

0 comments on commit e18d0e3

Please sign in to comment.