diff --git a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java
index f6c7a6c1b..bf20afda8 100644
--- a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java
+++ b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java
@@ -39,6 +39,7 @@ public enum ApiName {
INSERT_REPORT("GET"),
LOAD_HISTORY_SCAN("GET"),
STREAMING_OPEN_CHANNEL("POST"),
+ STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
diff --git a/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java
new file mode 100644
index 000000000..4fe476edf
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming;
+
+import net.snowflake.ingest.utils.Utils;
+
+/** A class that is used to drop a {@link SnowflakeStreamingIngestChannel} */
+public class DropChannelRequest {
+ // Name of the channel
+ private final String channelName;
+
+ // Name of the database that the channel belongs to
+ private final String dbName;
+
+ // Name of the schema that the channel belongs to
+ private final String schemaName;
+
+ // Name of the table that the channel belongs to
+ private final String tableName;
+
+ // Optional client sequencer to verify when dropping the channel.
+ private final Long clientSequencer;
+
+ public static DropChannelRequestBuilder builder(String channelName) {
+ return new DropChannelRequestBuilder(channelName);
+ }
+
+ /** A builder class to build a DropChannelRequest */
+ public static class DropChannelRequestBuilder {
+ private final String channelName;
+ private String dbName;
+ private String schemaName;
+ private String tableName;
+
+ private Long clientSequencer = null;
+
+ public DropChannelRequestBuilder(String channelName) {
+ this.channelName = channelName;
+ }
+
+ public DropChannelRequestBuilder setDBName(String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public DropChannelRequestBuilder setSchemaName(String schemaName) {
+ this.schemaName = schemaName;
+ return this;
+ }
+
+ public DropChannelRequestBuilder setTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public DropChannelRequestBuilder setClientSequencer(Long clientSequencer) {
+ this.clientSequencer = clientSequencer;
+ return this;
+ }
+
+ public DropChannelRequest build() {
+ return new DropChannelRequest(this);
+ }
+ }
+
+ private DropChannelRequest(DropChannelRequestBuilder builder) {
+ Utils.assertStringNotNullOrEmpty("channel name", builder.channelName);
+ Utils.assertStringNotNullOrEmpty("database name", builder.dbName);
+ Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName);
+ Utils.assertStringNotNullOrEmpty("table name", builder.tableName);
+
+ this.channelName = builder.channelName;
+ this.dbName = builder.dbName;
+ this.schemaName = builder.schemaName;
+ this.tableName = builder.tableName;
+ this.clientSequencer = builder.clientSequencer;
+ }
+
+ public String getDBName() {
+ return this.dbName;
+ }
+
+ public String getSchemaName() {
+ return this.schemaName;
+ }
+
+ public String getTableName() {
+ return this.tableName;
+ }
+
+ public String getChannelName() {
+ return this.channelName;
+ }
+
+ public String getFullyQualifiedTableName() {
+ return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
+ }
+
+ public Long getClientSequencer() {
+ return this.clientSequencer;
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java
index d30472b63..b103d046c 100644
--- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java
+++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java
@@ -43,6 +43,10 @@ public enum OnErrorOption {
private final String offsetToken;
private final boolean isOffsetTokenProvided;
+ // If true, the channel will be dropped when it is closed after any pending data is fully
+ // committed.
+ private final boolean dropOnClose;
+
public static OpenChannelRequestBuilder builder(String channelName) {
return new OpenChannelRequestBuilder(channelName);
}
@@ -59,6 +63,8 @@ public static class OpenChannelRequestBuilder {
private String offsetToken;
private boolean isOffsetTokenProvided = false;
+ private boolean dropOnClose = false;
+
public OpenChannelRequestBuilder(String channelName) {
this.channelName = channelName;
this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE;
@@ -89,10 +95,15 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) {
return this;
}
- public OpenChannelRequestBuilder setOffsetToken(String offsetToken){
+ public OpenChannelRequestBuilder setOffsetToken(String offsetToken) {
this.offsetToken = offsetToken;
this.isOffsetTokenProvided = true;
- return this;
+ return this;
+ }
+
+ public OpenChannelRequestBuilder setDropOnClose(boolean dropOnClose) {
+ this.dropOnClose = dropOnClose;
+ return this;
}
public OpenChannelRequest build() {
@@ -116,6 +127,7 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
+ this.dropOnClose = builder.dropOnClose;
}
public String getDBName() {
@@ -153,4 +165,8 @@ public String getOffsetToken() {
public boolean isOffsetTokenProvided() {
return this.isOffsetTokenProvided;
}
+
+ public boolean getDropOnClose() {
+ return this.dropOnClose;
+ }
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java
index 3ae4a83d1..3054d9564 100644
--- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java
+++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClient.java
@@ -25,6 +25,20 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable {
*/
SnowflakeStreamingIngestChannel openChannel(OpenChannelRequest request);
+ /**
+ * Drop the specified channel on the server using a {@link DropChannelRequest}
+ *
+ *
Note that {@link DropChannelRequest.DropChannelRequestBuilder#setClientSequencer(Long)}} can
+ * be used to drop a specific version of the channel and prevent accidentally dropping a channel
+ * concurrently opened by another client. If it is not specified, this call will blindly drop the
+ * latest version of the channel and any pending data will be lost. Also see {@link
+ * OpenChannelRequest.OpenChannelRequestBuilder#setDropOnClose(boolean)} to automatically drop
+ * channels on close.
+ *
+ * @param request the drop channel request
+ */
+ void dropChannel(DropChannelRequest request);
+
/**
* Get the client name
*
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java
new file mode 100644
index 000000000..e36692915
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelResponse.java
@@ -0,0 +1,68 @@
+package net.snowflake.ingest.streaming.internal;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Response for a {@link net.snowflake.ingest.streaming.DropChannelRequest}. */
+class DropChannelResponse extends StreamingIngestResponse {
+ private Long statusCode;
+ private String message;
+ private String dbName;
+ private String schemaName;
+ private String tableName;
+ private String channelName;
+
+ @JsonProperty("status_code")
+ void setStatusCode(Long statusCode) {
+ this.statusCode = statusCode;
+ }
+
+ @Override
+ Long getStatusCode() {
+ return this.statusCode;
+ }
+
+ @JsonProperty("message")
+ void setMessage(String message) {
+ this.message = message;
+ }
+
+ String getMessage() {
+ return this.message;
+ }
+
+ @JsonProperty("database")
+ void setDBName(String dbName) {
+ this.dbName = dbName;
+ }
+
+ String getDBName() {
+ return this.dbName;
+ }
+
+ @JsonProperty("schema")
+ void setSchemaName(String schemaName) {
+ this.schemaName = schemaName;
+ }
+
+ String getSchemaName() {
+ return this.schemaName;
+ }
+
+ @JsonProperty("table")
+ void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ String getTableName() {
+ return this.tableName;
+ }
+
+ @JsonProperty("channel")
+ void setChannelName(String channelName) {
+ this.channelName = channelName;
+ }
+
+ String getChannelName() {
+ return this.channelName;
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java
index 3e442d43b..ace1c80c8 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java
@@ -30,6 +30,8 @@ static class SnowflakeStreamingIngestChannelBuilder {
private ZoneId defaultTimezone;
+ private boolean dropOnClose;
+
private SnowflakeStreamingIngestChannelBuilder(String name) {
this.name = name;
}
@@ -91,6 +93,11 @@ SnowflakeStreamingIngestChannelBuilder setOwningClient(
return this;
}
+ SnowflakeStreamingIngestChannelBuilder setDropOnClose(boolean dropOnClose) {
+ this.dropOnClose = dropOnClose;
+ return this;
+ }
+
SnowflakeStreamingIngestChannelInternal build() {
Utils.assertStringNotNullOrEmpty("channel name", this.name);
Utils.assertStringNotNullOrEmpty("table name", this.tableName);
@@ -116,7 +123,8 @@ SnowflakeStreamingIngestChannelInternal build() {
this.encryptionKeyId,
this.onErrorOption,
this.defaultTimezone,
- this.owningClient.getParameterProvider().getBlobFormatVersion());
+ this.owningClient.getParameterProvider().getBlobFormatVersion(),
+ this.dropOnClose);
}
}
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
index 756f9b5e8..08e30892a 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
@@ -19,6 +19,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
@@ -42,6 +43,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
// Reference to the row buffer
private final RowBuffer rowBuffer;
+ private final boolean dropOnClose;
// Indicates whether the channel is closed
private volatile boolean isClosed;
@@ -79,7 +81,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
- ZoneOffset defaultTimezone) {
+ ZoneOffset defaultTimezone,
+ boolean dropOnClose) {
this(
name,
dbName,
@@ -93,7 +96,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
encryptionKeyId,
onErrorOption,
defaultTimezone,
- client.getParameterProvider().getBlobFormatVersion());
+ client.getParameterProvider().getBlobFormatVersion(),
+ dropOnClose);
}
/** Default constructor */
@@ -110,7 +114,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
- Constants.BdecVersion bdecVersion) {
+ Constants.BdecVersion bdecVersion,
+ boolean dropOnClose) {
this.isClosed = false;
this.owningClient = client;
this.channelFlushContext =
@@ -127,6 +132,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
channelState,
new ClientBufferParameters(owningClient));
this.tableColumns = new HashMap<>();
+ this.dropOnClose = dropOnClose;
logger.logInfo(
"Channel={} created for table={}",
this.channelFlushContext.getName(),
@@ -291,6 +297,19 @@ public CompletableFuture close() {
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
.collect(Collectors.toList()));
}
+ if (this.dropOnClose) {
+ this.owningClient.dropChannel(
+ DropChannelRequest.builder(this.getChannelContext().getName())
+ .setDBName(this.getDBName())
+ .setTableName(this.getTableName())
+ .setSchemaName(this.getSchemaName())
+ .setClientSequencer(this.getChannelSequencer())
+ .build());
+ System.out.println(
+ "SUCCESSFULLY dropped "
+ + this.getChannelContext().getFullyQualifiedName()
+ + " channel");
+ }
});
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java
index 88510a8d2..dd9d24e36 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java
@@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS;
+import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB;
import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries;
@@ -12,6 +13,7 @@
import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.COMMIT_MAX_RETRY_COUNT;
import static net.snowflake.ingest.utils.Constants.COMMIT_RETRY_INTERVAL_IN_MS;
+import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.ENABLE_TELEMETRY_TO_SF;
import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY;
import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT;
@@ -63,6 +65,7 @@
import net.snowflake.ingest.connection.OAuthCredential;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.connection.TelemetryService;
+import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
@@ -353,7 +356,7 @@ public SnowflakeStreamingIngestChannelInternal> openChannel(OpenChannelRequest
payload.put("schema", request.getSchemaName());
payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name());
payload.put("role", this.role);
- if (request.isOffsetTokenProvided()){
+ if (request.isOffsetTokenProvided()) {
payload.put("offset_token", request.getOffsetToken());
}
@@ -401,6 +404,7 @@ public SnowflakeStreamingIngestChannelInternal> openChannel(OpenChannelRequest
.setEncryptionKeyId(response.getEncryptionKeyId())
.setOnErrorOption(request.getOnErrorOption())
.setDefaultTimezone(request.getDefaultTimezone())
+ .setDropOnClose(request.getDropOnClose())
.build();
// Setup the row buffer schema
@@ -415,6 +419,64 @@ public SnowflakeStreamingIngestChannelInternal> openChannel(OpenChannelRequest
}
}
+ @Override
+ public void dropChannel(DropChannelRequest request) {
+ if (isClosed) {
+ throw new SFException(ErrorCode.CLOSED_CLIENT);
+ }
+
+ logger.logDebug(
+ "Drop channel request start, channel={}, table={}, client={}",
+ request.getChannelName(),
+ request.getFullyQualifiedTableName(),
+ getName());
+
+ try {
+ Map