Skip to content

Commit

Permalink
put with GCP to skip existing file
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-hx authored and Harry Xi committed Oct 26, 2023
1 parent 4097f15 commit 327ab4d
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 8 deletions.
31 changes: 26 additions & 5 deletions cpp/SnowflakeGCSClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ RemoteStorageRequestOutcome SnowflakeGCSClient::upload(FileMetadata *fileMetadat
if (!m_gcsAccessToken.empty())
{
std::string filePathFull = m_stageInfo->location + fileMetadata->destFileName;

//check if file exists if overwrite is not specified.
if (!fileMetadata->overWrite)
{
RemoteStorageRequestOutcome outcome = GetRemoteFileMetadata(&filePathFull, fileMetadata);
if (RemoteStorageRequestOutcome::SUCCESS == outcome)
{
CXX_LOG_DEBUG("File already exists skipping the file upload %s",
fileMetadata->srcFileToUpload.c_str());
return RemoteStorageRequestOutcome::SKIP_UPLOAD_FILE;
}
}
buildGcsRequest(filePathFull, url, reqHeaders);
}
else
Expand Down Expand Up @@ -134,12 +146,21 @@ RemoteStorageRequestOutcome SnowflakeGCSClient::GetRemoteFileMetadata(
url = fileMetadata->presignedUrl;
}

if (!m_statement->http_get(url,
reqHeaders,
NULL,
headerString,
true))
try
{
if (!m_statement->http_get(url,
reqHeaders,
NULL,
headerString,
true))
{
return RemoteStorageRequestOutcome::FAILED;
}
}
catch (...)
{
// It's expected to fail when file doesn't exist, while http_get()
// could throw excpetion on that. Catch that and return FAILED
return RemoteStorageRequestOutcome::FAILED;
}

Expand Down
49 changes: 46 additions & 3 deletions tests/test_unit_put_get_gcs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ class MockedGCSStatement : public Snowflake::Client::IStatementPutGet
{
public:
MockedGCSStatement(bool useGcsToken)
: m_useGcsToken(useGcsToken), Snowflake::Client::IStatementPutGet()
: m_useGcsToken(useGcsToken),
m_firstTime(true),
m_isPut(false),
Snowflake::Client::IStatementPutGet()
{
m_stageInfo.stageType = Snowflake::Client::StageType::GCS;
if (m_useGcsToken)
Expand Down Expand Up @@ -63,9 +66,18 @@ class MockedGCSStatement : public Snowflake::Client::IStatementPutGet
}
else
{
m_isPut = true;
putGetParseResponse->command = CommandType::UPLOAD;
putGetParseResponse->srcLocations = m_srcLocationsPut;
putGetParseResponse->threshold = DEFAULT_UPLOAD_DATA_SIZE_THRESHOLD;
if (sql->find("OVERWRITE=true") != std::string::npos)
{
putGetParseResponse->overwrite = true;
}
else
{
putGetParseResponse->overwrite = false;
}
}
if (!m_useGcsToken)
{
Expand Down Expand Up @@ -106,8 +118,17 @@ class MockedGCSStatement : public Snowflake::Client::IStatementPutGet
std::string& responseHeaders,
bool headerOnly) override
{
if (headerOnly)
if (headerOnly) // try getting metadata
{
if (m_isPut)
{
if ((strcasecmp(url.c_str(), m_expectedUrl.c_str()) == 0) && m_firstTime)
{
// first time retry false to mock the case of file doesn't exist
m_firstTime = false;
return false;
}
}
responseHeaders = "HTTP/1.1 200 OK\n"
"x-goog-meta-encryptiondata: {\"WrappedContentKey\":{\"KeyId\":\"symmKey1\",\"EncryptedKey\":\"MyZBZNLcndKTeR+xC8Msle5IcSYKsx/nYNn93OONSqs=\",\"Algorithm\":\"AES_CBC_256\"},\"ContentEncryptionIV\":\"30fmhKrf1aKyWidrv06NNA==\"}\n"
"Content-Type: application/octet-stream\n"
Expand Down Expand Up @@ -143,12 +164,16 @@ class MockedGCSStatement : public Snowflake::Client::IStatementPutGet
bool m_useGcsToken;

std::string m_expectedUrl;

bool m_firstTime;

bool m_isPut;
};

// test helper for put
void put_gcs_test_core(bool useGcsToken)
{
std::string cmd = std::string("put file://small_file.csv.gz @odbctestStage AUTO_COMPRESS=false OVERWRITE=true");
std::string cmd = std::string("put file://small_file.csv.gz @odbctestStage AUTO_COMPRESS=false");

MockedGCSStatement mockedStatementPut(useGcsToken);

Expand All @@ -162,6 +187,24 @@ void put_gcs_test_core(bool useGcsToken)
result->getColumnAsString(6, put_status);
assert_string_equal("UPLOADED", put_status.c_str());
}

if (useGcsToken)
{
result = agent.execute(&cmd);
while (result->next())
{
result->getColumnAsString(6, put_status);
assert_string_equal("SKIPPED", put_status.c_str());
}
}

cmd += " OVERWRITE=true";
result = agent.execute(&cmd);
while (result->next())
{
result->getColumnAsString(6, put_status);
assert_string_equal("UPLOADED", put_status.c_str());
}
}

// test helper for get
Expand Down

0 comments on commit 327ab4d

Please sign in to comment.