Skip to content

Commit

Permalink
eliminate iceberg logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jul 11, 2024
1 parent 6eb0fff commit 1657d14
Show file tree
Hide file tree
Showing 27 changed files with 176 additions and 796 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public enum ApiName {
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_CHANNEL_CONFIGURE("POST");
STREAMING_CLIENT_CONFIGURE("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;
Expand Down Expand Up @@ -49,11 +49,6 @@ public Builder setParameterOverrides(Map<String, Object> parameterOverrides) {
return this;
}

public Builder setIsIceberg(boolean isIcebergMode) {
this.isIcebergMode = isIcebergMode;
return this;
}

public Builder setIsTestMode(boolean isTestMode) {
this.isTestMode = isTestMode;
return this;
Expand All @@ -67,12 +62,7 @@ public SnowflakeStreamingIngestClient build() {
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));

return new SnowflakeStreamingIngestClientInternal<>(
this.name,
accountURL,
prop,
this.parameterOverrides,
this.isIcebergMode,
this.isTestMode);
this.name, accountURL, prop, this.parameterOverrides, this.isTestMode);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -61,14 +61,10 @@ class BlobBuilder {
* @param blobData All the data for one blob. Assumes that all ChannelData in the inner List
* belongs to the same table. Will error if this is not the case
* @param bdecVersion version of blob
* @param encrypt If the output chunk is encrypted or not
* @return {@link Blob} data
*/
static <T> Blob constructBlobAndMetadata(
String filePath,
List<List<ChannelData<T>>> blobData,
Constants.BdecVersion bdecVersion,
boolean encrypt)
String filePath, List<List<ChannelData<T>>> blobData, Constants.BdecVersion bdecVersion)
throws IOException, NoSuchPaddingException, NoSuchAlgorithmException,
InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException,
BadPaddingException {
Expand All @@ -87,32 +83,24 @@ static <T> Blob constructBlobAndMetadata(
flusher.serialize(channelsDataPerTable, filePath);

if (!serializedChunk.channelsMetadataList.isEmpty()) {
byte[] compressedChunkData;
int chunkLength;

if (encrypt) {
Pair<byte[], Integer> paddedChunk =
padChunk(serializedChunk.chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES);
byte[] paddedChunkData = paddedChunk.getFirst();
chunkLength = paddedChunk.getSecond();

// Encrypt the compressed chunk data, the encryption key is derived using the key from
// server with the full blob path.
// We need to maintain IV as a block counter for the whole file, even interleaved,
// to align with decryption on the Snowflake query path.
// TODO: address alignment for the header SNOW-557866
long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES;
compressedChunkData =
Cryptor.encrypt(
paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv);
} else {
compressedChunkData = serializedChunk.chunkData.toByteArray();
chunkLength = compressedChunkData.length;
}
ByteArrayOutputStream chunkData = serializedChunk.chunkData;
Pair<byte[], Integer> paddedChunk =
padChunk(chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES);
byte[] paddedChunkData = paddedChunk.getFirst();
int paddedChunkLength = paddedChunk.getSecond();
// Encrypt the compressed chunk data, the encryption key is derived using the key from
// server with the full blob path.
// We need to maintain IV as a block counter for the whole file, even interleaved,
// to align with decryption on the Snowflake query path.
// TODO: address alignment for the header SNOW-557866
long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES;
byte[] encryptedCompressedChunkData =
Cryptor.encrypt(
paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv);

// Compute the md5 of the chunk data
String md5 = computeMD5(compressedChunkData, chunkLength);
int compressedChunkDataSize = compressedChunkData.length;
String md5 = computeMD5(encryptedCompressedChunkData, paddedChunkLength);
int encryptedCompressedChunkDataSize = encryptedCompressedChunkData.length;

// Create chunk metadata
long startOffset = curDataSize;
Expand All @@ -122,9 +110,9 @@ static <T> Blob constructBlobAndMetadata(
// The start offset will be updated later in BlobBuilder#build to include the blob
// header
.setChunkStartOffset(startOffset)
// The chunkLength is used because it is the actual data size used for
// The paddedChunkLength is used because it is the actual data size used for
// decompression and md5 calculation on server side.
.setChunkLength(chunkLength)
.setChunkLength(paddedChunkLength)
.setUncompressedChunkLength((int) serializedChunk.chunkEstimatedUncompressedSize)
.setChannelList(serializedChunk.channelsMetadataList)
.setChunkMD5(md5)
Expand All @@ -138,22 +126,21 @@ static <T> Blob constructBlobAndMetadata(

// Add chunk metadata and data to the list
chunksMetadataList.add(chunkMetadata);
chunksDataList.add(compressedChunkData);
curDataSize += compressedChunkDataSize;
crc.update(compressedChunkData, 0, compressedChunkDataSize);
chunksDataList.add(encryptedCompressedChunkData);
curDataSize += encryptedCompressedChunkDataSize;
crc.update(encryptedCompressedChunkData, 0, encryptedCompressedChunkDataSize);

logger.logInfo(
"Finish building chunk in blob={}, table={}, rowCount={}, startOffset={},"
+ " estimatedUncompressedSize={}, chunkLength={}, compressedSize={},"
+ " encryption={}, bdecVersion={}",
+ " estimatedUncompressedSize={}, paddedChunkLength={}, encryptedCompressedSize={},"
+ " bdecVersion={}",
filePath,
firstChannelFlushContext.getFullyQualifiedTableName(),
serializedChunk.rowCount,
startOffset,
serializedChunk.chunkEstimatedUncompressedSize,
chunkLength,
compressedChunkDataSize,
encrypt,
paddedChunkLength,
encryptedCompressedChunkDataSize,
bdecVersion);
}
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ class DropChannelRequestInternal implements StreamingIngestRequest {
@JsonProperty("schema")
private String schema;

@JsonProperty("is_iceberg")
private boolean isIceberg;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("client_sequencer")
Long clientSequencer;
Expand All @@ -43,16 +40,14 @@ class DropChannelRequestInternal implements StreamingIngestRequest {
String schema,
String table,
String channel,
Long clientSequencer,
boolean isIceberg) {
Long clientSequencer) {
this.requestId = requestId;
this.role = role;
this.database = database;
this.schema = schema;
this.table = table;
this.channel = channel;
this.clientSequencer = clientSequencer;
this.isIceberg = isIceberg;
}

String getRequestId() {
Expand All @@ -79,10 +74,6 @@ String getSchema() {
return schema;
}

boolean getIsIceberg() {
return isIceberg;
}

Long getClientSequencer() {
return clientSequencer;
}
Expand All @@ -95,7 +86,7 @@ String getFullyQualifiedTableName() {
public String getStringForLogging() {
return String.format(
"DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
+ " isIceberg=%s, clientSequencer=%s)",
requestId, role, database, schema, table, channel, isIceberg, clientSequencer);
+ " clientSequencer=%s)",
requestId, role, database, schema, table, channel, clientSequencer);
}
}
Loading

0 comments on commit 1657d14

Please sign in to comment.