From d8513d113dd6cf43039fc5f23e42b7fc2d9d222a Mon Sep 17 00:00:00 2001 From: Tyler Jones <59716821+sfc-gh-tjones@users.noreply.github.com> Date: Thu, 21 Sep 2023 11:03:51 -0700 Subject: [PATCH] SNOW-919423 Surfaces Table Schema for a Channel (#589) We've had requests to surface the table schema for a channel so that it is possible to reason about what columns need to be supplied to a given call of `insertRow` or `insertRows`. This surfaces a map of Column Name to Column Properties that are normally surfaced in the output of `SHOW COLUMNS`. @test adds test to `SnowflakeStreamingIngestChannelTest.java` --- .../SnowflakeStreamingIngestChannel.java | 10 ++++ .../streaming/internal/ColumnProperties.java | 60 +++++++++++++++++++ ...owflakeStreamingIngestChannelInternal.java | 12 ++++ .../SnowflakeStreamingIngestChannelTest.java | 16 +++++ 4 files changed, 98 insertions(+) create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java index 5fdadc90b..909fcefa1 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import javax.annotation.Nullable; +import net.snowflake.ingest.streaming.internal.ColumnProperties; /** * A logical partition that represents a connection to a single Snowflake table, data will be @@ -253,4 +254,13 @@ InsertValidationResponse insertRows( */ @Nullable String getLatestCommittedOffsetToken(); + + /** + * Gets the table schema associated with this channel. Note that this is the table schema at the + * time of a channel open event. The schema may be changed on the Snowflake side in which case + * this will continue to show an old schema version until the channel is re-opened. + * + * @return map representing Column Name -> Column Properties + */ + Map getTableSchema(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java new file mode 100644 index 000000000..856170c57 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java @@ -0,0 +1,60 @@ +package net.snowflake.ingest.streaming.internal; + +/** + * Class that encapsulates column properties. These are the same properties showed in the output of + * SHOW COLUMNS. Note + * that this is slightly different than the internal column metadata used elsewhere in this SDK. + */ +public class ColumnProperties { + private String type; + + private String logicalType; + + private Integer precision; + + private Integer scale; + + private Integer byteLength; + + private Integer length; + + private boolean nullable; + + ColumnProperties(ColumnMetadata columnMetadata) { + this.type = columnMetadata.getType(); + this.logicalType = columnMetadata.getLogicalType(); + this.precision = columnMetadata.getPrecision(); + this.scale = columnMetadata.getScale(); + this.byteLength = columnMetadata.getByteLength(); + this.length = columnMetadata.getLength(); + this.nullable = columnMetadata.getNullable(); + } + + public String getType() { + return type; + } + + public String getLogicalType() { + return logicalType; + } + + public Integer getPrecision() { + return precision; + } + + public Integer getScale() { + return scale; + } + + public Integer getByteLength() { + return byteLength; + } + + public Integer getLength() { + return length; + } + + public boolean isNullable() { + return nullable; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 4a27aba5d..756f9b5e8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -12,6 +12,7 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -51,6 +52,9 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn // State of the channel that will be shared with its underlying buffer private final ChannelRuntimeState channelState; + // Internal map of column name -> column properties + private final Map tableColumns; + /** * Constructor for TESTING ONLY which allows us to set the test mode * @@ -122,6 +126,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn this::collectRowSize, channelState, new ClientBufferParameters(owningClient)); + this.tableColumns = new HashMap<>(); logger.logInfo( "Channel={} created for table={}", this.channelFlushContext.getName(), @@ -298,6 +303,7 @@ public CompletableFuture close() { void setupSchema(List columns) { logger.logDebug("Setup schema for channel={}, schema={}", getFullyQualifiedName(), columns); this.rowBuffer.setupSchema(columns); + columns.forEach(c -> tableColumns.putIfAbsent(c.getName(), new ColumnProperties(c))); } /** @@ -391,6 +397,12 @@ public String getLatestCommittedOffsetToken() { return response.getPersistedOffsetToken(); } + /** Returns a map of column name -> datatype for the table the channel is bound to */ + @Override + public Map getTableSchema() { + return this.tableColumns; + } + /** Check whether we need to throttle the insertRows API */ void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) { int retry = 0; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 62f2efdce..e9dc87e94 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -495,6 +495,22 @@ public void testOpenChannelSuccessResponse() throws Exception { Assert.assertEquals(dbName, channel.getDBName()); Assert.assertEquals(schemaName, channel.getSchemaName()); Assert.assertEquals(tableName, channel.getTableName()); + + Assert.assertTrue(channel.getTableSchema().containsKey("C1")); + Assert.assertEquals("NUMBER(38,0)", channel.getTableSchema().get("C1").getType()); + Assert.assertEquals("FIXED", channel.getTableSchema().get("C1").getLogicalType()); + Assert.assertEquals(38, (int) channel.getTableSchema().get("C1").getPrecision()); + Assert.assertEquals(0, (int) channel.getTableSchema().get("C1").getScale()); + Assert.assertNull(channel.getTableSchema().get("C1").getByteLength()); + Assert.assertTrue(channel.getTableSchema().get("C1").isNullable()); + + Assert.assertTrue(channel.getTableSchema().containsKey("C2")); + Assert.assertEquals("NUMBER(38,0)", channel.getTableSchema().get("C2").getType()); + Assert.assertEquals("FIXED", channel.getTableSchema().get("C2").getLogicalType()); + Assert.assertEquals(38, (int) channel.getTableSchema().get("C2").getPrecision()); + Assert.assertEquals(0, (int) channel.getTableSchema().get("C2").getScale()); + Assert.assertNull(channel.getTableSchema().get("C2").getByteLength()); + Assert.assertTrue(channel.getTableSchema().get("C2").isNullable()); } @Test