Skip to content

Commit

Permalink
Allow clients to drop channel on close or blindly (#657)
Browse files Browse the repository at this point in the history
* Add client side support for upcoming drop channel API

* Address review comments

* Add copyright blurb
  • Loading branch information
sfc-gh-psaha authored Feb 21, 2024
1 parent f8c8e3f commit ec5cf9b
Show file tree
Hide file tree
Showing 13 changed files with 443 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum ApiName {
INSERT_REPORT("GET"),
LOAD_HISTORY_SCAN("GET"),
STREAMING_OPEN_CHANNEL("POST"),
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;

import net.snowflake.ingest.utils.Utils;

/** A class that is used to drop a {@link SnowflakeStreamingIngestChannel} */
public class DropChannelRequest {
// Name of the channel
private final String channelName;

// Name of the database that the channel belongs to
private final String dbName;

// Name of the schema that the channel belongs to
private final String schemaName;

// Name of the table that the channel belongs to
private final String tableName;

public static DropChannelRequestBuilder builder(String channelName) {
return new DropChannelRequestBuilder(channelName);
}

/** A builder class to build a DropChannelRequest */
public static class DropChannelRequestBuilder {
private final String channelName;
private String dbName;
private String schemaName;
private String tableName;

public DropChannelRequestBuilder(String channelName) {
this.channelName = channelName;
}

public DropChannelRequestBuilder setDBName(String dbName) {
this.dbName = dbName;
return this;
}

public DropChannelRequestBuilder setSchemaName(String schemaName) {
this.schemaName = schemaName;
return this;
}

public DropChannelRequestBuilder setTableName(String tableName) {
this.tableName = tableName;
return this;
}

public DropChannelRequest build() {
return new DropChannelRequest(this);
}
}

public DropChannelRequest(DropChannelRequestBuilder builder) {
Utils.assertStringNotNullOrEmpty("channel name", builder.channelName);
Utils.assertStringNotNullOrEmpty("database name", builder.dbName);
Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName);
Utils.assertStringNotNullOrEmpty("table name", builder.tableName);

this.channelName = builder.channelName;
this.dbName = builder.dbName;
this.schemaName = builder.schemaName;
this.tableName = builder.tableName;
}

public String getDBName() {
return this.dbName;
}

public String getSchemaName() {
return this.schemaName;
}

public String getTableName() {
return this.tableName;
}

public String getChannelName() {
return this.channelName;
}

public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public interface SnowflakeStreamingIngestChannel {
*/
CompletableFuture<Void> close();

/**
* Close the channel, this function will make sure all the data in this channel is committed
*
* @param drop if true, the channel will be dropped after all data is successfully committed.
* @return a completable future which will be completed when the channel is closed
*/
CompletableFuture<Void> close(boolean drop);

/**
* Insert one row into the channel, the row is represented using Map where the key is column name
* and the value is a row of data. The following table summarizes supported value types and their
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable {
*/
SnowflakeStreamingIngestChannel openChannel(OpenChannelRequest request);

/**
* Drop the specified channel on the server using a {@link DropChannelRequest}
*
* <p>Note that this call will blindly drop the latest version of the channel and any pending data
* will be lost. Also see {@link SnowflakeStreamingIngestChannel#close(boolean)} to drop channels
* on close. That approach will drop the local version of the channel and if the channel has been
* concurrently reopened by another client, that version of the channel won't be affected.
*
* @param request the drop channel request
*/
void dropChannel(DropChannelRequest request);

/**
* Get the client name
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;

/** Response for a {@link net.snowflake.ingest.streaming.DropChannelRequest}. */
class DropChannelResponse extends StreamingIngestResponse {
private Long statusCode;
private String message;
private String dbName;
private String schemaName;
private String tableName;
private String channelName;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

@Override
Long getStatusCode() {
return this.statusCode;
}

@JsonProperty("message")
void setMessage(String message) {
this.message = message;
}

String getMessage() {
return this.message;
}

@JsonProperty("database")
void setDBName(String dbName) {
this.dbName = dbName;
}

String getDBName() {
return this.dbName;
}

@JsonProperty("schema")
void setSchemaName(String schemaName) {
this.schemaName = schemaName;
}

String getSchemaName() {
return this.schemaName;
}

@JsonProperty("table")
void setTableName(String tableName) {
this.tableName = tableName;
}

String getTableName() {
return this.tableName;
}

@JsonProperty("channel")
void setChannelName(String channelName) {
this.channelName = channelName;
}

String getChannelName() {
return this.channelName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.streaming.DropChannelRequest;

/**
* Same as DropChannelRequest but allows specifying a client sequencer to drop a specific version.
*/
class DropChannelVersionRequest extends DropChannelRequest {
private final Long clientSequencer;

public DropChannelVersionRequest(DropChannelRequestBuilder builder, long clientSequencer) {
super(builder);
this.clientSequencer = clientSequencer;
}

Long getClientSequencer() {
return this.clientSequencer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
Expand Down Expand Up @@ -267,6 +268,11 @@ CompletableFuture<Void> flush(boolean closing) {
*/
@Override
public CompletableFuture<Void> close() {
return this.close(false);
}

@Override
public CompletableFuture<Void> close(boolean drop) {
checkValidation();

if (isClosed()) {
Expand All @@ -292,6 +298,15 @@ public CompletableFuture<Void> close() {
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
.collect(Collectors.toList()));
}
if (drop) {
DropChannelRequest.DropChannelRequestBuilder builder =
DropChannelRequest.builder(this.getChannelContext().getName())
.setDBName(this.getDBName())
.setTableName(this.getTableName())
.setSchemaName(this.getSchemaName());
this.owningClient.dropChannel(
new DropChannelVersionRequest(builder, this.getChannelSequencer()));
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB;
import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries;
import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.sleepForRetry;
import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.COMMIT_MAX_RETRY_COUNT;
import static net.snowflake.ingest.utils.Constants.COMMIT_RETRY_INTERVAL_IN_MS;
import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.ENABLE_TELEMETRY_TO_SF;
import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY;
import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT;
Expand Down Expand Up @@ -63,6 +65,7 @@
import net.snowflake.ingest.connection.OAuthCredential;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
Expand Down Expand Up @@ -367,6 +370,68 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
}
}

@Override
public void dropChannel(DropChannelRequest request) {
if (isClosed) {
throw new SFException(ErrorCode.CLOSED_CLIENT);
}

logger.logDebug(
"Drop channel request start, channel={}, table={}, client={}",
request.getChannelName(),
request.getFullyQualifiedTableName(),
getName());

try {
Map<Object, Object> payload = new HashMap<>();
payload.put(
"request_id", this.flushService.getClientPrefix() + "_" + counter.getAndIncrement());
payload.put("channel", request.getChannelName());
payload.put("table", request.getTableName());
payload.put("database", request.getDBName());
payload.put("schema", request.getSchemaName());
payload.put("role", this.role);
Long clientSequencer = null;
if (request instanceof DropChannelVersionRequest) {
clientSequencer = ((DropChannelVersionRequest) request).getClientSequencer();
if (clientSequencer != null) {
payload.put("client_sequencer", clientSequencer);
}
}

DropChannelResponse response =
executeWithRetries(
DropChannelResponse.class,
DROP_CHANNEL_ENDPOINT,
payload,
"drop channel",
STREAMING_DROP_CHANNEL,
httpClient,
requestBuilder);

// Check for Snowflake specific response code
if (response.getStatusCode() != RESPONSE_SUCCESS) {
logger.logDebug(
"Drop channel request failed, channel={}, table={}, client={}, message={}",
request.getChannelName(),
request.getFullyQualifiedTableName(),
getName(),
response.getMessage());
throw new SFException(ErrorCode.DROP_CHANNEL_FAILURE, response.getMessage());
}

logger.logInfo(
"Drop channel request succeeded, channel={}, table={}, clientSequencer={} client={}",
request.getChannelName(),
request.getFullyQualifiedTableName(),
clientSequencer,
getName());

} catch (IOException | IngestResponseException e) {
throw new SFException(e, ErrorCode.DROP_CHANNEL_FAILURE, e.getMessage());
}
}

/**
* Return the latest committed/persisted offset token for all channels
*
Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Constants {
// Channel level constants
public static final String CHANNEL_STATUS_ENDPOINT = "/v1/streaming/channels/status/";
public static final String OPEN_CHANNEL_ENDPOINT = "/v1/streaming/channels/open/";
public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/";
public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/";

public enum WriteMode {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/net/snowflake/ingest/utils/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public enum ErrorCode {
OAUTH_REFRESH_TOKEN_ERROR("0033"),
INVALID_CONFIG_PARAMETER("0034"),
MAX_BATCH_SIZE_EXCEEDED("0035"),
CRYPTO_PROVIDER_ERROR("0036");
CRYPTO_PROVIDER_ERROR("0036"),
DROP_CHANNEL_FAILURE("0037");

public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages";

Expand Down
Loading

0 comments on commit ec5cf9b

Please sign in to comment.