diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java index 5a17b6e91..025647f14 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java @@ -10,7 +10,7 @@ import net.snowflake.ingest.utils.Utils; /** Class to deserialize a request from a channel status request */ -class ChannelsStatusRequest implements StreamingIngestRequest { +class ChannelsStatusRequest implements IStreamingIngestRequest { // Used to deserialize a channel request static class ChannelStatusRequestDTO { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java index 62c7a3578..79b282079 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java @@ -8,7 +8,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; /** Class used to serialize client configure request */ -class ClientConfigureRequest implements StreamingIngestRequest { +class ClientConfigureRequest implements IStreamingIngestRequest { /** * Constructor for client configure request * diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java index a6e1e1746..322b53acf 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java @@ -10,7 +10,7 @@ import net.snowflake.ingest.utils.Utils; /** Class used to serialize the {@link DropChannelRequest} */ -class DropChannelRequestInternal implements StreamingIngestRequest { +class DropChannelRequestInternal implements IStreamingIngestRequest { @JsonProperty("request_id") private String requestId; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index b34335194..954abfc4a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -95,7 +95,7 @@ List>> getData() { private final ChannelCache channelCache; // Reference to the Streaming Ingest storage manager - private final StorageManager storageManager; + private final IStorageManager storageManager; // Reference to register service private final RegisterService registerService; @@ -127,7 +127,7 @@ List>> getData() { FlushService( SnowflakeStreamingIngestClientInternal client, ChannelCache cache, - StorageManager storageManager, + IStorageManager storageManager, boolean isTestMode) { this.owningClient = client; this.channelCache = cache; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java similarity index 94% rename from src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java rename to src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java index 798d33ade..51f4a82de 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StorageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java @@ -12,7 +12,7 @@ * @param The type of chunk data * @param the type of location that's being managed (internal stage / external volume) */ -interface StorageManager { +interface IStorageManager { /** Default max upload retries for streaming ingest storage */ int DEFAULT_MAX_UPLOAD_RETRIES = 5; @@ -54,7 +54,7 @@ void addStorage( /** * Decrement the blob sequencer, this method is needed to prevent gap between file name sequencer. - * See {@link StorageManager#generateBlobPath()} for more details. + * See {@link IStorageManager#generateBlobPath()} for more details. */ void decrementBlobSequencer(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStreamingIngestRequest.java similarity index 90% rename from src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestRequest.java rename to src/main/java/net/snowflake/ingest/streaming/internal/IStreamingIngestRequest.java index 3dd30b5e2..a4b5e29d1 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStreamingIngestRequest.java @@ -8,6 +8,6 @@ * The StreamingIngestRequest interface is a marker interface used for type safety in the {@link * SnowflakeServiceClient} for streaming ingest API request. */ -interface StreamingIngestRequest { +interface IStreamingIngestRequest { String getStringForLogging(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java index 144bdc662..d33a80738 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -24,7 +24,7 @@ public InternalStageLocation() {} } /** Class to manage single Snowflake internal stage */ -class InternalStageManager implements StorageManager { +class InternalStageManager implements IStorageManager { /** Target stage for the client */ private final StreamingIngestStorage targetStage; @@ -94,7 +94,7 @@ class InternalStageManager implements StorageManager T executeApiRequestWithRetries( Class responseClass, - StreamingIngestRequest request, + IStreamingIngestRequest request, String endpoint, String operation, ServiceResponseHandler.ApiName apiName) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 762e84659..6331a4045 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -106,7 +106,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea private final FlushService flushService; // Reference to storage manager - private final StorageManager storageManager; + private final IStorageManager storageManager; // Indicates whether the client has closed private volatile boolean isClosed; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java index 82fee8a66..242b5cc43 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java @@ -89,7 +89,7 @@ state to record unknown age. } private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge; - private final StorageManager owningManager; + private final IStorageManager owningManager; private final TLocation location; private final String clientName; @@ -108,7 +108,7 @@ state to record unknown age. * @param maxUploadRetries The maximum number of retries to attempt */ StreamingIngestStorage( - StorageManager owningManager, + IStorageManager owningManager, String clientName, FileLocationInfo fileLocationInfo, TLocation location, @@ -133,7 +133,7 @@ state to record unknown age. * @param maxUploadRetries the maximum number of retries to attempt */ StreamingIngestStorage( - StorageManager owningManager, + IStorageManager owningManager, String clientName, SnowflakeFileTransferMetadataWithAge testMetadata, TLocation location, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java index ebfffa234..538283b4e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtils.java @@ -80,7 +80,7 @@ public static void sleepForRetry(int executionCount) { static T executeWithRetries( Class targetClass, String endpoint, - StreamingIngestRequest payload, + IStreamingIngestRequest payload, String message, ServiceResponseHandler.ApiName apiName, CloseableHttpClient httpClient, diff --git a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java index bd5c36c7f..a9aab9c3b 100644 --- a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java +++ b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java @@ -26,7 +26,7 @@ public enum ErrorCode { INVALID_ENCRYPTED_KEY("0018"), INVALID_DATA_IN_CHUNK("0019"), IO_ERROR("0020"), - UNABLE_TO_CONNECT_TO_STORAGE("0021"), + UNABLE_TO_CONNECT_TO_STAGE("0021"), KEYPAIR_CREATION_FAILURE("0022"), MD5_HASHING_NOT_AVAILABLE("0023"), CHANNEL_STATUS_FAILURE("0024"), diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index afb2be40c..8ac1d2b85 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -77,7 +77,7 @@ private abstract static class TestContext implements AutoCloseable { ChannelCache channelCache; final Map> channels = new HashMap<>(); FlushService flushService; - StorageManager storageManager; + IStorageManager storageManager; StreamingIngestStorage storage; ParameterProvider parameterProvider; RegisterService registerService; @@ -395,7 +395,7 @@ private static ColumnMetadata createLargeTestTextColumn(String name) { @Test public void testGetFilePath() { TestContext testContext = testContextFactory.create(); - StorageManager storageManager = testContext.storageManager; + IStorageManager storageManager = testContext.storageManager; Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String clientPrefix = "honk"; String outputString = @@ -925,7 +925,7 @@ public void testInvalidateChannels() { innerData.add(channel1Data); innerData.add(channel2Data); - StorageManager storageManager = + IStorageManager storageManager = Mockito.spy(new InternalStageManager<>(true, "role", "client", null)); FlushService flushService = new FlushService<>(client, channelCache, storageManager, false); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java index 7b1efb2ef..d4c3f0374 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java @@ -130,7 +130,7 @@ public void testPutRemote() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StorageManager storageManager = Mockito.mock(StorageManager.class); + IStorageManager storageManager = Mockito.mock(IStorageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); StreamingIngestStorage stage = @@ -201,7 +201,7 @@ public void doTestPutRemoteRefreshes() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StorageManager storageManager = Mockito.mock(StorageManager.class); + IStorageManager storageManager = Mockito.mock(IStorageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); StreamingIngestStorage stage = @@ -256,7 +256,7 @@ public void testPutRemoteGCS() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StorageManager storageManager = Mockito.mock(StorageManager.class); + IStorageManager storageManager = Mockito.mock(IStorageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); StreamingIngestStorage stage = @@ -294,7 +294,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - StorageManager storageManager = + IStorageManager storageManager = new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); StreamingIngestStorage stage = @@ -353,7 +353,7 @@ public void testRefreshSnowflakeMetadataDeploymentIdMismatch() throws Exception SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - StorageManager storageManager = + IStorageManager storageManager = new InternalStageManager<>(true, "role", "clientName", snowflakeServiceClient); StreamingIngestStorage storage = storageManager.getStorage(""); @@ -385,7 +385,7 @@ public void testFetchSignedURL() throws Exception { Mockito.when(mockClientInternal.getRole()).thenReturn("role"); SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - StorageManager storageManager = + IStorageManager storageManager = new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -431,7 +431,7 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception { Mockito.when(mockClientInternal.getRole()).thenReturn("role"); SnowflakeServiceClient snowflakeServiceClient = new SnowflakeServiceClient(mockClient, mockBuilder); - StorageManager storageManager = + IStorageManager storageManager = new InternalStageManager<>(true, "role", "client", snowflakeServiceClient); StatusLine mockStatusLine = Mockito.mock(StatusLine.class); Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200); @@ -574,7 +574,7 @@ public void testRefreshMetadataOnFirstPutException() throws Exception { byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8); - StorageManager storageManager = Mockito.mock(StorageManager.class); + IStorageManager storageManager = Mockito.mock(IStorageManager.class); Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix"); StreamingIngestStorage stage =