From b51174368896cc2738282443a555ff04f0719280 Mon Sep 17 00:00:00 2001 From: Hitesh Madan Date: Sun, 20 Oct 2024 18:37:43 +0000 Subject: [PATCH] introduce subscopedtoken ext vol manager that reuses InternalStage.java for uploads via jdbc --- .../streaming/internal/IStorageManager.java | 11 ++ .../streaming/internal/InternalStage.java | 49 +++-- .../internal/InternalStageManager.java | 30 ++- .../PresignedUrlExternalVolumeManager.java | 6 + ...nowflakeStreamingIngestClientInternal.java | 4 +- .../SubscopedTokenExternalVolumeManager.java | 187 ++++++++++++++++++ .../net/snowflake/ingest/utils/Constants.java | 2 +- .../streaming/internal/FlushServiceTest.java | 4 +- .../streaming/internal/InternalStageTest.java | 75 ++++--- 9 files changed, 320 insertions(+), 48 deletions(-) create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java index 9a173365b..cb5affd8e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java @@ -4,6 +4,8 @@ package net.snowflake.ingest.streaming.internal; +import java.util.Optional; + /** * Interface to manage {@link InternalStage} and {@link PresignedUrlExternalVolume} for {@link * FlushService} @@ -42,4 +44,13 @@ interface IStorageManager { * @return the client prefix */ String getClientPrefix(); + + /** + * Get the updated subscoped tokens and location info for this table + * + * @param tableRef + * @param fileName + * @return + */ + FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional fileName); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java index adf7a0256..e1192ba6c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java @@ -35,7 +35,7 @@ import net.snowflake.ingest.utils.Utils; /** Handles uploading files to the Snowflake Streaming Ingest Storage */ -class InternalStage implements IStorage { +class InternalStage implements IStorage { private static final ObjectMapper mapper = new ObjectMapper(); /** @@ -59,31 +59,45 @@ class InternalStage implements IStorage { private static final Logging logger = new Logging(InternalStage.class); - private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; - private final InternalStageManager owningManager; + private final IStorageManager owningManager; private final String clientName; - + private final String clientPrefix; + private final TableRef tableRef; private final int maxUploadRetries; // Proxy parameters that we set while calling the Snowflake JDBC to upload the streams private final Properties proxyProperties; + private FileLocationInfo fileLocationInfo; + private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; + /** * Default constructor * * @param owningManager the storage manager owning this storage * @param clientName The client name + * @param clientPrefix client prefix + * @param tableRef * @param fileLocationInfo The file location information from open channel response * @param maxUploadRetries The maximum number of retries to attempt */ InternalStage( - InternalStageManager owningManager, + IStorageManager owningManager, String clientName, + String clientPrefix, + TableRef tableRef, FileLocationInfo fileLocationInfo, int maxUploadRetries) throws SnowflakeSQLException, IOException { - this(owningManager, clientName, (SnowflakeFileTransferMetadataWithAge) null, maxUploadRetries); - Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix()); + this( + owningManager, + clientName, + clientPrefix, + tableRef, + (SnowflakeFileTransferMetadataWithAge) null, + maxUploadRetries); + Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix); + this.fileLocationInfo = fileLocationInfo; this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo); } @@ -92,17 +106,23 @@ class InternalStage implements IStorage { * * @param owningManager the storage manager owning this storage * @param clientName the client name + * @param clientPrefix + * @param tableRef * @param testMetadata SnowflakeFileTransferMetadataWithAge to test with * @param maxUploadRetries the maximum number of retries to attempt */ InternalStage( - InternalStageManager owningManager, + IStorageManager owningManager, String clientName, + String clientPrefix, + TableRef tableRef, SnowflakeFileTransferMetadataWithAge testMetadata, int maxUploadRetries) throws SnowflakeSQLException, IOException { this.owningManager = owningManager; this.clientName = clientName; + this.clientPrefix = clientPrefix; + this.tableRef = tableRef; this.maxUploadRetries = maxUploadRetries; this.proxyProperties = generateProxyPropertiesForJDBC(); this.fileTransferMetadataWithAge = testMetadata; @@ -153,7 +173,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) .setUploadStream(inStream) .setRequireCompress(false) .setOcspMode(OCSPMode.FAIL_OPEN) - .setStreamingIngestClientKey(this.owningManager.getClientPrefix()) + .setStreamingIngestClientKey(this.clientPrefix) .setStreamingIngestClientName(this.clientName) .setProxyProperties(this.proxyProperties) .setDestFileName(fullFilePath) @@ -204,8 +224,10 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole return fileTransferMetadataWithAge; } - FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.empty()); + FileLocationInfo location = + this.owningManager.getRefreshedLocation(this.tableRef, Optional.empty()); SnowflakeFileTransferMetadataWithAge metadata = createFileTransferMetadataWithAge(location); + this.fileLocationInfo = location; this.fileTransferMetadataWithAge = metadata; return metadata; } @@ -265,7 +287,8 @@ static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) throws SnowflakeSQLException, IOException { - FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.of(fileName)); + FileLocationInfo location = + this.owningManager.getRefreshedLocation(this.tableRef, Optional.of(fileName)); SnowflakeFileTransferMetadataV1 metadata = (SnowflakeFileTransferMetadataV1) @@ -336,4 +359,8 @@ static void putLocal(String stageLocation, String fullFilePath, byte[] data) { throw new SFException(ex, ErrorCode.BLOB_UPLOAD_FAILURE); } } + + FileLocationInfo getFileLocationInfo() { + return this.fileLocationInfo; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java index aa770c92f..e7014274f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -20,9 +20,10 @@ import net.snowflake.ingest.utils.Utils; /** Class to manage single Snowflake internal stage */ -class InternalStageManager implements IStorageManager { +class InternalStageManager implements IStorageManager { + public static final TableRef NO_TABLE_REF = new TableRef("$NO_DB$", "$NO_SCH$", "$NO_TABLE$"); /** Target stage for the client */ - private final InternalStage targetStage; + private final InternalStage targetStage; /** Increasing counter to generate a unique blob name per client */ private final AtomicLong counter; @@ -70,15 +71,22 @@ class InternalStageManager implements IStorageManager { this.clientPrefix = response.getClientPrefix(); this.deploymentId = response.getDeploymentId(); this.targetStage = - new InternalStage( - this, clientName, response.getStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES); + new InternalStage( + this, + clientName, + clientPrefix, + NO_TABLE_REF, + response.getStageLocation(), + DEFAULT_MAX_UPLOAD_RETRIES); } else { this.clientPrefix = null; this.deploymentId = null; this.targetStage = - new InternalStage( + new InternalStage( this, "testClient", + null /* clientPrefix */, + NO_TABLE_REF, (SnowflakeFileTransferMetadataWithAge) null, DEFAULT_MAX_UPLOAD_RETRIES); } @@ -98,7 +106,7 @@ class InternalStageManager implements IStorageManager { */ @Override @SuppressWarnings("unused") - public InternalStage getStorage(String fullyQualifiedTableName) { + public InternalStage getStorage(String fullyQualifiedTableName) { // There's always only one stage for the client in non-iceberg mode return targetStage; } @@ -117,7 +125,15 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {} * @param fileName optional filename for single-file signed URL fetch from server * @return the new location information */ - FileLocationInfo getRefreshedLocation(Optional fileName) { + @Override + public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional fileName) { + if (!tableRef.equals(NO_TABLE_REF)) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format( + "getRefreshedLocation received TableRef=%s and expected=%s", tableRef, NO_TABLE_REF)); + } + try { ClientConfigureRequest request = new ClientConfigureRequest(this.role); fileName.ifPresent(request::setFileName); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java index 4147430bc..6ce3cb555 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import net.snowflake.ingest.connection.IngestResponseException; import net.snowflake.ingest.utils.ErrorCode; @@ -158,4 +159,9 @@ private PresignedUrlExternalVolume getVolumeSafe(String fullyQualifiedTableName) return volume; } + + @Override + public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional fileName) { + throw new SFException(ErrorCode.INTERNAL_ERROR, "Unsupported"); + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index f553fb7ff..5729b9baf 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -238,9 +238,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.storageManager = isIcebergMode - ? new PresignedUrlExternalVolumeManager( + ? new SubscopedTokenExternalVolumeManager( isTestMode, this.role, this.name, this.snowflakeServiceClient) - : new InternalStageManager( + : new InternalStageManager( isTestMode, this.role, this.name, this.snowflakeServiceClient); try { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java new file mode 100644 index 000000000..2317a59f8 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import net.snowflake.ingest.connection.IngestResponseException; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.Logging; +import net.snowflake.ingest.utils.SFException; + +/** Class to manage multiple external volumes */ +class SubscopedTokenExternalVolumeManager implements IStorageManager { + private static final Logging logger = new Logging(SubscopedTokenExternalVolumeManager.class); + // Reference to the external volume per table + private final Map externalVolumeMap; + + /** Increasing counter to generate a unique blob name */ + private final AtomicLong counter; + + // name of the owning client + private final String clientName; + + private final String role; + + // Reference to the Snowflake service client used for configure calls + private final SnowflakeServiceClient serviceClient; + + // Client prefix generated by the Snowflake server + private final String clientPrefix; + + // Deployment ID returned by the Snowflake server + private final Long deploymentId; + + // concurrency control to avoid creating multiple ExternalVolume objects for the same table (if + // openChannel is called + // multiple times concurrently) + private final Object registerTableLock = new Object(); + + /** + * Constructor for ExternalVolumeManager + * + * @param isTestMode whether the manager in test mode + * @param role the role of the client + * @param clientName the name of the client + * @param snowflakeServiceClient the Snowflake service client used for configure calls + */ + SubscopedTokenExternalVolumeManager( + boolean isTestMode, + String role, + String clientName, + SnowflakeServiceClient snowflakeServiceClient) { + this.clientName = clientName; + this.role = role; + this.counter = new AtomicLong(0); + this.serviceClient = snowflakeServiceClient; + this.externalVolumeMap = new ConcurrentHashMap<>(); + try { + ClientConfigureResponse response = + this.serviceClient.clientConfigure(new ClientConfigureRequest(role)); + this.clientPrefix = isTestMode ? "testPrefix" : response.getClientPrefix(); + this.deploymentId = response.getDeploymentId(); + } catch (IngestResponseException | IOException e) { + throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); + } + logger.logDebug( + "Created SubscopedTokenExternalVolumeManager with clientName=%s and clientPrefix=%s", + clientName, clientPrefix); + } + + /** + * Given a fully qualified table name, return the target storage by looking up the table name + * + * @param fullyQualifiedTableName the target fully qualified table name + * @return target storage + */ + @Override + public InternalStage getStorage(String fullyQualifiedTableName) { + // Only one chunk per blob in Iceberg mode. + return getVolumeSafe(fullyQualifiedTableName); + } + + /** Informs the storage manager about a new table that's being ingested into by the client. */ + @Override + public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { + if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { + logger.logInfo( + "Skip registering table since its already been registered with the VolumeManager." + + " tableRef=%s", + tableRef); + return; + } + + // future enhancement - per table locks instead of per-client lock + synchronized (registerTableLock) { + if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { + logger.logInfo( + "Skip registering table since its already been registered with the VolumeManager." + + " tableRef=%s", + tableRef); + return; + } + + try { + InternalStage externalVolume = + new InternalStage( + this, + clientName, + getClientPrefix(), + tableRef, + locationInfo, + DEFAULT_MAX_UPLOAD_RETRIES); + this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume); + } catch (SFException ex) { + logger.logError( + "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); + // allow external volume ctor's SFExceptions to bubble up directly + throw ex; + } catch (Exception err) { + logger.logError( + "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); + throw new SFException( + err, + ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, + String.format("fullyQualifiedTableName=%s", tableRef)); + } + } + } + + @Override + public BlobPath generateBlobPath(String fullyQualifiedTableName) { + InternalStage volume = getVolumeSafe(fullyQualifiedTableName); + + // {nullableTableBasePath}/data/streaming_ingest/{figsId}/{twoHexChars}/snow_{volumeHash}_{figsId}_{workerRank}_1_ + String filePathRelativeToVolume = volume.getFileLocationInfo().getPath(); + String[] parts = filePathRelativeToVolume.split("/"); + if (parts.length < 6) { + throw new SFException(ErrorCode.INTERNAL_ERROR, "File path returned by server is invalid."); + } + + String suffix = this.counter.getAndIncrement() + ".parquet"; + + String fileNameRelativeToCredentialedPath = parts[parts.length - 1]; + return new BlobPath( + fileNameRelativeToCredentialedPath + suffix /* uploadPath */, + filePathRelativeToVolume + suffix /* fileRegistrationPath */); + } + + /** + * Get the client prefix from first external volume in the map + * + * @return the client prefix + */ + @Override + public String getClientPrefix() { + return this.clientPrefix; + } + + @Override + public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional fileName) { + try { + RefreshTableInformationResponse response = + this.serviceClient.refreshTableInformation( + new RefreshTableInformationRequest(tableRef, this.role, true)); + return response.getIcebergLocationInfo(); + } catch (IngestResponseException | IOException e) { + throw new SFException(e, ErrorCode.REFRESH_TABLE_INFORMATION_FAILURE, e.getMessage()); + } + } + + private InternalStage getVolumeSafe(String fullyQualifiedTableName) { + InternalStage volume = this.externalVolumeMap.get(fullyQualifiedTableName); + + if (volume == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("No external volume found for tableRef=%s", fullyQualifiedTableName)); + } + + return volume; + } +} diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 754c81cff..0a5fc673c 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -49,7 +49,7 @@ public class Constants { public static final int BLOB_FILE_SIZE_SIZE_IN_BYTES = 8; public static final int BLOB_CHECKSUM_SIZE_IN_BYTES = 8; public static final int BLOB_CHUNK_METADATA_LENGTH_SIZE_IN_BYTES = 4; - public static final long THREAD_SHUTDOWN_TIMEOUT_IN_SEC = 300L; + public static final long THREAD_SHUTDOWN_TIMEOUT_IN_SEC = 30L; public static final String BLOB_EXTENSION_TYPE = "bdec"; public static final int MAX_THREAD_COUNT = Integer.MAX_VALUE; public static final String CLIENT_CONFIGURE_ENDPOINT = "/v1/streaming/client/configure/"; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 7b8ba7605..9df3f1b11 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -445,7 +445,7 @@ public void testGetFilePath() { Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String clientPrefix = "honk"; String outputString = - ((InternalStageManager) storageManager).getNextFileName(calendar, clientPrefix); + ((InternalStageManager) storageManager).getNextFileName(calendar, clientPrefix); Path outputPath = Paths.get(outputString); Assert.assertTrue(outputPath.getFileName().toString().contains(clientPrefix)); Assert.assertTrue( @@ -1116,7 +1116,7 @@ public void testInvalidateChannels() { innerData.add(channel2Data); IStorageManager storageManager = - Mockito.spy(new InternalStageManager<>(true, "role", "client", null)); + Mockito.spy(new InternalStageManager(true, "role", "client", null)); FlushService flushService = new FlushService<>(client, channelCache, storageManager, false); flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java index 0cfadb92e..150ec566a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java @@ -134,13 +134,15 @@ public void testPutRemote() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); + InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - InternalStage stage = + InternalStage stage = new InternalStage( storageManager, "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), 1); @@ -178,11 +180,13 @@ public void testPutLocal() throws Exception { String fullFilePath = "testOutput"; String fileName = "putLocalOutput"; - InternalStage stage = + InternalStage stage = Mockito.spy( new InternalStage( null, "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, new SnowflakeFileTransferMetadataWithAge( fullFilePath, Optional.of(System.currentTimeMillis())), 1)); @@ -209,10 +213,12 @@ public void doTestPutRemoteRefreshes() throws Exception { InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - InternalStage stage = - new InternalStage<>( + InternalStage stage = + new InternalStage( storageManager, "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), maxUploadRetryCount); @@ -265,11 +271,13 @@ public void testPutRemoteGCS() throws Exception { InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - InternalStage stage = + InternalStage stage = Mockito.spy( - new InternalStage<>( + new InternalStage( storageManager, "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), 1)); @@ -302,11 +310,16 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); InternalStageManager storageManager = - new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); + new InternalStageManager(true, "role", "client", snowflakeServiceClient); - InternalStage stage = - new InternalStage<>( - storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); + InternalStage stage = + new InternalStage( + storageManager, + "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, + (SnowflakeFileTransferMetadataWithAge) null, + 1); SnowflakeFileTransferMetadataWithAge metadataWithAge = stage.refreshSnowflakeMetadata(true); @@ -355,10 +368,10 @@ public void testRefreshSnowflakeMetadataDeploymentIdMismatch() throws Exception SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - InternalStageManager storageManager = - new InternalStageManager<>(true, "role", "clientName", snowflakeServiceClient); + InternalStageManager storageManager = + new InternalStageManager(true, "role", "clientName", snowflakeServiceClient); - InternalStage storage = storageManager.getStorage(""); + InternalStage storage = storageManager.getStorage(""); storage.refreshSnowflakeMetadata(true); Assert.assertEquals(prefix + "_" + deploymentId, storageManager.getClientPrefix()); @@ -387,8 +400,8 @@ public void testFetchSignedURL() throws Exception { Mockito.when(mockClientInternal.getRole()).thenReturn("role"); SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - InternalStageManager storageManager = - new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); + InternalStageManager storageManager = + new InternalStageManager(true, "role", "client", snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -396,9 +409,14 @@ public void testFetchSignedURL() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - InternalStage stage = + InternalStage stage = new InternalStage( - storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); + storageManager, + "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, + (SnowflakeFileTransferMetadataWithAge) null, + 1); SnowflakeFileTransferMetadataV1 metadata = stage.fetchSignedURL("path/fileName"); @@ -429,8 +447,8 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { Mockito.when(mockClientInternal.getRole()).thenReturn("role"); SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - InternalStageManager storageManager = - new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); + InternalStageManager storageManager = + new InternalStageManager(true, "role", "client", snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -438,9 +456,14 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(createHttpEntity(exampleRemoteMetaResponse)); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - InternalStage stage = - new InternalStage<>( - storageManager, "clientName", (SnowflakeFileTransferMetadataWithAge) null, 1); + InternalStage stage = + new InternalStage( + storageManager, + "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, + (SnowflakeFileTransferMetadataWithAge) null, + 1); ThreadFactory buildUploadThreadFactory = new ThreadFactoryBuilder().setNameFormat("ingest-build-upload-thread-%d").build(); @@ -571,10 +594,12 @@ public void testRefreshMetadataOnFirstPutException() throws Exception { InternalStageManager storageManager = Mockito.mock(InternalStageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); - InternalStage stage = - new InternalStage<>( + InternalStage stage = + new InternalStage( storageManager, "clientName", + "testPrefix", + InternalStageManager.NO_TABLE_REF, new SnowflakeFileTransferMetadataWithAge( originalMetadata, Optional.of(System.currentTimeMillis())), maxUploadRetryCount);