Skip to content

Commit

Permalink
SNOW-1483230 Disable blob encryption / Add class for storage metadata…
Browse files Browse the repository at this point in the history
… & configure response (#773)
  • Loading branch information
sfc-gh-alhuang committed Jul 10, 2024
1 parent 2b26107 commit 1678a6d
Show file tree
Hide file tree
Showing 12 changed files with 442 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ class BlobBuilder {
* @param blobData All the data for one blob. Assumes that all ChannelData in the inner List
* belongs to the same table. Will error if this is not the case
* @param bdecVersion version of blob
* @param encrypt If the output chunk is encrypted or not
* @return {@link Blob} data
*/
static <T> Blob constructBlobAndMetadata(
String filePath, List<List<ChannelData<T>>> blobData, Constants.BdecVersion bdecVersion)
String filePath,
List<List<ChannelData<T>>> blobData,
Constants.BdecVersion bdecVersion,
boolean encrypt)
throws IOException, NoSuchPaddingException, NoSuchAlgorithmException,
InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException,
BadPaddingException {
Expand All @@ -83,25 +87,32 @@ static <T> Blob constructBlobAndMetadata(
flusher.serialize(channelsDataPerTable, filePath);

if (!serializedChunk.channelsMetadataList.isEmpty()) {
ByteArrayOutputStream chunkData = serializedChunk.chunkData;
Pair<byte[], Integer> paddedChunk =
padChunk(chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES);
byte[] paddedChunkData = paddedChunk.getFirst();
int paddedChunkLength = paddedChunk.getSecond();
byte[] compressedChunkData;
int chunkLength;

// Encrypt the compressed chunk data, the encryption key is derived using the key from
// server with the full blob path.
// We need to maintain IV as a block counter for the whole file, even interleaved,
// to align with decryption on the Snowflake query path.
// TODO: address alignment for the header SNOW-557866
long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES;
byte[] encryptedCompressedChunkData =
Cryptor.encrypt(
paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv);
if (encrypt) {
Pair<byte[], Integer> paddedChunk =
padChunk(serializedChunk.chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES);
byte[] paddedChunkData = paddedChunk.getFirst();
chunkLength = paddedChunk.getSecond();

// Encrypt the compressed chunk data, the encryption key is derived using the key from
// server with the full blob path.
// We need to maintain IV as a block counter for the whole file, even interleaved,
// to align with decryption on the Snowflake query path.
// TODO: address alignment for the header SNOW-557866
long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES;
compressedChunkData =
Cryptor.encrypt(
paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv);
} else {
compressedChunkData = serializedChunk.chunkData.toByteArray();
chunkLength = compressedChunkData.length;
}

// Compute the md5 of the chunk data
String md5 = computeMD5(encryptedCompressedChunkData, paddedChunkLength);
int encryptedCompressedChunkDataSize = encryptedCompressedChunkData.length;
String md5 = computeMD5(compressedChunkData, chunkLength);
int compressedChunkDataSize = compressedChunkData.length;

// Create chunk metadata
long startOffset = curDataSize;
Expand All @@ -111,9 +122,9 @@ static <T> Blob constructBlobAndMetadata(
// The start offset will be updated later in BlobBuilder#build to include the blob
// header
.setChunkStartOffset(startOffset)
// The paddedChunkLength is used because it is the actual data size used for
// The chunkLength is used because it is the actual data size used for
// decompression and md5 calculation on server side.
.setChunkLength(paddedChunkLength)
.setChunkLength(chunkLength)
.setUncompressedChunkLength((int) serializedChunk.chunkEstimatedUncompressedSize)
.setChannelList(serializedChunk.channelsMetadataList)
.setChunkMD5(md5)
Expand All @@ -127,21 +138,22 @@ static <T> Blob constructBlobAndMetadata(

// Add chunk metadata and data to the list
chunksMetadataList.add(chunkMetadata);
chunksDataList.add(encryptedCompressedChunkData);
curDataSize += encryptedCompressedChunkDataSize;
crc.update(encryptedCompressedChunkData, 0, encryptedCompressedChunkDataSize);
chunksDataList.add(compressedChunkData);
curDataSize += compressedChunkDataSize;
crc.update(compressedChunkData, 0, compressedChunkDataSize);

logger.logInfo(
"Finish building chunk in blob={}, table={}, rowCount={}, startOffset={},"
+ " estimatedUncompressedSize={}, paddedChunkLength={}, encryptedCompressedSize={},"
+ " bdecVersion={}",
+ " estimatedUncompressedSize={}, chunkLength={}, compressedSize={},"
+ " encryption={}, bdecVersion={}",
filePath,
firstChannelFlushContext.getFullyQualifiedTableName(),
serializedChunk.rowCount,
startOffset,
serializedChunk.chunkEstimatedUncompressedSize,
paddedChunkLength,
encryptedCompressedChunkDataSize,
chunkLength,
compressedChunkDataSize,
encrypt,
bdecVersion);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to deserialize responses from configure endpoint */
class ConfigureResponse extends StreamingIngestResponse {
@JsonProperty("prefix")
private String prefix;

@JsonProperty("status_code")
private Long statusCode;

@JsonProperty("message")
private String message;

@JsonProperty("stage_location")
private FileLocationInfo stageLocation;

@JsonProperty("deployment_id")
private Long deploymentId;

String getPrefix() {
return prefix;
}

void setPrefix(String prefix) {
this.prefix = prefix;
}

@Override
Long getStatusCode() {
return statusCode;
}

void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

String getMessage() {
return message;
}

void setMessage(String message) {
this.message = message;
}

FileLocationInfo getStageLocation() {
return stageLocation;
}

void setStageLocation(FileLocationInfo stageLocation) {
this.stageLocation = stageLocation;
}

Long getDeploymentId() {
return deploymentId;
}

void setDeploymentId(Long deploymentId) {
this.deploymentId = deploymentId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;

/** Class used to deserialized volume information response by server */
class FileLocationInfo {
@JsonProperty("locationType")
private String locationType; // The stage type

@JsonProperty("location")
private String location; // The container or bucket

@JsonProperty("path")
private String path; // path of the target file

@JsonProperty("creds")
private Map<String, String> credentials; // the credentials required for the stage

@JsonProperty("region")
private String region; // AWS/S3/GCS region (S3/GCS only)

@JsonProperty("endPoint")
private String endPoint; // The Azure Storage endpoint (Azure only)

@JsonProperty("storageAccount")
private String storageAccount; // The Azure Storage account (Azure only)

@JsonProperty("presignedUrl")
private String presignedUrl; // GCS gives us back a presigned URL instead of a cred

@JsonProperty("isClientSideEncrypted")
private boolean isClientSideEncrypted; // whether to encrypt/decrypt files on the stage

@JsonProperty("useS3RegionalUrl")
private boolean useS3RegionalUrl; // whether to use s3 regional URL (AWS Only)

@JsonProperty("volumeHash")
private String volumeHash; // a unique id for volume assigned by server

String getLocationType() {
return locationType;
}

void setLocationType(String locationType) {
this.locationType = locationType;
}

String getLocation() {
return location;
}

void setLocation(String location) {
this.location = location;
}

String getPath() {
return path;
}

void setPath(String path) {
this.path = path;
}

Map<String, String> getCredentials() {
return credentials;
}

void setCredentials(Map<String, String> credentials) {
this.credentials = credentials;
}

String getRegion() {
return region;
}

void setRegion(String region) {
this.region = region;
}

String getEndPoint() {
return endPoint;
}

void setEndPoint(String endPoint) {
this.endPoint = endPoint;
}

String getStorageAccount() {
return storageAccount;
}

void setStorageAccount(String storageAccount) {
this.storageAccount = storageAccount;
}

String getPresignedUrl() {
return presignedUrl;
}

void setPresignedUrl(String presignedUrl) {
this.presignedUrl = presignedUrl;
}

boolean getIsClientSideEncrypted() {
return this.isClientSideEncrypted;
}

void setIsClientSideEncrypted(boolean isClientSideEncrypted) {
this.isClientSideEncrypted = isClientSideEncrypted;
}

boolean getUseS3RegionalUrl() {
return this.useS3RegionalUrl;
}

void setUseS3RegionalUrl(boolean useS3RegionalUrl) {
this.useS3RegionalUrl = useS3RegionalUrl;
}

String getVolumeHash() {
return this.volumeHash;
}

void setVolumeHash(String volumeHash) {
this.volumeHash = volumeHash;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -555,7 +555,12 @@ BlobMetadata buildAndUpload(String blobPath, List<List<ChannelData<T>>> blobData
Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency);

// Construct the blob along with the metadata of the blob
BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata(blobPath, blobData, bdecVersion);
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
blobPath,
blobData,
bdecVersion,
this.owningClient.getInternalParameterProvider().getEnableChunkEncryption());

blob.blobStats.setBuildDurationMs(buildContext);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

/** A class to provide non-configurable constants depends on Iceberg or non-Iceberg mode */
class InternalParameterProvider {
private final boolean isIcebergMode;

InternalParameterProvider(boolean isIcebergMode) {
this.isIcebergMode = isIcebergMode;
}

boolean getEnableChunkEncryption() {
// When in Iceberg mode, chunk encryption is disabled. Otherwise, it is enabled. Since Iceberg
// mode does not need client-side encryption.
return !isIcebergMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class OpenChannelResponse extends StreamingIngestResponse {
private List<ColumnMetadata> tableColumns;
private String encryptionKey;
private Long encryptionKeyId;
private FileLocationInfo stageLocation;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
Expand Down Expand Up @@ -130,4 +131,13 @@ void setEncryptionKeyId(Long encryptionKeyId) {
Long getEncryptionKeyId() {
return this.encryptionKeyId;
}

@JsonProperty("stage_location")
void setStageLocation(FileLocationInfo stageLocation) {
this.stageLocation = stageLocation;
}

FileLocationInfo getStageLocation() {
return this.stageLocation;
}
}
Loading

0 comments on commit 1678a6d

Please sign in to comment.