diff --git a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java index 822c969a1..c9968bcf3 100644 --- a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java +++ b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java @@ -43,7 +43,9 @@ public enum ApiName { STREAMING_CHANNEL_STATUS("POST"), STREAMING_REGISTER_BLOB("POST"), STREAMING_CLIENT_CONFIGURE("POST"), - GENERATE_PRESIGNED_URLS("POST"); + GENERATE_PRESIGNED_URLS("POST"), + REFRESH_TABLE_INFORMATION("POST"); + private final String httpMethod; private ApiName(String httpMethod) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationRequest.java new file mode 100644 index 000000000..97e6e29f5 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationRequest.java @@ -0,0 +1,55 @@ +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RefreshTableInformationRequest implements IStreamingIngestRequest { + @JsonProperty("database") + private String dbName; + + @JsonProperty("schema") + private String schemaName; + + @JsonProperty("table") + private String tableName; + + @JsonProperty("role") + private String role; + + @JsonProperty("is_iceberg") + private boolean isIceberg; + + public RefreshTableInformationRequest(TableRef tableRef, String role, boolean isIceberg) { + this.dbName = tableRef.dbName; + this.schemaName = tableRef.schemaName; + this.tableName = tableRef.tableName; + this.role = role; + this.isIceberg = isIceberg; + } + + String getDBName() { + return this.dbName; + } + + String getSchemaName() { + return this.schemaName; + } + + String getTableName() { + return this.tableName; + } + + String getRole() { + return this.role; + } + + boolean getIsIceberg() { + return this.isIceberg; + } + + @Override + public String getStringForLogging() { + return String.format( + "RefreshTableInformation(db=%s, schema=%s, table=%s, role=%s, isIceberg=%s)", + dbName, schemaName, tableName, role, isIceberg); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationResponse.java new file mode 100644 index 000000000..d4a88a818 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RefreshTableInformationResponse.java @@ -0,0 +1,27 @@ +package net.snowflake.ingest.streaming.internal; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RefreshTableInformationResponse extends StreamingIngestResponse { + @JsonProperty("status_code") + private Long statusCode; + + @JsonProperty("message") + private String message; + + @JsonProperty("iceberg_location") + private FileLocationInfo icebergLocationInfo; + + @Override + Long getStatusCode() { + return this.statusCode; + } + + String getMessage() { + return this.message; + } + + FileLocationInfo getIcebergLocationInfo() { + return this.icebergLocationInfo; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java index 7d4b52ef9..41c803689 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.GENERATE_PRESIGNED_URLS; +import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.REFRESH_TABLE_INFORMATION; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL; @@ -16,6 +17,7 @@ import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.GENERATE_PRESIGNED_URLS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT; import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; @@ -97,6 +99,27 @@ GeneratePresignedUrlsResponse generatePresignedUrls(GeneratePresignedUrlsRequest return response; } + /** Generates a batch of presigned URLs for a table */ + RefreshTableInformationResponse refreshTableInformation(RefreshTableInformationRequest request) + throws IngestResponseException, IOException { + RefreshTableInformationResponse response = + executeApiRequestWithRetries( + RefreshTableInformationResponse.class, + request, + REFRESH_TABLE_INFORMATION_ENDPOINT, + "refresh table information", + REFRESH_TABLE_INFORMATION); + + if (response.getStatusCode() != RESPONSE_SUCCESS) { + logger.logDebug( + "RefreshTableInformation request failed, request={}, response={}", + request.getStringForLogging(), + response.getMessage()); + throw new SFException(ErrorCode.REFRESH_TABLE_INFORMATION_FAILURE, response.getMessage()); + } + return response; + } + /** * Opens a channel given a {@link OpenChannelRequestInternal}. * diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index a5e04e21e..754c81cff 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -55,6 +55,8 @@ public class Constants { public static final String CLIENT_CONFIGURE_ENDPOINT = "/v1/streaming/client/configure/"; public static final String GENERATE_PRESIGNED_URLS_ENDPOINT = "/v1/streaming/presignedurls/generate/"; + public static final String REFRESH_TABLE_INFORMATION_ENDPOINT = + "/v1/streaming/tableinformation/refresh/"; public static final int COMMIT_MAX_RETRY_COUNT = 60; public static final int COMMIT_RETRY_INTERVAL_IN_MS = 1000; public static final String ENCRYPTION_ALGORITHM = "AES/CTR/NoPadding"; diff --git a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java index dc7d1c631..0bb75b174 100644 --- a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java +++ b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java @@ -43,7 +43,8 @@ public enum ErrorCode { CRYPTO_PROVIDER_ERROR("0035"), DROP_CHANNEL_FAILURE("0036"), CLIENT_DEPLOYMENT_ID_MISMATCH("0037"), - GENERATE_PRESIGNED_URLS_FAILURE("0038"); + GENERATE_PRESIGNED_URLS_FAILURE("0038"), + REFRESH_TABLE_INFORMATION_FAILURE("0039"); public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages"; diff --git a/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties b/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties index 193ae0105..a6e53a2cc 100644 --- a/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties +++ b/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties @@ -44,4 +44,5 @@ 0035=Failed to load {0}. If you use FIPS, import BouncyCastleFipsProvider in the application: {1} 0036=Failed to drop channel: {0} 0037=Deployment ID mismatch, Client was created on: {0}, Got upload location for: {1}. Please restart client: {2}. -0038=Generate presigned URLs request failed: {0}. \ No newline at end of file +0038=Generate presigned URLs request failed: {0}. +0039=Refresh Table Information request failed: {0}. \ No newline at end of file diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java b/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java index e632c2ed0..33ccc3824 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/MockSnowflakeServiceClient.java @@ -5,6 +5,7 @@ import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.GENERATE_PRESIGNED_URLS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT; import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; import com.fasterxml.jackson.databind.ObjectMapper; @@ -130,6 +131,15 @@ public static CloseableHttpClient createHttpClient(ApiOverride apiOverride) { clientConfigresponseMap.put("deployment_id", 123L); return buildStreamingIngestResponse( HttpStatus.SC_OK, clientConfigresponseMap); + case REFRESH_TABLE_INFORMATION_ENDPOINT: + Thread.sleep(1); + Map refreshTableInformationMap = new HashMap<>(); + refreshTableInformationMap.put("status_code", 0L); + refreshTableInformationMap.put("message", "OK"); + refreshTableInformationMap.put("iceberg_location", getStageLocationMap()); + return buildStreamingIngestResponse( + HttpStatus.SC_OK, refreshTableInformationMap); + case GENERATE_PRESIGNED_URLS_ENDPOINT: Thread.sleep(1); Map generateUrlsResponseMap = new HashMap<>();