Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow clients to drop channel on close or blindly #657

Merged
merged 3 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum ApiName {
INSERT_REPORT("GET"),
LOAD_HISTORY_SCAN("GET"),
STREAMING_OPEN_CHANNEL("POST"),
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;

import net.snowflake.ingest.utils.Utils;

/** A class that is used to drop a {@link SnowflakeStreamingIngestChannel} */
public class DropChannelRequest {
// Name of the channel
private final String channelName;

// Name of the database that the channel belongs to
private final String dbName;

// Name of the schema that the channel belongs to
private final String schemaName;

// Name of the table that the channel belongs to
private final String tableName;

public static DropChannelRequestBuilder builder(String channelName) {
return new DropChannelRequestBuilder(channelName);
}

/** A builder class to build a DropChannelRequest */
public static class DropChannelRequestBuilder {
private final String channelName;
private String dbName;
private String schemaName;
private String tableName;

public DropChannelRequestBuilder(String channelName) {
this.channelName = channelName;
}

public DropChannelRequestBuilder setDBName(String dbName) {
this.dbName = dbName;
return this;
}

public DropChannelRequestBuilder setSchemaName(String schemaName) {
this.schemaName = schemaName;
return this;
}

public DropChannelRequestBuilder setTableName(String tableName) {
this.tableName = tableName;
return this;
}

public DropChannelRequest build() {
return new DropChannelRequest(this);
}
}

public DropChannelRequest(DropChannelRequestBuilder builder) {
Utils.assertStringNotNullOrEmpty("channel name", builder.channelName);
Utils.assertStringNotNullOrEmpty("database name", builder.dbName);
Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName);
Utils.assertStringNotNullOrEmpty("table name", builder.tableName);

this.channelName = builder.channelName;
this.dbName = builder.dbName;
this.schemaName = builder.schemaName;
this.tableName = builder.tableName;
}

public String getDBName() {
return this.dbName;
}

public String getSchemaName() {
return this.schemaName;
}

public String getTableName() {
return this.tableName;
}

public String getChannelName() {
return this.channelName;
}

public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public interface SnowflakeStreamingIngestChannel {
*/
CompletableFuture<Void> close();

/**
* Close the channel, this function will make sure all the data in this channel is committed
*
* @param drop if true, the channel will be dropped after all data is successfully committed.
* @return a completable future which will be completed when the channel is closed
*/
CompletableFuture<Void> close(boolean drop);

/**
* Insert one row into the channel, the row is represented using Map where the key is column name
* and the value is a row of data. The following table summarizes supported value types and their
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable {
*/
SnowflakeStreamingIngestChannel openChannel(OpenChannelRequest request);

/**
* Drop the specified channel on the server using a {@link DropChannelRequest}
*
* <p>Note that this call will blindly drop the latest version of the channel and any pending data
* will be lost. Also see {@link SnowflakeStreamingIngestChannel#close(boolean)} to drop channels
* on close. That approach will drop the local version of the channel and if the channel has been
* concurrently reopened by another client, that version of the channel won't be affected.
*
* @param request the drop channel request
*/
void dropChannel(DropChannelRequest request);
sfc-gh-psaha marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get the client name
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;
sfc-gh-psaha marked this conversation as resolved.
Show resolved Hide resolved

/** Response for a {@link net.snowflake.ingest.streaming.DropChannelRequest}. */
class DropChannelResponse extends StreamingIngestResponse {
private Long statusCode;
private String message;
private String dbName;
private String schemaName;
private String tableName;
private String channelName;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

@Override
Long getStatusCode() {
return this.statusCode;
}

@JsonProperty("message")
void setMessage(String message) {
this.message = message;
}

String getMessage() {
return this.message;
}

@JsonProperty("database")
void setDBName(String dbName) {
this.dbName = dbName;
}

String getDBName() {
return this.dbName;
}

@JsonProperty("schema")
void setSchemaName(String schemaName) {
this.schemaName = schemaName;
}

String getSchemaName() {
return this.schemaName;
}

@JsonProperty("table")
void setTableName(String tableName) {
this.tableName = tableName;
}

String getTableName() {
return this.tableName;
}

@JsonProperty("channel")
void setChannelName(String channelName) {
this.channelName = channelName;
}

String getChannelName() {
return this.channelName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.streaming.DropChannelRequest;

/**
* Same as DropChannelRequest but allows specifying a client sequencer to drop a specific version.
*/
class DropChannelVersionRequest extends DropChannelRequest {
private final Long clientSequencer;

public DropChannelVersionRequest(DropChannelRequestBuilder builder, long clientSequencer) {
super(builder);
this.clientSequencer = clientSequencer;
}

Long getClientSequencer() {
return this.clientSequencer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
Expand Down Expand Up @@ -267,6 +268,11 @@ CompletableFuture<Void> flush(boolean closing) {
*/
@Override
public CompletableFuture<Void> close() {
return this.close(false);
}

@Override
public CompletableFuture<Void> close(boolean drop) {
checkValidation();

if (isClosed()) {
Expand All @@ -292,6 +298,15 @@ public CompletableFuture<Void> close() {
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
.collect(Collectors.toList()));
}
if (drop) {
DropChannelRequest.DropChannelRequestBuilder builder =
DropChannelRequest.builder(this.getChannelContext().getName())
.setDBName(this.getDBName())
.setTableName(this.getTableName())
.setSchemaName(this.getSchemaName());
this.owningClient.dropChannel(
new DropChannelVersionRequest(builder, this.getChannelSequencer()));
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB;
import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries;
import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.sleepForRetry;
import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.COMMIT_MAX_RETRY_COUNT;
import static net.snowflake.ingest.utils.Constants.COMMIT_RETRY_INTERVAL_IN_MS;
import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.ENABLE_TELEMETRY_TO_SF;
import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY;
import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT;
Expand Down Expand Up @@ -63,6 +65,7 @@
import net.snowflake.ingest.connection.OAuthCredential;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
Expand Down Expand Up @@ -367,6 +370,68 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
}
}

@Override
public void dropChannel(DropChannelRequest request) {
if (isClosed) {
throw new SFException(ErrorCode.CLOSED_CLIENT);
}

logger.logDebug(
"Drop channel request start, channel={}, table={}, client={}",
request.getChannelName(),
request.getFullyQualifiedTableName(),
getName());

try {
Map<Object, Object> payload = new HashMap<>();
payload.put(
"request_id", this.flushService.getClientPrefix() + "_" + counter.getAndIncrement());
payload.put("channel", request.getChannelName());
payload.put("table", request.getTableName());
payload.put("database", request.getDBName());
payload.put("schema", request.getSchemaName());
payload.put("role", this.role);
Long clientSequencer = null;
if (request instanceof DropChannelVersionRequest) {
clientSequencer = ((DropChannelVersionRequest) request).getClientSequencer();
if (clientSequencer != null) {
payload.put("client_sequencer", clientSequencer);
}
}

DropChannelResponse response =
executeWithRetries(
DropChannelResponse.class,
DROP_CHANNEL_ENDPOINT,
payload,
"drop channel",
STREAMING_DROP_CHANNEL,
httpClient,
requestBuilder);

// Check for Snowflake specific response code
if (response.getStatusCode() != RESPONSE_SUCCESS) {
logger.logDebug(
"Drop channel request failed, channel={}, table={}, client={}, message={}",
request.getChannelName(),
request.getFullyQualifiedTableName(),
getName(),
response.getMessage());
throw new SFException(ErrorCode.DROP_CHANNEL_FAILURE, response.getMessage());
}

logger.logInfo(
"Drop channel request succeeded, channel={}, table={}, clientSequencer={} client={}",
request.getChannelName(),
request.getFullyQualifiedTableName(),
clientSequencer,
getName());

} catch (IOException | IngestResponseException e) {
throw new SFException(e, ErrorCode.DROP_CHANNEL_FAILURE, e.getMessage());
}
}

/**
* Return the latest committed/persisted offset token for all channels
*
Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Constants {
// Channel level constants
public static final String CHANNEL_STATUS_ENDPOINT = "/v1/streaming/channels/status/";
public static final String OPEN_CHANNEL_ENDPOINT = "/v1/streaming/channels/open/";
public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/";
public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/";

public enum WriteMode {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/net/snowflake/ingest/utils/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public enum ErrorCode {
OAUTH_REFRESH_TOKEN_ERROR("0033"),
INVALID_CONFIG_PARAMETER("0034"),
MAX_BATCH_SIZE_EXCEEDED("0035"),
CRYPTO_PROVIDER_ERROR("0036");
CRYPTO_PROVIDER_ERROR("0036"),
DROP_CHANNEL_FAILURE("0037");

public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages";

Expand Down
Loading
Loading