From e38a7e5737774434d770dd7db198a04926a1248e Mon Sep 17 00:00:00 2001 From: Tyler Jones Date: Wed, 20 Sep 2023 13:35:43 -0700 Subject: [PATCH 1/2] SNOW-919423 Surfaces Table Schema for a Channel We've had requests to surface the table schema for a channel so that it is possible to reason about what columns need to be supplied to a given call of `insertRow` or `insertRows`. This surfaces a map of Column Name to Column Properties that are normally surfaced in the output of `SHOW COLUMNS`. @test adds test to `SnowflakeStreamingIngestChannelTest.java` --- .../SnowflakeStreamingIngestChannel.java | 8 +++ .../streaming/internal/ColumnProperties.java | 59 +++++++++++++++++++ ...owflakeStreamingIngestChannelInternal.java | 12 ++++ .../SnowflakeStreamingIngestChannelTest.java | 16 +++++ 4 files changed, 95 insertions(+) create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java index 5fdadc90b..6324baa54 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import javax.annotation.Nullable; +import net.snowflake.ingest.streaming.internal.ColumnProperties; /** * A logical partition that represents a connection to a single Snowflake table, data will be @@ -253,4 +254,11 @@ InsertValidationResponse insertRows( */ @Nullable String getLatestCommittedOffsetToken(); + + /** + * Gets the table schema associated with this channel + * + * @return map representing Column Name -> Column Properties + */ + Map getTableSchema(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java new file mode 100644 index 000000000..bd2b48a30 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java @@ -0,0 +1,59 @@ +package net.snowflake.ingest.streaming.internal; + +/** + * Class that encapsulates column properties. These are the same properties showed in the output of + * SHOW COLUMNS + */ +public class ColumnProperties { + private String type; + + private String logicalType; + + private Integer precision; + + private Integer scale; + + private Integer byteLength; + + private Integer length; + + private boolean nullable; + + ColumnProperties(ColumnMetadata columnMetadata) { + this.type = columnMetadata.getType(); + this.logicalType = columnMetadata.getLogicalType(); + this.precision = columnMetadata.getPrecision(); + this.scale = columnMetadata.getScale(); + this.byteLength = columnMetadata.getByteLength(); + this.length = columnMetadata.getLength(); + this.nullable = columnMetadata.getNullable(); + } + + public String getType() { + return type; + } + + public String getLogicalType() { + return logicalType; + } + + public Integer getPrecision() { + return precision; + } + + public Integer getScale() { + return scale; + } + + public Integer getByteLength() { + return byteLength; + } + + public Integer getLength() { + return length; + } + + public boolean isNullable() { + return nullable; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 4a27aba5d..46089dca6 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -12,6 +12,7 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -51,6 +52,9 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn // State of the channel that will be shared with its underlying buffer private final ChannelRuntimeState channelState; + // Internal map of column name -> column properties + private final Map tableColumns; + /** * Constructor for TESTING ONLY which allows us to set the test mode * @@ -126,6 +130,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn "Channel={} created for table={}", this.channelFlushContext.getName(), this.channelFlushContext.getTableName()); + this.tableColumns = new HashMap<>(); } /** @@ -298,6 +303,7 @@ public CompletableFuture close() { void setupSchema(List columns) { logger.logDebug("Setup schema for channel={}, schema={}", getFullyQualifiedName(), columns); this.rowBuffer.setupSchema(columns); + columns.forEach(c -> tableColumns.putIfAbsent(c.getName(), new ColumnProperties(c))); } /** @@ -391,6 +397,12 @@ public String getLatestCommittedOffsetToken() { return response.getPersistedOffsetToken(); } + /** Returns a map of column name -> datatype for the table the channel is bound to */ + @Override + public Map getTableSchema() { + return this.tableColumns; + } + /** Check whether we need to throttle the insertRows API */ void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) { int retry = 0; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 62f2efdce..e9dc87e94 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -495,6 +495,22 @@ public void testOpenChannelSuccessResponse() throws Exception { Assert.assertEquals(dbName, channel.getDBName()); Assert.assertEquals(schemaName, channel.getSchemaName()); Assert.assertEquals(tableName, channel.getTableName()); + + Assert.assertTrue(channel.getTableSchema().containsKey("C1")); + Assert.assertEquals("NUMBER(38,0)", channel.getTableSchema().get("C1").getType()); + Assert.assertEquals("FIXED", channel.getTableSchema().get("C1").getLogicalType()); + Assert.assertEquals(38, (int) channel.getTableSchema().get("C1").getPrecision()); + Assert.assertEquals(0, (int) channel.getTableSchema().get("C1").getScale()); + Assert.assertNull(channel.getTableSchema().get("C1").getByteLength()); + Assert.assertTrue(channel.getTableSchema().get("C1").isNullable()); + + Assert.assertTrue(channel.getTableSchema().containsKey("C2")); + Assert.assertEquals("NUMBER(38,0)", channel.getTableSchema().get("C2").getType()); + Assert.assertEquals("FIXED", channel.getTableSchema().get("C2").getLogicalType()); + Assert.assertEquals(38, (int) channel.getTableSchema().get("C2").getPrecision()); + Assert.assertEquals(0, (int) channel.getTableSchema().get("C2").getScale()); + Assert.assertNull(channel.getTableSchema().get("C2").getByteLength()); + Assert.assertTrue(channel.getTableSchema().get("C2").isNullable()); } @Test From 9ea17bd484e676f3c0e2a1915412e0e83121959c Mon Sep 17 00:00:00 2001 From: Tyler Jones Date: Wed, 20 Sep 2023 16:41:11 -0700 Subject: [PATCH 2/2] Iteration 2: Addresses Tobys comments --- .../ingest/streaming/SnowflakeStreamingIngestChannel.java | 4 +++- .../snowflake/ingest/streaming/internal/ColumnProperties.java | 3 ++- .../internal/SnowflakeStreamingIngestChannelInternal.java | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java index 6324baa54..909fcefa1 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java @@ -256,7 +256,9 @@ InsertValidationResponse insertRows( String getLatestCommittedOffsetToken(); /** - * Gets the table schema associated with this channel + * Gets the table schema associated with this channel. Note that this is the table schema at the + * time of a channel open event. The schema may be changed on the Snowflake side in which case + * this will continue to show an old schema version until the channel is re-opened. * * @return map representing Column Name -> Column Properties */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java index bd2b48a30..856170c57 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java @@ -2,7 +2,8 @@ /** * Class that encapsulates column properties. These are the same properties showed in the output of - * SHOW COLUMNS + * SHOW COLUMNS. Note + * that this is slightly different than the internal column metadata used elsewhere in this SDK. */ public class ColumnProperties { private String type; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 46089dca6..756f9b5e8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -126,11 +126,11 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn this::collectRowSize, channelState, new ClientBufferParameters(owningClient)); + this.tableColumns = new HashMap<>(); logger.logInfo( "Channel={} created for table={}", this.channelFlushContext.getName(), this.channelFlushContext.getTableName()); - this.tableColumns = new HashMap<>(); } /**