Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse storage clients + destroy when no longer needed (fixes #734) #735

Merged
merged 6 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/file_transfer_agent/azure_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ function azure_util(azure, filestream) {
*/
this.getFileHeader = async function (meta, filename) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);

const containerClient = client.getContainerClient(azureLocation.containerName);
Expand Down Expand Up @@ -189,7 +189,7 @@ function azure_util(azure, filestream) {
}

const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['dstFileName'];

Expand Down Expand Up @@ -233,7 +233,7 @@ function azure_util(azure, filestream) {
*/
this.nativeDownloadFile = async function (meta, fullDstPath, maxConcurrency) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['srcFileName'];

Expand Down
58 changes: 37 additions & 21 deletions lib/file_transfer_agent/file_transfer_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@
meta['dstFileName'] = meta['srcFileName'];

var storageClient = getStorageClient(meta['stageLocationType']);
await storageClient.uploadOneFileStream(meta);
try {
await storageClient.uploadOneFileStream(meta);
} finally {
storageClient.destroyClient(stageInfo, client);
}
} else {
parseCommand();
initFileMetadata();
Expand Down Expand Up @@ -305,12 +309,16 @@
meta['client'] = client;
}

if (smallFileMetas.length > 0) {
//await uploadFilesinParallel(smallFileMetas);
await uploadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await uploadFilesinSequential(largeFileMetas);
try {
if (smallFileMetas.length > 0) {
//await uploadFilesinParallel(smallFileMetas);
await uploadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await uploadFilesinSequential(largeFileMetas);
}
} finally {
storageClient.destroyClient(stageInfo, client);
}
}

Expand Down Expand Up @@ -419,12 +427,16 @@
meta['client'] = client;
}

if (smallFileMetas.length > 0) {
//await downloadFilesinParallel(smallFileMetas);
await downloadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await downloadFilesinSequential(largeFileMetas);
try {
if (smallFileMetas.length > 0) {
//await downloadFilesinParallel(smallFileMetas);
await downloadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await downloadFilesinSequential(largeFileMetas);

Check warning on line 436 in lib/file_transfer_agent/file_transfer_agent.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/file_transfer_agent.js#L436

Added line #L436 was not covered by tests
}
} finally {
storageClient.destroyClient(stageInfo, client);
}
}

Expand Down Expand Up @@ -503,14 +515,18 @@
var client = SnowflakeRemoteStorageUtil.createClient(stageInfo, false);
var s3location = SnowflakeS3Util.extractBucketNameAndPath(stageInfo['location']);

await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName })
.then(function (data) {
useAccelerateEndpoint = data['Status'] === 'Enabled';
}).catch(function (err) {
if (err['code'] === 'AccessDenied') {
return;
}
});
try {
await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName })
.then(function (data) {
useAccelerateEndpoint = data['Status'] === 'Enabled';

Check warning on line 521 in lib/file_transfer_agent/file_transfer_agent.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/file_transfer_agent.js#L521

Added line #L521 was not covered by tests
}).catch(function (err) {
if (err['code'] === 'AccessDenied') {
return;

Check warning on line 524 in lib/file_transfer_agent/file_transfer_agent.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/file_transfer_agent.js#L524

Added line #L524 was not covered by tests
}
});
} finally {
SnowflakeRemoteStorageUtil.destroyClient(stageInfo, client);
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions lib/file_transfer_agent/local_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ function local_util() {
return null;
};

this.destroyClient = function (stageInfo, client) {
};

/**
* Write file to upload.
*
Expand Down
13 changes: 13 additions & 0 deletions lib/file_transfer_agent/remote_storage_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ function remote_storage_util() {
return utilClass.createClient(stageInfo, useAccelerateEndpoint);
};

/**
* Destroys a client based on the location type.
*
* @param {Object} stageInfo
* @param {Object} client
*/
this.destroyClient = function (stageInfo, client) {
var utilClass = this.getForStorageType(stageInfo['locationType']);
if (utilClass.destroyClient) {
utilClass.destroyClient(client);
}
};

/**
* Encrypt then upload one file stream.
*
Expand Down
17 changes: 12 additions & 5 deletions lib/file_transfer_agent/s3_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ function s3_util(s3, filestream) {
return new AWS.S3(config);
};

/**
* Destroys an AWS S3 client.
*
* @param {AWS.S3} client
*/
this.destroyClient = function (client) {
client.destroy();
};

/**
* Extract the bucket name and path from the metadata's stage location.
*
Expand Down Expand Up @@ -114,7 +123,7 @@ function s3_util(s3, filestream) {
*/
this.getFileHeader = async function (meta, filename) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const s3location = this.extractBucketNameAndPath(stageInfo['location']);

const params = {
Expand Down Expand Up @@ -194,8 +203,7 @@ function s3_util(s3, filestream) {
s3Metadata[AMZ_MATDESC] = encryptionMetadata.matDesc;
}

const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];

const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']);

Expand Down Expand Up @@ -235,8 +243,7 @@ function s3_util(s3, filestream) {
* @param {Object} encryptionMetadata
*/
this.nativeDownloadFile = async function (meta, fullDstPath) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];

const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']);

Expand Down
28 changes: 20 additions & 8 deletions test/unit/file_transfer_agent/azure_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ describe('Azure client', function () {
let Azure = null;
let client = null;
let filestream = null;
let meta = null;
const dataFile = mockDataFile;
const meta = {
stageInfo: {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
},
SHA256_DIGEST: mockDigest,
};
const encryptionMetadata = {
key: mockKey,
iv: mockIv,
Expand Down Expand Up @@ -108,6 +101,18 @@ describe('Azure client', function () {
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
});
beforeEach(function () {
const stageInfo = {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
};
meta = {
stageInfo,
SHA256_DIGEST: mockDigest,
client: Azure.createClient(stageInfo),
};
});

it('extract bucket name and path', async function () {
verifyNameAndPath('sfc-eng-regression/test_sub_dir/', 'sfc-eng-regression', 'test_sub_dir/');
Expand All @@ -132,6 +137,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -147,6 +153,7 @@ describe('Azure client', function () {

client = require('client');
const Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.NOT_FOUND_FILE);
Expand All @@ -162,6 +169,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -177,6 +185,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.ERROR);
Expand All @@ -193,6 +202,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.UPLOADED);
Expand All @@ -213,6 +223,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -233,6 +244,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY);
Expand Down
Loading
Loading