Skip to content

Commit

Permalink
Merge branch 'master' into tzhang-si-random
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tzhang committed Oct 4, 2023
2 parents 3e2408c + 02d06c9 commit 654afeb
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 54 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<!-- Arifact name and version information -->
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.0.3</version>
<version>2.0.4-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Snowflake Ingest SDK</name>
<description>Snowflake Ingest SDK</description>
Expand Down Expand Up @@ -63,7 +63,7 @@
<protobuf.version>3.19.6</protobuf.version>
<shadeBase>net.snowflake.ingest.internal</shadeBase>
<slf4j.version>1.7.36</slf4j.version>
<snappy.version>1.1.10.1</snappy.version>
<snappy.version>1.1.10.4</snappy.version>
<snowjdbc.version>3.13.30</snowjdbc.version>
<yetus.version>0.13.0</yetus.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public class RequestBuilder {
// Don't change!
public static final String CLIENT_NAME = "SnowpipeJavaSDK";

public static final String DEFAULT_VERSION = "2.0.3";
public static final String DEFAULT_VERSION = "2.0.4-SNAPSHOT";

public static final String JAVA_USER_AGENT = "JAVA";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import net.snowflake.ingest.streaming.internal.ColumnProperties;

/**
* A logical partition that represents a connection to a single Snowflake table, data will be
Expand All @@ -17,7 +18,7 @@
* at the same time. When a new channel is opened, all previously opened channels with the same name
* are invalidated (this applies for the table globally. not just in a single JVM). In order to
* ingest data from multiple threads/clients/applications, we recommend opening multiple channels,
* each with a different name. There is no limit on the number of channels that can be opened.
* each with a different name.
*
* <p>Thread safety note: Implementations of this interface are required to be thread safe.
*/
Expand Down Expand Up @@ -253,4 +254,13 @@ InsertValidationResponse insertRows(
*/
@Nullable
String getLatestCommittedOffsetToken();

/**
* Gets the table schema associated with this channel. Note that this is the table schema at the
* time of a channel open event. The schema may be changed on the Snowflake side in which case
* this will continue to show an old schema version until the channel is re-opened.
*
* @return map representing Column Name -> Column Properties
*/
Map<String, ColumnProperties> getTableSchema();
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,11 @@ public float getSize() {
*
* @param row the input row
* @param error the insert error that we return to the customer
* @param rowIndex the index of the current row in the input batch
* @return the set of input column names
*/
Set<String> verifyInputColumns(
Map<String, Object> row, InsertValidationResponse.InsertError error) {
Map<String, Object> row, InsertValidationResponse.InsertError error, int rowIndex) {
// Map of unquoted column name -> original column name
Map<String, String> inputColNamesMap =
row.keySet().stream()
Expand All @@ -260,7 +261,8 @@ Set<String> verifyInputColumns(
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
"Extra columns: " + extraCols,
"Columns not present in the table shouldn't be specified.");
String.format(
"Columns not present in the table shouldn't be specified, rowIndex:%d", rowIndex));
}

// Check for missing columns in the row
Expand All @@ -278,7 +280,8 @@ Set<String> verifyInputColumns(
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
"Missing columns: " + missingCols,
"Values for all non-nullable columns must be specified.");
String.format(
"Values for all non-nullable columns must be specified, rowIndex:%d", rowIndex));
}

return inputColNamesMap.keySet();
Expand All @@ -305,12 +308,12 @@ public InsertValidationResponse insertRows(
if (onErrorOption == OpenChannelRequest.OnErrorOption.CONTINUE
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_AT_FIRST_ERROR) {
// Used to map incoming row(nth row) to InsertError(for nth row) in response
long rowIndex = 0;
int rowIndex = 0;
for (Map<String, Object> row : rows) {
InsertValidationResponse.InsertError error =
new InsertValidationResponse.InsertError(row, rowIndex);
try {
Set<String> inputColumnNames = verifyInputColumns(row, error);
Set<String> inputColumnNames = verifyInputColumns(row, error, rowIndex);
rowsSizeInBytes +=
addRow(row, this.bufferedRowCount, this.statsMap, inputColumnNames, rowIndex);
this.bufferedRowCount++;
Expand Down Expand Up @@ -338,7 +341,7 @@ public InsertValidationResponse insertRows(
float tempRowsSizeInBytes = 0F;
int tempRowCount = 0;
for (Map<String, Object> row : rows) {
Set<String> inputColumnNames = verifyInputColumns(row, null);
Set<String> inputColumnNames = verifyInputColumns(row, null, tempRowCount);
tempRowsSizeInBytes +=
addTempRow(row, tempRowCount, this.tempStatsMap, inputColumnNames, tempRowCount);
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package net.snowflake.ingest.streaming.internal;

/**
* Class that encapsulates column properties. These are the same properties showed in the output of
* <a href="https://docs.snowflake.com/en/sql-reference/sql/show-columns">SHOW COLUMNS</a>. Note
* that this is slightly different than the internal column metadata used elsewhere in this SDK.
*/
public class ColumnProperties {
private String type;

private String logicalType;

private Integer precision;

private Integer scale;

private Integer byteLength;

private Integer length;

private boolean nullable;

ColumnProperties(ColumnMetadata columnMetadata) {
this.type = columnMetadata.getType();
this.logicalType = columnMetadata.getLogicalType();
this.precision = columnMetadata.getPrecision();
this.scale = columnMetadata.getScale();
this.byteLength = columnMetadata.getByteLength();
this.length = columnMetadata.getLength();
this.nullable = columnMetadata.getNullable();
}

public String getType() {
return type;
}

public String getLogicalType() {
return logicalType;
}

public Integer getPrecision() {
return precision;
}

public Integer getScale() {
return scale;
}

public Integer getByteLength() {
return byteLength;
}

public Integer getLength() {
return length;
}

public boolean isNullable() {
return nullable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,10 @@ static TimestampWrapper validateAndParseTimestamp(
if (offsetDateTime.getYear() < 1 || offsetDateTime.getYear() > 9999) {
throw new SFException(
ErrorCode.INVALID_VALUE_ROW,
"Timestamp out of representable inclusive range of years between 1 and 9999");
String.format(
"Timestamp out of representable inclusive range of years between 1 and 9999,"
+ " rowIndex:%d",
insertRowIndex));
}
return new TimestampWrapper(offsetDateTime, scale);
}
Expand Down Expand Up @@ -588,7 +591,10 @@ static int validateAndParseDate(String columnName, Object input, long insertRowI
if (offsetDateTime.getYear() < -9999 || offsetDateTime.getYear() > 9999) {
throw new SFException(
ErrorCode.INVALID_VALUE_ROW,
"Date out of representable inclusive range of years between -9999 and 9999");
String.format(
"Date out of representable inclusive range of years between -9999 and 9999,"
+ " rowIndex:%d",
insertRowIndex));
}

return Math.toIntExact(offsetDateTime.toLocalDate().toEpochDay());
Expand Down Expand Up @@ -814,7 +820,7 @@ static void checkValueInRange(
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Number out of representable exclusive range of (-1e%s..1e%s), Row Index:%s",
"Number out of representable exclusive range of (-1e%s..1e%s), rowIndex:%d",
precision - scale, precision - scale, insertRowIndex));
}
}
Expand Down Expand Up @@ -858,8 +864,7 @@ private static SFException typeNotAllowedException(
return new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Object of type %s cannot be ingested into Snowflake column %s of type %s, Row"
+ " Index:%s",
"Object of type %s cannot be ingested into Snowflake column %s of type %s, rowIndex:%d",
javaType.getName(), columnName, snowflakeType, insertRowIndex),
String.format(
String.format("Allowed Java types: %s", String.join(", ", allowedJavaTypes))));
Expand All @@ -882,7 +887,7 @@ private static SFException valueFormatNotAllowedException(
return new SFException(
ErrorCode.INVALID_VALUE_ROW,
String.format(
"Value cannot be ingested into Snowflake column %s of type %s, Row Index: %s, reason:"
"Value cannot be ingested into Snowflake column %s of type %s, rowIndex:%d, reason:"
+ " %s",
columnName, snowflakeType, rowIndex, reason));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -174,7 +173,7 @@ List<List<ChannelData<T>>> getData() {
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
this.isTestMode = isTestMode;
this.latencyTimerContextMap = new HashMap<>();
this.latencyTimerContextMap = new ConcurrentHashMap<>();
this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
createWorkers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ private float addRow(
throw new SFException(
ErrorCode.MAX_ROW_SIZE_EXCEEDED,
String.format(
"rowSizeInBytes=%.3f maxAllowedRowSizeInBytes=%d",
size, clientBufferParameters.getMaxAllowedRowSizeInBytes()));
"rowSizeInBytes:%.3f, maxAllowedRowSizeInBytes:%d, rowIndex:%d",
size, clientBufferParameters.getMaxAllowedRowSizeInBytes(), insertRowsCurrIndex));
}

out.accept(Arrays.asList(indexedRow));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -51,6 +52,9 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
// State of the channel that will be shared with its underlying buffer
private final ChannelRuntimeState channelState;

// Internal map of column name -> column properties
private final Map<String, ColumnProperties> tableColumns;

/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
Expand Down Expand Up @@ -122,6 +126,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
this::collectRowSize,
channelState,
new ClientBufferParameters(owningClient));
this.tableColumns = new HashMap<>();
logger.logInfo(
"Channel={} created for table={}",
this.channelFlushContext.getName(),
Expand Down Expand Up @@ -298,6 +303,7 @@ public CompletableFuture<Void> close() {
void setupSchema(List<ColumnMetadata> columns) {
logger.logDebug("Setup schema for channel={}, schema={}", getFullyQualifiedName(), columns);
this.rowBuffer.setupSchema(columns);
columns.forEach(c -> tableColumns.putIfAbsent(c.getName(), new ColumnProperties(c)));
}

/**
Expand Down Expand Up @@ -391,6 +397,12 @@ public String getLatestCommittedOffsetToken() {
return response.getPersistedOffsetToken();
}

/** Returns a map of column name -> datatype for the table the channel is bound to */
@Override
public Map<String, ColumnProperties> getTableSchema() {
return this.tableColumns;
}

/** Check whether we need to throttle the insertRows API */
void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) {
int retry = 0;
Expand Down
Loading

0 comments on commit 654afeb

Please sign in to comment.