Skip to content

Commit

Permalink
rename interface / undo error code change
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jul 16, 2024
1 parent 796a533 commit ca1d767
Show file tree
Hide file tree
Showing 16 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import net.snowflake.ingest.utils.Utils;

/** Class to deserialize a request from a channel status request */
class ChannelsStatusRequest implements StreamingIngestRequest {
class ChannelsStatusRequest implements IStreamingIngestRequest {

// Used to deserialize a channel request
static class ChannelStatusRequestDTO {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize client configure request */
class ClientConfigureRequest implements StreamingIngestRequest {
class ClientConfigureRequest implements IStreamingIngestRequest {
/**
* Constructor for client configure request
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import net.snowflake.ingest.utils.Utils;

/** Class used to serialize the {@link DropChannelRequest} */
class DropChannelRequestInternal implements StreamingIngestRequest {
class DropChannelRequestInternal implements IStreamingIngestRequest {
@JsonProperty("request_id")
private String requestId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ List<List<ChannelData<T>>> getData() {
private final ChannelCache<T> channelCache;

// Reference to the Streaming Ingest storage manager
private final StorageManager<T, ?> storageManager;
private final IStorageManager<T, ?> storageManager;

// Reference to register service
private final RegisterService<T> registerService;
Expand Down Expand Up @@ -127,7 +127,7 @@ List<List<ChannelData<T>>> getData() {
FlushService(
SnowflakeStreamingIngestClientInternal<T> client,
ChannelCache<T> cache,
StorageManager<T, ?> storageManager,
IStorageManager<T, ?> storageManager,
boolean isTestMode) {
this.owningClient = client;
this.channelCache = cache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* @param <T> The type of chunk data
* @param <TLocation> the type of location that's being managed (internal stage / external volume)
*/
interface StorageManager<T, TLocation> {
interface IStorageManager<T, TLocation> {
/** Default max upload retries for streaming ingest storage */
int DEFAULT_MAX_UPLOAD_RETRIES = 5;

Expand Down Expand Up @@ -54,7 +54,7 @@ void addStorage(

/**
* Decrement the blob sequencer, this method is needed to prevent gap between file name sequencer.
* See {@link StorageManager#generateBlobPath()} for more details.
* See {@link IStorageManager#generateBlobPath()} for more details.
*/
void decrementBlobSequencer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
* The StreamingIngestRequest interface is a marker interface used for type safety in the {@link
* SnowflakeServiceClient} for streaming ingest API request.
*/
interface StreamingIngestRequest {
interface IStreamingIngestRequest {
String getStringForLogging();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public InternalStageLocation() {}
}

/** Class to manage single Snowflake internal stage */
class InternalStageManager<T> implements StorageManager<T, InternalStageLocation> {
class InternalStageManager<T> implements IStorageManager<T, InternalStageLocation> {
/** Target stage for the client */
private final StreamingIngestStorage<T, InternalStageLocation> targetStage;

Expand Down Expand Up @@ -94,7 +94,7 @@ class InternalStageManager<T> implements StorageManager<T, InternalStageLocation
} catch (IngestResponseException | IOException e) {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
} catch (SnowflakeSQLException e) {
throw new SFException(e, ErrorCode.UNABLE_TO_CONNECT_TO_STORAGE, e.getMessage());
throw new SFException(e, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import net.snowflake.ingest.utils.Constants;

/** Class used to serialize the {@link OpenChannelRequest} */
class OpenChannelRequestInternal implements StreamingIngestRequest {
class OpenChannelRequestInternal implements IStreamingIngestRequest {
@JsonProperty("request_id")
private String requestId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.stream.Collectors;

/** Class used to serialize the blob register request */
class RegisterBlobRequest implements StreamingIngestRequest {
class RegisterBlobRequest implements IStreamingIngestRequest {
@JsonProperty("request_id")
private String requestId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ RegisterBlobResponse registerBlob(RegisterBlobRequest request, final int executi

private <T extends StreamingIngestResponse> T executeApiRequestWithRetries(
Class<T> responseClass,
StreamingIngestRequest request,
IStreamingIngestRequest request,
String endpoint,
String operation,
ServiceResponseHandler.ApiName apiName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
private final FlushService<T> flushService;

// Reference to storage manager
private final StorageManager<T, ?> storageManager;
private final IStorageManager<T, ?> storageManager;

// Indicates whether the client has closed
private volatile boolean isClosed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ state to record unknown age.
}

private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;
private final StorageManager<T, TLocation> owningManager;
private final IStorageManager<T, TLocation> owningManager;
private final TLocation location;
private final String clientName;

Expand All @@ -108,7 +108,7 @@ state to record unknown age.
* @param maxUploadRetries The maximum number of retries to attempt
*/
StreamingIngestStorage(
StorageManager<T, TLocation> owningManager,
IStorageManager<T, TLocation> owningManager,
String clientName,
FileLocationInfo fileLocationInfo,
TLocation location,
Expand All @@ -133,7 +133,7 @@ state to record unknown age.
* @param maxUploadRetries the maximum number of retries to attempt
*/
StreamingIngestStorage(
StorageManager<T, TLocation> owningManager,
IStorageManager<T, TLocation> owningManager,
String clientName,
SnowflakeFileTransferMetadataWithAge testMetadata,
TLocation location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static void sleepForRetry(int executionCount) {
static <T extends StreamingIngestResponse> T executeWithRetries(
Class<T> targetClass,
String endpoint,
StreamingIngestRequest payload,
IStreamingIngestRequest payload,
String message,
ServiceResponseHandler.ApiName apiName,
CloseableHttpClient httpClient,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/snowflake/ingest/utils/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public enum ErrorCode {
INVALID_ENCRYPTED_KEY("0018"),
INVALID_DATA_IN_CHUNK("0019"),
IO_ERROR("0020"),
UNABLE_TO_CONNECT_TO_STORAGE("0021"),
UNABLE_TO_CONNECT_TO_STAGE("0021"),
KEYPAIR_CREATION_FAILURE("0022"),
MD5_HASHING_NOT_AVAILABLE("0023"),
CHANNEL_STATUS_FAILURE("0024"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private abstract static class TestContext<T> implements AutoCloseable {
ChannelCache<T> channelCache;
final Map<String, SnowflakeStreamingIngestChannelInternal<T>> channels = new HashMap<>();
FlushService<T> flushService;
StorageManager<T, ?> storageManager;
IStorageManager<T, ?> storageManager;
StreamingIngestStorage storage;
ParameterProvider parameterProvider;
RegisterService registerService;
Expand Down Expand Up @@ -395,7 +395,7 @@ private static ColumnMetadata createLargeTestTextColumn(String name) {
@Test
public void testGetFilePath() {
TestContext<?> testContext = testContextFactory.create();
StorageManager storageManager = testContext.storageManager;
IStorageManager storageManager = testContext.storageManager;
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
String clientPrefix = "honk";
String outputString =
Expand Down Expand Up @@ -925,7 +925,7 @@ public void testInvalidateChannels() {
innerData.add(channel1Data);
innerData.add(channel2Data);

StorageManager<StubChunkData, InternalStageLocation> storageManager =
IStorageManager<StubChunkData, InternalStageLocation> storageManager =
Mockito.spy(new InternalStageManager<>(true, "role", "client", null));
FlushService<StubChunkData> flushService =
new FlushService<>(client, channelCache, storageManager, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testPutRemote() throws Exception {

byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8);

StorageManager<?, ?> storageManager = Mockito.mock(StorageManager.class);
IStorageManager<?, ?> storageManager = Mockito.mock(IStorageManager.class);
Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix");

StreamingIngestStorage<?, ?> stage =
Expand Down Expand Up @@ -201,7 +201,7 @@ public void doTestPutRemoteRefreshes() throws Exception {

byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8);

StorageManager<?, ?> storageManager = Mockito.mock(StorageManager.class);
IStorageManager<?, ?> storageManager = Mockito.mock(IStorageManager.class);
Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix");

StreamingIngestStorage<?, ?> stage =
Expand Down Expand Up @@ -256,7 +256,7 @@ public void testPutRemoteGCS() throws Exception {

byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8);

StorageManager<?, ?> storageManager = Mockito.mock(StorageManager.class);
IStorageManager<?, ?> storageManager = Mockito.mock(IStorageManager.class);
Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix");

StreamingIngestStorage<?, ?> stage =
Expand Down Expand Up @@ -294,7 +294,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception {

SnowflakeServiceClient snowflakeServiceClient =
new SnowflakeServiceClient(mockClient, mockBuilder);
StorageManager<?, ?> storageManager =
IStorageManager<?, ?> storageManager =
new InternalStageManager<>(true, "role", "client", snowflakeServiceClient);

StreamingIngestStorage<?, ?> stage =
Expand Down Expand Up @@ -353,7 +353,7 @@ public void testRefreshSnowflakeMetadataDeploymentIdMismatch() throws Exception

SnowflakeServiceClient snowflakeServiceClient =
new SnowflakeServiceClient(mockClient, mockBuilder);
StorageManager<?, ?> storageManager =
IStorageManager<?, ?> storageManager =
new InternalStageManager<>(true, "role", "clientName", snowflakeServiceClient);

StreamingIngestStorage<?, ?> storage = storageManager.getStorage("");
Expand Down Expand Up @@ -385,7 +385,7 @@ public void testFetchSignedURL() throws Exception {
Mockito.when(mockClientInternal.getRole()).thenReturn("role");
SnowflakeServiceClient snowflakeServiceClient =
new SnowflakeServiceClient(mockClient, mockBuilder);
StorageManager<?, ?> storageManager =
IStorageManager<?, ?> storageManager =
new InternalStageManager<>(true, "role", "client", snowflakeServiceClient);
StatusLine mockStatusLine = Mockito.mock(StatusLine.class);
Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200);
Expand Down Expand Up @@ -431,7 +431,7 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception {
Mockito.when(mockClientInternal.getRole()).thenReturn("role");
SnowflakeServiceClient snowflakeServiceClient =
new SnowflakeServiceClient(mockClient, mockBuilder);
StorageManager<?, ?> storageManager =
IStorageManager<?, ?> storageManager =
new InternalStageManager<>(true, "role", "client", snowflakeServiceClient);
StatusLine mockStatusLine = Mockito.mock(StatusLine.class);
Mockito.when(mockStatusLine.getStatusCode()).thenReturn(200);
Expand Down Expand Up @@ -574,7 +574,7 @@ public void testRefreshMetadataOnFirstPutException() throws Exception {

byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8);

StorageManager<?, ?> storageManager = Mockito.mock(StorageManager.class);
IStorageManager<?, ?> storageManager = Mockito.mock(IStorageManager.class);
Mockito.when(storageManager.getClientPrefix()).thenReturn("testPrefix");

StreamingIngestStorage<?, ?> stage =
Expand Down

0 comments on commit ca1d767

Please sign in to comment.