diff --git a/pom.xml b/pom.xml index fa478ee08..db391aebc 100644 --- a/pom.xml +++ b/pom.xml @@ -478,6 +478,13 @@ protobuf-java runtime + + + org.slf4j + slf4j-simple + ${slf4j.version} + runtime + org.xerial.snappy snappy-java @@ -524,13 +531,6 @@ 2.0.2 test - - - org.slf4j - slf4j-simple - ${slf4j.version} - test - diff --git a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java index f6c7a6c1b..bf20afda8 100644 --- a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java +++ b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java @@ -39,6 +39,7 @@ public enum ApiName { INSERT_REPORT("GET"), LOAD_HISTORY_SCAN("GET"), STREAMING_OPEN_CHANNEL("POST"), + STREAMING_DROP_CHANNEL("POST"), STREAMING_CHANNEL_STATUS("POST"), STREAMING_REGISTER_BLOB("POST"), STREAMING_CLIENT_CONFIGURE("POST"); diff --git a/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java new file mode 100644 index 000000000..5d0de5f85 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming; + +import net.snowflake.ingest.utils.Utils; + +/** A class that is used to open/create a {@link SnowflakeStreamingIngestChannel} */ +public class DropChannelRequest { + // Name of the channel + private final String channelName; + + // Name of the database that the channel belongs to + private final String dbName; + + // Name of the schema that the channel belongs to + private final String schemaName; + + // Name of the table that the channel belongs to + private final String tableName; + + private final Long clientSequencer; + + public static DropChannelRequestBuilder builder(String channelName) { + return new DropChannelRequestBuilder(channelName); + } + + /** A builder class to build a OpenChannelRequest */ + public static class DropChannelRequestBuilder { + private final String channelName; + private String dbName; + private String schemaName; + private String tableName; + + private Long clientSequencer = null; + + public DropChannelRequestBuilder(String channelName) { + this.channelName = channelName; + } + + public DropChannelRequestBuilder setDBName(String dbName) { + this.dbName = dbName; + return this; + } + + public DropChannelRequestBuilder setSchemaName(String schemaName) { + this.schemaName = schemaName; + return this; + } + + public DropChannelRequestBuilder setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public DropChannelRequestBuilder setClientSequencer(Long clientSequencer) { + this.clientSequencer = clientSequencer; + return this; + } + + public DropChannelRequest build() { + return new DropChannelRequest(this); + } + } + + private DropChannelRequest(DropChannelRequestBuilder builder) { + Utils.assertStringNotNullOrEmpty("channel name", builder.channelName); + Utils.assertStringNotNullOrEmpty("database name", builder.dbName); + Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName); + Utils.assertStringNotNullOrEmpty("table name", builder.tableName); + + this.channelName = builder.channelName; + this.dbName = builder.dbName; + this.schemaName = builder.schemaName; + this.tableName = builder.tableName; + this.clientSequencer = builder.clientSequencer; + } + + public String getDBName() { + return this.dbName; + } + + public String getSchemaName() { + return this.schemaName; + } + + public String getTableName() { + return this.tableName; + } + + public String getChannelName() { + return this.channelName; + } + + public String getFullyQualifiedTableName() { + return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName); + } + + public Long getClientSequencer() { + return this.clientSequencer; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java index d30472b63..b103d046c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java @@ -43,6 +43,10 @@ public enum OnErrorOption { private final String offsetToken; private final boolean isOffsetTokenProvided; + // If true, the channel will be dropped when it is closed after any pending data is fully + // committed. + private final boolean dropOnClose; + public static OpenChannelRequestBuilder builder(String channelName) { return new OpenChannelRequestBuilder(channelName); } @@ -59,6 +63,8 @@ public static class OpenChannelRequestBuilder { private String offsetToken; private boolean isOffsetTokenProvided = false; + private boolean dropOnClose = false; + public OpenChannelRequestBuilder(String channelName) { this.channelName = channelName; this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE; @@ -89,10 +95,15 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) { return this; } - public OpenChannelRequestBuilder setOffsetToken(String offsetToken){ + public OpenChannelRequestBuilder setOffsetToken(String offsetToken) { this.offsetToken = offsetToken; this.isOffsetTokenProvided = true; - return this; + return this; + } + + public OpenChannelRequestBuilder setDropOnClose(boolean dropOnClose) { + this.dropOnClose = dropOnClose; + return this; } public OpenChannelRequest build() { @@ -116,6 +127,7 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) { this.defaultTimezone = builder.defaultTimezone; this.offsetToken = builder.offsetToken; this.isOffsetTokenProvided = builder.isOffsetTokenProvided; + this.dropOnClose = builder.dropOnClose; } public String getDBName() { @@ -153,4 +165,8 @@ public String getOffsetToken() { public boolean isOffsetTokenProvided() { return this.isOffsetTokenProvided; } + + public boolean getDropOnClose() { + return this.dropOnClose; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java index 3ae4a83d1..3054d9564 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java @@ -25,6 +25,20 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable { */ SnowflakeStreamingIngestChannel openChannel(OpenChannelRequest request); + /** + * Drop the specified channel on the server using a {@link DropChannelRequest} + * + *

Note that {@link DropChannelRequest.DropChannelRequestBuilder#setClientSequencer(Long)}} can + * be used to drop a specific version of the channel and prevent accidentally dropping a channel + * concurrently opened by another client. If it is not specified, this call will blindly drop the + * latest version of the channel and any pending data will be lost. Also see {@link + * OpenChannelRequest.OpenChannelRequestBuilder#setDropOnClose(boolean)} to automatically drop + * channels on close. + * + * @param request the drop channel request + */ + void dropChannel(DropChannelRequest request); + /** * Get the client name * diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java new file mode 100644 index 000000000..7428bb8c2 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java @@ -0,0 +1,67 @@ +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class DropChannelResponse extends StreamingIngestResponse { + private Long statusCode; + private String message; + private String dbName; + private String schemaName; + private String tableName; + private String channelName; + + @JsonProperty("status_code") + void setStatusCode(Long statusCode) { + this.statusCode = statusCode; + } + + @Override + Long getStatusCode() { + return this.statusCode; + } + + @JsonProperty("message") + void setMessage(String message) { + this.message = message; + } + + String getMessage() { + return this.message; + } + + @JsonProperty("database") + void setDBName(String dbName) { + this.dbName = dbName; + } + + String getDBName() { + return this.dbName; + } + + @JsonProperty("schema") + void setSchemaName(String schemaName) { + this.schemaName = schemaName; + } + + String getSchemaName() { + return this.schemaName; + } + + @JsonProperty("table") + void setTableName(String tableName) { + this.tableName = tableName; + } + + String getTableName() { + return this.tableName; + } + + @JsonProperty("channel") + void setChannelName(String channelName) { + this.channelName = channelName; + } + + String getChannelName() { + return this.channelName; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java index 3e442d43b..ace1c80c8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java @@ -30,6 +30,8 @@ static class SnowflakeStreamingIngestChannelBuilder { private ZoneId defaultTimezone; + private boolean dropOnClose; + private SnowflakeStreamingIngestChannelBuilder(String name) { this.name = name; } @@ -91,6 +93,11 @@ SnowflakeStreamingIngestChannelBuilder setOwningClient( return this; } + SnowflakeStreamingIngestChannelBuilder setDropOnClose(boolean dropOnClose) { + this.dropOnClose = dropOnClose; + return this; + } + SnowflakeStreamingIngestChannelInternal build() { Utils.assertStringNotNullOrEmpty("channel name", this.name); Utils.assertStringNotNullOrEmpty("table name", this.tableName); @@ -116,7 +123,8 @@ SnowflakeStreamingIngestChannelInternal build() { this.encryptionKeyId, this.onErrorOption, this.defaultTimezone, - this.owningClient.getParameterProvider().getBlobFormatVersion()); + this.owningClient.getParameterProvider().getBlobFormatVersion(), + this.dropOnClose); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 756f9b5e8..08e30892a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import net.snowflake.ingest.streaming.DropChannelRequest; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; @@ -42,6 +43,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn // Reference to the row buffer private final RowBuffer rowBuffer; + private final boolean dropOnClose; // Indicates whether the channel is closed private volatile boolean isClosed; @@ -79,7 +81,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn String encryptionKey, Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, - ZoneOffset defaultTimezone) { + ZoneOffset defaultTimezone, + boolean dropOnClose) { this( name, dbName, @@ -93,7 +96,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn encryptionKeyId, onErrorOption, defaultTimezone, - client.getParameterProvider().getBlobFormatVersion()); + client.getParameterProvider().getBlobFormatVersion(), + dropOnClose); } /** Default constructor */ @@ -110,7 +114,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, - Constants.BdecVersion bdecVersion) { + Constants.BdecVersion bdecVersion, + boolean dropOnClose) { this.isClosed = false; this.owningClient = client; this.channelFlushContext = @@ -127,6 +132,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn channelState, new ClientBufferParameters(owningClient)); this.tableColumns = new HashMap<>(); + this.dropOnClose = dropOnClose; logger.logInfo( "Channel={} created for table={}", this.channelFlushContext.getName(), @@ -291,6 +297,19 @@ public CompletableFuture close() { .map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName) .collect(Collectors.toList())); } + if (this.dropOnClose) { + this.owningClient.dropChannel( + DropChannelRequest.builder(this.getChannelContext().getName()) + .setDBName(this.getDBName()) + .setTableName(this.getTableName()) + .setSchemaName(this.getSchemaName()) + .setClientSequencer(this.getChannelSequencer()) + .build()); + System.out.println( + "SUCCESSFULLY dropped " + + this.getChannelContext().getFullyQualifiedName() + + " channel"); + } }); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index a52149331..e80c979fc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -4,26 +4,10 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.*; import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.sleepForRetry; -import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; -import static net.snowflake.ingest.utils.Constants.COMMIT_MAX_RETRY_COUNT; -import static net.snowflake.ingest.utils.Constants.COMMIT_RETRY_INTERVAL_IN_MS; -import static net.snowflake.ingest.utils.Constants.ENABLE_TELEMETRY_TO_SF; -import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY; -import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; -import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; -import static net.snowflake.ingest.utils.Constants.RESPONSE_ERR_ENQUEUE_TABLE_CHUNK_QUEUE_FULL; -import static net.snowflake.ingest.utils.Constants.RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST; -import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; -import static net.snowflake.ingest.utils.Constants.SNOWPIPE_STREAMING_JMX_METRIC_PREFIX; -import static net.snowflake.ingest.utils.Constants.SNOWPIPE_STREAMING_JVM_MEMORY_AND_THREAD_METRICS_REGISTRY; -import static net.snowflake.ingest.utils.Constants.SNOWPIPE_STREAMING_SHARED_METRICS_REGISTRY; -import static net.snowflake.ingest.utils.Constants.STREAMING_INGEST_TELEMETRY_UPLOAD_INTERVAL_IN_SEC; -import static net.snowflake.ingest.utils.Constants.USER; +import static net.snowflake.ingest.utils.Constants.*; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; @@ -63,6 +47,7 @@ import net.snowflake.ingest.connection.OAuthCredential; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.connection.TelemetryService; +import net.snowflake.ingest.streaming.DropChannelRequest; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; @@ -304,7 +289,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest payload.put("schema", request.getSchemaName()); payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name()); payload.put("role", this.role); - if (request.isOffsetTokenProvided()){ + if (request.isOffsetTokenProvided()) { payload.put("offset_token", request.getOffsetToken()); } @@ -352,6 +337,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest .setEncryptionKeyId(response.getEncryptionKeyId()) .setOnErrorOption(request.getOnErrorOption()) .setDefaultTimezone(request.getDefaultTimezone()) + .setDropOnClose(request.getDropOnClose()) .build(); // Setup the row buffer schema @@ -366,6 +352,64 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest } } + @Override + public void dropChannel(DropChannelRequest request) { + if (isClosed) { + throw new SFException(ErrorCode.CLOSED_CLIENT); + } + + logger.logDebug( + "Drop channel request start, channel={}, table={}, client={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + getName()); + + try { + Map payload = new HashMap<>(); + payload.put( + "request_id", this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); + payload.put("channel", request.getChannelName()); + payload.put("table", request.getTableName()); + payload.put("database", request.getDBName()); + payload.put("schema", request.getSchemaName()); + payload.put("role", this.role); + if (request.getClientSequencer() != null) { + payload.put("client_sequencer", request.getClientSequencer()); + } + + DropChannelResponse response = + executeWithRetries( + DropChannelResponse.class, + DROP_CHANNEL_ENDPOINT, + payload, + "drop channel", + STREAMING_DROP_CHANNEL, + httpClient, + requestBuilder); + + // Check for Snowflake specific response code + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Drop channel request failed, channel={}, table={}, client={}, message={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + getName(), + response.getMessage()); + throw new SFException(ErrorCode.DROP_CHANNEL_FAILURE, response.getMessage()); + } + + logger.logInfo( + "Drop channel request succeeded, channel={}, table={}, clientSequencer={} client={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + request.getClientSequencer(), + getName()); + + } catch (IOException | IngestResponseException e) { + throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); + } + } + /** * Return the latest committed/persisted offset token for all channels * diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index a22bc9f21..83b3e63f2 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -62,6 +62,7 @@ public class Constants { // Channel level constants public static final String CHANNEL_STATUS_ENDPOINT = "/v1/streaming/channels/status/"; public static final String OPEN_CHANNEL_ENDPOINT = "/v1/streaming/channels/open/"; + public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/"; public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/"; public enum WriteMode { diff --git a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java index b212d7244..46c11a485 100644 --- a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java +++ b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java @@ -41,7 +41,8 @@ public enum ErrorCode { OAUTH_REFRESH_TOKEN_ERROR("0033"), INVALID_CONFIG_PARAMETER("0034"), MAX_BATCH_SIZE_EXCEEDED("0035"), - CRYPTO_PROVIDER_ERROR("0036"); + CRYPTO_PROVIDER_ERROR("0036"), + DROP_CHANNEL_FAILURE("0037"); public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages"; diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 82dd1164b..7518fac13 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -411,8 +411,8 @@ public int getMaxChunksInBlobAndRegistrationRequest() { /** @return BDEC compression algorithm */ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { Object val = - this.parameterMap.getOrDefault( - BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT); + this.parameterMap.getOrDefault( + BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT); if (val instanceof Constants.BdecParquetCompression) { return (Constants.BdecParquetCompression) val; } diff --git a/src/test/java/net/snowflake/ingest/SimpleIngestIT.java b/src/test/java/net/snowflake/ingest/SimpleIngestIT.java index ee73a2046..20a27eafb 100644 --- a/src/test/java/net/snowflake/ingest/SimpleIngestIT.java +++ b/src/test/java/net/snowflake/ingest/SimpleIngestIT.java @@ -178,8 +178,7 @@ public void testSimpleIngestWithPattern() throws Exception { } private void getHistoryAndAssertLoad(SimpleIngestManager manager, String test_file_name_2) - throws InterruptedException, - java.util.concurrent.ExecutionException, + throws InterruptedException, java.util.concurrent.ExecutionException, java.util.concurrent.TimeoutException { // keeps track of whether we've loaded the file boolean loaded = false; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 037028086..528f9e46d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -266,20 +266,19 @@ public void testOpenChannelRequestCreationSuccess() { Assert.assertFalse(request.isOffsetTokenProvided()); } - @Test public void testOpenChannelRequesCreationtWithOffsetToken() { OpenChannelRequest request = - OpenChannelRequest.builder("CHANNEL") - .setDBName("STREAMINGINGEST_TEST") - .setSchemaName("PUBLIC") - .setTableName("T_STREAMINGINGEST") - .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) - .setOffsetToken("TEST_TOKEN") - .build(); + OpenChannelRequest.builder("CHANNEL") + .setDBName("STREAMINGINGEST_TEST") + .setSchemaName("PUBLIC") + .setTableName("T_STREAMINGINGEST") + .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) + .setOffsetToken("TEST_TOKEN") + .build(); Assert.assertEquals( - "STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName()); + "STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName()); Assert.assertEquals("TEST_TOKEN", request.getOffsetToken()); Assert.assertTrue(request.isOffsetTokenProvided()); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java index eadbe81ee..2318a7162 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java @@ -536,19 +536,16 @@ public void testMultiColumnIngest() throws Exception { public void testOpenChannelOffsetToken() throws Exception { String tableName = "offsetTokenTest"; jdbcConnection - .createStatement() - .execute( - String.format( - "create or replace table %s (s text);", - tableName)); + .createStatement() + .execute(String.format("create or replace table %s (s text);", tableName)); OpenChannelRequest request1 = - OpenChannelRequest.builder("TEST_CHANNEL") - .setDBName(testDb) - .setSchemaName(TEST_SCHEMA) - .setTableName(tableName) - .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) - .setOffsetToken("TEST_OFFSET") - .build(); + OpenChannelRequest.builder("TEST_CHANNEL") + .setDBName(testDb) + .setSchemaName(TEST_SCHEMA) + .setTableName(tableName) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) + .setOffsetToken("TEST_OFFSET") + .build(); // Open a streaming ingest channel from the given client SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1); @@ -558,7 +555,7 @@ public void testOpenChannelOffsetToken() throws Exception { for (int i = 1; i < 15; i++) { if (channel1.getLatestCommittedOffsetToken() != null - && channel1.getLatestCommittedOffsetToken().equals("TEST_OFFSET")) { + && channel1.getLatestCommittedOffsetToken().equals("TEST_OFFSET")) { return; } else { Thread.sleep(2000);