Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-psaha committed Jan 3, 2024
1 parent 632f6a9 commit 609795d
Show file tree
Hide file tree
Showing 16 changed files with 144 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ public RequestBuilder(
SecurityManager securityManager,
CloseableHttpClient httpClient,
String clientName) {
this(accountName,
this(
accountName,
userName,
credential,
schemeName,
Expand Down Expand Up @@ -648,7 +649,7 @@ private static void addUserAgent(HttpUriRequest request, String userAgentSuffix)
public void addToken(HttpUriRequest request) {
request.setHeader(HttpHeaders.AUTHORIZATION, BEARER_PARAMETER + securityManager.getToken());
request.setHeader(SF_HEADER_AUTHORIZATION_TOKEN_TYPE, this.securityManager.getTokenType());
if(addAccountNameInRequest) {
if (addAccountNameInRequest) {
request.setHeader(SF_HEADER_ACCOUNT_NAME, accountName);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;
Expand All @@ -20,9 +20,6 @@ public class DropChannelRequest {
// Name of the table that the channel belongs to
private final String tableName;

// Optional client sequencer to verify when dropping the channel.
private final Long clientSequencer;

public static DropChannelRequestBuilder builder(String channelName) {
return new DropChannelRequestBuilder(channelName);
}
Expand All @@ -34,8 +31,6 @@ public static class DropChannelRequestBuilder {
private String schemaName;
private String tableName;

private Long clientSequencer = null;

public DropChannelRequestBuilder(String channelName) {
this.channelName = channelName;
}
Expand All @@ -55,17 +50,12 @@ public DropChannelRequestBuilder setTableName(String tableName) {
return this;
}

public DropChannelRequestBuilder setClientSequencer(Long clientSequencer) {
this.clientSequencer = clientSequencer;
return this;
}

public DropChannelRequest build() {
return new DropChannelRequest(this);
}
}

private DropChannelRequest(DropChannelRequestBuilder builder) {
public DropChannelRequest(DropChannelRequestBuilder builder) {
Utils.assertStringNotNullOrEmpty("channel name", builder.channelName);
Utils.assertStringNotNullOrEmpty("database name", builder.dbName);
Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName);
Expand All @@ -75,7 +65,6 @@ private DropChannelRequest(DropChannelRequestBuilder builder) {
this.dbName = builder.dbName;
this.schemaName = builder.schemaName;
this.tableName = builder.tableName;
this.clientSequencer = builder.clientSequencer;
}

public String getDBName() {
Expand All @@ -97,8 +86,4 @@ public String getChannelName() {
public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
}

public Long getClientSequencer() {
return this.clientSequencer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ public enum OnErrorOption {
private final String offsetToken;
private final boolean isOffsetTokenProvided;

// If true, the channel will be dropped when it is closed after any pending data is fully
// committed.
private final boolean dropOnClose;

public static OpenChannelRequestBuilder builder(String channelName) {
return new OpenChannelRequestBuilder(channelName);
}
Expand All @@ -63,8 +59,6 @@ public static class OpenChannelRequestBuilder {
private String offsetToken;
private boolean isOffsetTokenProvided = false;

private boolean dropOnClose = false;

public OpenChannelRequestBuilder(String channelName) {
this.channelName = channelName;
this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE;
Expand Down Expand Up @@ -101,11 +95,6 @@ public OpenChannelRequestBuilder setOffsetToken(String offsetToken) {
return this;
}

public OpenChannelRequestBuilder setDropOnClose(boolean dropOnClose) {
this.dropOnClose = dropOnClose;
return this;
}

public OpenChannelRequest build() {
return new OpenChannelRequest(this);
}
Expand All @@ -127,7 +116,6 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
this.dropOnClose = builder.dropOnClose;
}

public String getDBName() {
Expand Down Expand Up @@ -165,8 +153,4 @@ public String getOffsetToken() {
public boolean isOffsetTokenProvided() {
return this.isOffsetTokenProvided;
}

public boolean getDropOnClose() {
return this.dropOnClose;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public interface SnowflakeStreamingIngestChannel {
*/
CompletableFuture<Void> close();

/**
* Close the channel, this function will make sure all the data in this channel is committed
*
* @param drop if true, the channel will be dropped after all data is successfully committed.
* @return a completable future which will be completed when the channel is closed
*/
CompletableFuture<Void> close(boolean drop);

/**
* Insert one row into the channel, the row is represented using Map where the key is column name
* and the value is a row of data. The following table summarizes supported value types and their
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable {
/**
* Drop the specified channel on the server using a {@link DropChannelRequest}
*
* <p>Note that {@link DropChannelRequest.DropChannelRequestBuilder#setClientSequencer(Long)}} can
* be used to drop a specific version of the channel and prevent accidentally dropping a channel
* concurrently opened by another client. If it is not specified, this call will blindly drop the
* latest version of the channel and any pending data will be lost. Also see {@link
* OpenChannelRequest.OpenChannelRequestBuilder#setDropOnClose(boolean)} to automatically drop
* channels on close.
* <p>Note that this call will blindly drop the latest version of the channel and any pending data
* will be lost. Also see {@link SnowflakeStreamingIngestChannel#close(boolean)} to drop channels
* on close. That approach will drop the local version of the channel and if the channel has been
* concurrently reopened by another client, that version of the channel won't be affected.
*
* @param request the drop channel request
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import java.util.Map;
import java.util.Properties;

import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.SnowflakeURL;
Expand Down Expand Up @@ -71,10 +70,10 @@ public SnowflakeStreamingIngestClient build() {

if (addAccountNameInRequest) {
return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, addAccountNameInRequest);
this.name, accountURL, prop, this.parameterOverrides, addAccountNameInRequest);
}
return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides);
this.name, accountURL, prop, this.parameterOverrides);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.streaming.DropChannelRequest;

/**
* Same as DropChannelRequest but allows specifying a client sequencer to drop a specific version.
*/
class DropChannelVersionRequest extends DropChannelRequest {
private final Long clientSequencer;

public DropChannelVersionRequest(DropChannelRequestBuilder builder, long clientSequencer) {
super(builder);
this.clientSequencer = clientSequencer;
}

Long getClientSequencer() {
return this.clientSequencer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ static class SnowflakeStreamingIngestChannelBuilder<T> {

private ZoneId defaultTimezone;

private boolean dropOnClose;

private SnowflakeStreamingIngestChannelBuilder(String name) {
this.name = name;
}
Expand Down Expand Up @@ -93,11 +91,6 @@ SnowflakeStreamingIngestChannelBuilder<T> setOwningClient(
return this;
}

SnowflakeStreamingIngestChannelBuilder<T> setDropOnClose(boolean dropOnClose) {
this.dropOnClose = dropOnClose;
return this;
}

SnowflakeStreamingIngestChannelInternal<T> build() {
Utils.assertStringNotNullOrEmpty("channel name", this.name);
Utils.assertStringNotNullOrEmpty("table name", this.tableName);
Expand All @@ -123,8 +116,7 @@ SnowflakeStreamingIngestChannelInternal<T> build() {
this.encryptionKeyId,
this.onErrorOption,
this.defaultTimezone,
this.owningClient.getParameterProvider().getBlobFormatVersion(),
this.dropOnClose);
this.owningClient.getParameterProvider().getBlobFormatVersion());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn

// Reference to the row buffer
private final RowBuffer<T> rowBuffer;
private final boolean dropOnClose;

// Indicates whether the channel is closed
private volatile boolean isClosed;
Expand Down Expand Up @@ -81,8 +80,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneOffset defaultTimezone,
boolean dropOnClose) {
ZoneOffset defaultTimezone) {
this(
name,
dbName,
Expand All @@ -96,8 +94,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
encryptionKeyId,
onErrorOption,
defaultTimezone,
client.getParameterProvider().getBlobFormatVersion(),
dropOnClose);
client.getParameterProvider().getBlobFormatVersion());
}

/** Default constructor */
Expand All @@ -114,8 +111,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
Constants.BdecVersion bdecVersion,
boolean dropOnClose) {
Constants.BdecVersion bdecVersion) {
this.isClosed = false;
this.owningClient = client;
this.channelFlushContext =
Expand All @@ -132,7 +128,6 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
channelState,
new ClientBufferParameters(owningClient));
this.tableColumns = new HashMap<>();
this.dropOnClose = dropOnClose;
logger.logInfo(
"Channel={} created for table={}",
this.channelFlushContext.getName(),
Expand Down Expand Up @@ -272,6 +267,11 @@ CompletableFuture<Void> flush(boolean closing) {
*/
@Override
public CompletableFuture<Void> close() {
return this.close(false);
}

@Override
public CompletableFuture<Void> close(boolean drop) {
checkValidation();

if (isClosed()) {
Expand All @@ -297,18 +297,14 @@ public CompletableFuture<Void> close() {
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
.collect(Collectors.toList()));
}
if (this.dropOnClose) {
this.owningClient.dropChannel(
if (drop) {
DropChannelRequest.DropChannelRequestBuilder builder =
DropChannelRequest.builder(this.getChannelContext().getName())
.setDBName(this.getDBName())
.setTableName(this.getTableName())
.setSchemaName(this.getSchemaName())
.setClientSequencer(this.getChannelSequencer())
.build());
System.out.println(
"SUCCESSFULLY dropped "
+ this.getChannelContext().getFullyQualifiedName()
+ " channel");
.setSchemaName(this.getSchemaName());
this.owningClient.dropChannel(
new DropChannelVersionRequest(builder, this.getChannelSequencer()));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
prop.getProperty(Constants.OAUTH_CLIENT_SECRET),
prop.getProperty(Constants.OAUTH_REFRESH_TOKEN));
}
this.requestBuilder = new RequestBuilder(
this.requestBuilder =
new RequestBuilder(
accountURL,
prop.get(USER).toString(),
credential,
Expand Down Expand Up @@ -278,14 +279,7 @@ public SnowflakeStreamingIngestClientInternal(
Properties prop,
Map<String, Object> parameterOverrides,
boolean addAccountNameInRequest) {
this(name,
accountURL,
prop,
null,
false,
null,
parameterOverrides,
addAccountNameInRequest);
this(name, accountURL, prop, null, false, null, parameterOverrides, addAccountNameInRequest);
}

/**
Expand Down Expand Up @@ -404,7 +398,6 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
.setEncryptionKeyId(response.getEncryptionKeyId())
.setOnErrorOption(request.getOnErrorOption())
.setDefaultTimezone(request.getDefaultTimezone())
.setDropOnClose(request.getDropOnClose())
.build();

// Setup the row buffer schema
Expand Down Expand Up @@ -440,8 +433,12 @@ public void dropChannel(DropChannelRequest request) {
payload.put("database", request.getDBName());
payload.put("schema", request.getSchemaName());
payload.put("role", this.role);
if (request.getClientSequencer() != null) {
payload.put("client_sequencer", request.getClientSequencer());
Long clientSequencer = null;
if (request instanceof DropChannelVersionRequest) {
clientSequencer = ((DropChannelVersionRequest) request).getClientSequencer();
if (clientSequencer != null) {
payload.put("client_sequencer", clientSequencer);
}
}

DropChannelResponse response =
Expand Down Expand Up @@ -469,11 +466,11 @@ public void dropChannel(DropChannelRequest request) {
"Drop channel request succeeded, channel={}, table={}, clientSequencer={} client={}",
request.getChannelName(),
request.getFullyQualifiedTableName(),
request.getClientSequencer(),
clientSequencer,
getName());

} catch (IOException | IngestResponseException e) {
throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage());
throw new SFException(e, ErrorCode.DROP_CHANNEL_FAILURE, e.getMessage());
}
}

Expand Down
Loading

0 comments on commit 609795d

Please sign in to comment.