From e19b84f8427e01ae4d042b4dcfe48917198a08bf Mon Sep 17 00:00:00 2001 From: Yurii Zhuk Date: Sun, 29 Sep 2024 12:39:00 +0300 Subject: [PATCH] Refactored file upload process to increase speed by allowing more chunks in the stream without ACK. --- mobile-sdk/build.gradle | 2 +- .../mobile_sdk/data/chats/FileUploader.kt | 383 +++++++++--------- .../mobile_sdk/data/chats/UploadProcess.kt | 52 +++ .../mobile_sdk/data/chats/UploadRequest.kt | 13 + 4 files changed, 267 insertions(+), 183 deletions(-) create mode 100644 mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/UploadProcess.kt create mode 100644 mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/UploadRequest.kt diff --git a/mobile-sdk/build.gradle b/mobile-sdk/build.gradle index abaeecd..effba55 100644 --- a/mobile-sdk/build.gradle +++ b/mobile-sdk/build.gradle @@ -122,7 +122,7 @@ afterEvaluate { groupId = "com.webitel" artifactId = "mobile-sdk" - version = "0.16.1" + version = "0.16.2" } } } diff --git a/mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/FileUploader.kt b/mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/FileUploader.kt index 6cd7ef7..a36a63e 100644 --- a/mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/FileUploader.kt +++ b/mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/FileUploader.kt @@ -3,7 +3,6 @@ package com.webitel.mobile_sdk.data.chats import com.google.protobuf.ByteString import com.webitel.mobile_sdk.data.grps.ChatApi import com.webitel.mobile_sdk.data.portal.WebitelPortalClient.Companion.logger -import com.webitel.mobile_sdk.domain.CallbackListener import com.webitel.mobile_sdk.domain.Code import com.webitel.mobile_sdk.domain.Error import com.webitel.mobile_sdk.domain.File @@ -16,20 +15,29 @@ import io.grpc.StatusRuntimeException import io.grpc.stub.ClientCallStreamObserver import io.grpc.stub.ClientResponseObserver import webitel.portal.Media +import java.util.concurrent.atomic.AtomicInteger internal class FileUploader(val api: ChatApi) { + // The maximum number of active chunks waiting for a response + private val maxActiveChunks = 3 + private val activeChunksCount = AtomicInteger(0) + private var activeProcess: UploadProcess? = null private var isUploading = false + private var isChunkSending = false // Queue to manage upload requests private val uploadQueue: ArrayList = arrayListOf() - private var chunkSize = 1024 - private val maxChunkSize = 65536 - private val minChunkSize = 1024 - private var lastConfirmationTime = 0.0 + private var length: Int = 0 + private var lastConfirmationTime = 0.0 + + private var chunkSize = 2048 + private val maxChunkSize = 65536 + private val minChunkSize = 2048 + //private var startUploadAt = 0.0 @@ -64,7 +72,7 @@ internal class FileUploader(val api: ChatApi) { it.transferRequest.listener?.onCanceled() } } - if (request != null) { + if (request == null) { logger.warn( "Upload Cancel", "Cancellation failed - process not found, already canceled, or completed" @@ -86,69 +94,73 @@ internal class FileUploader(val api: ChatApi) { var call: ClientCallStreamObserver? = null val responseObserver = object : ClientResponseObserver { - override fun onNext(value: Media.UploadProgress?) { - if (value == null) return - - if (value.hasEnd()) { -// val timeSpend = (System.currentTimeMillis() / 1000.0) - startUploadAt -// logger.debug("timeSpend", "${timeSpend}") - logger.debug("uploadFile", "stream completion received - $value") - return - } + override fun onNext(value: Media.UploadProgress?) { + if (value == null) return - if (value.hasStat()) { - logger.debug("uploadFile", "File was uploaded - ${value.stat}") - isUploading = false - chunkSize = minChunkSize - call = null - process.setStream(null) - activeProcess = null - - val file = File( - id = value.stat.file.id, - fileName = value.stat.file.name, - type = value.stat.file.type, - size = value.stat.file.size - ) - val result = UploadResult(file, value.stat.hashMap) - request.callback.onSuccess(result) - releaseNextRequest() + if (value.hasEnd()) { +// val timeSpend = (System.currentTimeMillis() / 1000.0) - startUploadAt +// logger.debug("timeSpend", "${timeSpend}") + logger.debug("uploadFile", "stream completion received - $value") + return + } - } else { - if (value.part.pid.isNotEmpty()) { - request.transferRequest.listener?.onStarted(value.part.pid) - process.processId = value.part.pid - - if (value.part.size > 0) { - try { - request.transferRequest.stream.skip(value.part.size) - } catch (e: Exception) { - isUploading = false - chunkSize = minChunkSize - call?.cancel(e.message, e) - call = null - } - } - sendChunk(request, call) + if (value.hasStat()) { + logger.debug("uploadFile", "File was uploaded - ${value.stat}") + activeChunksCount.set(0) + isUploading = false + chunkSize = minChunkSize + call = null + process.setStream(null) + activeProcess = null + + val file = File( + id = value.stat.file.id, + fileName = value.stat.file.name, + type = value.stat.file.type, + size = value.stat.file.size + ) + val result = UploadResult(file, value.stat.hashMap) + request.callback.onSuccess(result) + releaseNextRequest() - } else { - handleChunkProgress(value.part.size, request, call) + } else { + if (value.part.pid.isNotEmpty()) { + request.transferRequest.listener?.onStarted(value.part.pid) + process.setPid(value.part.pid) + + if (value.part.size > 0) { + try { + request.transferRequest.stream.skip(value.part.size) + } catch (e: Exception) { + isUploading = false + call?.cancel(e.message, e) + call = null + } } + processQueue(process) + + } else { + handleChunkProgress(value.part.size, process) } } + } - override fun onError(t: Throwable) { - handleUploadError(t, request, process) - call = null - } + override fun onError(t: Throwable) { + handleUploadError(t, process) + process.setStream(null) + call = null + } - override fun onCompleted() {} + override fun onCompleted() { + process.setStream(null) + call = null + } - override fun beforeStart(requestStream: ClientCallStreamObserver?) { - call = requestStream - process.setStream(requestStream) - } + override fun beforeStart(requestStream: ClientCallStreamObserver?) { + call = requestStream + process.setStream(requestStream) } + } sendMetadata(request.transferRequest, responseObserver) } @@ -159,49 +171,22 @@ internal class FileUploader(val api: ChatApi) { */ private fun handleChunkProgress( partSize: Long, - request: UploadRequest, - call: ClientCallStreamObserver? + process: UploadProcess ) { - if (partSize > 0) { logger.debug("handleChunkProgress", "Chunk of size $partSize uploaded successfully.") - setUpChunkSize() - // Inform the listener about the progress of the upload. - request.transferRequest.listener?.onProgress(partSize) - } - - // Process the queue to upload the next chunk if there are more chunks to be sent. - sendChunk(request, call) - } - + if (activeChunksCount.get() > 0) { + // Decrement the count of active chunks since one chunk has been acknowledged. + activeChunksCount.decrementAndGet() + } - private fun setUpChunkSize() { - val now = System.currentTimeMillis() / 1000.0 - val elapsedTime = now - lastConfirmationTime - lastConfirmationTime = now - adjustChunkSize(elapsedTime) - } + setUpChunkSize() + processQueue(process) - private fun adjustChunkSize(elapsedTime: Double) { - val targetTime = 0.3 - if (elapsedTime < targetTime && chunkSize < maxChunkSize) { - val x = if (elapsedTime < 0.2) { 1.25 } - else { 1.1 } - logger.debug( - "Adjust Size", - "Chunk size adjustment: factor = $x, elapsed time = $elapsedTime" - ) - chunkSize = minOf((chunkSize * x).toInt(), maxChunkSize) - } else if (elapsedTime > targetTime && chunkSize > minChunkSize) { - val x = if (elapsedTime > 0.4) { 0.75 } - else { 0.9 } - logger.debug( - "Adjust Size", - "Chunk size adjustment: factor = $x, elapsed time = $elapsedTime" - ) - chunkSize = maxOf((chunkSize * x).toInt(), minChunkSize) + // Inform the listener about the progress of the upload. + process.request.transferRequest.listener?.onProgress(partSize) } } @@ -209,21 +194,30 @@ internal class FileUploader(val api: ChatApi) { /** * Handles errors during the upload process. */ - private fun handleUploadError(t: Throwable, request: UploadRequest, process: UploadProcess?) { + private fun handleUploadError(t: Throwable, process: UploadProcess) { + activeChunksCount.set(0) isUploading = false - chunkSize = minChunkSize - process?.setStream(null) + isChunkSending = false activeProcess = null + chunkSize = minChunkSize + sendErrorEvent(t, process) releaseNextRequest() + } + + + @Synchronized + private fun sendErrorEvent(t: Throwable, process: UploadProcess) { + if (process.isReported()) return + process.setReported() + val err = parseError(t) when (err.message) { cancel_file_transfer -> { - request.transferRequest.listener?.onCanceled() + process.request.transferRequest.listener?.onCanceled() logger.debug("sendFile", "File upload canceled") } - else -> { - request.callback.onError(err) + process.request.callback.onError(err) logger.error("sendFile", "File upload error - ${err.message}") } } @@ -234,9 +228,11 @@ internal class FileUploader(val api: ChatApi) { * Cancels the active upload process. */ private fun cancelActiveProcess(process: UploadProcess, cleanUp: Boolean) { + activeChunksCount.set(0) isUploading = false - chunkSize = minChunkSize + isChunkSending = false activeProcess = null + chunkSize = minChunkSize if (process.isCompleted()) { throw InvalidStateException( @@ -244,7 +240,12 @@ internal class FileUploader(val api: ChatApi) { ) } - process.call?.cancel( + val listener = + if (process.getStream() == null) + process.request.transferRequest.listener + else null + + process.getStream()?.cancel( cancel_file_transfer, Status.CANCELLED.asException() ) @@ -252,11 +253,7 @@ internal class FileUploader(val api: ChatApi) { if (cleanUp) { process.completed() - val listener = - if (process.call == null) process.request.transferRequest.listener - else null - - killUpload(process.processId, listener) + killUpload(process.getPid(), listener) } } @@ -265,7 +262,7 @@ internal class FileUploader(val api: ChatApi) { * Releases the next request from the queue and starts the upload. */ @Synchronized - fun releaseNextRequest() { + private fun releaseNextRequest() { val nextRequest = uploadQueue.removeFirstOrNull() nextRequest?.let { logger.debug("Next request to upload", "File: ${it.transferRequest.fileName}") @@ -278,7 +275,7 @@ internal class FileUploader(val api: ChatApi) { * Releases a specific request by ID from the queue. */ @Synchronized - fun releaseRequest(id: String): UploadRequest? { + private fun releaseRequest(id: String): UploadRequest? { val request = uploadQueue.firstOrNull { it.id == id } request?.let { uploadQueue.remove(it) } return request @@ -290,31 +287,30 @@ internal class FileUploader(val api: ChatApi) { */ private fun killUpload(pid: String, listener: UploadListener?) { var stream: ClientCallStreamObserver? = null - val responseObserver = - object : ClientResponseObserver { - override fun onNext(value: Media.UploadProgress?) { - if (!value?.part?.pid.isNullOrEmpty()) { - stream?.onNext( - Media.UploadRequest.newBuilder() - .setKill(Media.UploadRequest.Abort.newBuilder().build()) - .build() - ) - } + val responseObserver = object : ClientResponseObserver { + override fun onNext(value: Media.UploadProgress?) { + if (!value?.part?.pid.isNullOrEmpty()) { + stream?.onNext( + Media.UploadRequest.newBuilder() + .setKill(Media.UploadRequest.Abort.newBuilder().build()) + .build() + ) } + } - override fun onError(t: Throwable) { - logger.error("sendFile", "Kill upload error - ${parseError(t)}") - } + override fun onError(t: Throwable) { + logger.error("sendFile", "Kill upload error - ${parseError(t)}") + } - override fun onCompleted() { - listener?.onCanceled() - logger.debug("sendFile", "Data on the server is cleared") - } + override fun onCompleted() { + listener?.onCanceled() + logger.debug("sendFile", "Data on the server is cleared") + } - override fun beforeStart(requestStream: ClientCallStreamObserver?) { - stream = requestStream - } + override fun beforeStart(requestStream: ClientCallStreamObserver?) { + stream = requestStream } + } val req = Media.UploadRequest.newBuilder().setPid(pid).build() val st = api.uploadFile(responseObserver) @@ -323,39 +319,64 @@ internal class FileUploader(val api: ChatApi) { @Synchronized - private fun sendChunk( - request: UploadRequest, - call: ClientCallStreamObserver? - ) { - if (!isUploading) { + private fun processQueue(process: UploadProcess) { + if (!isUploading || activeChunksCount.get() > maxActiveChunks || isChunkSending) { logger.debug( "Chunk Upload", - "Upload completed or canceled - Uploading: $isUploading" + "Cannot start sending chunks - Uploading: $isUploading, " + + "Active chunks: ${activeChunksCount.get()}/$maxActiveChunks, Chunk sending: $isChunkSending" ) return } + sandChunk(process) + } + + + private fun sandChunk(process: UploadProcess) { + if (activeChunksCount.get() > maxActiveChunks) { + return + } - val data = ByteArray(chunkSize) try { - request.transferRequest.stream.read(data).also { length = it } - if (data.isNotEmpty()) { + var currentSize = chunkSize + var data = ByteArray(currentSize) + while (process.request.transferRequest.stream.read(data).also { length = it } > 0) { + isChunkSending = true logger.debug( "Chunk Upload", - "Chunk size: $length bytes" + "Chunk size: $length bytes, Active: ${activeChunksCount.get()}" ) + val chunk = Media.UploadRequest.newBuilder() .setPart(ByteString.copyFrom(data, 0, length)) .build() if (!isUploading) { - return + isChunkSending = false + break + } + + process.getStream()?.onNext(chunk) + + if (chunkSize != currentSize) { + logger.debug( + "Chunk Upload", + "currentSize - $currentSize; chunkSize: ${chunkSize}" + ) + currentSize = chunkSize + data = ByteArray(currentSize) + } + + if (activeChunksCount.incrementAndGet() > maxActiveChunks || !isUploading) { + isChunkSending = false + break } - call?.onNext(chunk) } + finalizeSendingIfNeeded(process) - finalizeSendingIfNeeded(request, call) } catch (e: Exception) { - handleUploadError(e, request, activeProcess) + handleUploadError(e, process) + process.getStream()?.cancel(e.message, e) } } @@ -363,18 +384,45 @@ internal class FileUploader(val api: ChatApi) { /** * Finalizes the sending process if all bytes are sent. */ - private fun finalizeSendingIfNeeded( - request: UploadRequest, - call: ClientCallStreamObserver? - ) { - if (isUploading && request.transferRequest.stream.available() == 0) { + private fun finalizeSendingIfNeeded(process: UploadProcess) { + if (isUploading && process.request.transferRequest.stream.available() == 0) { logger.debug("sendFile", "All bytes sent to stream") isUploading = false + isChunkSending = false chunkSize = minChunkSize try { - call?.onCompleted() - } catch (_: Exception) { - } + process.getStream()?.onCompleted() + } catch (_: Exception) {} + } else { + isChunkSending = false + } + } + + + private fun setUpChunkSize() { + val now = System.currentTimeMillis() / 1000.0 + val elapsedTime = now - lastConfirmationTime + lastConfirmationTime = now + adjustChunkSize(elapsedTime) + } + + + private fun adjustChunkSize(elapsedTime: Double) { + val targetTime = 0.7 + if (elapsedTime < targetTime && chunkSize < maxChunkSize) { + val x = 1.1 + logger.debug( + "Adjust Size", + "Chunk size adjustment: factor = $x, elapsed time = $elapsedTime" + ) + chunkSize = minOf((chunkSize * x).toInt(), maxChunkSize) + } else if (elapsedTime > targetTime && chunkSize > minChunkSize) { + val x = 0.9 + logger.debug( + "Adjust Size", + "Chunk size adjustment: factor = $x, elapsed time = $elapsedTime" + ) + chunkSize = maxOf((chunkSize * x).toInt(), minChunkSize) } } @@ -419,33 +467,4 @@ internal class FileUploader(val api: ChatApi) { ) } } - - - // Class representing an active upload process - data class UploadProcess(val request: UploadRequest) { - var call: ClientCallStreamObserver? = null - var processId = "" - - private var isCompleted = false - - fun isCompleted(): Boolean { - return isCompleted - } - - fun completed() { - isCompleted = true - } - - fun setStream(call: ClientCallStreamObserver?) { - this.call = call - } - } -} - - -internal data class UploadRequest( - val id: String, - val dialog: WebitelDialog, - val transferRequest: FileTransferRequest, - val callback: CallbackListener -) \ No newline at end of file +} \ No newline at end of file diff --git a/mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/UploadProcess.kt b/mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/UploadProcess.kt new file mode 100644 index 0000000..9c8cb75 --- /dev/null +++ b/mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/UploadProcess.kt @@ -0,0 +1,52 @@ +package com.webitel.mobile_sdk.data.chats + +import io.grpc.stub.ClientCallStreamObserver +import webitel.portal.Media + + +// Class representing an active upload process +internal data class UploadProcess(val request: UploadRequest) { + private var call: ClientCallStreamObserver? = null + private var pid = "" + + private var isCompleted = false + private var reported = false + + fun isCompleted(): Boolean { + return isCompleted + } + + fun completed() { + isCompleted = true + } + + + fun setReported() { + reported = true + } + + + fun setPid(pid: String) { + this.pid = pid + } + + + fun getPid(): String { + return pid + } + + + fun isReported(): Boolean { + return reported + } + + + fun getStream(): ClientCallStreamObserver? { + return call + } + + + fun setStream(call: ClientCallStreamObserver?) { + this.call = call + } +} \ No newline at end of file diff --git a/mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/UploadRequest.kt b/mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/UploadRequest.kt new file mode 100644 index 0000000..f320763 --- /dev/null +++ b/mobile-sdk/src/main/java/com/webitel/mobile_sdk/data/chats/UploadRequest.kt @@ -0,0 +1,13 @@ +package com.webitel.mobile_sdk.data.chats + +import com.webitel.mobile_sdk.domain.CallbackListener +import com.webitel.mobile_sdk.domain.FileTransferRequest +import com.webitel.mobile_sdk.domain.UploadResult + + +internal data class UploadRequest( + val id: String, + val dialog: WebitelDialog, + val transferRequest: FileTransferRequest, + val callback: CallbackListener +) \ No newline at end of file