diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 989be0fa1..86175613b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -101,4 +101,11 @@ void invalidateChannelIfSequencersMatch( int getSize() { return cache.size(); } + + /** Get the number of channels for a given table */ + int getSizePerTable(String fullyQualifiedTableName) { + ConcurrentHashMap> channelsMapPerTable = + cache.get(fullyQualifiedTableName); + return channelsMapPerTable == null ? 0 : channelsMapPerTable.size(); + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java index 7832ae44b..31bca95ac 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java @@ -4,14 +4,10 @@ package net.snowflake.ingest.streaming.internal; -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; -/** Class used to serialize the client / channel configure request. */ -class ChannelConfigureRequest implements ConfigureRequest { - @JsonProperty("role") - private String role; - +/** Class used to serialize the channel configure request. */ +class ChannelConfigureRequest extends ConfigureRequest { @JsonProperty("database") private String database; @@ -21,10 +17,6 @@ class ChannelConfigureRequest implements ConfigureRequest { @JsonProperty("table") private String table; - @JsonInclude(JsonInclude.Include.NON_NULL) - @JsonProperty("file_name") - private String fileName; - /** * Constructor for channel configure request * @@ -34,17 +26,12 @@ class ChannelConfigureRequest implements ConfigureRequest { * @param table Table name. */ ChannelConfigureRequest(String role, String database, String schema, String table) { - this.role = role; + setRole(role); this.database = database; this.schema = schema; this.table = table; } - @Override - public String getRole() { - return role; - } - String getDatabase() { return database; } @@ -57,20 +44,10 @@ String getTable() { return table; } - String getFileName() { - return fileName; - } - - /** Set the file name for the GCS signed url request. */ - @Override - public void setFileName(String fileName) { - this.fileName = fileName; - } - @Override public String getStringForLogging() { return String.format( "ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)", - role, database, schema, table, fileName); + getRole(), database, schema, table, getFileName()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java index 17626967f..ca6d72ab3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java @@ -4,44 +4,19 @@ package net.snowflake.ingest.streaming.internal; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; - /** Class used to serialize client configure request */ -class ClientConfigureRequest implements ConfigureRequest { - @JsonProperty("role") - private String role; - - @JsonInclude(JsonInclude.Include.NON_NULL) - @JsonProperty("file_name") - private String fileName; - +class ClientConfigureRequest extends ConfigureRequest { /** * Constructor for client configure request * * @param role Role to be used for the request. */ ClientConfigureRequest(String role) { - this.role = role; - } - - @Override - public String getRole() { - return role; - } - - String getFileName() { - return fileName; - } - - /** Set the file name for the GCS signed url request. */ - @Override - public void setFileName(String fileName) { - this.fileName = fileName; + setRole(role); } @Override public String getStringForLogging() { - return String.format("ClientConfigureRequest(role=%s, file_name=%s)", role, fileName); + return String.format("ClientConfigureRequest(role=%s, file_name=%s)", getRole(), getFileName()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureRequest.java index e783f0eda..6ca10f52a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureRequest.java @@ -4,9 +4,35 @@ package net.snowflake.ingest.streaming.internal; -/** Interface for {@link ChannelConfigureRequest} and {@link ClientConfigureRequest} */ -interface ConfigureRequest extends StreamingIngestRequest { - String getRole(); +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; - void setFileName(String fileName); +/** Abstract class for {@link ChannelConfigureRequest} and {@link ClientConfigureRequest} */ +abstract class ConfigureRequest implements StreamingIngestRequest { + @JsonProperty("role") + private String role; + + // File name for the GCS signed url request + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("file_name") + private String fileName; + + String getRole() { + return role; + } + + void setRole(String role) { + this.role = role; + } + + String getFileName() { + return fileName; + } + + void setFileName(String fileName) { + this.fileName = fileName; + } + + @Override + public abstract String getStringForLogging(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java index de96bbc16..8dad97d23 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java @@ -13,6 +13,7 @@ import net.snowflake.ingest.connection.IngestResponseException; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; class ExternalVolumeLocation { public final String dbName; @@ -73,23 +74,22 @@ class ExternalVolumeManager implements StorageManager getStorage( - ChannelFlushContext channelFlushContext) { + String fullyQualifiedTableName) { // Only one chunk per blob in Iceberg mode. StreamingIngestStorage stage = - this.externalVolumeMap.get(channelFlushContext.getFullyQualifiedTableName()); + this.externalVolumeMap.get(fullyQualifiedTableName); if (stage == null) { throw new SFException( ErrorCode.INTERNAL_ERROR, - String.format( - "No storage found for table %s", channelFlushContext.getFullyQualifiedTableName())); + String.format("No external volume found for table %s", fullyQualifiedTableName)); } return stage; @@ -106,7 +106,8 @@ public StreamingIngestStorage getStorage( @Override public void addStorage( String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) { - String fullyQualifiedTableName = String.format("%s.%s.%s", dbName, schemaName, tableName); + String fullyQualifiedTableName = + Utils.getFullyQualifiedTableName(dbName, schemaName, tableName); try { this.externalVolumeMap.putIfAbsent( @@ -122,6 +123,18 @@ public void addStorage( } } + /** + * Remove the storage of a target table + * + * @param dbName the database name + * @param schemaName the schema name + * @param tableName the table name + */ + @Override + public void removeStorage(String dbName, String schemaName, String tableName) { + this.externalVolumeMap.remove(Utils.getFullyQualifiedTableName(dbName, schemaName, tableName)); + } + /** * Gets the latest file location info (with a renewed short-lived access token) for the specified * location @@ -131,7 +144,7 @@ public void addStorage( * @return the new location information */ @Override - public FileLocationInfo refreshLocation( + public FileLocationInfo getRefreshedLocation( ExternalVolumeLocation location, Optional fileName) { try { ChannelConfigureRequest request = diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 0e2eb3deb..19b592319 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -417,13 +417,13 @@ && shouldStopProcessing( CompletableFuture.supplyAsync( () -> { try { - // Get the channel flush context from the first channel in the blob. This - // only matters when the client is in Iceberg mode. In Iceberg mode, all - // channels in the blob belong to the same table. - ChannelFlushContext channelFlushContext = - blobData.get(0).get(0).getChannelContext(); + // Get the fully qualified table name from the first channel in the blob. + // This only matters when the client is in Iceberg mode. In Iceberg mode, + // all channels in the blob belong to the same table. + String fullyQualifiedTableName = + blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName(); BlobMetadata blobMetadata = - buildAndUpload(blobPath, blobData, channelFlushContext); + buildAndUpload(blobPath, blobData, fullyQualifiedTableName); blobMetadata.getBlobStats().setFlushStartMs(flushStartMs); return blobMetadata; } catch (Throwable e) { @@ -506,11 +506,12 @@ private boolean shouldStopProcessing( * @param blobPath Path of the destination blob in cloud storage * @param blobData All the data for one blob. Assumes that all ChannelData in the inner List * belongs to the same table. Will error if this is not the case - * @param channelFlushContext the channel flush context + * @param fullyQualifiedTableName the table name of the first channel in the blob, only matters in + * Iceberg mode * @return BlobMetadata for FlushService.upload */ BlobMetadata buildAndUpload( - String blobPath, List>> blobData, ChannelFlushContext channelFlushContext) + String blobPath, List>> blobData, String fullyQualifiedTableName) throws IOException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException { @@ -527,7 +528,7 @@ BlobMetadata buildAndUpload( blob.blobStats.setBuildDurationMs(buildContext); return upload( - this.storageManager.getStorage(channelFlushContext), + this.storageManager.getStorage(fullyQualifiedTableName), blobPath, blob.blobBytes, blob.chunksMetadataList, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java index 31c4a0b39..86425f0d7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -93,13 +93,13 @@ class InternalStageManager implements StorageManager getStorage( - ChannelFlushContext channelFlushContext) { + String fullyQualifiedTableName) { // There's always only one stage for the client in non-iceberg mode return targetStage; } @@ -109,6 +109,10 @@ public StreamingIngestStorage getStorage( public void addStorage( String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {} + /** Remove storage from the manager. Do nothing as there's only one stage in non-Iceberg mode. */ + @Override + public void removeStorage(String dbName, String schemaName, String tableName) {} + /** * Gets the latest file location info (with a renewed short-lived access token) for the specified * location @@ -118,7 +122,7 @@ public void addStorage( * @return the new location information */ @Override - public FileLocationInfo refreshLocation( + public FileLocationInfo getRefreshedLocation( InternalStageLocation location, Optional fileName) { try { ClientConfigureRequest request = new ClientConfigureRequest(this.role); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java index b73f3c151..be98cba6f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java @@ -38,7 +38,7 @@ class OpenChannelRequestInternal implements StreamingIngestRequest { @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty("offset_token") - String offsetToken; + private String offsetToken; OpenChannelRequestInternal( String requestId, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 65b8b070e..99052dd83 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -774,9 +774,16 @@ void setNeedFlush() { this.flushService.setNeedFlush(); } - /** Remove the channel in the channel cache if the channel sequencer matches */ + /** + * Remove the channel in the channel cache if the channel sequencer matches. Update storage + * manager if needed. + */ void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal channel) { this.channelCache.removeChannelIfSequencersMatch(channel); + if (this.channelCache.getSizePerTable(channel.getFullyQualifiedName()) == 0) { + this.storageManager.removeStorage( + channel.getDBName(), channel.getSchemaName(), channel.getTableName()); + } } /** Get whether we're running under test mode */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java index 7d414a66a..eda22671a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java @@ -17,12 +17,12 @@ interface StorageManager { int DEFAULT_MAX_UPLOAD_RETRIES = 5; /** - * Given a blob, return the target storage + * Given a fully qualified table name, return the target storage * - * @param channelFlushContext the blob to upload + * @param fullyQualifiedTableName the target fully qualified table name * @return target stage */ - StreamingIngestStorage getStorage(ChannelFlushContext channelFlushContext); + StreamingIngestStorage getStorage(String fullyQualifiedTableName); /** * Add a storage to the manager @@ -35,6 +35,15 @@ interface StorageManager { void addStorage( String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo); + /** + * Remove the storage of a target table + * + * @param dbName the database name + * @param schemaName the schema name + * @param tableName the table name + */ + void removeStorage(String dbName, String schemaName, String tableName); + /** * Gets the latest file location info (with a renewed short-lived access token) for the specified * location @@ -43,7 +52,7 @@ void addStorage( * @param fileName optional filename for single-file signed URL fetch from server * @return the new location information */ - FileLocationInfo refreshLocation(TLocation location, Optional fileName); + FileLocationInfo getRefreshedLocation(TLocation location, Optional fileName); /** * Generate a unique blob path and increment the blob sequencer diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java index 7e2facc01..eb9f10826 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java @@ -243,7 +243,8 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole return fileTransferMetadataWithAge; } - FileLocationInfo location = this.owningManager.refreshLocation(this.location, Optional.empty()); + FileLocationInfo location = + this.owningManager.getRefreshedLocation(this.location, Optional.empty()); return createFileTransferMetadataWithAge(location); } @@ -305,7 +306,7 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) throws SnowflakeSQLException, IOException { FileLocationInfo location = - this.owningManager.refreshLocation(this.location, Optional.of(fileName)); + this.owningManager.getRefreshedLocation(this.location, Optional.of(fileName)); SnowflakeFileTransferMetadataV1 metadata = (SnowflakeFileTransferMetadataV1) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 08c668e55..c8f04aad1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -128,7 +128,9 @@ ChannelData flushChannel(String name) { BlobMetadata buildAndUpload() throws Exception { List>> blobData = Collections.singletonList(channelData); return flushService.buildAndUpload( - "file_name", blobData, blobData.get(0).get(0).getChannelContext()); + "file_name", + blobData, + blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName()); } abstract SnowflakeStreamingIngestChannelInternal createChannel(