Skip to content

Commit

Permalink
SNOW-919423 Surfaces Table Schema for a Channel
Browse files Browse the repository at this point in the history
We've had requests to surface the table schema for a channel so that it
is possible to reason about what columns need to be supplied to a given
call of `insertRow` or `insertRows`. This surfaces a map of Column
Name to Column Properties that are normally surfaced in the output of
`SHOW COLUMNS`.

@test adds test to `SnowflakeStreamingIngestChannelTest.java`
  • Loading branch information
sfc-gh-tjones committed Sep 20, 2023
1 parent f57f92b commit e38a7e5
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import net.snowflake.ingest.streaming.internal.ColumnProperties;

/**
* A logical partition that represents a connection to a single Snowflake table, data will be
Expand Down Expand Up @@ -253,4 +254,11 @@ InsertValidationResponse insertRows(
*/
@Nullable
String getLatestCommittedOffsetToken();

/**
* Gets the table schema associated with this channel
*
* @return map representing Column Name -> Column Properties
*/
Map<String, ColumnProperties> getTableSchema();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package net.snowflake.ingest.streaming.internal;

/**
* Class that encapsulates column properties. These are the same properties showed in the output of
* <a href="https://docs.snowflake.com/en/sql-reference/sql/show-columns">SHOW COLUMNS</a>
*/
public class ColumnProperties {
private String type;

private String logicalType;

private Integer precision;

private Integer scale;

private Integer byteLength;

private Integer length;

private boolean nullable;

ColumnProperties(ColumnMetadata columnMetadata) {
this.type = columnMetadata.getType();
this.logicalType = columnMetadata.getLogicalType();
this.precision = columnMetadata.getPrecision();
this.scale = columnMetadata.getScale();
this.byteLength = columnMetadata.getByteLength();
this.length = columnMetadata.getLength();
this.nullable = columnMetadata.getNullable();
}

public String getType() {
return type;
}

public String getLogicalType() {
return logicalType;
}

public Integer getPrecision() {
return precision;
}

public Integer getScale() {
return scale;
}

public Integer getByteLength() {
return byteLength;
}

public Integer getLength() {
return length;
}

public boolean isNullable() {
return nullable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -51,6 +52,9 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
// State of the channel that will be shared with its underlying buffer
private final ChannelRuntimeState channelState;

// Internal map of column name -> column properties
private final Map<String, ColumnProperties> tableColumns;

/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
Expand Down Expand Up @@ -126,6 +130,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
"Channel={} created for table={}",
this.channelFlushContext.getName(),
this.channelFlushContext.getTableName());
this.tableColumns = new HashMap<>();
}

/**
Expand Down Expand Up @@ -298,6 +303,7 @@ public CompletableFuture<Void> close() {
void setupSchema(List<ColumnMetadata> columns) {
logger.logDebug("Setup schema for channel={}, schema={}", getFullyQualifiedName(), columns);
this.rowBuffer.setupSchema(columns);
columns.forEach(c -> tableColumns.putIfAbsent(c.getName(), new ColumnProperties(c)));
}

/**
Expand Down Expand Up @@ -391,6 +397,12 @@ public String getLatestCommittedOffsetToken() {
return response.getPersistedOffsetToken();
}

/** Returns a map of column name -> datatype for the table the channel is bound to */
@Override
public Map<String, ColumnProperties> getTableSchema() {
return this.tableColumns;
}

/** Check whether we need to throttle the insertRows API */
void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) {
int retry = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,22 @@ public void testOpenChannelSuccessResponse() throws Exception {
Assert.assertEquals(dbName, channel.getDBName());
Assert.assertEquals(schemaName, channel.getSchemaName());
Assert.assertEquals(tableName, channel.getTableName());

Assert.assertTrue(channel.getTableSchema().containsKey("C1"));
Assert.assertEquals("NUMBER(38,0)", channel.getTableSchema().get("C1").getType());
Assert.assertEquals("FIXED", channel.getTableSchema().get("C1").getLogicalType());
Assert.assertEquals(38, (int) channel.getTableSchema().get("C1").getPrecision());
Assert.assertEquals(0, (int) channel.getTableSchema().get("C1").getScale());
Assert.assertNull(channel.getTableSchema().get("C1").getByteLength());
Assert.assertTrue(channel.getTableSchema().get("C1").isNullable());

Assert.assertTrue(channel.getTableSchema().containsKey("C2"));
Assert.assertEquals("NUMBER(38,0)", channel.getTableSchema().get("C2").getType());
Assert.assertEquals("FIXED", channel.getTableSchema().get("C2").getLogicalType());
Assert.assertEquals(38, (int) channel.getTableSchema().get("C2").getPrecision());
Assert.assertEquals(0, (int) channel.getTableSchema().get("C2").getScale());
Assert.assertNull(channel.getTableSchema().get("C2").getByteLength());
Assert.assertTrue(channel.getTableSchema().get("C2").isNullable());
}

@Test
Expand Down

0 comments on commit e38a7e5

Please sign in to comment.