diff --git a/lib/connection/connection.js b/lib/connection/connection.js index 7e1b31a02..88af4eb77 100644 --- a/lib/connection/connection.js +++ b/lib/connection/connection.js @@ -5,6 +5,7 @@ const { v4: uuidv4 } = require('uuid'); const Url = require('url'); const QueryString = require('querystring'); const GSErrors = require('../constants/gs_errors') +const QueryStatus = require('../constants/query_status').code; var Util = require('../util'); var Errors = require('../errors'); @@ -37,6 +38,15 @@ function Connection(context) // generate an id for the connection var id = uuidv4(); + // async max retry and retry pattern from python connector + const asyncNoDataMaxRetry = 24; + const asyncRetryPattern = [1, 1, 2, 3, 4, 8, 10]; + const asyncRetryInMilliseconds = 500; + + // Custom regex based on uuid validate + // Unable to directly use uuid validate because the queryId returned from the server doesn't match the regex + const queryIdRegex = new RegExp(/^(?:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|00000000-0000-0000-0000-000000000000)$/i); + //Make session tokens available for testing this.getTokens = function () { @@ -411,6 +421,181 @@ function Connection(context) return this; }; + /** + * Gets the response containing the status of the query based on queryId. + * + * @param {String} queryId + * + * @returns {Object} the query response + */ + async function getQueryResponse(queryId) { + // Check if queryId exists and is valid uuid + Errors.checkArgumentExists(Util.exists(queryId), + ErrorCodes.ERR_CONN_FETCH_RESULT_MISSING_QUERY_ID); + Errors.checkArgumentValid(queryIdRegex.test(queryId), + ErrorCodes.ERR_GET_RESPONSE_QUERY_INVALID_UUID, queryId); + + // Form the request options + const options = + { + method: 'GET', + url: Url.format( + { + pathname: `/monitoring/queries/${queryId}` + }), + }; + + // Get the response containing the query status + const response = await services.sf.requestAsync(options); + + return response['body']; + } + + /** + * Extracts the status of the query from the query response. + * + * @param {Object} queryResponse + * + * @returns {String} the query status. + */ + function extractQueryStatus(queryResponse) { + const queries = queryResponse['data']['queries']; + let status = QueryStatus.NO_DATA; // default status + if (queries.length > 0) { + status = queries[0]['status']; + } + + return status; + } + + /** + * Gets the status of the query based on queryId. + * + * @param {String} queryId + * + * @returns {String} the query status. + */ + this.getQueryStatus = async function (queryId) { + return extractQueryStatus(await getQueryResponse(queryId)); + }; + + /** + * Gets the status of the query based on queryId and throws if there's an error. + * + * @param {String} queryId + * + * @returns {String} the query status. + */ + this.getQueryStatusThrowIfError = async function (queryId) { + const response = await getQueryResponse(queryId); + const queries = response['data']['queries']; + const status = extractQueryStatus(response); + let message, code, sqlState = null; + + if (this.isAnError(status)) { + message = response['message'] || ''; + code = response['code'] || -1; + + if (response['data']) { + message += queries.length > 0 ? queries[0]['errorMessage'] : ''; + sqlState = response['data']['sqlState']; + } + + throw Errors.createOperationFailedError( + code, response, message, sqlState); + } + + return status; + }; + + /** + * Gets the results from a previously ran query based on queryId + * + * @param {Object} options + * + * @returns {Object} + */ + this.getResultsFromQueryId = async function (options) { + const queryId = options.queryId; + let status, noDataCounter = 0, retryPatternPos = 0; + + // Wait until query has finished executing + for (;;) { + // Check if query is still running and trigger exception if it failed + status = await this.getQueryStatusThrowIfError(queryId); + if (!this.isStillRunning(status)) { + break; + } + + // Timeout based on python connector + await new Promise((resolve) => { + setTimeout(() => resolve(), asyncRetryInMilliseconds * asyncRetryPattern[retryPatternPos]); + }); + + // If no data, increment the no data counter + if (QueryStatus[status] == QueryStatus.NO_DATA) { + noDataCounter++; + // Check if retry for no data is exceeded + if (noDataCounter > asyncNoDataMaxRetry) { + throw Errors.createClientError( + ErrorCodes.ERR_GET_RESULTS_QUERY_ID_NO_DATA, true, queryId); + } + } + + if (retryPatternPos < asyncRetryPattern.length - 1) { + retryPatternPos++; + } + } + + if (QueryStatus[status] != QueryStatus.SUCCESS) { + throw Errors.createClientError( + ErrorCodes.ERR_GET_RESULTS_QUERY_ID_NOT_SUCCESS_STATUS, true, queryId, status); + } + + return this.fetchResult(options); + }; + + /** + * Checks whether the given status is currently running. + * + * @param {String} status + * + * @returns {Boolean} + */ + this.isStillRunning = function (status) { + const stillRunning = + [ + QueryStatus.RUNNING, + QueryStatus.RESUMING_WAREHOUSE, + QueryStatus.QUEUED, + QueryStatus.QUEUED_REPARING_WAREHOUSE, + QueryStatus.NO_DATA, + ].includes(QueryStatus[status]); + + return stillRunning; + }; + + /** + * Checks whether the given status means that there has been an error. + * + * @param {String} status + * + * @returns {Boolean} + */ + this.isAnError = function (status) { + const isAnError = + [ + QueryStatus.ABORTING, + QueryStatus.FAILED_WITH_ERROR, + QueryStatus.ABORTED, + QueryStatus.FAILED_WITH_INCIDENT, + QueryStatus.DISCONNECTED, + QueryStatus.BLOCKED, + ].includes(QueryStatus[status]); + + return isAnError; + }; + /** * Returns a serialized version of this connection. * diff --git a/lib/connection/statement.js b/lib/connection/statement.js index 130d6bc30..a96c7f7d1 100644 --- a/lib/connection/statement.js +++ b/lib/connection/statement.js @@ -33,6 +33,11 @@ var statementTypes = FILE_POST_EXEC: 'FILE_POST_EXEC' }; +const queryCodes = { + QUERY_IN_PROGRESS: '333333', // GS code: the query is in progress + QUERY_IN_PROGRESS_ASYNC: '333334' // GS code: the query is detached +}; + exports.createContext = function ( options, services, connectionConfig) { @@ -187,6 +192,12 @@ exports.createStatementPostExec = function ( JSON.stringify(fetchAsString[invalidValueIndex])); } + // check for invalid asyncExec + if (Util.exists(statementOptions.asyncExec)) { + Errors.checkArgumentValid(Util.isBoolean(statementOptions.asyncExec), + ErrorCodes.ERR_CONN_CREATE_INVALID_ASYNC_EXEC); + } + const rowMode = statementOptions.rowMode; if (Util.exists(rowMode)) { RowMode.checkRowModeValid(rowMode); @@ -206,6 +217,7 @@ exports.createStatementPostExec = function ( statementContext.multiResultIds = statementOptions.multiResultIds; statementContext.multiCurId = statementOptions.multiCurId; statementContext.rowMode = statementOptions.rowMode; + statementContext.asyncExec = statementOptions.asyncExec; // set the statement type statementContext.type = (statementContext.type == statementTypes.ROW_PRE_EXEC) ? statementTypes.ROW_POST_EXEC : statementTypes.FILE_POST_EXEC; @@ -364,6 +376,12 @@ function createContextPreExec( RowMode.checkRowModeValid(rowMode); } + // if an asyncExec flag is specified, make sure it's boolean + if (Util.exists(statementOptions.asyncExec)) { + Errors.checkArgumentValid(Util.isBoolean(statementOptions.asyncExec), + ErrorCodes.ERR_CONN_CREATE_INVALID_ASYNC_EXEC); + } + // create a statement context var statementContext = createStatementContext(); @@ -374,6 +392,7 @@ function createContextPreExec( statementContext.multiResultIds = statementOptions.multiResultIds; statementContext.multiCurId = statementOptions.multiCurId; statementContext.rowMode = statementOptions.rowMode; + statementContext.asyncExec = statementOptions.asyncExec; // if a binds array is specified, add it to the statement context if (Util.exists(statementOptions.binds)) @@ -682,10 +701,13 @@ function invokeStatementComplete(statement, context) streamResult = context.connectionConfig.getStreamResult(); } - // if the result will be streamed later, + // if the result will be streamed later or in asyncExec mode, // invoke the complete callback right away if (streamResult) { context.complete(Errors.externalize(context.resultError), statement); + } else if (context.asyncExec) { + // return the result object with the query ID inside. + context.complete(null, statement, context.result); } else { process.nextTick(function () { // aggregate all the rows into an array and pass this @@ -779,6 +801,12 @@ function createOnStatementRequestSuccRow(statement, context) // if we don't already have a result if (!context.result) { + if (body.code === queryCodes.QUERY_IN_PROGRESS_ASYNC) { + context.result = { + queryId: body.data.queryId + }; + return; + } if (body.data.resultIds != undefined && body.data.resultIds.length > 0) { //multi statements @@ -1330,6 +1358,11 @@ function sendRequestPreExec(statementContext, onResultAvailable) json.queryContextDTO = statementContext.services.sf.getQueryContextDTO(); } + // include the asyncExec flag if a value was specified + if (Util.exists(statementContext.asyncExec)) { + json.asyncExec = statementContext.asyncExec; + } + // use the snowflake service to issue the request sendSfRequest(statementContext, { @@ -1645,9 +1678,15 @@ function buildResultRequestCallback( statementContext.queryId = body.data.queryId; // if the result is not ready yet, extract the result url from the response - // and issue a GET request to try to fetch the result again - if (body && (body.code === '333333' || body.code === '333334')) - { + // and issue a GET request to try to fetch the result again unless asyncExec is enabled. + if (body && (body.code === queryCodes.QUERY_IN_PROGRESS + || body.code === queryCodes.QUERY_IN_PROGRESS_ASYNC)) { + + if (statementContext.asyncExec) { + await onResultAvailable.call(null, err, body); + return; + } + // extract the result url from the response and try to get the result // again sendSfRequest(statementContext, diff --git a/lib/constants/error_messages.js b/lib/constants/error_messages.js index ad6b407c2..d8990e88d 100644 --- a/lib/constants/error_messages.js +++ b/lib/constants/error_messages.js @@ -68,6 +68,7 @@ exports[404040] = 'Invalid browser timeout value. The specified value must be a exports[404041] = 'Invalid disablQueryContextCache. The specified value must be a boolean.'; exports[404042] = 'Invalid includeRetryReason. The specified value must be a boolean.' exports[404043] = 'Invalid clientConfigFile value. The specified value must be a string.'; +exports[404044] = 'Invalid asyncExec. The specified value must be a boolean.' // 405001 exports[405001] = 'Invalid callback. The specified value must be a function.'; @@ -113,8 +114,8 @@ exports[409013] = 'Invalid requestId. The specified value must be a string.'; // 410001 exports[410001] = 'Fetch-result options must be specified.'; exports[410002] = 'Invalid options. The specified value must be an object.'; -exports[410003] = 'A statement id must be specified.'; -exports[410004] = 'Invalid statement id. The specified value must be a string.'; +exports[410003] = 'A query id/statement id must be specified.'; +exports[410004] = 'Invalid query id/statement id. The specified value must be a string.'; exports[410005] = 'Invalid complete callback. The specified value must be a function.'; exports[410006] = 'Invalid streamResult flag. The specified value must be a boolean.'; exports[410007] = 'Invalid fetchAsString value. The specified value must be an Array.'; @@ -151,3 +152,8 @@ exports[450004] = 'Invalid each() callback. The specified value must be a functi exports[450005] = 'An end() callback must be specified.'; exports[450006] = 'Invalid end() callback. The specified value must be a function.'; exports[450007] = 'Operation failed because the statement is still in progress.'; + +// 460001 +exports[460001] = 'Invalid queryId: %s'; +exports[460002] = 'Cannot retrieve data. No information returned from server for query %s'; +exports[460003] = 'Status of query %s is %s, results are unavailable'; diff --git a/lib/constants/query_status.js b/lib/constants/query_status.js new file mode 100644 index 000000000..2449aeff0 --- /dev/null +++ b/lib/constants/query_status.js @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2023 Snowflake Computing Inc. All rights reserved. + */ + +const code = {}; + +code.RUNNING = 'RUNNING'; +code.ABORTING = 'ABORTING'; +code.SUCCESS = 'SUCCESS'; +code.FAILED_WITH_ERROR = 'FAILED_WITH_ERROR'; +code.ABORTED = 'ABORTED'; +code.QUEUED = 'QUEUED'; +code.FAILED_WITH_INCIDENT = 'FAILED_WITH_INCIDENT'; +code.DISCONNECTED = 'DISCONNECTED'; +code.RESUMING_WAREHOUSE = 'RESUMING_WAREHOUSE'; +// purposeful typo.Is present in QueryDTO.java +code.QUEUED_REPARING_WAREHOUSE = 'QUEUED_REPARING_WAREHOUSE'; +code.RESTARTED = 'RESTARTED'; +code.BLOCKED = 'BLOCKED'; +code.NO_DATA = 'NO_DATA'; + +exports.code = code; diff --git a/lib/errors.js b/lib/errors.js index c04fe7b5a..b4f15f3f5 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -73,6 +73,7 @@ codes.ERR_CONN_CREATE_INVALID_BROWSER_TIMEOUT = 404040; codes.ERR_CONN_CREATE_INVALID_DISABLED_QUERY_CONTEXT_CACHE = 404041 codes.ERR_CONN_CREATE_INVALID_INCLUDE_RETRY_REASON =404042 codes.ERR_CONN_CREATE_INVALID_CLIENT_CONFIG_FILE = 404043; +codes.ERR_CONN_CREATE_INVALID_ASYNC_EXEC = 404044 // 405001 codes.ERR_CONN_CONNECT_INVALID_CALLBACK = 405001; @@ -158,6 +159,11 @@ codes.ERR_STMT_FETCH_ROWS_MISSING_END = 450005; codes.ERR_STMT_FETCH_ROWS_INVALID_END = 450006; codes.ERR_STMT_FETCH_ROWS_FETCHING_RESULT = 450007; +// 460001 +codes.ERR_GET_RESPONSE_QUERY_INVALID_UUID = 460001; +codes.ERR_GET_RESULTS_QUERY_ID_NO_DATA = 460002; +codes.ERR_GET_RESULTS_QUERY_ID_NOT_SUCCESS_STATUS = 460003; + exports.codes = codes; /** diff --git a/lib/http/base.js b/lib/http/base.js index 2d92f9650..2f83ebdf9 100644 --- a/lib/http/base.js +++ b/lib/http/base.js @@ -88,6 +88,44 @@ HttpClient.prototype.requestAsync = async function (options) return axios.request(requestOptions); }; +/** + * Issues an async HTTP request. + * + * @param {Object} options + * + * @returns {Object} an object representing the request that was issued. + */ +HttpClient.prototype.requestAsync = async function (options) { + // validate input + Errors.assertInternal(Util.isObject(options)); + Errors.assertInternal(Util.isString(options.method)); + Errors.assertInternal(Util.isString(options.url)); + Errors.assertInternal(!Util.exists(options.headers) || + Util.isObject(options.headers)); + Errors.assertInternal(!Util.exists(options.json) || + Util.isObject(options.json)); + + // normalize the headers + const headers = normalizeHeaders(options.headers); + + const timeout = options.timeout || + this._connectionConfig.getTimeout() || + DEFAULT_REQUEST_TIMEOUT; + + const requestOptions = + { + method: options.method, + url: options.url, + headers: headers, + data: options.json, + timeout: timeout + }; + + const response = await axios.request(requestOptions); + + return normalizeResponse(response); +} + /** * @abstract * Returns the module to use when making HTTP requests. Subclasses must override diff --git a/lib/services/sf.js b/lib/services/sf.js index c183bad70..82a2d43c3 100644 --- a/lib/services/sf.js +++ b/lib/services/sf.js @@ -258,6 +258,15 @@ function SnowflakeService(connectionConfig, httpClient, config) new OperationRequest(options).validate().execute(); }; + /** + * Issues a generic async request to Snowflake. + * + * @param {Object} options + */ + this.requestAsync = async function (options) { + return await new OperationRequest(options).validate().executeAsync(); + }; + /** * Terminates the current connection to Snowflake. * @@ -407,9 +416,8 @@ function SnowflakeService(connectionConfig, httpClient, config) /** * @inheritDoc */ - OperationRequest.prototype.executeAsync = function () - { - return currentState.requestAsync(this.options); + OperationRequest.prototype.executeAsync = async function () { + return await currentState.requestAsync(this.options); }; /** @@ -737,24 +745,24 @@ function StateAbstract(options) } /** - * Sends out the post request. + * Sends out the request. * * @returns {Object} the request that was issued. */ - Request.prototype.sendAsync = function () - { + Request.prototype.sendAsync = async function () { // pre-process the request options this.preprocessOptions(this.requestOptions); - var url = this.requestOptions.absoluteUrl; - var header = this.requestOptions.headers; - var body = this.requestOptions.json; + const options = + { + method: this.requestOptions.method, + headers: this.requestOptions.headers, + url: this.requestOptions.absoluteUrl, + json: this.requestOptions.json + }; - // issue the http request - return axios - .post(url, body, { - headers: header - }); + // issue the async http request + return await httpClient.requestAsync(options); }; /** @@ -1339,10 +1347,9 @@ StateConnected.prototype.connect = function (options) }); }; -StateConnected.prototype.requestAsync = function (options) -{ +StateConnected.prototype.requestAsync = async function (options) { // create a session token request from the options and send out the request - return this.createSessionTokenRequest(options).sendAsync(); + return await this.createSessionTokenRequest(options).sendAsync(); }; /**