Skip to content

Commit

Permalink
Support multiple stage for iceberg mode
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jun 24, 2024
1 parent a9aa682 commit 44cc28e
Show file tree
Hide file tree
Showing 12 changed files with 682 additions and 231 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012-2017 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.connection;
Expand Down Expand Up @@ -42,7 +42,8 @@ public enum ApiName {
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_CHANNEL_CONFIGURE("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries;
import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.connection.ServiceResponseHandler;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

/** A class that is used to configure a Snowflake Ingest Storage */
class ConfigureCallHandler {

// Object mapper for creating payload, ignore null fields
private static final ObjectMapper mapper =
new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);

private final CloseableHttpClient httpClient;
private final RequestBuilder requestBuilder;
private final ServiceResponseHandler.ApiName apiName;
private final String configureEndpoint;
private final String role;
private final String database;
private final String schema;
private final String table;

/**
* Builder method to create a {@link ConfigureCallHandler}
*
* @param httpClient the HTTP client
* @param requestBuilder the request builder
* @param apiName the API name, used for logging
* @param configureEndpoint the configure endpoint
* @return a {@link ConfigureCallHandlerBuilder} object
*/
static ConfigureCallHandlerBuilder builder(
CloseableHttpClient httpClient,
RequestBuilder requestBuilder,
ServiceResponseHandler.ApiName apiName,
String configureEndpoint) {
return new ConfigureCallHandlerBuilder(httpClient, requestBuilder, apiName, configureEndpoint);
}

/** Builder class to build a {@link ConfigureCallHandler} */
static class ConfigureCallHandlerBuilder {
private final CloseableHttpClient httpClient;
private final RequestBuilder requestBuilder;
private final ServiceResponseHandler.ApiName ApiName;
private final String configureEndpoint;
private String role;
private String database;
private String schema;
private String table;
private boolean isTestMode;

/**
* Constructor for ConfigureCallHandlerBuilder
*
* @param httpClient the HTTP client
* @param requestBuilder the request builder
* @param apiName the API name, used for logging
* @param configureEndpoint the configure endpoint
*/
ConfigureCallHandlerBuilder(
CloseableHttpClient httpClient,
RequestBuilder requestBuilder,
ServiceResponseHandler.ApiName apiName,
String configureEndpoint) {
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
this.ApiName = apiName;
this.configureEndpoint = configureEndpoint;
}

public ConfigureCallHandlerBuilder setRole(String role) {
this.role = role;
return this;
}

public ConfigureCallHandlerBuilder setDatabase(String database) {
this.database = database;
return this;
}

public ConfigureCallHandlerBuilder setSchema(String schema) {
this.schema = schema;
return this;
}

public ConfigureCallHandlerBuilder setTable(String table) {
this.table = table;
return this;
}

public ConfigureCallHandlerBuilder setIsTestMode(boolean isTestMode) {
this.isTestMode = isTestMode;
return this;
}

public ConfigureCallHandler build() {
return new ConfigureCallHandler(this);
}
}

ConfigureCallHandler(ConfigureCallHandlerBuilder builder) {
if (!builder.isTestMode) {
Utils.assertNotNull("http client", builder.httpClient);
Utils.assertNotNull("request builder", builder.requestBuilder);
Utils.assertNotNull("api name", builder.ApiName);
Utils.assertStringNotNullOrEmpty("configure endpoint", builder.configureEndpoint);
}

this.httpClient = builder.httpClient;
this.requestBuilder = builder.requestBuilder;
this.apiName = builder.ApiName;
this.configureEndpoint = builder.configureEndpoint;
this.role = builder.role;
this.database = builder.database;
this.schema = builder.schema;
this.table = builder.table;
}

/**
* Make a configure call to the Snowflake service
*
* @return the configure response
* @throws IOException
*/
ConfigureResponse makeConfigureCall() throws IOException {
return makeConfigureCall(makeConfigurePayload());
}

/**
* Make a configure call to the Snowflake service with a file name, used for GCS
*
* @param fileName the file name
* @return the configure response
* @throws IOException
*/
ConfigureResponse makeConfigureCall(String fileName) throws IOException {
Map<String, String> payload = makeConfigurePayload();
payload.put("file_name", fileName);
return makeConfigureCall(payload);
}

private ConfigureResponse makeConfigureCall(Map<String, String> payload) throws IOException {
try {
ConfigureResponse response =
executeWithRetries(
ConfigureResponse.class,
this.configureEndpoint,
mapper.writeValueAsString(payload),
"client configure",
this.apiName,
this.httpClient,
this.requestBuilder);

// Check for Snowflake specific response code
if (response.getStatusCode() != RESPONSE_SUCCESS) {
throw new SFException(ErrorCode.CONFIGURE_FAILURE, response.getMessage());
}
return response;
} catch (IngestResponseException e) {
throw new SFException(e, ErrorCode.CONFIGURE_FAILURE, e.getMessage());
}
}

private Map<String, String> makeConfigurePayload() {
Map<String, String> payload = new HashMap<>();
payload.put("role", this.role);
payload.put("database", this.database);
payload.put("schema", this.schema);
payload.put("table", this.table);
return payload;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_CONFIGURE;
import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT;

import java.io.IOException;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;

/** Class to manage multiple external volumes */
class ExternalVolumeManager<T> implements StorageManager<T> {

// Reference to the external volume per table
private final Map<String, StreamingIngestStorage> externalVolumeMap;
private final SnowflakeStreamingIngestClientInternal<T> owningClient;

private final boolean isTestMode;

/**
* Constructor for ExternalVolumeManager
*
* @param isTestMode whether the manager in test mode
* @param client the owning client
*/
ExternalVolumeManager(boolean isTestMode, SnowflakeStreamingIngestClientInternal<T> client) {
this.owningClient = client;
this.isTestMode = isTestMode;
this.externalVolumeMap = new ConcurrentHashMap<>();
}

/**
* Given a blob, return the target storage by looking up the table name from the channel context
*
* @param blobData the blob to upload
* @return target storage
*/
@Override
public StreamingIngestStorage getStorage(List<List<ChannelData<T>>> blobData) {
// Only one chunk per blob in Iceberg mode.
ChannelFlushContext channelContext = blobData.get(0).get(0).getChannelContext();
StreamingIngestStorage stage =
this.externalVolumeMap.get(channelContext.getFullyQualifiedTableName());

if (stage == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"No storage found for table %s", channelContext.getFullyQualifiedTableName()));
}

return stage;
}

/**
* Add a storage to the manager by looking up the table name from the open channel response
*
* @param openChannelResponse response from open channel
*/
@Override
public void addStorage(OpenChannelResponse openChannelResponse) {
String fullyQualifiedTableName =
String.format(
"%s.%s.%s",
openChannelResponse.getDBName(),
openChannelResponse.getSchemaName(),
openChannelResponse.getTableName());
if (!this.externalVolumeMap.containsKey(fullyQualifiedTableName)) {
try {
ConfigureCallHandler configureCallHandler =
ConfigureCallHandler.builder(
this.owningClient.getHttpClient(),
this.owningClient.getRequestBuilder(),
STREAMING_CHANNEL_CONFIGURE,
CHANNEL_CONFIGURE_ENDPOINT)
.setRole(this.owningClient.getRole())
.setDatabase(openChannelResponse.getDBName())
.setSchema(openChannelResponse.getSchemaName())
.setTable(openChannelResponse.getTableName())
.build();
this.externalVolumeMap.put(
fullyQualifiedTableName,
new StreamingIngestStorage(
isTestMode,
configureCallHandler,
this.owningClient.getName(),
DEFAULT_MAX_UPLOAD_RETRIES));
} catch (SnowflakeSQLException | IOException err) {
throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE);
}
}
}

// TODO: SNOW-1502887 Blob path generation for iceberg table
@Override
public String generateBlobPath() {
return "snow_dummy_file_name";
}

// TODO: SNOW-1502887 Blob path generation for iceberg table
@Override
public String getBlobPath(Calendar calendar, String clientPrefix) {
return "";
}

/**
* Get the client prefix from first external volume in the map
*
* @return the client prefix
*/
@Override
public String getClientPrefix() {
if (this.externalVolumeMap.isEmpty()) {
return null;
}
return this.externalVolumeMap.values().iterator().next().getClientPrefix();
}
}
Loading

0 comments on commit 44cc28e

Please sign in to comment.