Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert commit of reuse storage clients #747

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/file_transfer_agent/azure_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ function AzureUtil(azure, filestream) {
*/
this.getFileHeader = async function (meta, filename) {
const stageInfo = meta['stageInfo'];
const client = meta['client'];
const client = this.createClient(stageInfo);
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);

const containerClient = client.getContainerClient(azureLocation.containerName);
Expand Down Expand Up @@ -188,7 +188,7 @@ function AzureUtil(azure, filestream) {
}

const stageInfo = meta['stageInfo'];
const client = meta['client'];
const client = this.createClient(stageInfo);
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['dstFileName'];

Expand Down Expand Up @@ -230,7 +230,7 @@ function AzureUtil(azure, filestream) {
*/
this.nativeDownloadFile = async function (meta, fullDstPath) {
const stageInfo = meta['stageInfo'];
const client = meta['client'];
const client = this.createClient(stageInfo);
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['srcFileName'];

Expand Down
61 changes: 22 additions & 39 deletions lib/file_transfer_agent/file_transfer_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
}

//upload
let storageClient = getStorageClient(stageLocationType);
const storageClient = getStorageClient(stageLocationType);
const client = storageClient.createClient(stageInfo, false);
const meta = fileMetadata[0];
meta['parallel'] = parallel;
Expand All @@ -148,12 +148,7 @@
meta['requireCompress'] = false;
meta['dstFileName'] = meta['srcFileName'];

storageClient = getStorageClient(meta['stageLocationType']);
try {
await storageClient.uploadOneFileStream(meta);
} finally {
storageClient.destroyClient(stageInfo, client);
}
await storageClient.uploadOneFileStream(meta);
} else {
parseCommand();
initFileMetadata();
Expand Down Expand Up @@ -308,16 +303,12 @@
meta['client'] = client;
}

try {
if (smallFileMetas.length > 0) {
//await uploadFilesinParallel(smallFileMetas);
await uploadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await uploadFilesinSequential(largeFileMetas);
}
} finally {
storageClient.destroyClient(stageInfo, client);
if (smallFileMetas.length > 0) {
//await uploadFilesinParallel(smallFileMetas);
await uploadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await uploadFilesinSequential(largeFileMetas);
}
}

Expand Down Expand Up @@ -426,16 +417,12 @@
meta['client'] = client;
}

try {
if (smallFileMetas.length > 0) {
//await downloadFilesinParallel(smallFileMetas);
await downloadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await downloadFilesinSequential(largeFileMetas);
}
} finally {
storageClient.destroyClient(stageInfo, client);
if (smallFileMetas.length > 0) {
//await downloadFilesinParallel(smallFileMetas);
await downloadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await downloadFilesinSequential(largeFileMetas);

Check warning on line 425 in lib/file_transfer_agent/file_transfer_agent.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/file_transfer_agent.js#L425

Added line #L425 was not covered by tests
}
}

Expand Down Expand Up @@ -514,18 +501,14 @@
const client = SnowflakeRemoteStorageUtil.createClient(stageInfo, false);
const s3location = SnowflakeS3Util.extractBucketNameAndPath(stageInfo['location']);

try {
await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName })
.then(function (data) {
useAccelerateEndpoint = data['Status'] === 'Enabled';
}).catch(function (err) {
if (err['code'] === 'AccessDenied') {
return;
}
});
} finally {
SnowflakeRemoteStorageUtil.destroyClient(stageInfo, client);
}
await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName })
.then(function (data) {
useAccelerateEndpoint = data['Status'] === 'Enabled';

Check warning on line 506 in lib/file_transfer_agent/file_transfer_agent.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/file_transfer_agent.js#L506

Added line #L506 was not covered by tests
}).catch(function (err) {
if (err['code'] === 'AccessDenied') {
return;

Check warning on line 509 in lib/file_transfer_agent/file_transfer_agent.js

View check run for this annotation

Codecov / codecov/patch

lib/file_transfer_agent/file_transfer_agent.js#L509

Added line #L509 was not covered by tests
}
});
}
}

Expand Down
3 changes: 0 additions & 3 deletions lib/file_transfer_agent/local_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ function LocalUtil() {
return null;
};

this.destroyClient = function () {
};

/**
* Write file to upload.
*
Expand Down
13 changes: 0 additions & 13 deletions lib/file_transfer_agent/remote_storage_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,6 @@ function RemoteStorageUtil() {
return utilClass.createClient(stageInfo, useAccelerateEndpoint);
};

/**
* Destroys a client based on the location type.
*
* @param {Object} stageInfo
* @param {Object} client
*/
this.destroyClient = function (stageInfo, client) {
const utilClass = this.getForStorageType(stageInfo['locationType']);
if (utilClass.destroyClient) {
utilClass.destroyClient(client);
}
};

/**
* Encrypt then upload one file stream.
*
Expand Down
17 changes: 5 additions & 12 deletions lib/file_transfer_agent/s3_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,6 @@ function S3Util(s3, filestream) {
return new AWS.S3(config);
};

/**
* Destroys an AWS S3 client.
*
* @param {AWS.S3} client
*/
this.destroyClient = function (client) {
client.destroy();
};

/**
* Extract the bucket name and path from the metadata's stage location.
*
Expand Down Expand Up @@ -123,7 +114,7 @@ function S3Util(s3, filestream) {
*/
this.getFileHeader = async function (meta, filename) {
const stageInfo = meta['stageInfo'];
const client = meta['client'];
const client = this.createClient(stageInfo);
const s3location = this.extractBucketNameAndPath(stageInfo['location']);

const params = {
Expand Down Expand Up @@ -203,7 +194,8 @@ function S3Util(s3, filestream) {
s3Metadata[AMZ_MATDESC] = encryptionMetadata.matDesc;
}

const client = meta['client'];
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);

const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']);

Expand Down Expand Up @@ -243,7 +235,8 @@ function S3Util(s3, filestream) {
* @param {Object} encryptionMetadata
*/
this.nativeDownloadFile = async function (meta, fullDstPath) {
const client = meta['client'];
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);

const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']);

Expand Down
28 changes: 8 additions & 20 deletions test/unit/file_transfer_agent/azure_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@ describe('Azure client', function () {
let Azure = null;
let client = null;
let filestream = null;
let meta = null;
const dataFile = mockDataFile;
const meta = {
stageInfo: {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
},
SHA256_DIGEST: mockDigest,
};
const encryptionMetadata = {
key: mockKey,
iv: mockIv,
Expand Down Expand Up @@ -101,18 +108,6 @@ describe('Azure client', function () {
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
});
beforeEach(function () {
const stageInfo = {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
};
meta = {
stageInfo,
SHA256_DIGEST: mockDigest,
client: Azure.createClient(stageInfo),
};
});

it('extract bucket name and path', async function () {
verifyNameAndPath('sfc-eng-regression/test_sub_dir/', 'sfc-eng-regression', 'test_sub_dir/');
Expand All @@ -137,7 +132,6 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -153,7 +147,6 @@ describe('Azure client', function () {

client = require('client');
const Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.NOT_FOUND_FILE);
Expand All @@ -169,7 +162,6 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -185,7 +177,6 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.ERROR);
Expand All @@ -202,7 +193,6 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.UPLOADED);
Expand All @@ -223,7 +213,6 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -244,7 +233,6 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY);
Expand Down
Loading
Loading