From af3a61ff34174d5065f38a7f442a39f3c16e37e1 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Tue, 26 Mar 2024 01:10:32 +0000 Subject: [PATCH] minor change --- .../ingest/connection/RequestBuilder.java | 2 +- .../internal/ChannelsStatusResponse.java | 1 + .../streaming/internal/InsertRowsRequest.java | 182 ------------------ .../internal/OpenRowsetChannelResponse.java | 4 +- ...SnowflakeStreamingIngestChannelRowset.java | 13 +- ...nowflakeStreamingIngestClientInternal.java | 5 + 6 files changed, 16 insertions(+), 191 deletions(-) delete mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/InsertRowsRequest.java diff --git a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java index ae6c33a44..6aae1867e 100644 --- a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java +++ b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java @@ -716,7 +716,7 @@ public HttpPost generateStreamingIngestPostRequest( * @param payload POST request payload * @param endPoint REST API endpoint * @param message error message if there are failures during HTTP building - * @param queryParameters + * @param queryParameters POST request query parameters * @return URI for the POST request */ public HttpPost generateStreamingIngestPostRequest( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusResponse.java index e0dbcc855..042c493db 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusResponse.java @@ -75,6 +75,7 @@ void setStatusCode(Long statusCode) { } @JsonProperty("status_code") + @Override Long getStatusCode() { return this.statusCode; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InsertRowsRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/InsertRowsRequest.java deleted file mode 100644 index 522bb8727..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InsertRowsRequest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import java.time.ZoneId; -import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; -import net.snowflake.ingest.utils.Utils; - -public class InsertRowsRequest { - - /** - * Default value of the timezone, which will be used for TIMESTAMP_LTZ and TIMESTAMP_TZ column - * types when the user input does not have any timezone information. - */ - private static final ZoneId DEFAULT_DEFAULT_TIMEZONE = ZoneId.of("America/Los_Angeles"); - - // Name of the channel - private final String channelName; - - // Name of the database that the channel belongs to - private final String dbName; - - // Name of the schema that the channel belongs to - private final String schemaName; - - // Name of the table that the channel belongs to - private final String tableName; - - // On_error option on this channel - private final net.snowflake.ingest.streaming.OpenChannelRequest.OnErrorOption onErrorOption; - - // Default timezone for TIMESTAMP_LTZ and TIMESTAMP_TZ columns - private final ZoneId defaultTimezone; - - private final String offsetToken; - private final boolean isOffsetTokenProvided; - - private final OffsetTokenVerificationFunction offsetTokenVerificationFunction; - - private final net.snowflake.ingest.streaming.OpenChannelRequest.ChannelType channelType; - - public static net.snowflake.ingest.streaming.OpenChannelRequest.OpenChannelRequestBuilder builder( - String channelName) { - return new net.snowflake.ingest.streaming.OpenChannelRequest.OpenChannelRequestBuilder( - channelName); - } - - /** A builder class to build a OpenChannelRequest */ - public static class InsertRowsRequestBuilder { - private final String channelName; - private String dbName; - private String schemaName; - private String tableName; - private net.snowflake.ingest.streaming.OpenChannelRequest.OnErrorOption onErrorOption; - private ZoneId defaultTimezone; - private String offsetToken; - private boolean isOffsetTokenProvided = false; - private OffsetTokenVerificationFunction offsetTokenVerificationFunction; - private net.snowflake.ingest.streaming.OpenChannelRequest.ChannelType channelType = - net.snowflake.ingest.streaming.OpenChannelRequest.ChannelType.ROWSET_API; - - public InsertRowsRequestBuilder(String channelName) { - this.channelName = channelName; - this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE; - } - - public InsertRowsRequestBuilder setDBName(String dbName) { - this.dbName = dbName; - return this; - } - - public InsertRowsRequestBuilder setSchemaName(String schemaName) { - this.schemaName = schemaName; - return this; - } - - public InsertRowsRequestBuilder setTableName(String tableName) { - this.tableName = tableName; - return this; - } - - public InsertRowsRequestBuilder setOnErrorOption( - net.snowflake.ingest.streaming.OpenChannelRequest.OnErrorOption onErrorOption) { - this.onErrorOption = onErrorOption; - return this; - } - - public InsertRowsRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) { - this.defaultTimezone = defaultTimezone; - return this; - } - - public InsertRowsRequestBuilder setOffsetToken(String offsetToken) { - this.offsetToken = offsetToken; - this.isOffsetTokenProvided = true; - return this; - } - - public InsertRowsRequestBuilder setOffsetTokenVerificationFunction( - OffsetTokenVerificationFunction function) { - this.offsetTokenVerificationFunction = function; - return this; - } - - public InsertRowsRequestBuilder setChannelType( - net.snowflake.ingest.streaming.OpenChannelRequest.ChannelType type) { - this.channelType = type; - return this; - } - - public InsertRowsRequest build() { - return new InsertRowsRequest(this); - } - } - - private InsertRowsRequest(InsertRowsRequestBuilder builder) { - Utils.assertStringNotNullOrEmpty("channel name", builder.channelName); - Utils.assertStringNotNullOrEmpty("database name", builder.dbName); - Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName); - Utils.assertStringNotNullOrEmpty("table name", builder.tableName); - Utils.assertNotNull("on_error option", builder.onErrorOption); - Utils.assertNotNull("default_timezone", builder.defaultTimezone); - Utils.assertNotNull("channel_type", builder.channelType); - - this.channelName = builder.channelName; - this.dbName = builder.dbName; - this.schemaName = builder.schemaName; - this.tableName = builder.tableName; - this.onErrorOption = builder.onErrorOption; - this.defaultTimezone = builder.defaultTimezone; - this.offsetToken = builder.offsetToken; - this.isOffsetTokenProvided = builder.isOffsetTokenProvided; - this.offsetTokenVerificationFunction = builder.offsetTokenVerificationFunction; - this.channelType = builder.channelType; - } - - public String getDBName() { - return this.dbName; - } - - public String getSchemaName() { - return this.schemaName; - } - - public String getTableName() { - return this.tableName; - } - - public String getChannelName() { - return this.channelName; - } - - public ZoneId getDefaultTimezone() { - return this.defaultTimezone; - } - - public String getFullyQualifiedTableName() { - return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName); - } - - public net.snowflake.ingest.streaming.OpenChannelRequest.OnErrorOption getOnErrorOption() { - return this.onErrorOption; - } - - public String getOffsetToken() { - return this.offsetToken; - } - - public boolean isOffsetTokenProvided() { - return this.isOffsetTokenProvided; - } - - public OffsetTokenVerificationFunction getOffsetTokenVerificationFunction() { - return this.offsetTokenVerificationFunction; - } - - public net.snowflake.ingest.streaming.OpenChannelRequest.ChannelType getChannelType() { - return this.channelType; - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenRowsetChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenRowsetChannelResponse.java index 0400f6c82..8443f13af 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenRowsetChannelResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenRowsetChannelResponse.java @@ -1,12 +1,12 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; import com.fasterxml.jackson.annotation.JsonProperty; -/** Response to the OpenChannelRequest */ +/** Response to the open channel request to open a rowset api channel */ class OpenRowsetChannelResponse extends StreamingIngestResponse { private String message; private String continuationToken; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelRowset.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelRowset.java index dde611931..37e65c6ba 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelRowset.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelRowset.java @@ -12,21 +12,22 @@ import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; import net.snowflake.ingest.utils.Logging; +/** The implementation for Rowset API channel */ public class SnowflakeStreamingIngestChannelRowset implements SnowflakeStreamingIngestChannel { private static final Logging logger = new Logging(SnowflakeStreamingIngestChannelRowset.class); - private String dbName; + private final String dbName; - private String schemaName; + private final String schemaName; - private String pipeName; + private final String pipeName; - private String tableName; + private final String tableName; - private String name; + private final String name; - private String fullyQualifiedName; + private final String fullyQualifiedName; private boolean isClosed; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index d89e37731..a3a837fab 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -384,6 +384,11 @@ public SnowflakeStreamingIngestChannel openChannel(OpenChannelRequest request) { } } + /** + * This function builds and sends an open channel request that opens a rowset api channel against + * a pipe or table. It will return a {@link SnowflakeStreamingIngestChannel} object if succeeded, + * otherwise it will throw an exception with failure HTTP response code. + */ private SnowflakeStreamingIngestChannel openRowsetApiChannel(OpenChannelRequest request) throws IngestResponseException, IOException { Map payload = new HashMap<>();