Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-919423 Surfaces Table Schema for a Channel #589

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import net.snowflake.ingest.streaming.internal.ColumnProperties;

/**
* A logical partition that represents a connection to a single Snowflake table, data will be
Expand Down Expand Up @@ -253,4 +254,13 @@ InsertValidationResponse insertRows(
*/
@Nullable
String getLatestCommittedOffsetToken();

/**
* Gets the table schema associated with this channel. Note that this is the table schema at the
* time of a channel open event. The schema may be changed on the Snowflake side in which case
* this will continue to show an old schema version until the channel is re-opened.
*
* @return map representing Column Name -> Column Properties
*/
Map<String, ColumnProperties> getTableSchema();
sfc-gh-tjones marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package net.snowflake.ingest.streaming.internal;

/**
sfc-gh-tjones marked this conversation as resolved.
Show resolved Hide resolved
* Class that encapsulates column properties. These are the same properties showed in the output of
* <a href="https://docs.snowflake.com/en/sql-reference/sql/show-columns">SHOW COLUMNS</a>. Note
* that this is slightly different than the internal column metadata used elsewhere in this SDK.
*/
public class ColumnProperties {
private String type;

private String logicalType;

private Integer precision;

private Integer scale;

private Integer byteLength;

private Integer length;

private boolean nullable;

ColumnProperties(ColumnMetadata columnMetadata) {
this.type = columnMetadata.getType();
this.logicalType = columnMetadata.getLogicalType();
this.precision = columnMetadata.getPrecision();
this.scale = columnMetadata.getScale();
this.byteLength = columnMetadata.getByteLength();
this.length = columnMetadata.getLength();
this.nullable = columnMetadata.getNullable();
}

public String getType() {
return type;
}

public String getLogicalType() {
return logicalType;
}

public Integer getPrecision() {
return precision;
}

public Integer getScale() {
return scale;
}

public Integer getByteLength() {
return byteLength;
}

public Integer getLength() {
return length;
}

public boolean isNullable() {
return nullable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -51,6 +52,9 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
// State of the channel that will be shared with its underlying buffer
private final ChannelRuntimeState channelState;

// Internal map of column name -> column properties
private final Map<String, ColumnProperties> tableColumns;

/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
Expand Down Expand Up @@ -122,6 +126,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
this::collectRowSize,
channelState,
new ClientBufferParameters(owningClient));
this.tableColumns = new HashMap<>();
logger.logInfo(
"Channel={} created for table={}",
this.channelFlushContext.getName(),
Expand Down Expand Up @@ -298,6 +303,7 @@ public CompletableFuture<Void> close() {
void setupSchema(List<ColumnMetadata> columns) {
logger.logDebug("Setup schema for channel={}, schema={}", getFullyQualifiedName(), columns);
this.rowBuffer.setupSchema(columns);
columns.forEach(c -> tableColumns.putIfAbsent(c.getName(), new ColumnProperties(c)));
}

/**
Expand Down Expand Up @@ -391,6 +397,12 @@ public String getLatestCommittedOffsetToken() {
return response.getPersistedOffsetToken();
}

/** Returns a map of column name -> datatype for the table the channel is bound to */
@Override
public Map<String, ColumnProperties> getTableSchema() {
return this.tableColumns;
}

/** Check whether we need to throttle the insertRows API */
void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) {
int retry = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,22 @@ public void testOpenChannelSuccessResponse() throws Exception {
Assert.assertEquals(dbName, channel.getDBName());
Assert.assertEquals(schemaName, channel.getSchemaName());
Assert.assertEquals(tableName, channel.getTableName());

Assert.assertTrue(channel.getTableSchema().containsKey("C1"));
Assert.assertEquals("NUMBER(38,0)", channel.getTableSchema().get("C1").getType());
Assert.assertEquals("FIXED", channel.getTableSchema().get("C1").getLogicalType());
Assert.assertEquals(38, (int) channel.getTableSchema().get("C1").getPrecision());
Assert.assertEquals(0, (int) channel.getTableSchema().get("C1").getScale());
Assert.assertNull(channel.getTableSchema().get("C1").getByteLength());
Assert.assertTrue(channel.getTableSchema().get("C1").isNullable());

Assert.assertTrue(channel.getTableSchema().containsKey("C2"));
Assert.assertEquals("NUMBER(38,0)", channel.getTableSchema().get("C2").getType());
Assert.assertEquals("FIXED", channel.getTableSchema().get("C2").getLogicalType());
Assert.assertEquals(38, (int) channel.getTableSchema().get("C2").getPrecision());
Assert.assertEquals(0, (int) channel.getTableSchema().get("C2").getScale());
Assert.assertNull(channel.getTableSchema().get("C2").getByteLength());
Assert.assertTrue(channel.getTableSchema().get("C2").isNullable());
}

@Test
Expand Down