Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into bmikaili-SNOW-1492090-snow…
Browse files Browse the repository at this point in the history
…pipe-streaming-file-master-key-id-rotation
  • Loading branch information
sfc-gh-bmikaili committed Oct 1, 2024
2 parents edcf313 + d283df1 commit 3ff93ca
Show file tree
Hide file tree
Showing 22 changed files with 1,532 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ static <T> Blob constructBlobAndMetadata(
chunkMetadataBuilder
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
.setCreatedOn(0L)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
.setExtendedMetadataSize(-1L);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {
private static final String BDEC_PARQUET_MESSAGE_TYPE_NAME = "bdec";
private static final String PARQUET_MESSAGE_TYPE_NAME = "schema";

private long maxChunkSizeInBytes;

Expand Down Expand Up @@ -118,4 +120,8 @@ public boolean getIsIcebergMode() {
public Optional<Integer> getMaxRowGroups() {
return maxRowGroups;
}

public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,83 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR
insertRowIndex);
}

/**
* Validate and cast Iceberg struct column to Map<String, Object>. Allowed Java type:
*
* <ul>
* <li>Map<String, Object>
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Map
*/
static Map<String, ?> validateAndParseIcebergStruct(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Map)) {
throw typeNotAllowedException(
columnName,
input.getClass(),
"STRUCT",
new String[] {"Map<String, Object>"},
insertRowIndex);
}
for (Object key : ((Map<?, ?>) input).keySet()) {
if (!(key instanceof String)) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Field name of struct %s must be of type String, rowIndex:%d",
columnName, insertRowIndex));
}
}

return (Map<String, ?>) input;
}

/**
* Validate and parse Iceberg list column to an Iterable. Allowed Java type:
*
* <ul>
* <li>Iterable
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Iterable
*/
static Iterable<?> validateAndParseIcebergList(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Iterable)) {
throw typeNotAllowedException(
columnName, input.getClass(), "LIST", new String[] {"Iterable"}, insertRowIndex);
}
return (Iterable<?>) input;
}

/**
* Validate and parse Iceberg map column to a map. Allowed Java type:
*
* <ul>
* <li>Map<Object, Object>
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Map
*/
static Map<?, ?> validateAndParseIcebergMap(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Map)) {
throw typeNotAllowedException(
columnName, input.getClass(), "MAP", new String[] {"Map"}, insertRowIndex);
}
return (Map<?, ?>) input;
}

static void checkValueInRange(
BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) {
BigDecimal comparand =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,25 @@ class DropChannelRequestInternal implements IStreamingIngestRequest {
@JsonProperty("client_sequencer")
Long clientSequencer;

@JsonProperty("is_iceberg")
boolean isIceberg;

DropChannelRequestInternal(
String requestId,
String role,
String database,
String schema,
String table,
String channel,
boolean isIceberg,
Long clientSequencer) {
this.requestId = requestId;
this.role = role;
this.database = database;
this.schema = schema;
this.table = table;
this.channel = channel;
this.isIceberg = isIceberg;
this.clientSequencer = clientSequencer;
}

Expand Down Expand Up @@ -74,6 +79,10 @@ String getSchema() {
return schema;
}

boolean isIceberg() {
return isIceberg;
}

Long getClientSequencer() {
return clientSequencer;
}
Expand All @@ -86,7 +95,7 @@ String getFullyQualifiedTableName() {
public String getStringForLogging() {
return String.format(
"DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
+ " clientSequencer=%s)",
requestId, role, database, schema, table, channel, clientSequencer);
+ " isIceberg=%s, clientSequencer=%s)",
requestId, role, database, schema, table, channel, isIceberg, clientSequencer);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigInteger;
import java.util.Objects;

/** Audit register endpoint/FileColumnPropertyDTO property list. */
class FileColumnProperties {
private int columnOrdinal;
private Integer fieldId;
private String minStrValue;

private String maxStrValue;
Expand Down Expand Up @@ -46,6 +52,7 @@ class FileColumnProperties {

FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) {
this.setColumnOrdinal(stats.getOrdinal());
this.setFieldId(stats.getFieldId());
this.setCollation(stats.getCollationDefinitionString());
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
Expand Down Expand Up @@ -93,6 +100,16 @@ public void setColumnOrdinal(int columnOrdinal) {
this.columnOrdinal = columnOrdinal;
}

@JsonProperty("fieldId")
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getFieldId() {
return fieldId;
}

public void setFieldId(Integer fieldId) {
this.fieldId = fieldId;
}

// Annotation required in order to have package private fields serialized
@JsonProperty("minStrValue")
String getMinStrValue() {
Expand Down Expand Up @@ -206,6 +223,7 @@ void setMaxStrNonCollated(String maxStrNonCollated) {
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"columnOrdinal\": ").append(columnOrdinal);
sb.append(", \"fieldId\": ").append(fieldId);
if (minIntValue != null) {
sb.append(", \"minIntValue\": ").append(minIntValue);
sb.append(", \"maxIntValue\": ").append(maxIntValue);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down
Loading

0 comments on commit 3ff93ca

Please sign in to comment.