Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task/leip 164 upload images #289

Merged
merged 36 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4a77c24
Fix typo
hb0 Oct 23, 2023
f79ad53
Support NIO through desugaring
hb0 Oct 23, 2023
1865dea
Rename Default-/FileDao to Default-/FileIOHandler
hb0 Oct 23, 2023
74837a7
[LEIP-174] Add classes for the File table
hb0 Oct 23, 2023
d3b4df7
Add database migration and set version to 19
hb0 Oct 24, 2023
332ed29
Add update method to FileDao
hb0 Oct 24, 2023
87cfb46
Add MeasurementStatus UPLOADING
hb0 Oct 24, 2023
9cc8d74
Refactor onPerformSync to lower complexity
hb0 Oct 24, 2023
827cd8b
Rename UPLOADING state to a more precise SYNCABLE_ATTACHMENTS
hb0 Oct 24, 2023
c8b0a7f
Enable NIO in desugaring in remaining modules
hb0 Oct 25, 2023
d6ee15d
[LEIP-176] Upload attachments
hb0 Oct 25, 2023
b23552b
Add locationTimestamp to File table
hb0 Oct 26, 2023
79a1f05
[LEIP-179] Wrap attachments as cyf
hb0 Oct 26, 2023
48c7123
[LEIP-182] Add file counts to meta data
hb0 Oct 26, 2023
f883eab
Fix upload progress when continuing with later attachments
hb0 Oct 26, 2023
6c41145
Convert CapturingPersistenceBehaviour to kotlin
hb0 Oct 26, 2023
84837a4
[LEIP-186] Mark database operations as suspend
hb0 Oct 26, 2023
8fd0551
[LEIP-184] Add filesSize to metadata
hb0 Oct 26, 2023
35ae540
[LEIP-185] Fix resource failed to call end
hb0 Oct 26, 2023
25680f8
Add FileDaoTest
hb0 Oct 30, 2023
1d88bf9
Fix tests after changing dao functions to suspend
hb0 Oct 30, 2023
7013cc7
Convert ProtoTest to kotlin
hb0 Oct 30, 2023
8d3ec81
Add ProtoTest.testWithFileAttachments
hb0 Oct 30, 2023
236947e
Fix SyncPerformerTest
hb0 Oct 30, 2023
140a5b2
Add SyncPerformerTest.testSendData_withAttachments
hb0 Oct 30, 2023
66b8aa7
[LEIP-181] Add SyncAdapterTest.testOnPerformSync with/out existing at…
hb0 Oct 31, 2023
fcdd8c8
Upgrade Cyface dependencies to beta1
hb0 Oct 31, 2023
37277f7
Upload the attachments to files endpoint
hb0 Oct 31, 2023
2c120f8
Fix tests
hb0 Oct 31, 2023
bf21279
Ignore false-positive lint errors (we use desugaring for NIO)
hb0 Oct 31, 2023
241f558
Update code version
hb0 Oct 31, 2023
201cf86
Upgrade serialization and uploader to final version
hb0 Nov 13, 2023
3498fb1
Fix typo
hb0 Nov 13, 2023
e8a52ec
Adjust to renamed uploader interface
hb0 Nov 13, 2023
d8e2eee
Rename measurement.file to .attachment in persistence
hb0 Nov 13, 2023
c726a86
Rename measurement.files to .attachments
hb0 Nov 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2023 Cyface GmbH
*
* This file is part of the Cyface SDK for Android.
*
* The Cyface SDK for Android is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The Cyface SDK for Android is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with the Cyface SDK for Android. If not, see <http://www.gnu.org/licenses/>.
*/
package de.cyface.persistence.serialization

import com.google.protobuf.ByteString
import de.cyface.persistence.DefaultPersistenceLayer
import de.cyface.persistence.model.File
import de.cyface.protos.model.File.FileType
import java.io.IOException
import java.nio.file.Files

/**
* Serializes a [File] in the [MeasurementSerializer.TRANSFER_FILE_FORMAT_VERSION].
*
* @author Armin Schnabel
* @version 1.0.0
* @since 7.10.0
*/
class FileSerializer {
hb0 marked this conversation as resolved.
Show resolved Hide resolved
/**
* The serialized data.
*/
private lateinit var file: de.cyface.protos.model.File

/**
* Loads the binary of a [File] and parses the [File] info with the binary data.
*
* @param file the [File] to load and parse the data for
*/
fun readFrom(file: File) {
val builder = de.cyface.protos.model.File.newBuilder()
require(file.type == FileType.CSV || file.type == FileType.JPG) { "Unsupported type: ${file.type}" }

// Ensure we only inject bytes from the correct file format version
// The current version of the file format used to persist File data. It's stored in each File
// database entry and allows to have stored and process files with different file format versions
// at the same time.
// Check fileFormatVersion for the specific type (right now both types only support 1)
require(file.fileFormatVersion == 1.toShort()) { "Unsupported format version (${file.fileFormatVersion}) for type ${file.type}" }

builder.timestamp = file.timestamp
builder.type = file.type
try {
val bytes = Files.readAllBytes(file.path)
builder.bytes = ByteString.copyFrom(bytes)
} catch (e: IOException) {
throw IllegalStateException("Could not read file (id ${file.id} at ${file.path}", e)
}

this.file = builder.build()
}

/**
* @return the data in the serialized format.
*/
fun result(): de.cyface.protos.model.File {
return file
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,49 @@ class MeasurementSerializer {
return compressedTempFile
}

/**
* Loads the [de.cyface.persistence.model.File] with the provided identifier from the persistence
* layer serialized in the [MeasurementSerializer.TRANSFER_FILE_FORMAT_VERSION] format and writes
* it to a temp file, ready to be transferred.
*
* **ATTENTION**: The caller needs to delete the file which is referenced by the returned `FileInputStream`
* when no longer needed or on program crash!
*
* @param file The [de.cyface.persistence.model.File] to load
* @param persistenceLayer The [PersistenceLayer] to load the data from
* @return A [File] pointing to a temporary file containing the serialized data for transfer.
*/
@Throws(CursorIsNullException::class)
suspend fun writeSerializedFile(
file: de.cyface.persistence.model.File,
persistenceLayer: PersistenceLayer<*>
): File? {

// Store the compressed bytes into a temp file to be able to read the byte size for transmission
val cacheDir = persistenceLayer.cacheDir
var tempFile: File? = null
try {
tempFile =
withContext(Dispatchers.IO) {
File.createTempFile(TRANSFER_FILE_PREFIX, ".tmp", cacheDir)
}
withContext(Dispatchers.IO) {
FileOutputStream(tempFile).use { fileOutputStream ->
loadSerializedFile(
fileOutputStream,
file
)
}
}
} catch (e: IOException) {
if (tempFile != null && tempFile.exists()) {
Validate.isTrue(tempFile.delete())
}
throw IllegalStateException(e)
}
return tempFile
}

/**
* Writes the [de.cyface.persistence.model.Measurement] with the provided identifier from the persistence layer serialized and compressed
* in the [MeasurementSerializer.TRANSFER_FILE_FORMAT_VERSION] format, ready to be transferred.
Expand Down Expand Up @@ -136,6 +179,32 @@ class MeasurementSerializer {
)
}

/**
* Writes the [de.cyface.persistence.model.File] with the provided identifier from the persistence
* layer serialized in the [MeasurementSerializer.TRANSFER_FILE_FORMAT_VERSION] format, ready to be
* transferred.
*
* No compression is used as we're mostly transferring JPG files right now which are pre-compressed.
*
* @param fileOutputStream the `FileInputStream` to write the compressed data to
* @param file The [de.cyface.persistence.model.File] to load
* @throws IOException When flushing or closing the [OutputStream] fails
*/
@Throws(IOException::class)
private suspend fun loadSerializedFile(
fileOutputStream: OutputStream,
file: de.cyface.persistence.model.File
) {
// These streams don't throw anything and, thus, it should be enough to close the outermost stream at the end

// Wrapping the streams with Buffered streams for performance reasons
BufferedOutputStream(fileOutputStream).use { outputStream ->
// Injecting the outputStream into which the serialized data is written to
TransferFileSerializer.loadSerializedFile(outputStream, file)
outputStream.flush()
}
}

companion object {
/**
* The current version of the transferred file. This is always specified by the first two bytes of the file
Expand Down Expand Up @@ -167,5 +236,10 @@ class MeasurementSerializer {
* The prefix of the filename used to store compressed files for serialization.
*/
private const val COMPRESSED_TRANSFER_FILE_PREFIX = "compressedTransferFile"

/**
* The prefix of the filename used to store temp files for serialization.
*/
private const val TRANSFER_FILE_PREFIX = "transferFile"
}
}
hb0 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import de.cyface.persistence.Constants.TAG
import de.cyface.persistence.DefaultPersistenceLayer
import de.cyface.persistence.PersistenceLayer
import de.cyface.persistence.content.AbstractCyfaceTable.Companion.DATABASE_QUERY_LIMIT
import de.cyface.persistence.model.File
import de.cyface.persistence.model.Measurement
import de.cyface.protos.model.Event
import de.cyface.protos.model.File.FileType
import de.cyface.protos.model.LocationRecords
import de.cyface.protos.model.MeasurementBytes
import de.cyface.serializer.DataSerializable
Expand Down Expand Up @@ -141,7 +143,7 @@ object TransferFileSerializer {
val transferFileHeader = DataSerializable.transferFileHeader()
val measurementBytes = builder.build().toByteArray()
try {
// The stream must be closed by the called in a finally catch
// The stream must be closed by the caller in a finally catch
withContext(Dispatchers.IO) {
bufferedOutputStream.write(transferFileHeader)
bufferedOutputStream.write(measurementBytes)
Expand Down Expand Up @@ -227,4 +229,89 @@ object TransferFileSerializer {
}
return serializer.result()
}

/**
* Loads and serializes a [File] from the persistence layer.
*
* @param file The reference of the entry to load
*/
@Throws(CursorIsNullException::class)
private fun loadFile(
file: File
): de.cyface.protos.model.File {
val serializer = FileSerializer()
try {
serializer.readFrom(file)
} catch (e: RemoteException) {
throw java.lang.IllegalStateException(e)
}
return serializer.result()
}

/**
* Implements the core algorithm of loading data of a [File] from the [PersistenceLayer]
* and serializing it into an array of bytes, ready to be transferred.
*
* We use the {@param loader} to access the measurement data. FIXME?
*
* We assemble the data using a buffer to avoid OOM exceptions.
*
* **ATTENTION:** The caller must make sure the {@param bufferedOutputStream} is closed when no longer needed
* or the app crashes.
*
* @param bufferedOutputStream The `OutputStream` to which the serialized data should be written. Injecting
* this allows us to compress the serialized data without the need to write it into a temporary file.
* We require a [BufferedOutputStream] for performance reasons.
* @param reference The [de.cyface.persistence.model.File] to load
* @throws CursorIsNullException If {@link ContentProvider} was inaccessible.
*/
@JvmStatic
@Throws(CursorIsNullException::class)
suspend fun loadSerializedFile(
bufferedOutputStream: BufferedOutputStream,
reference: File,
) {
val file = loadFile(reference)

val builder = de.cyface.protos.model.Measurement.newBuilder()
.setFormatVersion(MeasurementSerializer.TRANSFER_FILE_FORMAT_VERSION.toInt());
when (reference.type) {
FileType.CSV -> {
builder.capturingLog = file
}

FileType.JPG -> {
builder.addAllImages(mutableListOf(file))
}

else -> {
throw IllegalArgumentException("Unsupported type: ${reference.type}")
}
}

// Currently loading one image per transfer file into memory (~ 2-5 MB / image).
// - To add all high-res image data or video data in the future we cannot use the pre-compiled
// builder but have to stream the data without loading it into memory to avoid an OOM exception.
val transferFileHeader = DataSerializable.transferFileHeader()
val measurementBytes = builder.build().toByteArray()
try {
// The stream must be closed by the caller in a finally catch
withContext(Dispatchers.IO) {
bufferedOutputStream.write(transferFileHeader)
bufferedOutputStream.write(measurementBytes)
bufferedOutputStream.flush()
}
} catch (e: IOException) {
throw IllegalStateException(e)
}
Log.d(
TAG, String.format(
"Serialized file: %s",
DataSerializable.humanReadableSize(
(transferFileHeader.size + measurementBytes.size).toLong(),
true
)
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ class SyncAdapter private constructor(

for (index in 0 until measurementCount) {
val measurement = measurements[index]
if (error) return

val fileCount = persistence.fileDao!!.countByMeasurementId(measurement.id)
Log.d(TAG, "Preparing to upload Measurement (id ${measurement.id}) with $fileCount attachments.")
Expand Down Expand Up @@ -282,27 +281,39 @@ class SyncAdapter private constructor(

for (fileIndex in 0 until syncableFileCount) {
val file = files[fileIndex]
if (error) return

Log.d(TAG, "Preparing to upload File (id ${file.id}).")
validateFileFormat(file)

val transferFile = file.path.toFile() // FIXME: Needs to be wrapped as cyf

if (isSyncRequestAborted(account, authority)) return

val indexWithinMeasurement = fileIndex + 1 // the core file is index 0
val progressListener = DefaultUploadProgressListener(measurementCount, index, measurement.id, fileCount, indexWithinMeasurement, progressListeners)
error = !syncAttachment(
file.id,
metaData,
syncPerformer,
transferFile,
syncResult,
fromBackground,
persistence,
progressListener
)
var transferTempFile: File? = null
try {
transferTempFile = serializeFile(file, persistence)

if (isSyncRequestAborted(account, authority)) return

val indexWithinMeasurement = fileIndex + 1 // the core file is index 0
val progressListener = DefaultUploadProgressListener(
measurementCount,
index,
measurement.id,
fileCount,
indexWithinMeasurement,
progressListeners
)
error = !syncAttachment(
file.id,
metaData,
syncPerformer,
transferTempFile,
syncResult,
fromBackground,
persistence,
progressListener
)
if (error) return
} finally {
delete(transferTempFile)
}
}

persistence.markSyncableAttachmentsAs(MeasurementStatus.SYNCED, measurement.id)
Expand All @@ -325,6 +336,14 @@ class SyncAdapter private constructor(
return compressedTransferTempFile
}

private fun serializeFile(file: de.cyface.persistence.model.File, persistence: DefaultPersistenceLayer<DefaultPersistenceBehaviour?>): File? {
var transferTempFile: File?
runBlocking {
transferTempFile = MeasurementSerializer().writeSerializedFile(file, persistence)
}
return transferTempFile
}

private suspend fun syncMeasurement(
measurement: Measurement,
metaData: RequestMetaData,
Expand Down
Loading