From 378aaeac1b945659446c203061d7524d83d8fb2f Mon Sep 17 00:00:00 2001 From: Dominik Przybysz Date: Mon, 11 Mar 2024 08:31:38 +0100 Subject: [PATCH] SNOW-1215393: Test chunk fetching causing out of memory --- ci/container/test_component.sh | 6 +++--- lib/connection/result/chunk.js | 15 ++++++++++++++- lib/connection/result/result_stream.js | 1 + lib/connection/result/row_stream.js | 1 + package.json | 2 +- test/integration/testLargeResultSet.js | 20 +++++++++++++++----- 6 files changed, 35 insertions(+), 10 deletions(-) diff --git a/ci/container/test_component.sh b/ci/container/test_component.sh index 28c62832e..6814b2584 100755 --- a/ci/container/test_component.sh +++ b/ci/container/test_component.sh @@ -7,7 +7,7 @@ THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" export WORKSPACE=${WORKSPACE:-/mnt/workspace} export SOURCE_ROOT=${SOURCE_ROOT:-/mnt/host} export DRIVER_NAME=nodejs -export TIMEOUT=180000 +export TIMEOUT=360000 export SF_OCSP_TEST_OCSP_RESPONDER_TIMEOUT=1000 [[ -z "$GIT_BRANCH" ]] && echo "Set GIT_BRANCH to test" && exit 1 @@ -70,11 +70,11 @@ python3 $THIS_DIR/hang_webserver.py 12345 > hang_webserver.out 2>&1 & if [[ "$SHOULD_GENERATE_COVERAGE_REPORT" == "1" ]]; then MOCHA_CMD=( - "npx" "nyc" "--reporter=lcov" "--reporter=text" "mocha" "--exit" "--timeout" "$TIMEOUT" "--recursive" "--full-trace" + "npx" "nyc" "--reporter=lcov" "--reporter=text" "mocha" "--exit" "--timeout" "$TIMEOUT" "--recursive" "--full-trace" "--max-old-space-size=150" ) else MOCHA_CMD=( - "mocha" "--timeout" "$TIMEOUT" "--recursive" "--full-trace" + "mocha" "--timeout" "$TIMEOUT" "--recursive" "--full-trace" "--max-old-space-size=150" ) fi diff --git a/lib/connection/result/chunk.js b/lib/connection/result/chunk.js index c96b77f9e..d8a424ebe 100644 --- a/lib/connection/result/chunk.js +++ b/lib/connection/result/chunk.js @@ -160,6 +160,8 @@ Chunk.prototype.clearRows = function (force) { // clear out all row and rowset related fields this._rowsetAsString = this._rowset = this._rows = undefined; } + this._isProcessed = true; + console.log(`Cleared chunk ${this._id}`); }; /** @@ -199,6 +201,16 @@ Chunk.prototype.isLoading = function () { * @param callback */ Chunk.prototype.load = function (callback) { + // if (this.isLoaded()) { + // console.log(`Ordered fetch of chunk ${this._id} when is already loaded - skipping`); + // return; + // } + if (this._isProcessed) { + console.log(`Ordered fetch of chunk ${this._id} when is already processed`); + } + if (this.isLoaded()) { + console.log(`Ordered fetch of chunk ${this._id} when is already loaded`); + } // we've started loading this._isLoading = true; @@ -237,6 +249,7 @@ Chunk.prototype.load = function (callback) { // if the request succeeded, save the // body as a string version of the rowset if (!err) { + console.log(`Fetched chunk ${self._id}`); self._rowsetAsString = body; } @@ -257,7 +270,7 @@ Chunk.prototype.load = function (callback) { * @private */ function buildId(startIndex, endIndex) { - return Util.format('s=%d, e=%d', startIndex, endIndex); + return Util.format('s=%d, e=%d size=%d', startIndex, endIndex, endIndex - startIndex + 1); } /** diff --git a/lib/connection/result/result_stream.js b/lib/connection/result/result_stream.js index 82fb675c4..b1ccae53a 100644 --- a/lib/connection/result/result_stream.js +++ b/lib/connection/result/result_stream.js @@ -84,6 +84,7 @@ function ResultStream(options) { for (index = currChunk; index < chunks.length && index <= (currChunk + prefetchSize); index++) { chunk = chunks[index]; if (!chunk.isLoading()) { + console.log(`Fetching chunk ${chunk._id}`); chunk.load(); } } diff --git a/lib/connection/result/row_stream.js b/lib/connection/result/row_stream.js index 0c72627d5..dee6b9da7 100644 --- a/lib/connection/result/row_stream.js +++ b/lib/connection/result/row_stream.js @@ -196,6 +196,7 @@ function RowStream(statement, context, options) { rowBuffer = chunk.getRows().slice( Math.max(chunkStart, start) - chunkStart, Math.min(chunkEnd, end) + 1 - chunkStart); + console.log(`Row buffer length is ${rowBuffer.length}` ); // reset the row index rowIndex = 0; diff --git a/package.json b/package.json index 430b58813..82c636d2b 100644 --- a/package.json +++ b/package.json @@ -61,7 +61,7 @@ "lint:check:all:errorsOnly": "npm run lint:check:all -- --quiet", "lint:fix": "eslint --fix", "test": "mocha -timeout 180000 --recursive --full-trace test/unit/**/*.js test/unit/*.js", - "test:integration": "mocha -timeout 180000 --recursive --full-trace test/integration/**/*.js test/integration/*.js", + "test:integration": "mocha -timeout 360000 --recursive --full-trace test/integration/**/*.js test/integration/*.js", "test:single": "mocha -timeout 180000 --full-trace", "test:system": "mocha -timeout 180000 --recursive --full-trace system_test/*.js", "test:unit": "mocha -timeout 180000 --recursive --full-trace test/unit/**/*.js test/unit/*.js", diff --git a/test/integration/testLargeResultSet.js b/test/integration/testLargeResultSet.js index 460fb5c04..d12dbfd78 100644 --- a/test/integration/testLargeResultSet.js +++ b/test/integration/testLargeResultSet.js @@ -5,31 +5,41 @@ const assert = require('assert'); const async = require('async'); const testUtil = require('./testUtil'); const { randomizeName } = require('./testUtil'); +const { writeHeapSnapshot } = require('node:v8'); describe('Large result Set Tests', function () { - const sourceRowCount = 10000; - + const sourceRowCount = 1000000; let connection; const selectAllFromOrders = `select randstr(1000,random()) from table(generator(rowcount=>${sourceRowCount}))`; - + // const selectAllFromOrders = `select * from fake_sample_data.public.customer limit 400000`; + // const selectAllFromOrders = `select * from fake_sample_data.public.customer`; before(async () => { - connection = testUtil.createConnection(); + // configureLogger('TRACE'); + connection = testUtil.createConnection({ + resultPrefetch: 2, + }); await testUtil.connectAsync(connection); + await testUtil.executeCmdAsync(connection, "alter session set CLIENT_RESULT_CHUNK_SIZE=48"); }); after(async () => { await testUtil.destroyConnectionAsync(connection); }); - it('testSimpleLarge', function (done) { + it.only('testSimpleLarge', function (done) { connection.execute({ sqlText: selectAllFromOrders, + streamResult: true, complete: function (err, stmt) { testUtil.checkError(err); const stream = stmt.streamRows(); let rowCount = 0; stream.on('data', function () { rowCount++; + if (rowCount % 5000 === 0) { + console.log(`Row count: ${rowCount}`); + // console.log(writeHeapSnapshot()); + } }); stream.on('error', function (err) { testUtil.checkError(err);