Skip to content

Commit

Permalink
Done with code changes - need to add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-psaha committed Dec 13, 2023
1 parent 72c4004 commit 06dc34d
Show file tree
Hide file tree
Showing 15 changed files with 328 additions and 59 deletions.
14 changes: 7 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@
<artifactId>protobuf-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Change the 'test' scope to 'runtime' to enable console logging in examples -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
Expand Down Expand Up @@ -524,13 +531,6 @@
<version>2.0.2</version>
<scope>test</scope>
</dependency>
<!-- Change the 'test' scope to 'runtime' to enable console logging in examples -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum ApiName {
INSERT_REPORT("GET"),
LOAD_HISTORY_SCAN("GET"),
STREAMING_OPEN_CHANNEL("POST"),
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/net/snowflake/ingest/streaming/DropChannelRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;

import net.snowflake.ingest.utils.Utils;

/** A class that is used to open/create a {@link SnowflakeStreamingIngestChannel} */
public class DropChannelRequest {
// Name of the channel
private final String channelName;

// Name of the database that the channel belongs to
private final String dbName;

// Name of the schema that the channel belongs to
private final String schemaName;

// Name of the table that the channel belongs to
private final String tableName;

private final Long clientSequencer;

public static DropChannelRequestBuilder builder(String channelName) {
return new DropChannelRequestBuilder(channelName);
}

/** A builder class to build a OpenChannelRequest */
public static class DropChannelRequestBuilder {
private final String channelName;
private String dbName;
private String schemaName;
private String tableName;

private Long clientSequencer = null;

public DropChannelRequestBuilder(String channelName) {
this.channelName = channelName;
}

public DropChannelRequestBuilder setDBName(String dbName) {
this.dbName = dbName;
return this;
}

public DropChannelRequestBuilder setSchemaName(String schemaName) {
this.schemaName = schemaName;
return this;
}

public DropChannelRequestBuilder setTableName(String tableName) {
this.tableName = tableName;
return this;
}

public DropChannelRequestBuilder setClientSequencer(Long clientSequencer) {
this.clientSequencer = clientSequencer;
return this;
}

public DropChannelRequest build() {
return new DropChannelRequest(this);
}
}

private DropChannelRequest(DropChannelRequestBuilder builder) {
Utils.assertStringNotNullOrEmpty("channel name", builder.channelName);
Utils.assertStringNotNullOrEmpty("database name", builder.dbName);
Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName);
Utils.assertStringNotNullOrEmpty("table name", builder.tableName);

this.channelName = builder.channelName;
this.dbName = builder.dbName;
this.schemaName = builder.schemaName;
this.tableName = builder.tableName;
this.clientSequencer = builder.clientSequencer;
}

public String getDBName() {
return this.dbName;
}

public String getSchemaName() {
return this.schemaName;
}

public String getTableName() {
return this.tableName;
}

public String getChannelName() {
return this.channelName;
}

public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
}

public Long getClientSequencer() {
return this.clientSequencer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public enum OnErrorOption {
private final String offsetToken;
private final boolean isOffsetTokenProvided;

// If true, the channel will be dropped when it is closed after any pending data is fully
// committed.
private final boolean dropOnClose;

public static OpenChannelRequestBuilder builder(String channelName) {
return new OpenChannelRequestBuilder(channelName);
}
Expand All @@ -59,6 +63,8 @@ public static class OpenChannelRequestBuilder {
private String offsetToken;
private boolean isOffsetTokenProvided = false;

private boolean dropOnClose = false;

public OpenChannelRequestBuilder(String channelName) {
this.channelName = channelName;
this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE;
Expand Down Expand Up @@ -89,10 +95,15 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) {
return this;
}

public OpenChannelRequestBuilder setOffsetToken(String offsetToken){
public OpenChannelRequestBuilder setOffsetToken(String offsetToken) {
this.offsetToken = offsetToken;
this.isOffsetTokenProvided = true;
return this;
return this;
}

public OpenChannelRequestBuilder setDropOnClose(boolean dropOnClose) {
this.dropOnClose = dropOnClose;
return this;
}

public OpenChannelRequest build() {
Expand All @@ -116,6 +127,7 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
this.dropOnClose = builder.dropOnClose;
}

public String getDBName() {
Expand Down Expand Up @@ -153,4 +165,8 @@ public String getOffsetToken() {
public boolean isOffsetTokenProvided() {
return this.isOffsetTokenProvided;
}

public boolean getDropOnClose() {
return this.dropOnClose;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable {
*/
SnowflakeStreamingIngestChannel openChannel(OpenChannelRequest request);

/**
* Drop the specified channel on the server using a {@link DropChannelRequest}
*
* <p>Note that {@link DropChannelRequest.DropChannelRequestBuilder#setClientSequencer(Long)}} can
* be used to drop a specific version of the channel and prevent accidentally dropping a channel
* concurrently opened by another client. If it is not specified, this call will blindly drop the
* latest version of the channel and any pending data will be lost. Also see {@link
* OpenChannelRequest.OpenChannelRequestBuilder#setDropOnClose(boolean)} to automatically drop
* channels on close.
*
* @param request the drop channel request
*/
void dropChannel(DropChannelRequest request);

/**
* Get the client name
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;

public class DropChannelResponse extends StreamingIngestResponse {
private Long statusCode;
private String message;
private String dbName;
private String schemaName;
private String tableName;
private String channelName;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

@Override
Long getStatusCode() {
return this.statusCode;
}

@JsonProperty("message")
void setMessage(String message) {
this.message = message;
}

String getMessage() {
return this.message;
}

@JsonProperty("database")
void setDBName(String dbName) {
this.dbName = dbName;
}

String getDBName() {
return this.dbName;
}

@JsonProperty("schema")
void setSchemaName(String schemaName) {
this.schemaName = schemaName;
}

String getSchemaName() {
return this.schemaName;
}

@JsonProperty("table")
void setTableName(String tableName) {
this.tableName = tableName;
}

String getTableName() {
return this.tableName;
}

@JsonProperty("channel")
void setChannelName(String channelName) {
this.channelName = channelName;
}

String getChannelName() {
return this.channelName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ static class SnowflakeStreamingIngestChannelBuilder<T> {

private ZoneId defaultTimezone;

private boolean dropOnClose;

private SnowflakeStreamingIngestChannelBuilder(String name) {
this.name = name;
}
Expand Down Expand Up @@ -91,6 +93,11 @@ SnowflakeStreamingIngestChannelBuilder<T> setOwningClient(
return this;
}

SnowflakeStreamingIngestChannelBuilder<T> setDropOnClose(boolean dropOnClose) {
this.dropOnClose = dropOnClose;
return this;
}

SnowflakeStreamingIngestChannelInternal<T> build() {
Utils.assertStringNotNullOrEmpty("channel name", this.name);
Utils.assertStringNotNullOrEmpty("table name", this.tableName);
Expand All @@ -116,7 +123,8 @@ SnowflakeStreamingIngestChannelInternal<T> build() {
this.encryptionKeyId,
this.onErrorOption,
this.defaultTimezone,
this.owningClient.getParameterProvider().getBlobFormatVersion());
this.owningClient.getParameterProvider().getBlobFormatVersion(),
this.dropOnClose);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
Expand All @@ -42,6 +43,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn

// Reference to the row buffer
private final RowBuffer<T> rowBuffer;
private final boolean dropOnClose;

// Indicates whether the channel is closed
private volatile boolean isClosed;
Expand Down Expand Up @@ -79,7 +81,8 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneOffset defaultTimezone) {
ZoneOffset defaultTimezone,
boolean dropOnClose) {
this(
name,
dbName,
Expand All @@ -93,7 +96,8 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
encryptionKeyId,
onErrorOption,
defaultTimezone,
client.getParameterProvider().getBlobFormatVersion());
client.getParameterProvider().getBlobFormatVersion(),
dropOnClose);
}

/** Default constructor */
Expand All @@ -110,7 +114,8 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
Constants.BdecVersion bdecVersion) {
Constants.BdecVersion bdecVersion,
boolean dropOnClose) {
this.isClosed = false;
this.owningClient = client;
this.channelFlushContext =
Expand All @@ -127,6 +132,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
channelState,
new ClientBufferParameters(owningClient));
this.tableColumns = new HashMap<>();
this.dropOnClose = dropOnClose;
logger.logInfo(
"Channel={} created for table={}",
this.channelFlushContext.getName(),
Expand Down Expand Up @@ -291,6 +297,19 @@ public CompletableFuture<Void> close() {
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
.collect(Collectors.toList()));
}
if (this.dropOnClose) {
this.owningClient.dropChannel(
DropChannelRequest.builder(this.getChannelContext().getName())
.setDBName(this.getDBName())
.setTableName(this.getTableName())
.setSchemaName(this.getSchemaName())
.setClientSequencer(this.getChannelSequencer())
.build());
System.out.println(
"SUCCESSFULLY dropped "
+ this.getChannelContext().getFullyQualifiedName()
+ " channel");
}
});
}

Expand Down
Loading

0 comments on commit 06dc34d

Please sign in to comment.