From c48318ccfbc15a92977f67d5e26959efb6f1dd84 Mon Sep 17 00:00:00 2001 From: Anton Piliugin Date: Wed, 6 Jul 2022 07:14:42 +0500 Subject: [PATCH] Streams bug fix, version 1.0.32 --- package-lock.json | 2 +- package.json | 4 ++-- src/Response.js | 42 ++++++++++++++++++++++++++---------------- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/package-lock.json b/package-lock.json index e816e1e..72ae7fc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "uquik", - "version": "1.0.25", + "version": "1.0.32", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index b44b8a3..04897af 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "uquik", - "version": "1.0.31", + "version": "1.0.32", "description": "uQuik HTTP(S) framework", "main": "index.js", "scripts": { @@ -44,4 +44,4 @@ "eslint-plugin-promise": "^6.0.0", "nanobench": "^2.1.1" } -} \ No newline at end of file +} diff --git a/src/Response.js b/src/Response.js index f6c523c..d8159cc 100644 --- a/src/Response.js +++ b/src/Response.js @@ -330,7 +330,7 @@ class Response extends Writable { // Attempt to write the body to the client and end the response if (!this.streaming && !body) { // Add Content-Length where it's needed - if (!this.hasContentLength && this.wrapped_request.method !== 'HEAD') this.raw_response.writeHeader('Content-Length', '0') + if (!this.chunked && !this.hasContentLength && this.wrapped_request.method !== 'HEAD') this.raw_response.writeHeader('Content-Length', '0') // Send the response with the uWS.HttpResponse.endWithoutBody(length, close_connection) method as we have no body data // NOTE: This method is completely undocumented by uWS but exists in the source code to solve the problem of no body being sent with a custom content-length this.raw_response.endWithoutBody() @@ -417,22 +417,32 @@ class Response extends Writable { // Pause the readable stream to prevent any further data from being read stream.pause() + this.raw_response.stream_lastOffset = lastOffset + this.raw_response.stream_chunk = chunk + // Bind a drain handler which will resume the once the backpressure is cleared this.drain((offset) => { - // On failure the timeout will start - const [ok, done] = this.raw_response.tryEnd(chunk.slice(offset - lastOffset), totalSize) - if (done) { - if (!stream.destroyed) stream.destroy() - } else if (ok) { - // We sent a chunk and it was not the last one, so let's resume reading. - // Timeout is still disabled, so we can spend any amount of time waiting - // for more chunks to send. - if (stream.isPaused()) stream.resume() + if (this.completed) return !stream.destroyed && stream.destroy() + + if (totalSize) { + const [ok, done] = this.raw_response.tryEnd(this.raw_response.stream_chunk.slice(offset - this.raw_response.stream_lastOffset), totalSize) + if (done) { + if (!stream.destroyed) stream.destroy() + } else if (ok) { + // We sent a chunk and it was not the last one, so let's resume reading. + // Timeout is still disabled, so we can spend any amount of time waiting + // for more chunks to send. + if (stream.readable && stream.isPaused()) stream.resume() + } + + // We always have to return true/false in onWritable. + // If you did not send anything, return true for success. + return ok + } else { + if (stream.readable && stream.isPaused()) stream.resume() + + return !stream.isPaused() } - - // We always have to return true/false in onWritable. - // If you did not send anything, return true for success. - return ok }) } } @@ -452,6 +462,7 @@ class Response extends Writable { // Do not allow streaming if response has already been aborted or completed if (!this.completed) { + this.chunked = true // Bind an 'abort' event handler which will destroy the consumed stream if request is aborted this.on('abort', () => { if (!readable.destroyed) readable.destroy() @@ -465,8 +476,7 @@ class Response extends Writable { // Bind listeners to end request on stream closure if no total size was specified and thus we delivered with chunked transfer if (!totalSize) { - const endRequest = () => this.send() - readable.once('end', endRequest) + readable.once('end', () => this.send()) } } }