Skip to content

Commit

Permalink
SNOW-1497358 Support multiple stage for new table format (#812)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang authored Aug 30, 2024
1 parent 66501f9 commit b9d31b6
Show file tree
Hide file tree
Showing 12 changed files with 601 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012-2017 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.connection;
Expand Down Expand Up @@ -42,7 +42,8 @@ public enum ApiName {
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_CHANNEL_CONFIGURE("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize the channel configure request. */
class ChannelConfigureRequest extends ClientConfigureRequest {
@JsonProperty("database")
private String database;

@JsonProperty("schema")
private String schema;

@JsonProperty("table")
private String table;

/**
* Constructor for channel configure request
*
* @param role Role to be used for the request.
* @param database Database name.
* @param schema Schema name.
* @param table Table name.
*/
ChannelConfigureRequest(String role, String database, String schema, String table) {
super(role);
this.database = database;
this.schema = schema;
this.table = table;
}

String getDatabase() {
return database;
}

String getSchema() {
return schema;
}

String getTable() {
return table;
}

@Override
public String getStringForLogging() {
return String.format(
"ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)",
getRole(), database, schema, table, getFileName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to deserialize responses from channel configure endpoint */
@JsonIgnoreProperties(ignoreUnknown = true)
class ChannelConfigureResponse extends StreamingIngestResponse {
@JsonProperty("status_code")
private Long statusCode;

@JsonProperty("message")
private String message;

@JsonProperty("stage_location")
private FileLocationInfo stageLocation;

@Override
Long getStatusCode() {
return statusCode;
}

void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

String getMessage() {
return message;
}

void setMessage(String message) {
this.message = message;
}

FileLocationInfo getStageLocation() {
return stageLocation;
}

void setStageLocation(FileLocationInfo stageLocation) {
this.stageLocation = stageLocation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import java.io.IOException;
import java.util.Calendar;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

class ExternalVolumeLocation {
public final String dbName;
public final String schemaName;
public final String tableName;

public ExternalVolumeLocation(String dbName, String schemaName, String tableName) {
this.dbName = dbName;
this.schemaName = schemaName;
this.tableName = tableName;
}
}

/** Class to manage multiple external volumes */
class ExternalVolumeManager<T> implements IStorageManager<T, ExternalVolumeLocation> {
// Reference to the external volume per table
private final Map<String, StreamingIngestStorage<T, ExternalVolumeLocation>> externalVolumeMap;

// name of the owning client
private final String clientName;

// role of the owning client
private final String role;

// Reference to the Snowflake service client used for configure calls
private final SnowflakeServiceClient snowflakeServiceClient;

// Client prefix generated by the Snowflake server
private final String clientPrefix;

/**
* Constructor for ExternalVolumeManager
*
* @param isTestMode whether the manager in test mode
* @param role the role of the client
* @param clientName the name of the client
* @param snowflakeServiceClient the Snowflake service client used for configure calls
*/
ExternalVolumeManager(
boolean isTestMode,
String role,
String clientName,
SnowflakeServiceClient snowflakeServiceClient) {
this.role = role;
this.clientName = clientName;
this.snowflakeServiceClient = snowflakeServiceClient;
this.externalVolumeMap = new ConcurrentHashMap<>();
try {
this.clientPrefix =
isTestMode
? "testPrefix"
: this.snowflakeServiceClient
.clientConfigure(new ClientConfigureRequest(role))
.getClientPrefix();
} catch (IngestResponseException | IOException e) {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
}
}

/**
* Given a fully qualified table name, return the target storage by looking up the table name
*
* @param fullyQualifiedTableName the target fully qualified table name
* @return target storage
*/
@Override
public StreamingIngestStorage<T, ExternalVolumeLocation> getStorage(
String fullyQualifiedTableName) {
// Only one chunk per blob in Iceberg mode.
StreamingIngestStorage<T, ExternalVolumeLocation> stage =
this.externalVolumeMap.get(fullyQualifiedTableName);

if (stage == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("No external volume found for table %s", fullyQualifiedTableName));
}

return stage;
}

/**
* Add a storage to the manager by looking up the table name from the open channel response
*
* @param dbName the database name
* @param schemaName the schema name
* @param tableName the table name
* @param fileLocationInfo response from open channel
*/
@Override
public void addStorage(
String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {
String fullyQualifiedTableName =
Utils.getFullyQualifiedTableName(dbName, schemaName, tableName);

try {
this.externalVolumeMap.put(
fullyQualifiedTableName,
new StreamingIngestStorage<T, ExternalVolumeLocation>(
this,
this.clientName,
fileLocationInfo,
new ExternalVolumeLocation(dbName, schemaName, tableName),
DEFAULT_MAX_UPLOAD_RETRIES));
} catch (SnowflakeSQLException | IOException err) {
throw new SFException(
err,
ErrorCode.UNABLE_TO_CONNECT_TO_STAGE,
String.format("fullyQualifiedTableName=%s", fullyQualifiedTableName));
}
}

/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
* location
*
* @param location A reference to the target location
* @param fileName optional filename for single-file signed URL fetch from server
* @return the new location information
*/
@Override
public FileLocationInfo getRefreshedLocation(
ExternalVolumeLocation location, Optional<String> fileName) {
try {
ChannelConfigureRequest request =
new ChannelConfigureRequest(
this.role, location.dbName, location.schemaName, location.tableName);
fileName.ifPresent(request::setFileName);
ChannelConfigureResponse response = this.snowflakeServiceClient.channelConfigure(request);
return response.getStageLocation();
} catch (IngestResponseException | IOException e) {
throw new SFException(e, ErrorCode.CHANNEL_CONFIGURE_FAILURE, e.getMessage());
}
}

// TODO: SNOW-1502887 Blob path generation for external volume
@Override
public String generateBlobPath() {
return "snow_dummy_file_name.parquet";
}

// TODO: SNOW-1502887 Blob path generation for iceberg table
@Override
public void decrementBlobSequencer() {}

// TODO: SNOW-1502887 Blob path generation for iceberg table
public String getBlobPath(Calendar calendar, String clientPrefix) {
return "";
}

/**
* Get the client prefix from first external volume in the map
*
* @return the client prefix
*/
@Override
public String getClientPrefix() {
return this.clientPrefix;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -21,6 +21,7 @@ class OpenChannelResponse extends StreamingIngestResponse {
private List<ColumnMetadata> tableColumns;
private String encryptionKey;
private Long encryptionKeyId;
private FileLocationInfo externalVolumeLocation;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
Expand Down Expand Up @@ -130,4 +131,13 @@ void setEncryptionKeyId(Long encryptionKeyId) {
Long getEncryptionKeyId() {
return this.encryptionKeyId;
}

@JsonProperty("iceberg_location")
void setExternalVolumeLocation(FileLocationInfo externalVolumeLocation) {
this.externalVolumeLocation = externalVolumeLocation;
}

FileLocationInfo getExternalVolumeLocation() {
return this.externalVolumeLocation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_CONFIGURE;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB;
import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries;
import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT;
Expand Down Expand Up @@ -74,6 +76,32 @@ ClientConfigureResponse clientConfigure(ClientConfigureRequest request)
return response;
}

/**
* Configures a channel's storage info given a {@link ChannelConfigureRequest}.
*
* @param request the channel configuration request
* @return the response from the configuration request
*/
ChannelConfigureResponse channelConfigure(ChannelConfigureRequest request)
throws IngestResponseException, IOException {
ChannelConfigureResponse response =
executeApiRequestWithRetries(
ChannelConfigureResponse.class,
request,
CHANNEL_CONFIGURE_ENDPOINT,
"channel configure",
STREAMING_CHANNEL_CONFIGURE);

if (response.getStatusCode() != RESPONSE_SUCCESS) {
logger.logDebug(
"Channel configure request failed, request={}, response={}",
request.getStringForLogging(),
response.getMessage());
throw new SFException(ErrorCode.CHANNEL_CONFIGURE_FAILURE, response.getMessage());
}
return response;
}

/**
* Opens a channel given a {@link OpenChannelRequestInternal}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,11 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder);

this.storageManager =
new InternalStageManager<T>(isTestMode, this.role, this.name, this.snowflakeServiceClient);
isIcebergMode
? new ExternalVolumeManager<>(
isTestMode, this.role, this.name, this.snowflakeServiceClient)
: new InternalStageManager<>(
isTestMode, this.role, this.name, this.snowflakeServiceClient);

try {
this.flushService =
Expand Down Expand Up @@ -375,6 +379,11 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest

// Add channel to the channel cache
this.channelCache.addChannel(channel);
this.storageManager.addStorage(
response.getDBName(),
response.getSchemaName(),
response.getTableName(),
response.getExternalVolumeLocation());

return channel;
} catch (IOException | IngestResponseException e) {
Expand Down
Loading

0 comments on commit b9d31b6

Please sign in to comment.