Skip to content

Commit

Permalink
Refactor processQueue: add exit condition to chunk sending loop
Browse files Browse the repository at this point in the history
  • Loading branch information
yurizhuk committed Sep 8, 2024
1 parent ca46b88 commit 8aa104c
Showing 1 changed file with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ internal class FileUploader(val api: ChatApi) {
} catch (e: Exception) {
isUploading = false
call?.cancel(e.message, e)
call = null
}
}
processQueue(request, call)
Expand Down Expand Up @@ -311,6 +312,7 @@ internal class FileUploader(val api: ChatApi) {
)
return
}

handler.make {
if (activeChunksCount.get() > maxActiveChunks) {
return@make
Expand All @@ -319,20 +321,30 @@ internal class FileUploader(val api: ChatApi) {
try {
while (request.transferRequest.stream.read(fiveKB).also { length = it } > 0) {
isChunkSending = true
logger.debug("Chunk Upload", "Chunk size: $length bytes, Active: ${activeChunksCount.get()}")
logger.debug(
"Chunk Upload",
"Chunk size: $length bytes, Active: ${activeChunksCount.get()}"
)

val chunk = Media.UploadRequest.newBuilder()
.setPart(ByteString.copyFrom(fiveKB, 0, length))
.build()

if (!isUploading) {
isChunkSending = false
break
}

call?.onNext(chunk)
if (activeChunksCount.incrementAndGet() > maxActiveChunks) {

if (activeChunksCount.incrementAndGet() > maxActiveChunks || !isUploading) {
isChunkSending = false
break
}
}

finalizeSendingIfNeeded(request, call)

} catch (e: Exception) {
handleUploadError(e, request, activeProcess)
}
Expand Down

0 comments on commit 8aa104c

Please sign in to comment.