Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-psaha committed Feb 21, 2024
1 parent 4ff6e1e commit 1d36fe1
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 140 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;
Expand All @@ -20,9 +20,6 @@ public class DropChannelRequest {
// Name of the table that the channel belongs to
private final String tableName;

// Optional client sequencer to verify when dropping the channel.
private final Long clientSequencer;

public static DropChannelRequestBuilder builder(String channelName) {
return new DropChannelRequestBuilder(channelName);
}
Expand All @@ -34,8 +31,6 @@ public static class DropChannelRequestBuilder {
private String schemaName;
private String tableName;

private Long clientSequencer = null;

public DropChannelRequestBuilder(String channelName) {
this.channelName = channelName;
}
Expand All @@ -55,17 +50,12 @@ public DropChannelRequestBuilder setTableName(String tableName) {
return this;
}

public DropChannelRequestBuilder setClientSequencer(Long clientSequencer) {
this.clientSequencer = clientSequencer;
return this;
}

public DropChannelRequest build() {
return new DropChannelRequest(this);
}
}

private DropChannelRequest(DropChannelRequestBuilder builder) {
public DropChannelRequest(DropChannelRequestBuilder builder) {
Utils.assertStringNotNullOrEmpty("channel name", builder.channelName);
Utils.assertStringNotNullOrEmpty("database name", builder.dbName);
Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName);
Expand All @@ -75,7 +65,6 @@ private DropChannelRequest(DropChannelRequestBuilder builder) {
this.dbName = builder.dbName;
this.schemaName = builder.schemaName;
this.tableName = builder.tableName;
this.clientSequencer = builder.clientSequencer;
}

public String getDBName() {
Expand All @@ -97,8 +86,4 @@ public String getChannelName() {
public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
}

public Long getClientSequencer() {
return this.clientSequencer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ public enum OnErrorOption {
private final String offsetToken;
private final boolean isOffsetTokenProvided;

// If true, the channel will be dropped when it is closed after any pending data is fully
// committed.
private final boolean dropOnClose;

public static OpenChannelRequestBuilder builder(String channelName) {
return new OpenChannelRequestBuilder(channelName);
}
Expand All @@ -63,8 +59,6 @@ public static class OpenChannelRequestBuilder {
private String offsetToken;
private boolean isOffsetTokenProvided = false;

private boolean dropOnClose = false;

public OpenChannelRequestBuilder(String channelName) {
this.channelName = channelName;
this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE;
Expand Down Expand Up @@ -101,11 +95,6 @@ public OpenChannelRequestBuilder setOffsetToken(String offsetToken) {
return this;
}

public OpenChannelRequestBuilder setDropOnClose(boolean dropOnClose) {
this.dropOnClose = dropOnClose;
return this;
}

public OpenChannelRequest build() {
return new OpenChannelRequest(this);
}
Expand All @@ -127,7 +116,6 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
this.dropOnClose = builder.dropOnClose;
}

public String getDBName() {
Expand Down Expand Up @@ -165,8 +153,4 @@ public String getOffsetToken() {
public boolean isOffsetTokenProvided() {
return this.isOffsetTokenProvided;
}

public boolean getDropOnClose() {
return this.dropOnClose;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public interface SnowflakeStreamingIngestChannel {
*/
CompletableFuture<Void> close();

/**
* Close the channel, this function will make sure all the data in this channel is committed
*
* @param drop if true, the channel will be dropped after all data is successfully committed.
* @return a completable future which will be completed when the channel is closed
*/
CompletableFuture<Void> close(boolean drop);

/**
* Insert one row into the channel, the row is represented using Map where the key is column name
* and the value is a row of data. The following table summarizes supported value types and their
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable {
/**
* Drop the specified channel on the server using a {@link DropChannelRequest}
*
* <p>Note that {@link DropChannelRequest.DropChannelRequestBuilder#setClientSequencer(Long)}} can
* be used to drop a specific version of the channel and prevent accidentally dropping a channel
* concurrently opened by another client. If it is not specified, this call will blindly drop the
* latest version of the channel and any pending data will be lost. Also see {@link
* OpenChannelRequest.OpenChannelRequestBuilder#setDropOnClose(boolean)} to automatically drop
* channels on close.
* <p>Note that this call will blindly drop the latest version of the channel and any pending data
* will be lost. Also see {@link SnowflakeStreamingIngestChannel#close(boolean)} to drop channels
* on close. That approach will drop the local version of the channel and if the channel has been
* concurrently reopened by another client, that version of the channel won't be affected.
*
* @param request the drop channel request
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.streaming.DropChannelRequest;

/**
* Same as DropChannelRequest but allows specifying a client sequencer to drop a specific version.
*/
class DropChannelVersionRequest extends DropChannelRequest {
private final Long clientSequencer;

public DropChannelVersionRequest(DropChannelRequestBuilder builder, long clientSequencer) {
super(builder);
this.clientSequencer = clientSequencer;
}

Long getClientSequencer() {
return this.clientSequencer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ static class SnowflakeStreamingIngestChannelBuilder<T> {

private ZoneId defaultTimezone;

private boolean dropOnClose;

private SnowflakeStreamingIngestChannelBuilder(String name) {
this.name = name;
}
Expand Down Expand Up @@ -93,11 +91,6 @@ SnowflakeStreamingIngestChannelBuilder<T> setOwningClient(
return this;
}

SnowflakeStreamingIngestChannelBuilder<T> setDropOnClose(boolean dropOnClose) {
this.dropOnClose = dropOnClose;
return this;
}

SnowflakeStreamingIngestChannelInternal<T> build() {
Utils.assertStringNotNullOrEmpty("channel name", this.name);
Utils.assertStringNotNullOrEmpty("table name", this.tableName);
Expand All @@ -123,8 +116,7 @@ SnowflakeStreamingIngestChannelInternal<T> build() {
this.encryptionKeyId,
this.onErrorOption,
this.defaultTimezone,
this.owningClient.getParameterProvider().getBlobFormatVersion(),
this.dropOnClose);
this.owningClient.getParameterProvider().getBlobFormatVersion());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn

// Reference to the row buffer
private final RowBuffer<T> rowBuffer;
private final boolean dropOnClose;

// Indicates whether the channel is closed
private volatile boolean isClosed;
Expand Down Expand Up @@ -82,8 +81,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneOffset defaultTimezone,
boolean dropOnClose) {
ZoneOffset defaultTimezone) {
this(
name,
dbName,
Expand All @@ -97,8 +95,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
encryptionKeyId,
onErrorOption,
defaultTimezone,
client.getParameterProvider().getBlobFormatVersion(),
dropOnClose);
client.getParameterProvider().getBlobFormatVersion());
}

/** Default constructor */
Expand All @@ -115,8 +112,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
Constants.BdecVersion bdecVersion,
boolean dropOnClose) {
Constants.BdecVersion bdecVersion) {
this.isClosed = false;
this.owningClient = client;
this.channelFlushContext =
Expand All @@ -133,7 +129,6 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
channelState,
new ClientBufferParameters(owningClient));
this.tableColumns = new HashMap<>();
this.dropOnClose = dropOnClose;
logger.logInfo(
"Channel={} created for table={}",
this.channelFlushContext.getName(),
Expand Down Expand Up @@ -273,6 +268,11 @@ CompletableFuture<Void> flush(boolean closing) {
*/
@Override
public CompletableFuture<Void> close() {
return this.close(false);
}

@Override
public CompletableFuture<Void> close(boolean drop) {
checkValidation();

if (isClosed()) {
Expand All @@ -298,18 +298,14 @@ public CompletableFuture<Void> close() {
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
.collect(Collectors.toList()));
}
if (this.dropOnClose) {
this.owningClient.dropChannel(
if (drop) {
DropChannelRequest.DropChannelRequestBuilder builder =
DropChannelRequest.builder(this.getChannelContext().getName())
.setDBName(this.getDBName())
.setTableName(this.getTableName())
.setSchemaName(this.getSchemaName())
.setClientSequencer(this.getChannelSequencer())
.build());
System.out.println(
"SUCCESSFULLY dropped "
+ this.getChannelContext().getFullyQualifiedName()
+ " channel");
.setSchemaName(this.getSchemaName());
this.owningClient.dropChannel(
new DropChannelVersionRequest(builder, this.getChannelSequencer()));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
.setEncryptionKeyId(response.getEncryptionKeyId())
.setOnErrorOption(request.getOnErrorOption())
.setDefaultTimezone(request.getDefaultTimezone())
.setDropOnClose(request.getDropOnClose())
.build();

// Setup the row buffer schema
Expand Down Expand Up @@ -392,8 +391,12 @@ public void dropChannel(DropChannelRequest request) {
payload.put("database", request.getDBName());
payload.put("schema", request.getSchemaName());
payload.put("role", this.role);
if (request.getClientSequencer() != null) {
payload.put("client_sequencer", request.getClientSequencer());
Long clientSequencer = null;
if (request instanceof DropChannelVersionRequest) {
clientSequencer = ((DropChannelVersionRequest) request).getClientSequencer();
if (clientSequencer != null) {
payload.put("client_sequencer", clientSequencer);
}
}

DropChannelResponse response =
Expand Down Expand Up @@ -421,11 +424,11 @@ public void dropChannel(DropChannelRequest request) {
"Drop channel request succeeded, channel={}, table={}, clientSequencer={} client={}",
request.getChannelName(),
request.getFullyQualifiedTableName(),
request.getClientSequencer(),
clientSequencer,
getName());

} catch (IOException | IngestResponseException e) {
throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage());
throw new SFException(e, ErrorCode.DROP_CHANNEL_FAILURE, e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public void setup() {
"key",
1234L,
OpenChannelRequest.OnErrorOption.CONTINUE,
UTC,
false);
UTC);
channel2 =
new SnowflakeStreamingIngestChannelInternal<>(
"channel2",
Expand All @@ -53,8 +52,7 @@ public void setup() {
"key",
1234L,
OpenChannelRequest.OnErrorOption.CONTINUE,
UTC,
false);
UTC);
channel3 =
new SnowflakeStreamingIngestChannelInternal<>(
"channel3",
Expand All @@ -68,8 +66,7 @@ public void setup() {
"key",
1234L,
OpenChannelRequest.OnErrorOption.CONTINUE,
UTC,
false);
UTC);
cache.addChannel(channel1);
cache.addChannel(channel2);
cache.addChannel(channel3);
Expand All @@ -95,8 +92,7 @@ public void testAddChannel() {
"key",
1234L,
OpenChannelRequest.OnErrorOption.CONTINUE,
UTC,
false);
UTC);
cache.addChannel(channel);
Assert.assertEquals(1, cache.getSize());
Assert.assertTrue(channel == cache.iterator().next().getValue().get(channelName));
Expand All @@ -114,8 +110,7 @@ public void testAddChannel() {
"key",
1234L,
OpenChannelRequest.OnErrorOption.CONTINUE,
UTC,
false);
UTC);
cache.addChannel(channelDup);
// The old channel should be invalid now
Assert.assertTrue(!channel.isValid());
Expand Down Expand Up @@ -196,8 +191,7 @@ public void testRemoveChannel() {
"key",
1234L,
OpenChannelRequest.OnErrorOption.CONTINUE,
UTC,
false);
UTC);
cache.removeChannelIfSequencersMatch(channel3Dup);
// Verify that remove the same channel with a different channel sequencer is a no op
Assert.assertEquals(1, cache.getSize());
Expand Down
Loading

0 comments on commit 1d36fe1

Please sign in to comment.