Skip to content

Commit

Permalink
Add remove storage method
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jul 9, 2024
1 parent a95626c commit 58b0776
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -101,4 +101,11 @@ void invalidateChannelIfSequencersMatch(
int getSize() {
return cache.size();
}

/** Get the number of channels for a given table */
int getSizePerTable(String fullyQualifiedTableName) {
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> channelsMapPerTable =
cache.get(fullyQualifiedTableName);
return channelsMapPerTable == null ? 0 : channelsMapPerTable.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize the client / channel configure request. */
class ChannelConfigureRequest implements ConfigureRequest {
@JsonProperty("role")
private String role;

/** Class used to serialize the channel configure request. */
class ChannelConfigureRequest extends ConfigureRequest {
@JsonProperty("database")
private String database;

Expand All @@ -21,10 +17,6 @@ class ChannelConfigureRequest implements ConfigureRequest {
@JsonProperty("table")
private String table;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("file_name")
private String fileName;

/**
* Constructor for channel configure request
*
Expand All @@ -34,17 +26,12 @@ class ChannelConfigureRequest implements ConfigureRequest {
* @param table Table name.
*/
ChannelConfigureRequest(String role, String database, String schema, String table) {
this.role = role;
setRole(role);
this.database = database;
this.schema = schema;
this.table = table;
}

@Override
public String getRole() {
return role;
}

String getDatabase() {
return database;
}
Expand All @@ -57,20 +44,10 @@ String getTable() {
return table;
}

String getFileName() {
return fileName;
}

/** Set the file name for the GCS signed url request. */
@Override
public void setFileName(String fileName) {
this.fileName = fileName;
}

@Override
public String getStringForLogging() {
return String.format(
"ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)",
role, database, schema, table, fileName);
getRole(), database, schema, table, getFileName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,19 @@

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize client configure request */
class ClientConfigureRequest implements ConfigureRequest {
@JsonProperty("role")
private String role;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("file_name")
private String fileName;

class ClientConfigureRequest extends ConfigureRequest {
/**
* Constructor for client configure request
*
* @param role Role to be used for the request.
*/
ClientConfigureRequest(String role) {
this.role = role;
}

@Override
public String getRole() {
return role;
}

String getFileName() {
return fileName;
}

/** Set the file name for the GCS signed url request. */
@Override
public void setFileName(String fileName) {
this.fileName = fileName;
setRole(role);
}

@Override
public String getStringForLogging() {
return String.format("ClientConfigureRequest(role=%s, file_name=%s)", role, fileName);
return String.format("ClientConfigureRequest(role=%s, file_name=%s)", getRole(), getFileName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,35 @@

package net.snowflake.ingest.streaming.internal;

/** Interface for {@link ChannelConfigureRequest} and {@link ClientConfigureRequest} */
interface ConfigureRequest extends StreamingIngestRequest {
String getRole();
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

void setFileName(String fileName);
/** Abstract class for {@link ChannelConfigureRequest} and {@link ClientConfigureRequest} */
abstract class ConfigureRequest implements StreamingIngestRequest {
@JsonProperty("role")
private String role;

// File name for the GCS signed url request
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("file_name")
private String fileName;

String getRole() {
return role;
}

void setRole(String role) {
this.role = role;
}

String getFileName() {
return fileName;
}

void setFileName(String fileName) {
this.fileName = fileName;
}

@Override
public abstract String getStringForLogging();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

class ExternalVolumeLocation {
public final String dbName;
Expand Down Expand Up @@ -73,23 +74,22 @@ class ExternalVolumeManager<T> implements StorageManager<T, ExternalVolumeLocati
}

/**
* Given a channel context, return the target storage by looking up the table name
* Given a fully qualified table name, return the target storage by looking up the table name
*
* @param channelFlushContext the channel flush context containing the table name
* @param fullyQualifiedTableName the target fully qualified table name
* @return target storage
*/
@Override
public StreamingIngestStorage<T, ExternalVolumeLocation> getStorage(
ChannelFlushContext channelFlushContext) {
String fullyQualifiedTableName) {
// Only one chunk per blob in Iceberg mode.
StreamingIngestStorage<T, ExternalVolumeLocation> stage =
this.externalVolumeMap.get(channelFlushContext.getFullyQualifiedTableName());
this.externalVolumeMap.get(fullyQualifiedTableName);

if (stage == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"No storage found for table %s", channelFlushContext.getFullyQualifiedTableName()));
String.format("No external volume found for table %s", fullyQualifiedTableName));
}

return stage;
Expand All @@ -106,7 +106,8 @@ public StreamingIngestStorage<T, ExternalVolumeLocation> getStorage(
@Override
public void addStorage(
String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {
String fullyQualifiedTableName = String.format("%s.%s.%s", dbName, schemaName, tableName);
String fullyQualifiedTableName =
Utils.getFullyQualifiedTableName(dbName, schemaName, tableName);

try {
this.externalVolumeMap.putIfAbsent(
Expand All @@ -122,6 +123,18 @@ public void addStorage(
}
}

/**
* Remove the storage of a target table
*
* @param dbName the database name
* @param schemaName the schema name
* @param tableName the table name
*/
@Override
public void removeStorage(String dbName, String schemaName, String tableName) {
this.externalVolumeMap.remove(Utils.getFullyQualifiedTableName(dbName, schemaName, tableName));
}

/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
* location
Expand All @@ -131,7 +144,7 @@ public void addStorage(
* @return the new location information
*/
@Override
public FileLocationInfo refreshLocation(
public FileLocationInfo getRefreshedLocation(
ExternalVolumeLocation location, Optional<String> fileName) {
try {
ChannelConfigureRequest request =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,13 @@ && shouldStopProcessing(
CompletableFuture.supplyAsync(
() -> {
try {
// Get the channel flush context from the first channel in the blob. This
// only matters when the client is in Iceberg mode. In Iceberg mode, all
// channels in the blob belong to the same table.
ChannelFlushContext channelFlushContext =
blobData.get(0).get(0).getChannelContext();
// Get the fully qualified table name from the first channel in the blob.
// This only matters when the client is in Iceberg mode. In Iceberg mode,
// all channels in the blob belong to the same table.
String fullyQualifiedTableName =
blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName();
BlobMetadata blobMetadata =
buildAndUpload(blobPath, blobData, channelFlushContext);
buildAndUpload(blobPath, blobData, fullyQualifiedTableName);
blobMetadata.getBlobStats().setFlushStartMs(flushStartMs);
return blobMetadata;
} catch (Throwable e) {
Expand Down Expand Up @@ -506,11 +506,12 @@ private boolean shouldStopProcessing(
* @param blobPath Path of the destination blob in cloud storage
* @param blobData All the data for one blob. Assumes that all ChannelData in the inner List
* belongs to the same table. Will error if this is not the case
* @param channelFlushContext the channel flush context
* @param fullyQualifiedTableName the table name of the first channel in the blob, only matters in
* Iceberg mode
* @return BlobMetadata for FlushService.upload
*/
BlobMetadata buildAndUpload(
String blobPath, List<List<ChannelData<T>>> blobData, ChannelFlushContext channelFlushContext)
String blobPath, List<List<ChannelData<T>>> blobData, String fullyQualifiedTableName)
throws IOException, NoSuchAlgorithmException, InvalidAlgorithmParameterException,
NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException,
InvalidKeyException {
Expand All @@ -527,7 +528,7 @@ BlobMetadata buildAndUpload(
blob.blobStats.setBuildDurationMs(buildContext);

return upload(
this.storageManager.getStorage(channelFlushContext),
this.storageManager.getStorage(fullyQualifiedTableName),
blobPath,
blob.blobBytes,
blob.chunksMetadataList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ class InternalStageManager<T> implements StorageManager<T, InternalStageLocation
* Get the storage. In this case, the storage is always the target stage as there's only one stage
* in non-iceberg mode.
*
* @param channelFlushContext this parameter does not affect the method outcome
* @param fullyQualifiedTableName the target fully qualified table name
* @return the target storage
*/
@Override
@SuppressWarnings("unused")
public StreamingIngestStorage<T, InternalStageLocation> getStorage(
ChannelFlushContext channelFlushContext) {
String fullyQualifiedTableName) {
// There's always only one stage for the client in non-iceberg mode
return targetStage;
}
Expand All @@ -109,6 +109,10 @@ public StreamingIngestStorage<T, InternalStageLocation> getStorage(
public void addStorage(
String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {}

/** Remove storage from the manager. Do nothing as there's only one stage in non-Iceberg mode. */
@Override
public void removeStorage(String dbName, String schemaName, String tableName) {}

/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
* location
Expand All @@ -118,7 +122,7 @@ public void addStorage(
* @return the new location information
*/
@Override
public FileLocationInfo refreshLocation(
public FileLocationInfo getRefreshedLocation(
InternalStageLocation location, Optional<String> fileName) {
try {
ClientConfigureRequest request = new ClientConfigureRequest(this.role);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class OpenChannelRequestInternal implements StreamingIngestRequest {

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("offset_token")
String offsetToken;
private String offsetToken;

OpenChannelRequestInternal(
String requestId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,9 +774,16 @@ void setNeedFlush() {
this.flushService.setNeedFlush();
}

/** Remove the channel in the channel cache if the channel sequencer matches */
/**
* Remove the channel in the channel cache if the channel sequencer matches. Update storage
* manager if needed.
*/
void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal<T> channel) {
this.channelCache.removeChannelIfSequencersMatch(channel);
if (this.channelCache.getSizePerTable(channel.getFullyQualifiedName()) == 0) {
this.storageManager.removeStorage(
channel.getDBName(), channel.getSchemaName(), channel.getTableName());
}
}

/** Get whether we're running under test mode */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ interface StorageManager<T, TLocation> {
int DEFAULT_MAX_UPLOAD_RETRIES = 5;

/**
* Given a blob, return the target storage
* Given a fully qualified table name, return the target storage
*
* @param channelFlushContext the blob to upload
* @param fullyQualifiedTableName the target fully qualified table name
* @return target stage
*/
StreamingIngestStorage<T, TLocation> getStorage(ChannelFlushContext channelFlushContext);
StreamingIngestStorage<T, TLocation> getStorage(String fullyQualifiedTableName);

/**
* Add a storage to the manager
Expand All @@ -35,6 +35,15 @@ interface StorageManager<T, TLocation> {
void addStorage(
String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo);

/**
* Remove the storage of a target table
*
* @param dbName the database name
* @param schemaName the schema name
* @param tableName the table name
*/
void removeStorage(String dbName, String schemaName, String tableName);

/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
* location
Expand All @@ -43,7 +52,7 @@ void addStorage(
* @param fileName optional filename for single-file signed URL fetch from server
* @return the new location information
*/
FileLocationInfo refreshLocation(TLocation location, Optional<String> fileName);
FileLocationInfo getRefreshedLocation(TLocation location, Optional<String> fileName);

/**
* Generate a unique blob path and increment the blob sequencer
Expand Down
Loading

0 comments on commit 58b0776

Please sign in to comment.