Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse storage clients + destroy when no longer needed (fixes #734) #735

Merged
merged 6 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/file_transfer_agent/azure_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ function azure_util(azure, filestream) {
*/
this.getFileHeader = async function (meta, filename) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);

const containerClient = client.getContainerClient(azureLocation.containerName);
Expand Down Expand Up @@ -189,7 +189,7 @@ function azure_util(azure, filestream) {
}

const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['dstFileName'];

Expand Down Expand Up @@ -233,7 +233,7 @@ function azure_util(azure, filestream) {
*/
this.nativeDownloadFile = async function (meta, fullDstPath, maxConcurrency) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['srcFileName'];

Expand Down
34 changes: 25 additions & 9 deletions lib/file_transfer_agent/file_transfer_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ function file_transfer_agent(context) {
meta['dstFileName'] = meta['srcFileName'];

var storageClient = getStorageClient(meta['stageLocationType']);
await storageClient.uploadOneFileStream(meta);
try {
await storageClient.uploadOneFileStream(meta);
} finally {
if (client.destroy) {
client.destroy();
}
}
} else {
parseCommand();
initFileMetadata();
Expand Down Expand Up @@ -312,6 +318,9 @@ function file_transfer_agent(context) {
if (largeFileMetas.length > 0) {
await uploadFilesinSequential(largeFileMetas);
}
if (client.destroy) {
sfc-gh-dheyman marked this conversation as resolved.
Show resolved Hide resolved
client.destroy();
}
}

/**
Expand Down Expand Up @@ -426,6 +435,9 @@ function file_transfer_agent(context) {
if (largeFileMetas.length > 0) {
await downloadFilesinSequential(largeFileMetas);
}
if (client.destroy) {
client.destroy();
}
}

/**
Expand Down Expand Up @@ -503,14 +515,18 @@ function file_transfer_agent(context) {
var client = SnowflakeRemoteStorageUtil.createClient(stageInfo, false);
var s3location = SnowflakeS3Util.extractBucketNameAndPath(stageInfo['location']);

await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName })
.then(function (data) {
useAccelerateEndpoint = data['Status'] === 'Enabled';
}).catch(function (err) {
if (err['code'] === 'AccessDenied') {
return;
}
});
try {
await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName })
.then(function (data) {
useAccelerateEndpoint = data['Status'] === 'Enabled';
}).catch(function (err) {
if (err['code'] === 'AccessDenied') {
return;
}
});
} finally {
client.destroy();
}
}
}

Expand Down
8 changes: 3 additions & 5 deletions lib/file_transfer_agent/s3_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ function s3_util(s3, filestream) {
*/
this.getFileHeader = async function (meta, filename) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const s3location = this.extractBucketNameAndPath(stageInfo['location']);

const params = {
Expand Down Expand Up @@ -194,8 +194,7 @@ function s3_util(s3, filestream) {
s3Metadata[AMZ_MATDESC] = encryptionMetadata.matDesc;
}

const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];

const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']);

Expand Down Expand Up @@ -235,8 +234,7 @@ function s3_util(s3, filestream) {
* @param {Object} encryptionMetadata
*/
this.nativeDownloadFile = async function (meta, fullDstPath) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];

const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']);

Expand Down
28 changes: 20 additions & 8 deletions test/unit/file_transfer_agent/azure_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ describe('Azure client', function () {
let Azure = null;
let client = null;
let filestream = null;
let meta = null;
const dataFile = mockDataFile;
const meta = {
stageInfo: {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
},
SHA256_DIGEST: mockDigest,
};
const encryptionMetadata = {
key: mockKey,
iv: mockIv,
Expand Down Expand Up @@ -108,6 +101,18 @@ describe('Azure client', function () {
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
});
beforeEach(function () {
const stageInfo = {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
};
meta = {
stageInfo,
SHA256_DIGEST: mockDigest,
client: Azure.createClient(stageInfo),
};
});

it('extract bucket name and path', async function () {
verifyNameAndPath('sfc-eng-regression/test_sub_dir/', 'sfc-eng-regression', 'test_sub_dir/');
Expand All @@ -132,6 +137,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -147,6 +153,7 @@ describe('Azure client', function () {

client = require('client');
const Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.NOT_FOUND_FILE);
Expand All @@ -162,6 +169,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -177,6 +185,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.ERROR);
Expand All @@ -193,6 +202,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.UPLOADED);
Expand All @@ -213,6 +223,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -233,6 +244,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY);
Expand Down
39 changes: 31 additions & 8 deletions test/unit/file_transfer_agent/s3_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ describe('S3 client', function () {
let AWS;
let s3;
let filesystem;
let meta;
const dataFile = mockDataFile;
const meta = {
stageInfo: {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
},
SHA256_DIGEST: mockDigest,
};
const encryptionMetadata = {
key: mockKey,
iv: mockIv,
Expand Down Expand Up @@ -59,6 +52,7 @@ describe('S3 client', function () {

return new putObject;
};
this.destroy = function () {};
}

return new S3;
Expand All @@ -74,6 +68,21 @@ describe('S3 client', function () {

AWS = new SnowflakeS3Util(s3, filesystem);
});
beforeEach(function () {
const stageInfo = {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
};
meta = {
stageInfo,
SHA256_DIGEST: mockDigest,
client: AWS.createClient(stageInfo),
};
});
this.afterEach(function () {
meta['client'].destroy();
});

it('extract bucket name and path', async function () {
var result = AWS.extractBucketNameAndPath('sfc-eng-regression/test_sub_dir/');
Expand Down Expand Up @@ -117,13 +126,15 @@ describe('S3 client', function () {

return new getObject;
};
this.destroy = function () {};
}

return new S3;
}
});
s3 = require('s3');
const AWS = new SnowflakeS3Util(s3);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -144,13 +155,15 @@ describe('S3 client', function () {

return new getObject;
};
this.destroy = function () {};
}

return new S3;
}
});
s3 = require('s3');
const AWS = new SnowflakeS3Util(s3);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.NOT_FOUND_FILE);
Expand All @@ -171,13 +184,15 @@ describe('S3 client', function () {

return new getObject;
};
this.destroy = function () {};
}

return new S3;
}
});
s3 = require('s3');
const AWS = new SnowflakeS3Util(s3);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -198,13 +213,15 @@ describe('S3 client', function () {

return new getObject;
};
this.destroy = function () {};
}

return new S3;
}
});
s3 = require('s3');
const AWS = new SnowflakeS3Util(s3);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.ERROR);
Expand All @@ -230,6 +247,7 @@ describe('S3 client', function () {

return new putObject;
};
this.destroy = function () {};
}

return new S3;
Expand All @@ -243,6 +261,7 @@ describe('S3 client', function () {
s3 = require('s3');
filesystem = require('filesystem');
const AWS = new SnowflakeS3Util(s3, filesystem);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -263,6 +282,7 @@ describe('S3 client', function () {

return new putObject;
};
this.destroy = function () {};
}

return new S3;
Expand All @@ -276,6 +296,7 @@ describe('S3 client', function () {
s3 = require('s3');
filesystem = require('filesystem');
const AWS = new SnowflakeS3Util(s3, filesystem);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY_WITH_LOWER_CONCURRENCY);
Expand All @@ -296,6 +317,7 @@ describe('S3 client', function () {

return new putObject;
};
this.destroy = function () {};
}

return new S3;
Expand All @@ -309,6 +331,7 @@ describe('S3 client', function () {
s3 = require('s3');
filesystem = require('filesystem');
const AWS = new SnowflakeS3Util(s3, filesystem);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY);
Expand Down