From 4ff6e1e87ac52efdada32264100086d116e516fa Mon Sep 17 00:00:00 2001 From: Purujit Saha Date: Thu, 14 Dec 2023 22:06:19 +0000 Subject: [PATCH 1/3] Add client side support for upcoming drop channel API --- .../connection/ServiceResponseHandler.java | 1 + .../ingest/streaming/DropChannelRequest.java | 104 ++++++++++++++++++ .../ingest/streaming/OpenChannelRequest.java | 16 +++ .../SnowflakeStreamingIngestClient.java | 14 +++ .../internal/DropChannelResponse.java | 68 ++++++++++++ ...nowflakeStreamingIngestChannelFactory.java | 10 +- ...owflakeStreamingIngestChannelInternal.java | 25 ++++- ...nowflakeStreamingIngestClientInternal.java | 62 +++++++++++ .../net/snowflake/ingest/utils/Constants.java | 1 + .../net/snowflake/ingest/utils/ErrorCode.java | 3 +- .../streaming/internal/ChannelCacheTest.java | 18 ++- .../streaming/internal/FlushServiceTest.java | 9 +- .../SnowflakeStreamingIngestChannelTest.java | 72 ++++++++++-- .../SnowflakeStreamingIngestClientTest.java | 91 +++++++++++---- .../streaming/internal/StreamingIngestIT.java | 36 ++++++ 15 files changed, 486 insertions(+), 44 deletions(-) create mode 100644 src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java diff --git a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java index f6c7a6c1b..bf20afda8 100644 --- a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java +++ b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java @@ -39,6 +39,7 @@ public enum ApiName { INSERT_REPORT("GET"), LOAD_HISTORY_SCAN("GET"), STREAMING_OPEN_CHANNEL("POST"), + STREAMING_DROP_CHANNEL("POST"), STREAMING_CHANNEL_STATUS("POST"), STREAMING_REGISTER_BLOB("POST"), STREAMING_CLIENT_CONFIGURE("POST"); diff --git a/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java new file mode 100644 index 000000000..4fe476edf --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming; + +import net.snowflake.ingest.utils.Utils; + +/** A class that is used to drop a {@link SnowflakeStreamingIngestChannel} */ +public class DropChannelRequest { + // Name of the channel + private final String channelName; + + // Name of the database that the channel belongs to + private final String dbName; + + // Name of the schema that the channel belongs to + private final String schemaName; + + // Name of the table that the channel belongs to + private final String tableName; + + // Optional client sequencer to verify when dropping the channel. + private final Long clientSequencer; + + public static DropChannelRequestBuilder builder(String channelName) { + return new DropChannelRequestBuilder(channelName); + } + + /** A builder class to build a DropChannelRequest */ + public static class DropChannelRequestBuilder { + private final String channelName; + private String dbName; + private String schemaName; + private String tableName; + + private Long clientSequencer = null; + + public DropChannelRequestBuilder(String channelName) { + this.channelName = channelName; + } + + public DropChannelRequestBuilder setDBName(String dbName) { + this.dbName = dbName; + return this; + } + + public DropChannelRequestBuilder setSchemaName(String schemaName) { + this.schemaName = schemaName; + return this; + } + + public DropChannelRequestBuilder setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public DropChannelRequestBuilder setClientSequencer(Long clientSequencer) { + this.clientSequencer = clientSequencer; + return this; + } + + public DropChannelRequest build() { + return new DropChannelRequest(this); + } + } + + private DropChannelRequest(DropChannelRequestBuilder builder) { + Utils.assertStringNotNullOrEmpty("channel name", builder.channelName); + Utils.assertStringNotNullOrEmpty("database name", builder.dbName); + Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName); + Utils.assertStringNotNullOrEmpty("table name", builder.tableName); + + this.channelName = builder.channelName; + this.dbName = builder.dbName; + this.schemaName = builder.schemaName; + this.tableName = builder.tableName; + this.clientSequencer = builder.clientSequencer; + } + + public String getDBName() { + return this.dbName; + } + + public String getSchemaName() { + return this.schemaName; + } + + public String getTableName() { + return this.tableName; + } + + public String getChannelName() { + return this.channelName; + } + + public String getFullyQualifiedTableName() { + return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName); + } + + public Long getClientSequencer() { + return this.clientSequencer; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java index 0cf16af2e..b103d046c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java @@ -43,6 +43,10 @@ public enum OnErrorOption { private final String offsetToken; private final boolean isOffsetTokenProvided; + // If true, the channel will be dropped when it is closed after any pending data is fully + // committed. + private final boolean dropOnClose; + public static OpenChannelRequestBuilder builder(String channelName) { return new OpenChannelRequestBuilder(channelName); } @@ -59,6 +63,8 @@ public static class OpenChannelRequestBuilder { private String offsetToken; private boolean isOffsetTokenProvided = false; + private boolean dropOnClose = false; + public OpenChannelRequestBuilder(String channelName) { this.channelName = channelName; this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE; @@ -95,6 +101,11 @@ public OpenChannelRequestBuilder setOffsetToken(String offsetToken) { return this; } + public OpenChannelRequestBuilder setDropOnClose(boolean dropOnClose) { + this.dropOnClose = dropOnClose; + return this; + } + public OpenChannelRequest build() { return new OpenChannelRequest(this); } @@ -116,6 +127,7 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) { this.defaultTimezone = builder.defaultTimezone; this.offsetToken = builder.offsetToken; this.isOffsetTokenProvided = builder.isOffsetTokenProvided; + this.dropOnClose = builder.dropOnClose; } public String getDBName() { @@ -153,4 +165,8 @@ public String getOffsetToken() { public boolean isOffsetTokenProvided() { return this.isOffsetTokenProvided; } + + public boolean getDropOnClose() { + return this.dropOnClose; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java index 3ae4a83d1..3054d9564 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java @@ -25,6 +25,20 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable { */ SnowflakeStreamingIngestChannel openChannel(OpenChannelRequest request); + /** + * Drop the specified channel on the server using a {@link DropChannelRequest} + * + *

Note that {@link DropChannelRequest.DropChannelRequestBuilder#setClientSequencer(Long)}} can + * be used to drop a specific version of the channel and prevent accidentally dropping a channel + * concurrently opened by another client. If it is not specified, this call will blindly drop the + * latest version of the channel and any pending data will be lost. Also see {@link + * OpenChannelRequest.OpenChannelRequestBuilder#setDropOnClose(boolean)} to automatically drop + * channels on close. + * + * @param request the drop channel request + */ + void dropChannel(DropChannelRequest request); + /** * Get the client name * diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java new file mode 100644 index 000000000..e36692915 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java @@ -0,0 +1,68 @@ +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** Response for a {@link net.snowflake.ingest.streaming.DropChannelRequest}. */ +class DropChannelResponse extends StreamingIngestResponse { + private Long statusCode; + private String message; + private String dbName; + private String schemaName; + private String tableName; + private String channelName; + + @JsonProperty("status_code") + void setStatusCode(Long statusCode) { + this.statusCode = statusCode; + } + + @Override + Long getStatusCode() { + return this.statusCode; + } + + @JsonProperty("message") + void setMessage(String message) { + this.message = message; + } + + String getMessage() { + return this.message; + } + + @JsonProperty("database") + void setDBName(String dbName) { + this.dbName = dbName; + } + + String getDBName() { + return this.dbName; + } + + @JsonProperty("schema") + void setSchemaName(String schemaName) { + this.schemaName = schemaName; + } + + String getSchemaName() { + return this.schemaName; + } + + @JsonProperty("table") + void setTableName(String tableName) { + this.tableName = tableName; + } + + String getTableName() { + return this.tableName; + } + + @JsonProperty("channel") + void setChannelName(String channelName) { + this.channelName = channelName; + } + + String getChannelName() { + return this.channelName; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java index 3e442d43b..ace1c80c8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java @@ -30,6 +30,8 @@ static class SnowflakeStreamingIngestChannelBuilder { private ZoneId defaultTimezone; + private boolean dropOnClose; + private SnowflakeStreamingIngestChannelBuilder(String name) { this.name = name; } @@ -91,6 +93,11 @@ SnowflakeStreamingIngestChannelBuilder setOwningClient( return this; } + SnowflakeStreamingIngestChannelBuilder setDropOnClose(boolean dropOnClose) { + this.dropOnClose = dropOnClose; + return this; + } + SnowflakeStreamingIngestChannelInternal build() { Utils.assertStringNotNullOrEmpty("channel name", this.name); Utils.assertStringNotNullOrEmpty("table name", this.tableName); @@ -116,7 +123,8 @@ SnowflakeStreamingIngestChannelInternal build() { this.encryptionKeyId, this.onErrorOption, this.defaultTimezone, - this.owningClient.getParameterProvider().getBlobFormatVersion()); + this.owningClient.getParameterProvider().getBlobFormatVersion(), + this.dropOnClose); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 695f5632b..07acfbcb7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import javax.annotation.Nullable; +import net.snowflake.ingest.streaming.DropChannelRequest; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; @@ -43,6 +44,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn // Reference to the row buffer private final RowBuffer rowBuffer; + private final boolean dropOnClose; // Indicates whether the channel is closed private volatile boolean isClosed; @@ -80,7 +82,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn String encryptionKey, Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, - ZoneOffset defaultTimezone) { + ZoneOffset defaultTimezone, + boolean dropOnClose) { this( name, dbName, @@ -94,7 +97,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn encryptionKeyId, onErrorOption, defaultTimezone, - client.getParameterProvider().getBlobFormatVersion()); + client.getParameterProvider().getBlobFormatVersion(), + dropOnClose); } /** Default constructor */ @@ -111,7 +115,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, - Constants.BdecVersion bdecVersion) { + Constants.BdecVersion bdecVersion, + boolean dropOnClose) { this.isClosed = false; this.owningClient = client; this.channelFlushContext = @@ -128,6 +133,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn channelState, new ClientBufferParameters(owningClient)); this.tableColumns = new HashMap<>(); + this.dropOnClose = dropOnClose; logger.logInfo( "Channel={} created for table={}", this.channelFlushContext.getName(), @@ -292,6 +298,19 @@ public CompletableFuture close() { .map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName) .collect(Collectors.toList())); } + if (this.dropOnClose) { + this.owningClient.dropChannel( + DropChannelRequest.builder(this.getChannelContext().getName()) + .setDBName(this.getDBName()) + .setTableName(this.getTableName()) + .setSchemaName(this.getSchemaName()) + .setClientSequencer(this.getChannelSequencer()) + .build()); + System.out.println( + "SUCCESSFULLY dropped " + + this.getChannelContext().getFullyQualifiedName() + + " channel"); + } }); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 898642fd1..25b359949 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB; import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; @@ -12,6 +13,7 @@ import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.COMMIT_MAX_RETRY_COUNT; import static net.snowflake.ingest.utils.Constants.COMMIT_RETRY_INTERVAL_IN_MS; +import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.ENABLE_TELEMETRY_TO_SF; import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY; import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; @@ -63,6 +65,7 @@ import net.snowflake.ingest.connection.OAuthCredential; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.connection.TelemetryService; +import net.snowflake.ingest.streaming.DropChannelRequest; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; @@ -353,6 +356,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest .setEncryptionKeyId(response.getEncryptionKeyId()) .setOnErrorOption(request.getOnErrorOption()) .setDefaultTimezone(request.getDefaultTimezone()) + .setDropOnClose(request.getDropOnClose()) .build(); // Setup the row buffer schema @@ -367,6 +371,64 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest } } + @Override + public void dropChannel(DropChannelRequest request) { + if (isClosed) { + throw new SFException(ErrorCode.CLOSED_CLIENT); + } + + logger.logDebug( + "Drop channel request start, channel={}, table={}, client={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + getName()); + + try { + Map payload = new HashMap<>(); + payload.put( + "request_id", this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); + payload.put("channel", request.getChannelName()); + payload.put("table", request.getTableName()); + payload.put("database", request.getDBName()); + payload.put("schema", request.getSchemaName()); + payload.put("role", this.role); + if (request.getClientSequencer() != null) { + payload.put("client_sequencer", request.getClientSequencer()); + } + + DropChannelResponse response = + executeWithRetries( + DropChannelResponse.class, + DROP_CHANNEL_ENDPOINT, + payload, + "drop channel", + STREAMING_DROP_CHANNEL, + httpClient, + requestBuilder); + + // Check for Snowflake specific response code + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "Drop channel request failed, channel={}, table={}, client={}, message={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + getName(), + response.getMessage()); + throw new SFException(ErrorCode.DROP_CHANNEL_FAILURE, response.getMessage()); + } + + logger.logInfo( + "Drop channel request succeeded, channel={}, table={}, clientSequencer={} client={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + request.getClientSequencer(), + getName()); + + } catch (IOException | IngestResponseException e) { + throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); + } + } + /** * Return the latest committed/persisted offset token for all channels * diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 404ec3851..134328668 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -62,6 +62,7 @@ public class Constants { // Channel level constants public static final String CHANNEL_STATUS_ENDPOINT = "/v1/streaming/channels/status/"; public static final String OPEN_CHANNEL_ENDPOINT = "/v1/streaming/channels/open/"; + public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/"; public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/"; public enum WriteMode { diff --git a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java index b212d7244..46c11a485 100644 --- a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java +++ b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java @@ -41,7 +41,8 @@ public enum ErrorCode { OAUTH_REFRESH_TOKEN_ERROR("0033"), INVALID_CONFIG_PARAMETER("0034"), MAX_BATCH_SIZE_EXCEEDED("0035"), - CRYPTO_PROVIDER_ERROR("0036"); + CRYPTO_PROVIDER_ERROR("0036"), + DROP_CHANNEL_FAILURE("0037"); public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages"; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index ea88d618c..089e66437 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -38,7 +38,8 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -52,7 +53,8 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -66,7 +68,8 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); cache.addChannel(channel1); cache.addChannel(channel2); cache.addChannel(channel3); @@ -92,7 +95,8 @@ public void testAddChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); cache.addChannel(channel); Assert.assertEquals(1, cache.getSize()); Assert.assertTrue(channel == cache.iterator().next().getValue().get(channelName)); @@ -110,7 +114,8 @@ public void testAddChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); cache.addChannel(channelDup); // The old channel should be invalid now Assert.assertTrue(!channel.isValid()); @@ -191,7 +196,8 @@ public void testRemoveChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); cache.removeChannelIfSequencersMatch(channel3Dup); // Verify that remove the same channel with a different channel sequencer is a no op Assert.assertEquals(1, cache.getSize()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index a25fd416e..c927e6b4e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -258,7 +258,8 @@ SnowflakeStreamingIngestChannelInternal>> createChannel( encryptionKeyId, onErrorOption, defaultTimezone, - Constants.BdecVersion.THREE); + Constants.BdecVersion.THREE, + false); } @Override @@ -877,7 +878,8 @@ public void testInvalidateChannels() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - ZoneOffset.UTC); + ZoneOffset.UTC, + false); SnowflakeStreamingIngestChannelInternal channel2 = new SnowflakeStreamingIngestChannelInternal<>( @@ -892,7 +894,8 @@ public void testInvalidateChannels() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - ZoneOffset.UTC); + ZoneOffset.UTC, + false); channelCache.addChannel(channel1); channelCache.addChannel(channel2); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 814ce77c1..fa767bfd4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -7,6 +7,7 @@ import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; import static net.snowflake.ingest.utils.Constants.ROLE; import static net.snowflake.ingest.utils.Constants.USER; +import static org.mockito.ArgumentMatchers.argThat; import java.security.KeyPair; import java.security.PrivateKey; @@ -32,6 +33,7 @@ import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.connection.RequestBuilder; +import net.snowflake.ingest.streaming.DropChannelRequest; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; @@ -171,7 +173,8 @@ public void testChannelValid() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); Assert.assertTrue(channel.isValid()); channel.invalidate("from testChannelValid"); @@ -221,7 +224,8 @@ public void testChannelClose() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); Assert.assertFalse(channel.isClosed()); channel.markClosed(); @@ -550,7 +554,8 @@ public void testInsertRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); ColumnMetadata col = new ColumnMetadata(); col.setOrdinal(1); @@ -636,7 +641,8 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); channel.setupSchema(schema); InsertValidationResponse insertValidationResponse = channel.insertRow(row, "token-1"); @@ -660,7 +666,8 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.ABORT, - UTC); + UTC, + false); channel.setupSchema(schema); try { @@ -685,7 +692,8 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.SKIP_BATCH, - UTC); + UTC, + false); channel.setupSchema(schema); insertValidationResponse = channel.insertRow(row, "token-1"); @@ -718,7 +726,8 @@ public void testInsertRowThrottling() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); ParameterProvider parameterProvider = new ParameterProvider(); memoryInfoProvider.freeMemory = @@ -764,7 +773,8 @@ public void testFlush() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -800,7 +810,8 @@ public void testClose() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -814,6 +825,46 @@ public void testClose() throws Exception { // Calling close again on closed channel shouldn't fail channel.close().get(); + Mockito.verify(client, Mockito.times(0)).dropChannel(Mockito.any()); + } + + @Test + public void testDropOnClose() throws Exception { + SnowflakeStreamingIngestClientInternal client = + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + SnowflakeStreamingIngestChannelInternal channel = + new SnowflakeStreamingIngestChannelInternal<>( + "channel", + "db", + "schema", + "table", + "0", + 1L, + 0L, + client, + "key", + 1234L, + OpenChannelRequest.OnErrorOption.CONTINUE, + UTC, + true); + ChannelsStatusResponse response = new ChannelsStatusResponse(); + response.setStatusCode(0L); + response.setMessage("Success"); + response.setChannels(new ArrayList<>()); + + Mockito.doReturn(response).when(client).getChannelsStatus(Mockito.any()); + + Assert.assertFalse(channel.isClosed()); + DropChannelResponse dropChannelResponse = new DropChannelResponse(); + Mockito.doNothing().when(client).dropChannel(Mockito.any()); + channel.close().get(); + Assert.assertTrue(channel.isClosed()); + Mockito.verify(client, Mockito.times(1)) + .dropChannel( + argThat( + (DropChannelRequest req) -> + req.getChannelName().equals(channel.getName()) + && req.getClientSequencer().equals(channel.getChannelSequencer()))); } @Test @@ -834,7 +885,8 @@ public void testGetLatestCommittedOffsetToken() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 1107aad89..7332aa738 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -1,15 +1,7 @@ package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; -import static net.snowflake.ingest.utils.Constants.ACCOUNT_URL; -import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; -import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY; -import static net.snowflake.ingest.utils.Constants.PRIVATE_KEY; -import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; -import static net.snowflake.ingest.utils.Constants.RESPONSE_ERR_ENQUEUE_TABLE_CHUNK_QUEUE_FULL; -import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; -import static net.snowflake.ingest.utils.Constants.ROLE; -import static net.snowflake.ingest.utils.Constants.USER; +import static net.snowflake.ingest.utils.Constants.*; import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_SNOWPIPE_STREAMING_METRICS; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.when; @@ -19,6 +11,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.StringWriter; +import java.nio.charset.Charset; import java.security.KeyPair; import java.security.PrivateKey; import java.time.ZoneOffset; @@ -43,6 +36,7 @@ import net.snowflake.client.jdbc.internal.google.common.collect.Sets; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.connection.RequestBuilder; +import net.snowflake.ingest.streaming.DropChannelRequest; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; @@ -63,6 +57,7 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; public class SnowflakeStreamingIngestClientTest { @@ -93,7 +88,8 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION); + BDEC_VERSION, + false); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -108,7 +104,8 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION); + BDEC_VERSION, + false); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -123,7 +120,8 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION); + BDEC_VERSION, + false); channel4 = new SnowflakeStreamingIngestChannelInternal<>( "channel4", @@ -138,7 +136,8 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION); + BDEC_VERSION, + false); } @Test @@ -354,7 +353,8 @@ public void testGetChannelsStatusWithRequest() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION); + BDEC_VERSION, + false); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); @@ -368,6 +368,51 @@ public void testGetChannelsStatusWithRequest() throws Exception { objectMapper.writeValueAsString(request), CHANNEL_STATUS_ENDPOINT, "channel status"); } + @Test + public void testDropChannel() throws Exception { + DropChannelResponse response = new DropChannelResponse(); + response.setStatusCode(RESPONSE_SUCCESS); + response.setMessage("dropped"); + String responseString = objectMapper.writeValueAsString(response); + + CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class); + CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); + StatusLine statusLine = Mockito.mock(StatusLine.class); + HttpEntity httpEntity = Mockito.mock(HttpEntity.class); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()) + .thenReturn(IOUtils.toInputStream(responseString, Charset.defaultCharset())); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + + RequestBuilder requestBuilder = + Mockito.spy( + new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair())); + SnowflakeStreamingIngestClientInternal client = + new SnowflakeStreamingIngestClientInternal<>( + "client", + new SnowflakeURL("snowflake.dev.local:8082"), + null, + httpClient, + true, + requestBuilder, + null); + + DropChannelRequest request = + DropChannelRequest.builder("channel") + .setDBName("db") + .setTableName("table") + .setSchemaName("schema") + .build(); + client.dropChannel(request); + Mockito.verify(requestBuilder) + .generateStreamingIngestPostRequest( + ArgumentMatchers.contains("channel"), + ArgumentMatchers.refEq(DROP_CHANNEL_ENDPOINT), + ArgumentMatchers.refEq("drop channel")); + } + @Test public void testGetChannelsStatusWithRequestError() throws Exception { ChannelsStatusResponse response = new ChannelsStatusResponse(); @@ -413,7 +458,8 @@ public void testGetChannelsStatusWithRequestError() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION); + BDEC_VERSION, + false); try { client.getChannelsStatus(Collections.singletonList(channel)); @@ -454,7 +500,8 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION); + BDEC_VERSION, + false); ChannelMetadata channelMetadata = ChannelMetadata.builder() @@ -1141,7 +1188,8 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); SnowflakeStreamingIngestChannelInternal channel2 = new SnowflakeStreamingIngestChannelInternal<>( channel2Name, @@ -1155,7 +1203,8 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); client.getChannelCache().addChannel(channel1); client.getChannelCache().addChannel(channel2); @@ -1282,7 +1331,8 @@ public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + false); client.getChannelCache().addChannel(channel); ChannelsStatusResponse response = new ChannelsStatusResponse(); @@ -1370,7 +1420,8 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION); + BDEC_VERSION, + false); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java index f3ee8dfe6..112d90a51 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java @@ -2,6 +2,7 @@ import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER; import static net.snowflake.ingest.utils.Constants.COMPRESS_BLOB_TWICE; +import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM; import static org.hamcrest.MatcherAssert.assertThat; @@ -202,6 +203,41 @@ public void testSimpleIngest() throws Exception { Assert.fail("Row sequencer not updated before timeout"); } + @Test + public void testDropChannel() throws Exception { + SnowflakeURL url = new SnowflakeURL(TestUtils.getAccountURL()); + RequestBuilder requestBuilder = + Mockito.spy( + new RequestBuilder( + url, + TestUtils.getUser(), + TestUtils.getKeyPair(), + HttpUtil.getHttpClient(url.getAccount()), + "testrequestbuilder")); + client.injectRequestBuilder(requestBuilder); + + OpenChannelRequest request1 = + OpenChannelRequest.builder("CHANNEL") + .setDBName(testDb) + .setSchemaName(TEST_SCHEMA) + .setTableName(TEST_TABLE) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) + .setDropOnClose(true) + .build(); + + // Open a streaming ingest channel from the given client + SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1); + // Close the channel after insertion + channel1.close().get(); + + // verify expected request sent to server + Mockito.verify(requestBuilder) + .generateStreamingIngestPostRequest( + ArgumentMatchers.contains("channel"), + ArgumentMatchers.refEq(DROP_CHANNEL_ENDPOINT), + ArgumentMatchers.refEq("drop channel")); + } + @Test public void testParameterOverrides() throws Exception { Map parameterMap = new HashMap<>(); From 1d36fe19242effcbf517227b7a14df8dd52b6889 Mon Sep 17 00:00:00 2001 From: Purujit Saha Date: Wed, 3 Jan 2024 20:36:59 +0000 Subject: [PATCH 2/3] Address review comments --- .../ingest/streaming/DropChannelRequest.java | 19 +---- .../ingest/streaming/OpenChannelRequest.java | 16 ----- .../SnowflakeStreamingIngestChannel.java | 8 +++ .../SnowflakeStreamingIngestClient.java | 10 ++- .../internal/DropChannelResponse.java | 4 ++ .../internal/DropChannelVersionRequest.java | 19 +++++ ...nowflakeStreamingIngestChannelFactory.java | 10 +-- ...owflakeStreamingIngestChannelInternal.java | 30 ++++---- ...nowflakeStreamingIngestClientInternal.java | 13 ++-- .../streaming/internal/ChannelCacheTest.java | 18 ++--- .../streaming/internal/FlushServiceTest.java | 9 +-- .../SnowflakeStreamingIngestChannelTest.java | 71 ++++++++++++------- .../SnowflakeStreamingIngestClientTest.java | 44 ++++++------ .../streaming/internal/StreamingIngestIT.java | 5 +- 14 files changed, 136 insertions(+), 140 deletions(-) create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/DropChannelVersionRequest.java diff --git a/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java index 4fe476edf..3bc096c16 100644 --- a/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming; @@ -20,9 +20,6 @@ public class DropChannelRequest { // Name of the table that the channel belongs to private final String tableName; - // Optional client sequencer to verify when dropping the channel. - private final Long clientSequencer; - public static DropChannelRequestBuilder builder(String channelName) { return new DropChannelRequestBuilder(channelName); } @@ -34,8 +31,6 @@ public static class DropChannelRequestBuilder { private String schemaName; private String tableName; - private Long clientSequencer = null; - public DropChannelRequestBuilder(String channelName) { this.channelName = channelName; } @@ -55,17 +50,12 @@ public DropChannelRequestBuilder setTableName(String tableName) { return this; } - public DropChannelRequestBuilder setClientSequencer(Long clientSequencer) { - this.clientSequencer = clientSequencer; - return this; - } - public DropChannelRequest build() { return new DropChannelRequest(this); } } - private DropChannelRequest(DropChannelRequestBuilder builder) { + public DropChannelRequest(DropChannelRequestBuilder builder) { Utils.assertStringNotNullOrEmpty("channel name", builder.channelName); Utils.assertStringNotNullOrEmpty("database name", builder.dbName); Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName); @@ -75,7 +65,6 @@ private DropChannelRequest(DropChannelRequestBuilder builder) { this.dbName = builder.dbName; this.schemaName = builder.schemaName; this.tableName = builder.tableName; - this.clientSequencer = builder.clientSequencer; } public String getDBName() { @@ -97,8 +86,4 @@ public String getChannelName() { public String getFullyQualifiedTableName() { return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName); } - - public Long getClientSequencer() { - return this.clientSequencer; - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java index b103d046c..0cf16af2e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java @@ -43,10 +43,6 @@ public enum OnErrorOption { private final String offsetToken; private final boolean isOffsetTokenProvided; - // If true, the channel will be dropped when it is closed after any pending data is fully - // committed. - private final boolean dropOnClose; - public static OpenChannelRequestBuilder builder(String channelName) { return new OpenChannelRequestBuilder(channelName); } @@ -63,8 +59,6 @@ public static class OpenChannelRequestBuilder { private String offsetToken; private boolean isOffsetTokenProvided = false; - private boolean dropOnClose = false; - public OpenChannelRequestBuilder(String channelName) { this.channelName = channelName; this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE; @@ -101,11 +95,6 @@ public OpenChannelRequestBuilder setOffsetToken(String offsetToken) { return this; } - public OpenChannelRequestBuilder setDropOnClose(boolean dropOnClose) { - this.dropOnClose = dropOnClose; - return this; - } - public OpenChannelRequest build() { return new OpenChannelRequest(this); } @@ -127,7 +116,6 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) { this.defaultTimezone = builder.defaultTimezone; this.offsetToken = builder.offsetToken; this.isOffsetTokenProvided = builder.isOffsetTokenProvided; - this.dropOnClose = builder.dropOnClose; } public String getDBName() { @@ -165,8 +153,4 @@ public String getOffsetToken() { public boolean isOffsetTokenProvided() { return this.isOffsetTokenProvided; } - - public boolean getDropOnClose() { - return this.dropOnClose; - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java index 2ab48b53f..b8687358e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java @@ -79,6 +79,14 @@ public interface SnowflakeStreamingIngestChannel { */ CompletableFuture close(); + /** + * Close the channel, this function will make sure all the data in this channel is committed + * + * @param drop if true, the channel will be dropped after all data is successfully committed. + * @return a completable future which will be completed when the channel is closed + */ + CompletableFuture close(boolean drop); + /** * Insert one row into the channel, the row is represented using Map where the key is column name * and the value is a row of data. The following table summarizes supported value types and their diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java index 3054d9564..8b65b5cae 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java @@ -28,12 +28,10 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable { /** * Drop the specified channel on the server using a {@link DropChannelRequest} * - *

Note that {@link DropChannelRequest.DropChannelRequestBuilder#setClientSequencer(Long)}} can - * be used to drop a specific version of the channel and prevent accidentally dropping a channel - * concurrently opened by another client. If it is not specified, this call will blindly drop the - * latest version of the channel and any pending data will be lost. Also see {@link - * OpenChannelRequest.OpenChannelRequestBuilder#setDropOnClose(boolean)} to automatically drop - * channels on close. + *

Note that this call will blindly drop the latest version of the channel and any pending data + * will be lost. Also see {@link SnowflakeStreamingIngestChannel#close(boolean)} to drop channels + * on close. That approach will drop the local version of the channel and if the channel has been + * concurrently reopened by another client, that version of the channel won't be affected. * * @param request the drop channel request */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java index e36692915..fce91f1d2 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelVersionRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelVersionRequest.java new file mode 100644 index 000000000..9afdbcbb3 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelVersionRequest.java @@ -0,0 +1,19 @@ +package net.snowflake.ingest.streaming.internal; + +import net.snowflake.ingest.streaming.DropChannelRequest; + +/** + * Same as DropChannelRequest but allows specifying a client sequencer to drop a specific version. + */ +class DropChannelVersionRequest extends DropChannelRequest { + private final Long clientSequencer; + + public DropChannelVersionRequest(DropChannelRequestBuilder builder, long clientSequencer) { + super(builder); + this.clientSequencer = clientSequencer; + } + + Long getClientSequencer() { + return this.clientSequencer; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java index ace1c80c8..3e442d43b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java @@ -30,8 +30,6 @@ static class SnowflakeStreamingIngestChannelBuilder { private ZoneId defaultTimezone; - private boolean dropOnClose; - private SnowflakeStreamingIngestChannelBuilder(String name) { this.name = name; } @@ -93,11 +91,6 @@ SnowflakeStreamingIngestChannelBuilder setOwningClient( return this; } - SnowflakeStreamingIngestChannelBuilder setDropOnClose(boolean dropOnClose) { - this.dropOnClose = dropOnClose; - return this; - } - SnowflakeStreamingIngestChannelInternal build() { Utils.assertStringNotNullOrEmpty("channel name", this.name); Utils.assertStringNotNullOrEmpty("table name", this.tableName); @@ -123,8 +116,7 @@ SnowflakeStreamingIngestChannelInternal build() { this.encryptionKeyId, this.onErrorOption, this.defaultTimezone, - this.owningClient.getParameterProvider().getBlobFormatVersion(), - this.dropOnClose); + this.owningClient.getParameterProvider().getBlobFormatVersion()); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 07acfbcb7..2f4a7678d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -44,7 +44,6 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn // Reference to the row buffer private final RowBuffer rowBuffer; - private final boolean dropOnClose; // Indicates whether the channel is closed private volatile boolean isClosed; @@ -82,8 +81,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn String encryptionKey, Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, - ZoneOffset defaultTimezone, - boolean dropOnClose) { + ZoneOffset defaultTimezone) { this( name, dbName, @@ -97,8 +95,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn encryptionKeyId, onErrorOption, defaultTimezone, - client.getParameterProvider().getBlobFormatVersion(), - dropOnClose); + client.getParameterProvider().getBlobFormatVersion()); } /** Default constructor */ @@ -115,8 +112,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, - Constants.BdecVersion bdecVersion, - boolean dropOnClose) { + Constants.BdecVersion bdecVersion) { this.isClosed = false; this.owningClient = client; this.channelFlushContext = @@ -133,7 +129,6 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn channelState, new ClientBufferParameters(owningClient)); this.tableColumns = new HashMap<>(); - this.dropOnClose = dropOnClose; logger.logInfo( "Channel={} created for table={}", this.channelFlushContext.getName(), @@ -273,6 +268,11 @@ CompletableFuture flush(boolean closing) { */ @Override public CompletableFuture close() { + return this.close(false); + } + + @Override + public CompletableFuture close(boolean drop) { checkValidation(); if (isClosed()) { @@ -298,18 +298,14 @@ public CompletableFuture close() { .map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName) .collect(Collectors.toList())); } - if (this.dropOnClose) { - this.owningClient.dropChannel( + if (drop) { + DropChannelRequest.DropChannelRequestBuilder builder = DropChannelRequest.builder(this.getChannelContext().getName()) .setDBName(this.getDBName()) .setTableName(this.getTableName()) - .setSchemaName(this.getSchemaName()) - .setClientSequencer(this.getChannelSequencer()) - .build()); - System.out.println( - "SUCCESSFULLY dropped " - + this.getChannelContext().getFullyQualifiedName() - + " channel"); + .setSchemaName(this.getSchemaName()); + this.owningClient.dropChannel( + new DropChannelVersionRequest(builder, this.getChannelSequencer())); } }); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 25b359949..0dde314f4 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -356,7 +356,6 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest .setEncryptionKeyId(response.getEncryptionKeyId()) .setOnErrorOption(request.getOnErrorOption()) .setDefaultTimezone(request.getDefaultTimezone()) - .setDropOnClose(request.getDropOnClose()) .build(); // Setup the row buffer schema @@ -392,8 +391,12 @@ public void dropChannel(DropChannelRequest request) { payload.put("database", request.getDBName()); payload.put("schema", request.getSchemaName()); payload.put("role", this.role); - if (request.getClientSequencer() != null) { - payload.put("client_sequencer", request.getClientSequencer()); + Long clientSequencer = null; + if (request instanceof DropChannelVersionRequest) { + clientSequencer = ((DropChannelVersionRequest) request).getClientSequencer(); + if (clientSequencer != null) { + payload.put("client_sequencer", clientSequencer); + } } DropChannelResponse response = @@ -421,11 +424,11 @@ public void dropChannel(DropChannelRequest request) { "Drop channel request succeeded, channel={}, table={}, clientSequencer={} client={}", request.getChannelName(), request.getFullyQualifiedTableName(), - request.getClientSequencer(), + clientSequencer, getName()); } catch (IOException | IngestResponseException e) { - throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); + throw new SFException(e, ErrorCode.DROP_CHANNEL_FAILURE, e.getMessage()); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 089e66437..ea88d618c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -38,8 +38,7 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -53,8 +52,7 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -68,8 +66,7 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); cache.addChannel(channel1); cache.addChannel(channel2); cache.addChannel(channel3); @@ -95,8 +92,7 @@ public void testAddChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); cache.addChannel(channel); Assert.assertEquals(1, cache.getSize()); Assert.assertTrue(channel == cache.iterator().next().getValue().get(channelName)); @@ -114,8 +110,7 @@ public void testAddChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); cache.addChannel(channelDup); // The old channel should be invalid now Assert.assertTrue(!channel.isValid()); @@ -196,8 +191,7 @@ public void testRemoveChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); cache.removeChannelIfSequencersMatch(channel3Dup); // Verify that remove the same channel with a different channel sequencer is a no op Assert.assertEquals(1, cache.getSize()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index c927e6b4e..a25fd416e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -258,8 +258,7 @@ SnowflakeStreamingIngestChannelInternal>> createChannel( encryptionKeyId, onErrorOption, defaultTimezone, - Constants.BdecVersion.THREE, - false); + Constants.BdecVersion.THREE); } @Override @@ -878,8 +877,7 @@ public void testInvalidateChannels() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - ZoneOffset.UTC, - false); + ZoneOffset.UTC); SnowflakeStreamingIngestChannelInternal channel2 = new SnowflakeStreamingIngestChannelInternal<>( @@ -894,8 +892,7 @@ public void testInvalidateChannels() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - ZoneOffset.UTC, - false); + ZoneOffset.UTC); channelCache.addChannel(channel1); channelCache.addChannel(channel2); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index fa767bfd4..780563d28 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -33,7 +33,6 @@ import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.connection.RequestBuilder; -import net.snowflake.ingest.streaming.DropChannelRequest; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; @@ -173,8 +172,7 @@ public void testChannelValid() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); Assert.assertTrue(channel.isValid()); channel.invalidate("from testChannelValid"); @@ -224,8 +222,7 @@ public void testChannelClose() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); Assert.assertFalse(channel.isClosed()); channel.markClosed(); @@ -554,8 +551,7 @@ public void testInsertRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); ColumnMetadata col = new ColumnMetadata(); col.setOrdinal(1); @@ -641,8 +637,7 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); channel.setupSchema(schema); InsertValidationResponse insertValidationResponse = channel.insertRow(row, "token-1"); @@ -666,8 +661,7 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.ABORT, - UTC, - false); + UTC); channel.setupSchema(schema); try { @@ -692,8 +686,7 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.SKIP_BATCH, - UTC, - false); + UTC); channel.setupSchema(schema); insertValidationResponse = channel.insertRow(row, "token-1"); @@ -726,8 +719,7 @@ public void testInsertRowThrottling() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); ParameterProvider parameterProvider = new ParameterProvider(); memoryInfoProvider.freeMemory = @@ -773,8 +765,7 @@ public void testFlush() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -810,8 +801,7 @@ public void testClose() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -845,8 +835,7 @@ public void testDropOnClose() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - true); + UTC); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -855,18 +844,49 @@ public void testDropOnClose() throws Exception { Mockito.doReturn(response).when(client).getChannelsStatus(Mockito.any()); Assert.assertFalse(channel.isClosed()); - DropChannelResponse dropChannelResponse = new DropChannelResponse(); Mockito.doNothing().when(client).dropChannel(Mockito.any()); - channel.close().get(); + channel.close(true).get(); Assert.assertTrue(channel.isClosed()); Mockito.verify(client, Mockito.times(1)) .dropChannel( argThat( - (DropChannelRequest req) -> + (DropChannelVersionRequest req) -> req.getChannelName().equals(channel.getName()) && req.getClientSequencer().equals(channel.getChannelSequencer()))); } + @Test + public void testDropOnCloseInvalidChannel() throws Exception { + SnowflakeStreamingIngestClientInternal client = + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + SnowflakeStreamingIngestChannelInternal channel = + new SnowflakeStreamingIngestChannelInternal<>( + "channel", + "db", + "schema", + "table", + "0", + 1L, + 0L, + client, + "key", + 1234L, + OpenChannelRequest.OnErrorOption.CONTINUE, + UTC); + ChannelsStatusResponse response = new ChannelsStatusResponse(); + response.setStatusCode(0L); + response.setMessage("Success"); + response.setChannels(new ArrayList<>()); + + Mockito.doReturn(response).when(client).getChannelsStatus(Mockito.any()); + + Assert.assertFalse(channel.isClosed()); + channel.invalidate("test"); + Mockito.doNothing().when(client).dropChannel(Mockito.any()); + Assert.assertThrows(SFException.class, () -> channel.close(true).get()); + Mockito.verify(client, Mockito.never()).dropChannel(Mockito.any()); + } + @Test public void testGetLatestCommittedOffsetToken() { String offsetToken = "10"; @@ -885,8 +905,7 @@ public void testGetLatestCommittedOffsetToken() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 7332aa738..dda34d83d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -1,7 +1,16 @@ package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; -import static net.snowflake.ingest.utils.Constants.*; +import static net.snowflake.ingest.utils.Constants.ACCOUNT_URL; +import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY; +import static net.snowflake.ingest.utils.Constants.PRIVATE_KEY; +import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.RESPONSE_ERR_ENQUEUE_TABLE_CHUNK_QUEUE_FULL; +import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; +import static net.snowflake.ingest.utils.Constants.ROLE; +import static net.snowflake.ingest.utils.Constants.USER; import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_SNOWPIPE_STREAMING_METRICS; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.when; @@ -88,8 +97,7 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, - false); + BDEC_VERSION); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -104,8 +112,7 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, - false); + BDEC_VERSION); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -120,8 +127,7 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, - false); + BDEC_VERSION); channel4 = new SnowflakeStreamingIngestChannelInternal<>( "channel4", @@ -136,8 +142,7 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, - false); + BDEC_VERSION); } @Test @@ -353,8 +358,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, - false); + BDEC_VERSION); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); @@ -458,8 +462,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, - false); + BDEC_VERSION); try { client.getChannelsStatus(Collections.singletonList(channel)); @@ -500,8 +503,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, - false); + BDEC_VERSION); ChannelMetadata channelMetadata = ChannelMetadata.builder() @@ -1188,8 +1190,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); SnowflakeStreamingIngestChannelInternal channel2 = new SnowflakeStreamingIngestChannelInternal<>( channel2Name, @@ -1203,8 +1204,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); client.getChannelCache().addChannel(channel1); client.getChannelCache().addChannel(channel2); @@ -1331,8 +1331,7 @@ public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC, - false); + UTC); client.getChannelCache().addChannel(channel); ChannelsStatusResponse response = new ChannelsStatusResponse(); @@ -1420,8 +1419,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, - false); + BDEC_VERSION); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java index 112d90a51..03c647b4b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java @@ -222,18 +222,17 @@ public void testDropChannel() throws Exception { .setSchemaName(TEST_SCHEMA) .setTableName(TEST_TABLE) .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) - .setDropOnClose(true) .build(); // Open a streaming ingest channel from the given client SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1); // Close the channel after insertion - channel1.close().get(); + channel1.close(true).get(); // verify expected request sent to server Mockito.verify(requestBuilder) .generateStreamingIngestPostRequest( - ArgumentMatchers.contains("channel"), + ArgumentMatchers.contains("client_sequencer"), ArgumentMatchers.refEq(DROP_CHANNEL_ENDPOINT), ArgumentMatchers.refEq("drop channel")); } From d3b6ae14145cd7eeba3a68c4cc4b2f23179f9e89 Mon Sep 17 00:00:00 2001 From: Purujit Saha Date: Wed, 3 Jan 2024 21:37:29 +0000 Subject: [PATCH 3/3] Add copyright blurb --- .../ingest/streaming/internal/DropChannelVersionRequest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelVersionRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelVersionRequest.java index 9afdbcbb3..abc6b8067 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelVersionRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelVersionRequest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import net.snowflake.ingest.streaming.DropChannelRequest;