Skip to content

Commit

Permalink
add RefreshTableInformation API
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-hmadan committed Oct 21, 2024
1 parent b26feec commit 051fa5d
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public enum ApiName {
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST"),
GENERATE_PRESIGNED_URLS("POST");
GENERATE_PRESIGNED_URLS("POST"),
REFRESH_TABLE_INFORMATION("POST");

private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;

public class RefreshTableInformationRequest implements IStreamingIngestRequest {
@JsonProperty("database")
private String dbName;

@JsonProperty("schema")
private String schemaName;

@JsonProperty("table")
private String tableName;

@JsonProperty("role")
private String role;

@JsonProperty("is_iceberg")
private boolean isIceberg;

public RefreshTableInformationRequest(TableRef tableRef, String role, boolean isIceberg) {
this.dbName = tableRef.dbName;
this.schemaName = tableRef.schemaName;
this.tableName = tableRef.tableName;
this.role = role;
this.isIceberg = isIceberg;
}

String getDBName() {
return this.dbName;
}

String getSchemaName() {
return this.schemaName;
}

String getTableName() {
return this.tableName;
}

String getRole() {
return this.role;
}

boolean getIsIceberg() {
return this.isIceberg;
}

@Override
public String getStringForLogging() {
return String.format(
"RefreshTableInformation(db=%s, schema=%s, table=%s, role=%s, isIceberg=%s)",
dbName, schemaName, tableName, role, isIceberg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;

public class RefreshTableInformationResponse extends StreamingIngestResponse {
@JsonProperty("status_code")
private Long statusCode;

@JsonProperty("message")
private String message;

@JsonProperty("iceberg_location")
private FileLocationInfo icebergLocationInfo;

@Override
Long getStatusCode() {
return this.statusCode;
}

String getMessage() {
return this.message;
}

FileLocationInfo getIcebergLocationInfo() {
return this.icebergLocationInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.GENERATE_PRESIGNED_URLS;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.REFRESH_TABLE_INFORMATION;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL;
Expand All @@ -16,6 +17,7 @@
import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.GENERATE_PRESIGNED_URLS_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS;

Expand Down Expand Up @@ -97,6 +99,27 @@ GeneratePresignedUrlsResponse generatePresignedUrls(GeneratePresignedUrlsRequest
return response;
}

/** Generates a batch of presigned URLs for a table */
RefreshTableInformationResponse refreshTableInformation(RefreshTableInformationRequest request)
throws IngestResponseException, IOException {
RefreshTableInformationResponse response =
executeApiRequestWithRetries(
RefreshTableInformationResponse.class,
request,
REFRESH_TABLE_INFORMATION_ENDPOINT,
"refresh table information",
REFRESH_TABLE_INFORMATION);

if (response.getStatusCode() != RESPONSE_SUCCESS) {
logger.logDebug(
"RefreshTableInformation request failed, request={}, response={}",
request.getStringForLogging(),
response.getMessage());
throw new SFException(ErrorCode.REFRESH_TABLE_INFORMATION_FAILURE, response.getMessage());
}
return response;
}

/**
* Opens a channel given a {@link OpenChannelRequestInternal}.
*
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class Constants {
public static final String CLIENT_CONFIGURE_ENDPOINT = "/v1/streaming/client/configure/";
public static final String GENERATE_PRESIGNED_URLS_ENDPOINT =
"/v1/streaming/presignedurls/generate/";
public static final String REFRESH_TABLE_INFORMATION_ENDPOINT =
"/v1/streaming/tableinformation/refresh/";
public static final int COMMIT_MAX_RETRY_COUNT = 60;
public static final int COMMIT_RETRY_INTERVAL_IN_MS = 1000;
public static final String ENCRYPTION_ALGORITHM = "AES/CTR/NoPadding";
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/net/snowflake/ingest/utils/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public enum ErrorCode {
CRYPTO_PROVIDER_ERROR("0035"),
DROP_CHANNEL_FAILURE("0036"),
CLIENT_DEPLOYMENT_ID_MISMATCH("0037"),
GENERATE_PRESIGNED_URLS_FAILURE("0038");
GENERATE_PRESIGNED_URLS_FAILURE("0038"),
REFRESH_TABLE_INFORMATION_FAILURE("0039");

public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@
0035=Failed to load {0}. If you use FIPS, import BouncyCastleFipsProvider in the application: {1}
0036=Failed to drop channel: {0}
0037=Deployment ID mismatch, Client was created on: {0}, Got upload location for: {1}. Please restart client: {2}.
0038=Generate presigned URLs request failed: {0}.
0038=Generate presigned URLs request failed: {0}.
0039=Refresh Table Information request failed: {0}.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.GENERATE_PRESIGNED_URLS_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -130,6 +131,15 @@ public static CloseableHttpClient createHttpClient(ApiOverride apiOverride) {
clientConfigresponseMap.put("deployment_id", 123L);
return buildStreamingIngestResponse(
HttpStatus.SC_OK, clientConfigresponseMap);
case REFRESH_TABLE_INFORMATION_ENDPOINT:
Thread.sleep(1);
Map<String, Object> refreshTableInformationMap = new HashMap<>();
refreshTableInformationMap.put("status_code", 0L);
refreshTableInformationMap.put("message", "OK");
refreshTableInformationMap.put("iceberg_location", getStageLocationMap());
return buildStreamingIngestResponse(
HttpStatus.SC_OK, refreshTableInformationMap);

case GENERATE_PRESIGNED_URLS_ENDPOINT:
Thread.sleep(1);
Map<String, Object> generateUrlsResponseMap = new HashMap<>();
Expand Down

0 comments on commit 051fa5d

Please sign in to comment.