diff --git a/lib/file_transfer_agent/azure_util.js b/lib/file_transfer_agent/azure_util.js index 68c6692f0..987dedd4f 100644 --- a/lib/file_transfer_agent/azure_util.js +++ b/lib/file_transfer_agent/azure_util.js @@ -89,7 +89,7 @@ function azure_util(azure, filestream) { */ this.getFileHeader = async function (meta, filename) { const stageInfo = meta['stageInfo']; - const client = this.createClient(stageInfo); + const client = meta['client']; const azureLocation = this.extractContainerNameAndPath(stageInfo['location']); const containerClient = client.getContainerClient(azureLocation.containerName); @@ -189,7 +189,7 @@ function azure_util(azure, filestream) { } const stageInfo = meta['stageInfo']; - const client = this.createClient(stageInfo); + const client = meta['client']; const azureLocation = this.extractContainerNameAndPath(stageInfo['location']); const blobName = azureLocation.path + meta['dstFileName']; @@ -233,7 +233,7 @@ function azure_util(azure, filestream) { */ this.nativeDownloadFile = async function (meta, fullDstPath, maxConcurrency) { const stageInfo = meta['stageInfo']; - const client = this.createClient(stageInfo); + const client = meta['client']; const azureLocation = this.extractContainerNameAndPath(stageInfo['location']); const blobName = azureLocation.path + meta['srcFileName']; diff --git a/lib/file_transfer_agent/file_transfer_agent.js b/lib/file_transfer_agent/file_transfer_agent.js index 0060d454f..6c56f3b07 100644 --- a/lib/file_transfer_agent/file_transfer_agent.js +++ b/lib/file_transfer_agent/file_transfer_agent.js @@ -150,7 +150,11 @@ function file_transfer_agent(context) { meta['dstFileName'] = meta['srcFileName']; var storageClient = getStorageClient(meta['stageLocationType']); - await storageClient.uploadOneFileStream(meta); + try { + await storageClient.uploadOneFileStream(meta); + } finally { + storageClient.destroyClient(stageInfo, client); + } } else { parseCommand(); initFileMetadata(); @@ -305,12 +309,16 @@ function file_transfer_agent(context) { meta['client'] = client; } - if (smallFileMetas.length > 0) { - //await uploadFilesinParallel(smallFileMetas); - await uploadFilesinSequential(smallFileMetas); - } - if (largeFileMetas.length > 0) { - await uploadFilesinSequential(largeFileMetas); + try { + if (smallFileMetas.length > 0) { + //await uploadFilesinParallel(smallFileMetas); + await uploadFilesinSequential(smallFileMetas); + } + if (largeFileMetas.length > 0) { + await uploadFilesinSequential(largeFileMetas); + } + } finally { + storageClient.destroyClient(stageInfo, client); } } @@ -419,12 +427,16 @@ function file_transfer_agent(context) { meta['client'] = client; } - if (smallFileMetas.length > 0) { - //await downloadFilesinParallel(smallFileMetas); - await downloadFilesinSequential(smallFileMetas); - } - if (largeFileMetas.length > 0) { - await downloadFilesinSequential(largeFileMetas); + try { + if (smallFileMetas.length > 0) { + //await downloadFilesinParallel(smallFileMetas); + await downloadFilesinSequential(smallFileMetas); + } + if (largeFileMetas.length > 0) { + await downloadFilesinSequential(largeFileMetas); + } + } finally { + storageClient.destroyClient(stageInfo, client); } } @@ -503,14 +515,18 @@ function file_transfer_agent(context) { var client = SnowflakeRemoteStorageUtil.createClient(stageInfo, false); var s3location = SnowflakeS3Util.extractBucketNameAndPath(stageInfo['location']); - await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName }) - .then(function (data) { - useAccelerateEndpoint = data['Status'] === 'Enabled'; - }).catch(function (err) { - if (err['code'] === 'AccessDenied') { - return; - } - }); + try { + await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName }) + .then(function (data) { + useAccelerateEndpoint = data['Status'] === 'Enabled'; + }).catch(function (err) { + if (err['code'] === 'AccessDenied') { + return; + } + }); + } finally { + SnowflakeRemoteStorageUtil.destroyClient(stageInfo, client); + } } } diff --git a/lib/file_transfer_agent/local_util.js b/lib/file_transfer_agent/local_util.js index 3ade0cefc..c2fc6b724 100644 --- a/lib/file_transfer_agent/local_util.js +++ b/lib/file_transfer_agent/local_util.js @@ -18,6 +18,9 @@ function local_util() { return null; }; + this.destroyClient = function (stageInfo, client) { + }; + /** * Write file to upload. * diff --git a/lib/file_transfer_agent/remote_storage_util.js b/lib/file_transfer_agent/remote_storage_util.js index d5d3a3d51..09d01307a 100644 --- a/lib/file_transfer_agent/remote_storage_util.js +++ b/lib/file_transfer_agent/remote_storage_util.js @@ -69,6 +69,19 @@ function remote_storage_util() { return utilClass.createClient(stageInfo, useAccelerateEndpoint); }; + /** + * Destroys a client based on the location type. + * + * @param {Object} stageInfo + * @param {Object} client + */ + this.destroyClient = function (stageInfo, client) { + var utilClass = this.getForStorageType(stageInfo['locationType']); + if (utilClass.destroyClient) { + utilClass.destroyClient(client); + } + }; + /** * Encrypt then upload one file stream. * diff --git a/lib/file_transfer_agent/s3_util.js b/lib/file_transfer_agent/s3_util.js index 89c6c196d..f9f9c09a9 100644 --- a/lib/file_transfer_agent/s3_util.js +++ b/lib/file_transfer_agent/s3_util.js @@ -76,6 +76,15 @@ function s3_util(s3, filestream) { return new AWS.S3(config); }; + /** + * Destroys an AWS S3 client. + * + * @param {AWS.S3} client + */ + this.destroyClient = function (client) { + client.destroy(); + }; + /** * Extract the bucket name and path from the metadata's stage location. * @@ -114,7 +123,7 @@ function s3_util(s3, filestream) { */ this.getFileHeader = async function (meta, filename) { const stageInfo = meta['stageInfo']; - const client = this.createClient(stageInfo); + const client = meta['client']; const s3location = this.extractBucketNameAndPath(stageInfo['location']); const params = { @@ -194,8 +203,7 @@ function s3_util(s3, filestream) { s3Metadata[AMZ_MATDESC] = encryptionMetadata.matDesc; } - const stageInfo = meta['stageInfo']; - const client = this.createClient(stageInfo); + const client = meta['client']; const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']); @@ -235,8 +243,7 @@ function s3_util(s3, filestream) { * @param {Object} encryptionMetadata */ this.nativeDownloadFile = async function (meta, fullDstPath) { - const stageInfo = meta['stageInfo']; - const client = this.createClient(stageInfo); + const client = meta['client']; const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']); diff --git a/test/unit/file_transfer_agent/azure_test.js b/test/unit/file_transfer_agent/azure_test.js index 0a621de0e..3f9c409b9 100644 --- a/test/unit/file_transfer_agent/azure_test.js +++ b/test/unit/file_transfer_agent/azure_test.js @@ -20,15 +20,8 @@ describe('Azure client', function () { let Azure = null; let client = null; let filestream = null; + let meta = null; const dataFile = mockDataFile; - const meta = { - stageInfo: { - location: mockLocation, - path: mockTable + '/' + mockPath + '/', - creds: {} - }, - SHA256_DIGEST: mockDigest, - }; const encryptionMetadata = { key: mockKey, iv: mockIv, @@ -108,6 +101,18 @@ describe('Azure client', function () { filestream = require('filestream'); Azure = new SnowflakeAzureUtil(client, filestream); }); + beforeEach(function () { + const stageInfo = { + location: mockLocation, + path: mockTable + '/' + mockPath + '/', + creds: {} + }; + meta = { + stageInfo, + SHA256_DIGEST: mockDigest, + client: Azure.createClient(stageInfo), + }; + }); it('extract bucket name and path', async function () { verifyNameAndPath('sfc-eng-regression/test_sub_dir/', 'sfc-eng-regression', 'test_sub_dir/'); @@ -132,6 +137,7 @@ describe('Azure client', function () { client = require('client'); Azure = new SnowflakeAzureUtil(client); + meta['client'] = Azure.createClient(meta['stageInfo']); await Azure.getFileHeader(meta, dataFile); assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN); @@ -147,6 +153,7 @@ describe('Azure client', function () { client = require('client'); const Azure = new SnowflakeAzureUtil(client); + meta['client'] = Azure.createClient(meta['stageInfo']); await Azure.getFileHeader(meta, dataFile); assert.strictEqual(meta['resultStatus'], resultStatus.NOT_FOUND_FILE); @@ -162,6 +169,7 @@ describe('Azure client', function () { client = require('client'); Azure = new SnowflakeAzureUtil(client); + meta['client'] = Azure.createClient(meta['stageInfo']); await Azure.getFileHeader(meta, dataFile); assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN); @@ -177,6 +185,7 @@ describe('Azure client', function () { client = require('client'); Azure = new SnowflakeAzureUtil(client); + meta['client'] = Azure.createClient(meta['stageInfo']); await Azure.getFileHeader(meta, dataFile); assert.strictEqual(meta['resultStatus'], resultStatus.ERROR); @@ -193,6 +202,7 @@ describe('Azure client', function () { client = require('client'); filestream = require('filestream'); Azure = new SnowflakeAzureUtil(client, filestream); + meta['client'] = Azure.createClient(meta['stageInfo']); await Azure.uploadFile(dataFile, meta, encryptionMetadata); assert.strictEqual(meta['resultStatus'], resultStatus.UPLOADED); @@ -213,6 +223,7 @@ describe('Azure client', function () { client = require('client'); filestream = require('filestream'); Azure = new SnowflakeAzureUtil(client, filestream); + meta['client'] = Azure.createClient(meta['stageInfo']); await Azure.uploadFile(dataFile, meta, encryptionMetadata); assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN); @@ -233,6 +244,7 @@ describe('Azure client', function () { client = require('client'); filestream = require('filestream'); Azure = new SnowflakeAzureUtil(client, filestream); + meta['client'] = Azure.createClient(meta['stageInfo']); await Azure.uploadFile(dataFile, meta, encryptionMetadata); assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY); diff --git a/test/unit/file_transfer_agent/s3_test.js b/test/unit/file_transfer_agent/s3_test.js index 3a759ba1a..0bf8e23b1 100644 --- a/test/unit/file_transfer_agent/s3_test.js +++ b/test/unit/file_transfer_agent/s3_test.js @@ -20,15 +20,8 @@ describe('S3 client', function () { let AWS; let s3; let filesystem; + let meta; const dataFile = mockDataFile; - const meta = { - stageInfo: { - location: mockLocation, - path: mockTable + '/' + mockPath + '/', - creds: {} - }, - SHA256_DIGEST: mockDigest, - }; const encryptionMetadata = { key: mockKey, iv: mockIv, @@ -59,6 +52,7 @@ describe('S3 client', function () { return new putObject; }; + this.destroy = function () {}; } return new S3; @@ -74,6 +68,21 @@ describe('S3 client', function () { AWS = new SnowflakeS3Util(s3, filesystem); }); + beforeEach(function () { + const stageInfo = { + location: mockLocation, + path: mockTable + '/' + mockPath + '/', + creds: {} + }; + meta = { + stageInfo, + SHA256_DIGEST: mockDigest, + client: AWS.createClient(stageInfo), + }; + }); + this.afterEach(function () { + AWS.destroyClient(meta['client']); + }); it('extract bucket name and path', async function () { var result = AWS.extractBucketNameAndPath('sfc-eng-regression/test_sub_dir/'); @@ -117,6 +126,7 @@ describe('S3 client', function () { return new getObject; }; + this.destroy = function () {}; } return new S3; @@ -124,6 +134,7 @@ describe('S3 client', function () { }); s3 = require('s3'); const AWS = new SnowflakeS3Util(s3); + meta['client'] = AWS.createClient(meta['stageInfo']); await AWS.getFileHeader(meta, dataFile); assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN); @@ -144,6 +155,7 @@ describe('S3 client', function () { return new getObject; }; + this.destroy = function () {}; } return new S3; @@ -151,6 +163,7 @@ describe('S3 client', function () { }); s3 = require('s3'); const AWS = new SnowflakeS3Util(s3); + meta['client'] = AWS.createClient(meta['stageInfo']); await AWS.getFileHeader(meta, dataFile); assert.strictEqual(meta['resultStatus'], resultStatus.NOT_FOUND_FILE); @@ -171,6 +184,7 @@ describe('S3 client', function () { return new getObject; }; + this.destroy = function () {}; } return new S3; @@ -178,6 +192,7 @@ describe('S3 client', function () { }); s3 = require('s3'); const AWS = new SnowflakeS3Util(s3); + meta['client'] = AWS.createClient(meta['stageInfo']); await AWS.getFileHeader(meta, dataFile); assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN); @@ -198,6 +213,7 @@ describe('S3 client', function () { return new getObject; }; + this.destroy = function () {}; } return new S3; @@ -205,6 +221,7 @@ describe('S3 client', function () { }); s3 = require('s3'); const AWS = new SnowflakeS3Util(s3); + meta['client'] = AWS.createClient(meta['stageInfo']); await AWS.getFileHeader(meta, dataFile); assert.strictEqual(meta['resultStatus'], resultStatus.ERROR); @@ -230,6 +247,7 @@ describe('S3 client', function () { return new putObject; }; + this.destroy = function () {}; } return new S3; @@ -243,6 +261,7 @@ describe('S3 client', function () { s3 = require('s3'); filesystem = require('filesystem'); const AWS = new SnowflakeS3Util(s3, filesystem); + meta['client'] = AWS.createClient(meta['stageInfo']); await AWS.uploadFile(dataFile, meta, encryptionMetadata); assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN); @@ -263,6 +282,7 @@ describe('S3 client', function () { return new putObject; }; + this.destroy = function () {}; } return new S3; @@ -276,6 +296,7 @@ describe('S3 client', function () { s3 = require('s3'); filesystem = require('filesystem'); const AWS = new SnowflakeS3Util(s3, filesystem); + meta['client'] = AWS.createClient(meta['stageInfo']); await AWS.uploadFile(dataFile, meta, encryptionMetadata); assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY_WITH_LOWER_CONCURRENCY); @@ -296,6 +317,7 @@ describe('S3 client', function () { return new putObject; }; + this.destroy = function () {}; } return new S3; @@ -309,6 +331,7 @@ describe('S3 client', function () { s3 = require('s3'); filesystem = require('filesystem'); const AWS = new SnowflakeS3Util(s3, filesystem); + meta['client'] = AWS.createClient(meta['stageInfo']); await AWS.uploadFile(dataFile, meta, encryptionMetadata); assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY);