-
Notifications
You must be signed in to change notification settings - Fork 58
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support multiple stage for iceberg mode
- Loading branch information
1 parent
a9aa682
commit d98dabd
Showing
10 changed files
with
648 additions
and
227 deletions.
There are no files selected for viewing
175 changes: 175 additions & 0 deletions
175
src/main/java/net/snowflake/ingest/streaming/internal/ConfigureCallHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
/* | ||
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. | ||
*/ | ||
|
||
package net.snowflake.ingest.streaming.internal; | ||
|
||
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; | ||
import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; | ||
import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; | ||
|
||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; | ||
import net.snowflake.ingest.connection.IngestResponseException; | ||
import net.snowflake.ingest.connection.RequestBuilder; | ||
import net.snowflake.ingest.utils.ErrorCode; | ||
import net.snowflake.ingest.utils.SFException; | ||
import net.snowflake.ingest.utils.Utils; | ||
|
||
/** A class that is used to configure a Snowflake Ingest Storage */ | ||
class ConfigureCallHandler { | ||
|
||
// Object mapper for creating payload, ignore null fields | ||
private static final ObjectMapper mapper = | ||
new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); | ||
|
||
private final CloseableHttpClient httpClient; | ||
private final RequestBuilder requestBuilder; | ||
private final String configureEndpoint; | ||
private final String role; | ||
private final String database; | ||
private final String schema; | ||
private final String table; | ||
|
||
/** | ||
* Builder method to create a {@link ConfigureCallHandler} | ||
* | ||
* @param httpClient the HTTP client | ||
* @param requestBuilder the request builder | ||
* @param configureEndpoint the configure endpoint | ||
* @return a {@link ConfigureCallHandlerBuilder} object | ||
*/ | ||
static ConfigureCallHandlerBuilder builder( | ||
CloseableHttpClient httpClient, RequestBuilder requestBuilder, String configureEndpoint) { | ||
return new ConfigureCallHandlerBuilder(httpClient, requestBuilder, configureEndpoint); | ||
} | ||
|
||
/** Builder class to build a {@link ConfigureCallHandler} */ | ||
static class ConfigureCallHandlerBuilder { | ||
private final CloseableHttpClient httpClient; | ||
private final RequestBuilder requestBuilder; | ||
private final String configureEndpoint; | ||
private String role; | ||
private String database; | ||
private String schema; | ||
private String table; | ||
private boolean isTestMode; | ||
|
||
/** | ||
* Constructor for ConfigureCallHandlerBuilder | ||
* | ||
* @param httpClient the HTTP client | ||
* @param requestBuilder the request builder | ||
* @param configureEndpoint the configure endpoint | ||
*/ | ||
ConfigureCallHandlerBuilder( | ||
CloseableHttpClient httpClient, RequestBuilder requestBuilder, String configureEndpoint) { | ||
this.httpClient = httpClient; | ||
this.requestBuilder = requestBuilder; | ||
this.configureEndpoint = configureEndpoint; | ||
} | ||
|
||
public ConfigureCallHandlerBuilder setRole(String role) { | ||
this.role = role; | ||
return this; | ||
} | ||
|
||
public ConfigureCallHandlerBuilder setDatabase(String database) { | ||
this.database = database; | ||
return this; | ||
} | ||
|
||
public ConfigureCallHandlerBuilder setSchema(String schema) { | ||
this.schema = schema; | ||
return this; | ||
} | ||
|
||
public ConfigureCallHandlerBuilder setTable(String table) { | ||
this.table = table; | ||
return this; | ||
} | ||
|
||
public ConfigureCallHandlerBuilder setIsTestMode(boolean isTestMode) { | ||
this.isTestMode = isTestMode; | ||
return this; | ||
} | ||
|
||
public ConfigureCallHandler build() { | ||
return new ConfigureCallHandler(this); | ||
} | ||
} | ||
|
||
ConfigureCallHandler(ConfigureCallHandlerBuilder builder) { | ||
if (!builder.isTestMode) { | ||
Utils.assertNotNull("http client", builder.httpClient); | ||
Utils.assertNotNull("request builder", builder.requestBuilder); | ||
Utils.assertStringNotNullOrEmpty("configure endpoint", builder.configureEndpoint); | ||
} | ||
|
||
this.httpClient = builder.httpClient; | ||
this.requestBuilder = builder.requestBuilder; | ||
this.configureEndpoint = builder.configureEndpoint; | ||
this.role = builder.role; | ||
this.database = builder.database; | ||
this.schema = builder.schema; | ||
this.table = builder.table; | ||
} | ||
|
||
/** | ||
* Make a configure call to the Snowflake service | ||
* | ||
* @return the configure response | ||
* @throws IOException | ||
*/ | ||
ConfigureResponse makeConfigureCall() throws IOException { | ||
return makeConfigureCall(makeConfigurePayload()); | ||
} | ||
|
||
/** | ||
* Make a configure call to the Snowflake service with a file name, used for GCS | ||
* | ||
* @param fileName the file name | ||
* @return the configure response | ||
* @throws IOException | ||
*/ | ||
ConfigureResponse makeConfigureCall(String fileName) throws IOException { | ||
Map<String, String> payload = makeConfigurePayload(); | ||
payload.put("file_name", fileName); | ||
return makeConfigureCall(payload); | ||
} | ||
|
||
private ConfigureResponse makeConfigureCall(Map<String, String> payload) throws IOException { | ||
try { | ||
ConfigureResponse response = | ||
executeWithRetries( | ||
ConfigureResponse.class, | ||
this.configureEndpoint, | ||
mapper.writeValueAsString(payload), | ||
"client configure", | ||
STREAMING_CLIENT_CONFIGURE, | ||
this.httpClient, | ||
this.requestBuilder); | ||
|
||
// Check for Snowflake specific response code | ||
if (response.getStatusCode() != RESPONSE_SUCCESS) { | ||
throw new SFException(ErrorCode.CLIENT_CONFIGURE_FAILURE, response.getMessage()); | ||
} | ||
return response; | ||
} catch (IngestResponseException e) { | ||
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); | ||
} | ||
} | ||
|
||
private Map<String, String> makeConfigurePayload() { | ||
Map<String, String> payload = new HashMap<>(); | ||
payload.put("role", this.role); | ||
payload.put("database", this.database); | ||
payload.put("schema", this.schema); | ||
payload.put("table", this.table); | ||
return payload; | ||
} | ||
} |
124 changes: 124 additions & 0 deletions
124
src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
/* | ||
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. | ||
*/ | ||
|
||
package net.snowflake.ingest.streaming.internal; | ||
|
||
import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT; | ||
|
||
import java.io.IOException; | ||
import java.util.Calendar; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import net.snowflake.client.jdbc.SnowflakeSQLException; | ||
import net.snowflake.ingest.utils.ErrorCode; | ||
import net.snowflake.ingest.utils.SFException; | ||
|
||
/** Class to manage multiple external volumes */ | ||
class ExternalVolumeManager<T> implements StorageManager<T> { | ||
|
||
// Reference to the external volume per table | ||
private final Map<String, StreamingIngestStorage> externalVolumeMap; | ||
private final SnowflakeStreamingIngestClientInternal<T> owningClient; | ||
|
||
private final boolean isTestMode; | ||
|
||
/** | ||
* Constructor for ExternalVolumeManager | ||
* | ||
* @param isTestMode whether the manager in test mode | ||
* @param client the owning client | ||
*/ | ||
ExternalVolumeManager(boolean isTestMode, SnowflakeStreamingIngestClientInternal<T> client) { | ||
this.owningClient = client; | ||
this.isTestMode = isTestMode; | ||
this.externalVolumeMap = new ConcurrentHashMap<>(); | ||
} | ||
|
||
/** | ||
* Given a blob, return the target storage by looking up the table name from the channel context | ||
* | ||
* @param blobData the blob to upload | ||
* @return target storage | ||
*/ | ||
@Override | ||
public StreamingIngestStorage getStorage(List<List<ChannelData<T>>> blobData) { | ||
// Only one chunk per blob in Iceberg mode. | ||
ChannelFlushContext channelContext = blobData.get(0).get(0).getChannelContext(); | ||
StreamingIngestStorage stage = | ||
this.externalVolumeMap.get(channelContext.getFullyQualifiedTableName()); | ||
|
||
if (stage == null) { | ||
throw new SFException( | ||
ErrorCode.INTERNAL_ERROR, | ||
String.format( | ||
"No storage found for table %s", channelContext.getFullyQualifiedTableName())); | ||
} | ||
|
||
return stage; | ||
} | ||
|
||
/** | ||
* Add a storage to the manager by looking up the table name from the open channel response | ||
* | ||
* @param openChannelResponse response from open channel | ||
*/ | ||
@Override | ||
public void addStorage(OpenChannelResponse openChannelResponse) { | ||
String fullyQualifiedTableName = | ||
String.format( | ||
"%s.%s.%s", | ||
openChannelResponse.getDBName(), | ||
openChannelResponse.getSchemaName(), | ||
openChannelResponse.getTableName()); | ||
if (!this.externalVolumeMap.containsKey(fullyQualifiedTableName)) { | ||
try { | ||
ConfigureCallHandler configureCallHandler = | ||
ConfigureCallHandler.builder( | ||
this.owningClient.getHttpClient(), | ||
this.owningClient.getRequestBuilder(), | ||
CHANNEL_CONFIGURE_ENDPOINT) | ||
.setRole(this.owningClient.getRole()) | ||
.setDatabase(openChannelResponse.getDBName()) | ||
.setSchema(openChannelResponse.getSchemaName()) | ||
.setTable(openChannelResponse.getTableName()) | ||
.build(); | ||
this.externalVolumeMap.put( | ||
fullyQualifiedTableName, | ||
new StreamingIngestStorage( | ||
isTestMode, | ||
configureCallHandler, | ||
this.owningClient.getName(), | ||
DEFAULT_MAX_UPLOAD_RETRIES)); | ||
} catch (SnowflakeSQLException | IOException err) { | ||
throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE); | ||
} | ||
} | ||
} | ||
|
||
// TODO: SNOW-1502887 Blob path generation for iceberg table | ||
@Override | ||
public String generateBlobPath() { | ||
return "snow_dummy_file_name"; | ||
} | ||
|
||
// TODO: SNOW-1502887 Blob path generation for iceberg table | ||
@Override | ||
public String getBlobPath(Calendar calendar, String clientPrefix) { | ||
return ""; | ||
} | ||
|
||
/** | ||
* Get the client prefix from first external volume in the map | ||
* | ||
* @return the client prefix | ||
*/ | ||
@Override | ||
public String getClientPrefix() { | ||
if (this.externalVolumeMap.isEmpty()) { | ||
return null; | ||
} | ||
return this.externalVolumeMap.values().iterator().next().getClientPrefix(); | ||
} | ||
} |
Oops, something went wrong.