diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java index 3422b4e81..09821dbe9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java @@ -71,7 +71,7 @@ public void setFileName(String fileName) { @Override public String getStringForLogging() { return String.format( - "ChannelConfigureResponse(role=%s, table=%s, file_name=%s)", - role, Utils.getFullyQualifiedTableName(database, schema, table), fileName); + "ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)", + role, database, schema, table, fileName); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java new file mode 100644 index 000000000..d44505daa --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Class used to deserialize responses from channel configure endpoint + */ +@JsonIgnoreProperties(ignoreUnknown = true) +class ChannelConfigureResponse extends StreamingIngestResponse { + @JsonProperty("status_code") + private Long statusCode; + + @JsonProperty("message") + private String message; + + @JsonProperty("stage_location") + private FileLocationInfo stageLocation; + + @Override + Long getStatusCode() { + return statusCode; + } + + void setStatusCode(Long statusCode) { + this.statusCode = statusCode; + } + + String getMessage() { + return message; + } + + void setMessage(String message) { + this.message = message; + } + + FileLocationInfo getStageLocation() { + return stageLocation; + } + + void setStageLocation(FileLocationInfo stageLocation) { + this.stageLocation = stageLocation; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureResponse.java similarity index 95% rename from src/main/java/net/snowflake/ingest/streaming/internal/ConfigureResponse.java rename to src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureResponse.java index 6fa2394a3..03a1d3576 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ConfigureResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureResponse.java @@ -9,7 +9,7 @@ /** Class used to deserialize responses from configure endpoint */ @JsonIgnoreProperties(ignoreUnknown = true) -class ConfigureResponse extends StreamingIngestResponse { +class ClientConfigureResponse extends StreamingIngestResponse { @JsonProperty("prefix") private String prefix; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java index 4daf2c50a..c8faa6777 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java @@ -93,13 +93,16 @@ String getFullyQualifiedTableName() { @Override public String getStringForLogging() { - return String.format( - "DropChannelRequestInternal(requestId=%s, role=%s, channel=%s, isIceberg=%s," + return String.format( + "DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s, isIceberg=%s," + " clientSequencer=%s)", - requestId, - role, - Utils.getFullyQualifiedChannelName(database, schema, table, channel), - isIceberg, - clientSequencer); + requestId, + role, + database, + schema, + table, + channel, + isIceberg, + clientSequencer); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java index 7879db47a..40ba8ed22 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java @@ -4,20 +4,33 @@ package net.snowflake.ingest.streaming.internal; -import java.io.IOException; -import java.util.Calendar; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.ingest.connection.IngestResponseException; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; -/** Class to manage multiple external volumes */ -class ExternalVolumeManager implements StorageManager { +import java.io.IOException; +import java.util.Calendar; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +class ExternalVolumeLocation { + public final String dbName; + public final String schemaName; + public final String tableName; + + public ExternalVolumeLocation(String dbName, String schemaName, String tableName) { + this.dbName = dbName; + this.schemaName = schemaName; + this.tableName = tableName; + } +} + +/** Class to manage multiple external volumes */ +class ExternalVolumeManager implements StorageManager { // Reference to the external volume per table - private final Map> externalVolumeMap; + private final Map> externalVolumeMap; // name of the owning client private final String clientName; @@ -67,9 +80,9 @@ class ExternalVolumeManager implements StorageManager { * @return target storage */ @Override - public StreamingIngestStorage getStorage(ChannelFlushContext channelFlushContext) { + public StreamingIngestStorage getStorage(ChannelFlushContext channelFlushContext) { // Only one chunk per blob in Iceberg mode. - StreamingIngestStorage stage = + StreamingIngestStorage stage = this.externalVolumeMap.get(channelFlushContext.getFullyQualifiedTableName()); if (stage == null) { @@ -98,11 +111,11 @@ public void addStorage( try { this.externalVolumeMap.putIfAbsent( fullyQualifiedTableName, - new StreamingIngestStorage<>( + new StreamingIngestStorage( this, this.clientName, fileLocationInfo, - new ChannelConfigureRequest(this.role, dbName, schemaName, tableName), + new ExternalVolumeLocation(dbName, schemaName, tableName), DEFAULT_MAX_UPLOAD_RETRIES)); } catch (SnowflakeSQLException | IOException err) { throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STORAGE); @@ -110,15 +123,20 @@ public void addStorage( } /** - * Configure method for storage + * Gets the latest file location info (with a renewed short-lived access token) for the specified location * - * @param request the configure request - * @return the configure response + * @param location A reference to the target location + * @param fileName optional filename for single-file signed URL fetch from server + * @return the new location information */ @Override - public ConfigureResponse configure(ConfigureRequest request) { + public FileLocationInfo refreshLocation(ExternalVolumeLocation location, Optional fileName) { try { - return this.snowflakeServiceClient.channelConfigure((ChannelConfigureRequest) request); + ChannelConfigureRequest request = new ChannelConfigureRequest( + this.role, location.dbName, location.schemaName, location.tableName); + fileName.ifPresent(request::setFileName); + ChannelConfigureResponse response = this.snowflakeServiceClient.channelConfigure(request); + return response.getStageLocation(); } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index a87fccfa4..c63d57fde 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -95,7 +95,7 @@ List>> getData() { private final ChannelCache channelCache; // Reference to the Stream Ingest storage manager - private final StorageManager storageManager; + private final StorageManager storageManager; // Reference to register service private final RegisterService registerService; @@ -126,7 +126,7 @@ List>> getData() { FlushService( SnowflakeStreamingIngestClientInternal client, ChannelCache cache, - StorageManager storageManager, + StorageManager storageManager, boolean isTestMode) { this.owningClient = client; this.channelCache = cache; @@ -545,7 +545,7 @@ BlobMetadata buildAndUpload( * @return BlobMetadata object used to create the register blob request */ BlobMetadata upload( - StreamingIngestStorage storage, + StreamingIngestStorage storage, String blobPath, byte[] blob, List metadata, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java index 1acb382f8..868afe29e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -4,24 +4,31 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE; - import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.util.Calendar; -import java.util.TimeZone; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.ingest.connection.IngestResponseException; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; +import java.io.IOException; +import java.util.Calendar; +import java.util.Optional; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE; + +class InternalStageLocation { + public InternalStageLocation() { + } +} + /** Class to manage single Snowflake internal stage */ -class InternalStageManager implements StorageManager { +class InternalStageManager implements StorageManager { // Target stage for the client - private final StreamingIngestStorage targetStage; + private final StreamingIngestStorage targetStage; // Increasing counter to generate a unique blob name per client private final AtomicLong counter; @@ -32,6 +39,9 @@ class InternalStageManager implements StorageManager { // Snowflake service client used for configure calls private final SnowflakeServiceClient snowflakeServiceClient; + // The role of the client + private final String role; + // Client prefix generated by the Snowflake server private final String clientPrefix; @@ -50,23 +60,23 @@ class InternalStageManager implements StorageManager { SnowflakeServiceClient snowflakeServiceClient) { this.snowflakeServiceClient = snowflakeServiceClient; this.isTestMode = isTestMode; + this.role = role; this.counter = new AtomicLong(0); try { if (!isTestMode) { - ClientConfigureRequest request = new ClientConfigureRequest(role); - ConfigureResponse response = this.snowflakeServiceClient.clientConfigure(request); + ClientConfigureResponse response = this.snowflakeServiceClient.clientConfigure(new ClientConfigureRequest(role)); this.clientPrefix = response.getClientPrefix(); this.targetStage = - new StreamingIngestStorage<>( - this, clientName, response.getStageLocation(), request, DEFAULT_MAX_UPLOAD_RETRIES); + new StreamingIngestStorage( + this, clientName, response.getStageLocation(), new InternalStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES); } else { this.clientPrefix = "testPrefix"; this.targetStage = - new StreamingIngestStorage<>( + new StreamingIngestStorage( this, "testClient", (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null, - new ClientConfigureRequest(role), + new InternalStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES); } } catch (IngestResponseException | IOException e) { @@ -85,33 +95,32 @@ class InternalStageManager implements StorageManager { */ @Override @SuppressWarnings("unused") - public StreamingIngestStorage getStorage(ChannelFlushContext channelFlushContext) { + public StreamingIngestStorage getStorage(ChannelFlushContext channelFlushContext) { // There's always only one stage for the client in non-iceberg mode return targetStage; } /** * Add storage to the manager. Do nothing as there's only one stage in non-Iceberg mode. - * - * @param dbName - * @param schemaName - * @param tableName - * @param fileLocationInfo */ @Override public void addStorage( String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {} /** - * Configure method for storage + * Gets the latest file location info (with a renewed short-lived access token) for the specified location * - * @param request the configure request - * @return the configure response + * @param location A reference to the target location + * @param fileName optional filename for single-file signed URL fetch from server + * @return the new location information */ @Override - public ConfigureResponse configure(ConfigureRequest request) { + public FileLocationInfo refreshLocation(InternalStageLocation location, Optional fileName) { try { - return snowflakeServiceClient.clientConfigure((ClientConfigureRequest) request); + ClientConfigureRequest request = new ClientConfigureRequest(this.role); + fileName.ifPresent(request::setFileName); + ClientConfigureResponse response = snowflakeServiceClient.clientConfigure(request); + return response.getStageLocation(); } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java index 8e0e9c76c..30ab2c83a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java @@ -104,13 +104,16 @@ String getFullyQualifiedTableName() { @Override public String getStringForLogging() { return String.format( - "OpenChannelRequestInternal(requestId=%s, role=%s, channel=%s, writeMode=%s, isIceberg=%s," + "OpenChannelRequestInternal(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s, writeMode=%s, isIceberg=%s," + " offsetToken=%s)", - requestId, - role, - Utils.getFullyQualifiedChannelName(database, schema, table, channel), - writeMode, - isIceberg, - offsetToken); + requestId, + role, + database, + schema, + table, + channel, + writeMode, + isIceberg, + offsetToken); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java index 7be2053c2..c79e38cb9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java @@ -58,11 +58,11 @@ class SnowflakeServiceClient { * @param request the client configuration request * @return the response from the configuration request */ - ConfigureResponse clientConfigure(ClientConfigureRequest request) + ClientConfigureResponse clientConfigure(ClientConfigureRequest request) throws IngestResponseException, IOException { - ConfigureResponse response = + ClientConfigureResponse response = executeApiRequestWithRetries( - ConfigureResponse.class, + ClientConfigureResponse.class, request, CLIENT_CONFIGURE_ENDPOINT, "client configure", @@ -83,11 +83,11 @@ ConfigureResponse clientConfigure(ClientConfigureRequest request) * @param request the channel configuration request * @return the response from the configuration request */ - ConfigureResponse channelConfigure(ChannelConfigureRequest request) + ChannelConfigureResponse channelConfigure(ChannelConfigureRequest request) throws IngestResponseException, IOException { - ConfigureResponse response = + ChannelConfigureResponse response = executeApiRequestWithRetries( - ConfigureResponse.class, + ChannelConfigureResponse.class, request, CHANNEL_CONFIGURE_ENDPOINT, "channel configure", diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 6292ee860..3e4d5224d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -113,7 +113,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea private final FlushService flushService; // Reference to storage manager - private final StorageManager storageManager; + private final StorageManager storageManager; // Indicates whether the client has closed private volatile boolean isClosed; @@ -239,9 +239,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.storageManager = isIcebergMode - ? new ExternalVolumeManager<>( + ? new ExternalVolumeManager( isTestMode, this.role, this.name, this.snowflakeServiceClient) - : new InternalStageManager<>( + : new InternalStageManager( isTestMode, this.role, this.name, this.snowflakeServiceClient); try { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java index 3206d0bf0..e6d4ca0c8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java @@ -4,8 +4,15 @@ package net.snowflake.ingest.streaming.internal; -/** Interface to manage {@link StreamingIngestStorage} for {@link FlushService} */ -interface StorageManager { +import java.util.Optional; + +/** + * Interface to manage {@link StreamingIngestStorage} for {@link FlushService} + * + * @param The type of chunk data + * @param the type of location that's being managed (internal stage / external volume) + */ +interface StorageManager { // Default max upload retries for streaming ingest storage int DEFAULT_MAX_UPLOAD_RETRIES = 5; @@ -15,7 +22,7 @@ interface StorageManager { * @param channelFlushContext the blob to upload * @return target stage */ - StreamingIngestStorage getStorage(ChannelFlushContext channelFlushContext); + StreamingIngestStorage getStorage(ChannelFlushContext channelFlushContext); /** * Add a storage to the manager @@ -29,12 +36,14 @@ void addStorage( String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo); /** - * Configure method for storage + * Gets the latest file location info (with a renewed short-lived access token) for the specified location + * + * @param location A reference to the target location + * @param fileName optional filename for single-file signed URL fetch from server * - * @param request the configure request - * @return the configure response + * @return the new location information */ - ConfigureResponse configure(ConfigureRequest request); + FileLocationInfo refreshLocation(TLocation location, Optional fileName); /** * Generate a unique blob path and increment the blob sequencer diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java index 78e5d9efc..0bce3207c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java @@ -33,7 +33,7 @@ import net.snowflake.ingest.utils.Utils; /** Handles uploading files to the Snowflake Streaming Ingest Storage */ -class StreamingIngestStorage { +class StreamingIngestStorage { private static final ObjectMapper mapper = new ObjectMapper(); // Object mapper for parsing the client/configure response to Jackson version the same as @@ -77,8 +77,8 @@ state to record unknown age. } private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; - private final StorageManager owningManager; - private final ConfigureRequest configureRequest; + private final StorageManager owningManager; + private final TLocation location; private final String clientName; private final int maxUploadRetries; @@ -92,21 +92,21 @@ state to record unknown age. * @param owningManager the storage manager owning this storage * @param clientName The client name * @param fileLocationInfo The file location information from open channel response - * @param configureRequest The configure request for configure call + * @param location A reference to the target location * @param maxUploadRetries The maximum number of retries to attempt */ StreamingIngestStorage( - StorageManager owningManager, + StorageManager owningManager, String clientName, FileLocationInfo fileLocationInfo, - ConfigureRequest configureRequest, + TLocation location, int maxUploadRetries) throws SnowflakeSQLException, IOException { this( owningManager, clientName, (SnowflakeFileTransferMetadataWithAge) null, - configureRequest, + location, maxUploadRetries); createFileTransferMetadataWithAge(fileLocationInfo); } @@ -117,21 +117,21 @@ state to record unknown age. * @param owningManager the storage manager owning this storage * @param clientName the client name * @param testMetadata SnowflakeFileTransferMetadataWithAge to test with - * @param configureRequest the configure request for configure call + * @param location A reference to the target location * @param maxUploadRetries the maximum number of retries to attempt */ StreamingIngestStorage( - StorageManager owningManager, + StorageManager owningManager, String clientName, SnowflakeFileTransferMetadataWithAge testMetadata, - ConfigureRequest configureRequest, + TLocation location, int maxUploadRetries) throws SnowflakeSQLException, IOException { this.owningManager = owningManager; this.clientName = clientName; this.maxUploadRetries = maxUploadRetries; this.proxyProperties = generateProxyPropertiesForJDBC(); - this.configureRequest = configureRequest; + this.location = location; this.fileTransferMetadataWithAge = testMetadata; } @@ -243,8 +243,8 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole return fileTransferMetadataWithAge; } - ConfigureResponse response = this.owningManager.configure(this.configureRequest); - return createFileTransferMetadataWithAge(response.getStageLocation()); + FileLocationInfo location = this.owningManager.refreshLocation(this.location, Optional.empty()); + return createFileTransferMetadataWithAge(location); } private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( @@ -288,7 +288,7 @@ private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( * @param response the client/configure response from the server * @return the client prefix. */ - private String createClientPrefix(final ConfigureResponse response) { + private String createClientPrefix(final ClientConfigureResponse response) { final String prefix = response.getPrefix() == null ? "" : response.getPrefix(); final String deploymentId = response.getDeploymentId() != null ? "_" + response.getDeploymentId() : ""; @@ -304,13 +304,12 @@ private String createClientPrefix(final ConfigureResponse response) { SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) throws SnowflakeSQLException, IOException { - this.configureRequest.setFileName(fileName); - ConfigureResponse response = this.owningManager.configure(this.configureRequest); + FileLocationInfo location = this.owningManager.refreshLocation(this.location, Optional.of(fileName)); SnowflakeFileTransferMetadataV1 metadata = (SnowflakeFileTransferMetadataV1) SnowflakeFileTransferAgent.getFileTransferMetadatas( - parseFileLocationInfo(response.getStageLocation())) + parseFileLocationInfo(location)) .get(0); // Transfer agent trims path for fileName metadata.setPresignedUrlFileName(fileName); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 2c189563b..08c668e55 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -93,7 +93,7 @@ private abstract static class TestContext implements AutoCloseable { ChannelCache channelCache; final Map> channels = new HashMap<>(); FlushService flushService; - StorageManager storageManager; + StorageManager storageManager; StreamingIngestStorage storage; ParameterProvider parameterProvider; InternalParameterProvider internalParameterProvider; @@ -412,7 +412,7 @@ private static ColumnMetadata createLargeTestTextColumn(String name) { @Test public void testGetFilePath() { TestContext testContext = testContextFactory.create(); - StorageManager storageManager = testContext.storageManager; + StorageManager storageManager = testContext.storageManager; Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String clientPrefix = "honk"; if (isIcebergMode) { @@ -949,7 +949,7 @@ public void testInvalidateChannels() { innerData.add(channel1Data); innerData.add(channel2Data); - StorageManager storageManager = + StorageManager storageManager = Mockito.spy(new InternalStageManager<>(true, "role", "client", null)); FlushService flushService = new FlushService<>(client, channelCache, storageManager, false); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java index 621cc9ab6..0ef2cff1f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java @@ -119,10 +119,10 @@ public void testPutRemote() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StorageManager storageManager = Mockito.mock(StorageManager.class); + StorageManager storageManager = Mockito.mock(StorageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - StreamingIngestStorage stage = + StreamingIngestStorage stage = new StreamingIngestStorage( storageManager, "clientName", @@ -162,7 +162,7 @@ public void testPutLocal() throws Exception { String fullFilePath = "testOutput"; String fileName = "putLocalOutput"; - StreamingIngestStorage stage = + StreamingIngestStorage stage = Mockito.spy( new StreamingIngestStorage( null, @@ -190,10 +190,10 @@ public void doTestPutRemoteRefreshes() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StorageManager storageManager = Mockito.mock(StorageManager.class); + StorageManager storageManager = Mockito.mock(StorageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - StreamingIngestStorage stage = + StreamingIngestStorage stage = new StreamingIngestStorage( storageManager, "clientName", @@ -245,10 +245,10 @@ public void testPutRemoteGCS() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StorageManager storageManager = Mockito.mock(StorageManager.class); + StorageManager storageManager = Mockito.mock(StorageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - StreamingIngestStorage stage = + StreamingIngestStorage stage = Mockito.spy( new StreamingIngestStorage( storageManager, @@ -283,11 +283,11 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - StorageManager storageManager = + StorageManager storageManager = new InternalStageManager(true, "role", "client", snowflakeServiceClient); ParameterProvider parameterProvider = new ParameterProvider(false); - StreamingIngestStorage stage = + StreamingIngestStorage stage = new StreamingIngestStorage( storageManager, "clientName", @@ -336,7 +336,7 @@ public void testFetchSignedURL() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - StreamingIngestStorage stage = + StreamingIngestStorage stage = new StreamingIngestStorage( storageManager, "clientName", @@ -516,7 +516,7 @@ public void testRefreshMetadataOnFirstPutException() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StorageManager storageManager = Mockito.mock(StorageManager.class); + StorageManager storageManager = Mockito.mock(StorageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); StreamingIngestStorage stage =