Skip to content

Commit

Permalink
SNOW-723810: Fix parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-lf committed Sep 22, 2023
1 parent 30cbff8 commit ac125f5
Showing 1 changed file with 47 additions and 48 deletions.
95 changes: 47 additions & 48 deletions Snowflake.Data.Tests/UnitTests/SFFileTransferAgentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@ class SFFileTransferAgentTest : SFBaseTest
const string LocationTables = "tables";
const string LocationKey = "mock-key";
const string LocationPath = LocationTables + "/" + LocationKey + "/";
[ThreadStatic] private static string t_location;
string _location;

// Connection string for mock session
const string ConnectionStringMock = "user=user;password=password;account=account;";

// File name for the mock file
[ThreadStatic] private static string t_realSourceFilePath;

// Temp directory for the files to be uploaded
[ThreadStatic] private static string t_mockUploadRootDirectory;

// File size of the mock file
long _sourceFileSize;

Expand All @@ -51,7 +48,7 @@ class SFFileTransferAgentTest : SFBaseTest

// Mock response data properties
[ThreadStatic] private static string t_localLocation;
[ThreadStatic] private static List<string> t_srcLocations;
List<string> _srcLocations;
const string AutoDetect = "auto_detect";
const int Parallel = 1;

Expand All @@ -64,7 +61,7 @@ class SFFileTransferAgentTest : SFBaseTest
SFSession _session;

// Mock PUT/GET queries
[ThreadStatic] private static string t_putQuery;
string _putQuery;
const string GetQuery = "GET @DB.SCHEMA.%MOCKTABLE file://;";

// Mock file content
Expand All @@ -73,16 +70,21 @@ class SFFileTransferAgentTest : SFBaseTest
[SetUp]
public void BeforeTest()
{
// Base object's names on on worker thread id
var threadSuffix = TestContext.CurrentContext.WorkerId?.Replace('#', '_');

// Set values for thread variables
t_realSourceFilePath = TestNameWithWorker + "_realSrcFilePath.txt";
t_localLocation = TestNameWithWorker + "mockLocalLocation";
t_locationStage = TestNameWithWorker + "mock-customer-stage";
t_srcLocations = new List<string>()
t_realSourceFilePath = $"realSrcFilePath_{threadSuffix}.txt";
t_localLocation = $"mockLocalLocation_{threadSuffix}";
t_locationStage = $"mock-customer-stage_{threadSuffix}";

// Set values for members that depend on thread variables
_srcLocations = new List<string>()
{
t_realSourceFilePath
};
t_putQuery = "PUT file://" + t_realSourceFilePath + " @DB.SCHEMA.%MOCKTABLE;";
t_location = t_locationStage + "/" + LocationId + "/" + LocationPath;
_putQuery = "PUT file://" + t_realSourceFilePath + " @DB.SCHEMA.%MOCKTABLE;";
_location = Path.GetFullPath(t_locationStage + "/" + LocationId + "/" + LocationPath);

_responseData = new PutGetResponseData()
{
Expand All @@ -98,10 +100,10 @@ public void BeforeTest()
rowType = null,
sourceCompression = AutoDetect,
sqlState = null,
src_locations = t_srcLocations,
src_locations = _srcLocations,
stageInfo = new PutGetStageInfo()
{
location = t_location,
location = _location,
locationType = SFRemoteStorageUtil.LOCAL_FS, // Use local storage for testing
path = LocationPath,
presignedUrl = null,
Expand All @@ -117,24 +119,24 @@ public void BeforeTest()

// Upload setup
// Write mock file to upload
File.WriteAllText(t_srcLocations[0], FileContent);
_sourceFileSize = new FileInfo(t_srcLocations[0]).Length;
File.WriteAllText(_srcLocations[0], FileContent);
_sourceFileSize = new FileInfo(_srcLocations[0]).Length;

// Download setup
// Write mock file in the local location to download
if (!Directory.Exists(t_location))
if (!Directory.Exists(_location))
{
Directory.CreateDirectory(t_location);
Directory.CreateDirectory(_location);
}
File.WriteAllText(t_location + t_realSourceFilePath, FileContent);
File.WriteAllText(_location + t_realSourceFilePath, FileContent);
}

[TearDown]
public void AfterTest()
{
// Upload teardown
// Delete mock files
foreach (string location in t_srcLocations)
foreach (string location in _srcLocations)
{
File.Delete(location);
}
Expand Down Expand Up @@ -169,7 +171,7 @@ public void TestUploadUsingFilepath()
// Arrange
// Set command to upload
_responseData.command = CommandTypes.UPLOAD.ToString();
_fileTransferAgent = new SFFileTransferAgent(t_putQuery,
_fileTransferAgent = new SFFileTransferAgent(_putQuery,
_session,
_responseData,
_cancellationToken);
Expand All @@ -195,7 +197,7 @@ public async Task TestUploadAsyncUsingFilepath()
// Arrange
// Set command to upload
_responseData.command = CommandTypes.UPLOAD.ToString();
_fileTransferAgent = new SFFileTransferAgent(t_putQuery,
_fileTransferAgent = new SFFileTransferAgent(_putQuery,
_session,
_responseData,
_cancellationToken);
Expand Down Expand Up @@ -223,7 +225,7 @@ public void TestUploadUsingMemoryStream()
_responseData.command = CommandTypes.UPLOAD.ToString();
MemoryStream memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(FileContent));

_fileTransferAgent = new SFFileTransferAgent(t_putQuery,
_fileTransferAgent = new SFFileTransferAgent(_putQuery,
_session,
_responseData,
ref memoryStream,
Expand Down Expand Up @@ -254,7 +256,7 @@ public async Task TestUploadAsyncUsingMemoryStream()
_responseData.command = CommandTypes.UPLOAD.ToString();
MemoryStream memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(FileContent));

_fileTransferAgent = new SFFileTransferAgent(t_putQuery,
_fileTransferAgent = new SFFileTransferAgent(_putQuery,
_session,
_responseData,
ref memoryStream,
Expand Down Expand Up @@ -285,7 +287,7 @@ public void TestUploadWithGZIPCompression()
_responseData.autoCompress = true;
// Set command to upload
_responseData.command = CommandTypes.UPLOAD.ToString();
_fileTransferAgent = new SFFileTransferAgent(t_putQuery,
_fileTransferAgent = new SFFileTransferAgent(_putQuery,
_session,
_responseData,
_cancellationToken);
Expand All @@ -312,27 +314,24 @@ public void TestUploadWithWilcardInTheFilename()
// The file name used for creating multiple files
string mockFileName = "testUploadWithMultipleFiles";
string extension = "txt";
t_mockUploadRootDirectory = TestNameWithWorker + "t_mockUploadRootDirectory";

Directory.CreateDirectory($"{t_mockUploadRootDirectory}");

// Create source location with wildcard in its filename
_responseData.src_locations = new List<string>()
{
// Add wildcard in the source location
$"{t_mockUploadRootDirectory}/{mockFileName}*.{extension}",
$"{mockFileName}*.{extension}",
};

// Write the mock files
int numberOfFiles = 3;
for (int i = 0; i < numberOfFiles; i++)
{
File.WriteAllText($"{t_mockUploadRootDirectory}/{mockFileName}{i}.{extension}", FileContent);
File.WriteAllText($"{mockFileName}{i}.{extension}", FileContent);
}

// Set command to upload
_responseData.command = CommandTypes.UPLOAD.ToString();
_fileTransferAgent = new SFFileTransferAgent(t_putQuery,
_fileTransferAgent = new SFFileTransferAgent(_putQuery,
_session,
_responseData,
_cancellationToken);
Expand All @@ -351,9 +350,9 @@ public void TestUploadWithWilcardInTheFilename()
// Check the name of the source file and destination file are the same
Assert.AreEqual($"{mockFileName}{i}.{extension}", GetResultValue(result, SFResultSet.PutGetResponseRowTypeInfo.SourceFileName));
Assert.AreEqual($"{mockFileName}{i}.{extension}", GetResultValue(result, SFResultSet.PutGetResponseRowTypeInfo.DestinationFileName));
}

Directory.Delete($"{t_mockUploadRootDirectory}", true);
File.Delete($"{mockFileName}{i}.{extension}");
}
}

[Test]
Expand All @@ -362,25 +361,25 @@ public void TestUploadWithWildcardInTheRootDirectory()
// Arrange
// Create the mock directory and files
string mockFileName = "testUploadWithMultipleDirectory.txt";
t_mockUploadRootDirectory = TestNameWithWorker + "mockDirectoryWithWildcardInRootDirectory";
string tempUploadRootDirectory = "mockDirectoryWithWildcardInRootDirectory";
int numberOfDirectories = 3;

for (int i = 0; i < numberOfDirectories; i++)
{
Directory.CreateDirectory($"{t_mockUploadRootDirectory}{i}");
File.WriteAllText($"{t_mockUploadRootDirectory}{i}/{mockFileName}", FileContent);
Directory.CreateDirectory($"{tempUploadRootDirectory}{i}");
File.WriteAllText($"{tempUploadRootDirectory}{i}/{mockFileName}", FileContent);
}

// Create source location with wildcard in its filename
_responseData.src_locations = new List<string>()
{
// Add wildcard in the source location
$"{t_mockUploadRootDirectory}*/{mockFileName}",
$"{tempUploadRootDirectory}*/{mockFileName}",
};

// Set command to upload
_responseData.command = CommandTypes.UPLOAD.ToString();
_fileTransferAgent = new SFFileTransferAgent(t_putQuery,
_fileTransferAgent = new SFFileTransferAgent(_putQuery,
_session,
_responseData,
_cancellationToken);
Expand All @@ -400,7 +399,7 @@ public void TestUploadWithWildcardInTheRootDirectory()
Assert.AreEqual(mockFileName, GetResultValue(result, SFResultSet.PutGetResponseRowTypeInfo.SourceFileName));
Assert.AreEqual(mockFileName, GetResultValue(result, SFResultSet.PutGetResponseRowTypeInfo.DestinationFileName));

Directory.Delete($"{t_mockUploadRootDirectory}{i}", true);
Directory.Delete($"{tempUploadRootDirectory}{i}", true);
}
}

Expand All @@ -410,26 +409,26 @@ public void TestUploadWithWildcardInTheDirectoryPath()
// Arrange
// Create the mock directory and files
string mockFileName = "testUploadWithMultipleDirectory.txt";
t_mockUploadRootDirectory = TestNameWithWorker + "mockDirectoryWithWildcardInDirectoryPath";
string mockDirectory = "mockDirectory";
string tempUploadRootDirectory = "mockDirectoryWithWildcardInDirectoryPath";
string mockPath = $"{tempUploadRootDirectory}/mockDirectory";
int numberOfDirectories = 3;

for (int i = 0; i < numberOfDirectories; i++)
{
Directory.CreateDirectory($"{t_mockUploadRootDirectory}/{mockDirectory}{i}");
File.WriteAllText($"{t_mockUploadRootDirectory}/{mockDirectory}{i}/{mockFileName}", FileContent);
Directory.CreateDirectory($"{mockPath}{i}");
File.WriteAllText($"{mockPath}{i}/{mockFileName}", FileContent);
}

// Create source location with wildcard in its filename
_responseData.src_locations = new List<string>()
{
// Add wildcard in the source location
$"{t_mockUploadRootDirectory}/{mockDirectory}*/{mockFileName}",
$"{mockPath}*/{mockFileName}",
};

// Set command to upload
_responseData.command = CommandTypes.UPLOAD.ToString();
_fileTransferAgent = new SFFileTransferAgent(t_putQuery,
_fileTransferAgent = new SFFileTransferAgent(_putQuery,
_session,
_responseData,
_cancellationToken);
Expand All @@ -449,7 +448,7 @@ public void TestUploadWithWildcardInTheDirectoryPath()
Assert.AreEqual(mockFileName, GetResultValue(result, SFResultSet.PutGetResponseRowTypeInfo.DestinationFileName));
}

Directory.Delete(t_mockUploadRootDirectory, true);
Directory.Delete(tempUploadRootDirectory, true);
}

[Test]
Expand Down Expand Up @@ -528,9 +527,9 @@ public void TestDownloadThrowsErrorDirectoryNotFound()
{
// Arrange
// Delete the directory to trigger the directory error
if (Directory.Exists(t_location))
if (Directory.Exists(_location))
{
Directory.Delete(t_location, true);
Directory.Delete(_location, true);
}

// Set command to download
Expand Down

0 comments on commit ac125f5

Please sign in to comment.