Skip to content

Commit

Permalink
Add client side support for upcoming drop channel API
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-psaha committed Feb 21, 2024
1 parent f8c8e3f commit 4ff6e1e
Show file tree
Hide file tree
Showing 15 changed files with 486 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum ApiName {
INSERT_REPORT("GET"),
LOAD_HISTORY_SCAN("GET"),
STREAMING_OPEN_CHANNEL("POST"),
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
Expand Down
104 changes: 104 additions & 0 deletions src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;

import net.snowflake.ingest.utils.Utils;

/** A class that is used to drop a {@link SnowflakeStreamingIngestChannel} */
public class DropChannelRequest {
// Name of the channel
private final String channelName;

// Name of the database that the channel belongs to
private final String dbName;

// Name of the schema that the channel belongs to
private final String schemaName;

// Name of the table that the channel belongs to
private final String tableName;

// Optional client sequencer to verify when dropping the channel.
private final Long clientSequencer;

public static DropChannelRequestBuilder builder(String channelName) {
return new DropChannelRequestBuilder(channelName);
}

/** A builder class to build a DropChannelRequest */
public static class DropChannelRequestBuilder {
private final String channelName;
private String dbName;
private String schemaName;
private String tableName;

private Long clientSequencer = null;

public DropChannelRequestBuilder(String channelName) {
this.channelName = channelName;
}

public DropChannelRequestBuilder setDBName(String dbName) {
this.dbName = dbName;
return this;
}

public DropChannelRequestBuilder setSchemaName(String schemaName) {
this.schemaName = schemaName;
return this;
}

public DropChannelRequestBuilder setTableName(String tableName) {
this.tableName = tableName;
return this;
}

public DropChannelRequestBuilder setClientSequencer(Long clientSequencer) {
this.clientSequencer = clientSequencer;
return this;
}

public DropChannelRequest build() {
return new DropChannelRequest(this);
}
}

private DropChannelRequest(DropChannelRequestBuilder builder) {
Utils.assertStringNotNullOrEmpty("channel name", builder.channelName);
Utils.assertStringNotNullOrEmpty("database name", builder.dbName);
Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName);
Utils.assertStringNotNullOrEmpty("table name", builder.tableName);

this.channelName = builder.channelName;
this.dbName = builder.dbName;
this.schemaName = builder.schemaName;
this.tableName = builder.tableName;
this.clientSequencer = builder.clientSequencer;
}

public String getDBName() {
return this.dbName;
}

public String getSchemaName() {
return this.schemaName;
}

public String getTableName() {
return this.tableName;
}

public String getChannelName() {
return this.channelName;
}

public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
}

public Long getClientSequencer() {
return this.clientSequencer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public enum OnErrorOption {
private final String offsetToken;
private final boolean isOffsetTokenProvided;

// If true, the channel will be dropped when it is closed after any pending data is fully
// committed.
private final boolean dropOnClose;

public static OpenChannelRequestBuilder builder(String channelName) {
return new OpenChannelRequestBuilder(channelName);
}
Expand All @@ -59,6 +63,8 @@ public static class OpenChannelRequestBuilder {
private String offsetToken;
private boolean isOffsetTokenProvided = false;

private boolean dropOnClose = false;

public OpenChannelRequestBuilder(String channelName) {
this.channelName = channelName;
this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE;
Expand Down Expand Up @@ -95,6 +101,11 @@ public OpenChannelRequestBuilder setOffsetToken(String offsetToken) {
return this;
}

public OpenChannelRequestBuilder setDropOnClose(boolean dropOnClose) {
this.dropOnClose = dropOnClose;
return this;
}

public OpenChannelRequest build() {
return new OpenChannelRequest(this);
}
Expand All @@ -116,6 +127,7 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
this.dropOnClose = builder.dropOnClose;
}

public String getDBName() {
Expand Down Expand Up @@ -153,4 +165,8 @@ public String getOffsetToken() {
public boolean isOffsetTokenProvided() {
return this.isOffsetTokenProvided;
}

public boolean getDropOnClose() {
return this.dropOnClose;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable {
*/
SnowflakeStreamingIngestChannel openChannel(OpenChannelRequest request);

/**
* Drop the specified channel on the server using a {@link DropChannelRequest}
*
* <p>Note that {@link DropChannelRequest.DropChannelRequestBuilder#setClientSequencer(Long)}} can
* be used to drop a specific version of the channel and prevent accidentally dropping a channel
* concurrently opened by another client. If it is not specified, this call will blindly drop the
* latest version of the channel and any pending data will be lost. Also see {@link
* OpenChannelRequest.OpenChannelRequestBuilder#setDropOnClose(boolean)} to automatically drop
* channels on close.
*
* @param request the drop channel request
*/
void dropChannel(DropChannelRequest request);

/**
* Get the client name
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;

/** Response for a {@link net.snowflake.ingest.streaming.DropChannelRequest}. */
class DropChannelResponse extends StreamingIngestResponse {
private Long statusCode;
private String message;
private String dbName;
private String schemaName;
private String tableName;
private String channelName;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

@Override
Long getStatusCode() {
return this.statusCode;
}

@JsonProperty("message")
void setMessage(String message) {
this.message = message;
}

String getMessage() {
return this.message;
}

@JsonProperty("database")
void setDBName(String dbName) {
this.dbName = dbName;
}

String getDBName() {
return this.dbName;
}

@JsonProperty("schema")
void setSchemaName(String schemaName) {
this.schemaName = schemaName;
}

String getSchemaName() {
return this.schemaName;
}

@JsonProperty("table")
void setTableName(String tableName) {
this.tableName = tableName;
}

String getTableName() {
return this.tableName;
}

@JsonProperty("channel")
void setChannelName(String channelName) {
this.channelName = channelName;
}

String getChannelName() {
return this.channelName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ static class SnowflakeStreamingIngestChannelBuilder<T> {

private ZoneId defaultTimezone;

private boolean dropOnClose;

private SnowflakeStreamingIngestChannelBuilder(String name) {
this.name = name;
}
Expand Down Expand Up @@ -91,6 +93,11 @@ SnowflakeStreamingIngestChannelBuilder<T> setOwningClient(
return this;
}

SnowflakeStreamingIngestChannelBuilder<T> setDropOnClose(boolean dropOnClose) {
this.dropOnClose = dropOnClose;
return this;
}

SnowflakeStreamingIngestChannelInternal<T> build() {
Utils.assertStringNotNullOrEmpty("channel name", this.name);
Utils.assertStringNotNullOrEmpty("table name", this.tableName);
Expand All @@ -116,7 +123,8 @@ SnowflakeStreamingIngestChannelInternal<T> build() {
this.encryptionKeyId,
this.onErrorOption,
this.defaultTimezone,
this.owningClient.getParameterProvider().getBlobFormatVersion());
this.owningClient.getParameterProvider().getBlobFormatVersion(),
this.dropOnClose);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
Expand All @@ -43,6 +44,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn

// Reference to the row buffer
private final RowBuffer<T> rowBuffer;
private final boolean dropOnClose;

// Indicates whether the channel is closed
private volatile boolean isClosed;
Expand Down Expand Up @@ -80,7 +82,8 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneOffset defaultTimezone) {
ZoneOffset defaultTimezone,
boolean dropOnClose) {
this(
name,
dbName,
Expand All @@ -94,7 +97,8 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
encryptionKeyId,
onErrorOption,
defaultTimezone,
client.getParameterProvider().getBlobFormatVersion());
client.getParameterProvider().getBlobFormatVersion(),
dropOnClose);
}

/** Default constructor */
Expand All @@ -111,7 +115,8 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
Constants.BdecVersion bdecVersion) {
Constants.BdecVersion bdecVersion,
boolean dropOnClose) {
this.isClosed = false;
this.owningClient = client;
this.channelFlushContext =
Expand All @@ -128,6 +133,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
channelState,
new ClientBufferParameters(owningClient));
this.tableColumns = new HashMap<>();
this.dropOnClose = dropOnClose;
logger.logInfo(
"Channel={} created for table={}",
this.channelFlushContext.getName(),
Expand Down Expand Up @@ -292,6 +298,19 @@ public CompletableFuture<Void> close() {
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
.collect(Collectors.toList()));
}
if (this.dropOnClose) {
this.owningClient.dropChannel(
DropChannelRequest.builder(this.getChannelContext().getName())
.setDBName(this.getDBName())
.setTableName(this.getTableName())
.setSchemaName(this.getSchemaName())
.setClientSequencer(this.getChannelSequencer())
.build());
System.out.println(
"SUCCESSFULLY dropped "
+ this.getChannelContext().getFullyQualifiedName()
+ " channel");
}
});
}

Expand Down
Loading

0 comments on commit 4ff6e1e

Please sign in to comment.