Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1497358 Support multiple storage for Iceberg mode #783

Merged
merged 12 commits into from
Jul 11, 2024
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012-2017 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.connection;
Expand Down Expand Up @@ -42,7 +42,8 @@ public enum ApiName {
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_CHANNEL_CONFIGURE("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries;
import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.connection.ServiceResponseHandler;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

/** A class that is used to configure a Snowflake Ingest Storage */
class ConfigureCallHandler {

// Object mapper for creating payload, ignore null fields
private static final ObjectMapper mapper =
new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we reuse object mappers across the process, via some JsonUtils class or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to create JsonUtils with a pool of object mappers to balance between concurrent mapping and memory. Could we add it to backlog and address this later?


private final CloseableHttpClient httpClient;
private final RequestBuilder requestBuilder;
private final ServiceResponseHandler.ApiName apiName;
private final String configureEndpoint;
private final String role;
private final String database;
private final String schema;
private final String table;

/**
* Builder method to create a {@link ConfigureCallHandler}
*
* @param httpClient the HTTP client
* @param requestBuilder the request builder
* @param apiName the API name, used for logging
* @param configureEndpoint the configure endpoint
* @return a {@link ConfigureCallHandlerBuilder} object
*/
static ConfigureCallHandlerBuilder builder(
CloseableHttpClient httpClient,
RequestBuilder requestBuilder,
ServiceResponseHandler.ApiName apiName,
String configureEndpoint) {
return new ConfigureCallHandlerBuilder(httpClient, requestBuilder, apiName, configureEndpoint);
}

/** Builder class to build a {@link ConfigureCallHandler} */
static class ConfigureCallHandlerBuilder {
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
private final CloseableHttpClient httpClient;
private final RequestBuilder requestBuilder;
private final ServiceResponseHandler.ApiName ApiName;
private final String configureEndpoint;
private String role;
private String database;
private String schema;
private String table;
private boolean isTestMode;

/**
* Constructor for ConfigureCallHandlerBuilder
*
* @param httpClient the HTTP client
* @param requestBuilder the request builder
* @param apiName the API name, used for logging
* @param configureEndpoint the configure endpoint
*/
ConfigureCallHandlerBuilder(
CloseableHttpClient httpClient,
RequestBuilder requestBuilder,
ServiceResponseHandler.ApiName apiName,
String configureEndpoint) {
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
this.ApiName = apiName;
this.configureEndpoint = configureEndpoint;
}

public ConfigureCallHandlerBuilder setRole(String role) {
this.role = role;
return this;
}

public ConfigureCallHandlerBuilder setDatabase(String database) {
this.database = database;
return this;
}

public ConfigureCallHandlerBuilder setSchema(String schema) {
this.schema = schema;
return this;
}

public ConfigureCallHandlerBuilder setTable(String table) {
this.table = table;
return this;
}

public ConfigureCallHandlerBuilder setIsTestMode(boolean isTestMode) {
this.isTestMode = isTestMode;
return this;
}

public ConfigureCallHandler build() {
return new ConfigureCallHandler(this);
}
}

ConfigureCallHandler(ConfigureCallHandlerBuilder builder) {
if (!builder.isTestMode) {
Utils.assertNotNull("http client", builder.httpClient);
Utils.assertNotNull("request builder", builder.requestBuilder);
Utils.assertNotNull("api name", builder.ApiName);
Utils.assertStringNotNullOrEmpty("configure endpoint", builder.configureEndpoint);
}

this.httpClient = builder.httpClient;
this.requestBuilder = builder.requestBuilder;
this.apiName = builder.ApiName;
this.configureEndpoint = builder.configureEndpoint;
this.role = builder.role;
this.database = builder.database;
this.schema = builder.schema;
this.table = builder.table;
}

/**
* Make a configure call to the Snowflake service
*
* @return the configure response
* @throws IOException
*/
ConfigureResponse makeConfigureCall() throws IOException {
return makeConfigureCall(makeConfigurePayload());
}

/**
* Make a configure call to the Snowflake service with a file name, used for GCS
*
* @param fileName the file name
* @return the configure response
* @throws IOException
*/
ConfigureResponse makeConfigureCall(String fileName) throws IOException {
Map<String, String> payload = makeConfigurePayload();
payload.put("file_name", fileName);
return makeConfigureCall(payload);
}

private ConfigureResponse makeConfigureCall(Map<String, String> payload) throws IOException {
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
try {
ConfigureResponse response =
executeWithRetries(
ConfigureResponse.class,
this.configureEndpoint,
mapper.writeValueAsString(payload),
"client configure",
this.apiName,
this.httpClient,
this.requestBuilder);

// Check for Snowflake specific response code
if (response.getStatusCode() != RESPONSE_SUCCESS) {
throw new SFException(ErrorCode.CONFIGURE_FAILURE, response.getMessage());
}
return response;
} catch (IngestResponseException e) {
throw new SFException(e, ErrorCode.CONFIGURE_FAILURE, e.getMessage());
}
}

private Map<String, String> makeConfigurePayload() {
Map<String, String> payload = new HashMap<>();
payload.put("role", this.role);
payload.put("database", this.database);
payload.put("schema", this.schema);
payload.put("table", this.table);
return payload;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_CONFIGURE;
import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT;

import java.io.IOException;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;

/** Class to manage multiple external volumes */
class ExternalVolumeManager<T> implements StorageManager<T> {

// Reference to the external volume per table
private final Map<String, StreamingIngestStorage> externalVolumeMap;
private final SnowflakeStreamingIngestClientInternal<T> owningClient;

private final boolean isTestMode;

/**
* Constructor for ExternalVolumeManager
*
* @param isTestMode whether the manager in test mode
* @param client the owning client
*/
ExternalVolumeManager(boolean isTestMode, SnowflakeStreamingIngestClientInternal<T> client) {
this.owningClient = client;
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
this.isTestMode = isTestMode;
this.externalVolumeMap = new ConcurrentHashMap<>();
}

/**
* Given a blob, return the target storage by looking up the table name from the channel context
*
* @param blobData the blob to upload
* @return target storage
*/
@Override
public StreamingIngestStorage getStorage(List<List<ChannelData<T>>> blobData) {
// Only one chunk per blob in Iceberg mode.
ChannelFlushContext channelContext = blobData.get(0).get(0).getChannelContext();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets pass in the channelContext on the method signature instead of assuming blobData's structure is constructed a certain way by the caller.

StreamingIngestStorage stage =
this.externalVolumeMap.get(channelContext.getFullyQualifiedTableName());

if (stage == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as another thread: does customer code get a chance to intercept this exception? Does it cause a process crash (or does it silently get ignored) ?

Copy link
Contributor Author

@sfc-gh-alhuang sfc-gh-alhuang Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cause a process crash. I don't think this is suppose to happen given that client seems impossible to ingest to a channel without open it via StreamingIngestClient

String.format(
"No storage found for table %s", channelContext.getFullyQualifiedTableName()));
}

return stage;
}

/**
* Add a storage to the manager by looking up the table name from the open channel response
*
* @param openChannelResponse response from open channel
*/
@Override
public void addStorage(OpenChannelResponse openChannelResponse) {
String fullyQualifiedTableName =
String.format(
"%s.%s.%s",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. do we have other places that do this concatenation? lets unify all these fully-qualified-name-construction pieces of code into one util method.
  2. How is each component's encoding taken care of? There's some single quotes wrapping/unwrapping business that probably needs to be accounted for.
  3. String.format behaves differently for different "cultures" (string.format for english typically behaves differently than string.format for arabic), please investigate how String.format infers the culture - is it some context on the thread? on the process? You might have to explicitly pass in "invariant" culture if such a thing exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created unify method for fully-qualified-name in StreamingIngestUtils. I don't think we deal with the cultures issue explicitly now. I think if the naming pattern follows the snowflake rule we should be fine.

openChannelResponse.getDBName(),
openChannelResponse.getSchemaName(),
openChannelResponse.getTableName());
if (!this.externalVolumeMap.containsKey(fullyQualifiedTableName)) {
try {
ConfigureCallHandler configureCallHandler =
ConfigureCallHandler.builder(
this.owningClient.getHttpClient(),
this.owningClient.getRequestBuilder(),
STREAMING_CHANNEL_CONFIGURE,
CHANNEL_CONFIGURE_ENDPOINT)
.setRole(this.owningClient.getRole())
.setDatabase(openChannelResponse.getDBName())
.setSchema(openChannelResponse.getSchemaName())
.setTable(openChannelResponse.getTableName())
.build();
this.externalVolumeMap.put(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there were two concurrent addStorage calls for the same table? Suggest taking a lock after if (!containsKey) in line 76 so that we only ever construct one StreamingIngestStorage object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to putIfAbsent with ConcurrentHashMap to address concurrent acces.

fullyQualifiedTableName,
new StreamingIngestStorage(
isTestMode,
configureCallHandler,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the thought process here but here's the problem - you're forcing CLIENT_CONFIGURE and CHANNEL_CONFIGURE APIs to behave almost exactly the same.

  • The goal is for StreamingIngestStorage to have a clean way to call the configure API without knowing whether we're in iceberg mode or not, and without knowing which API was called to get that info.
  • The StorageManager abstraction is serving pretty much this purpose already
  • I suggest passing in the StorageManager interface object to StreamingIngestStorage, and adding a configure() method on that interface that StreamingIngestStorage can call. Let this method return a strongly typed object, and each concrete impl of that interface will call whatever it wants on the service side to get latest tokens / figs ids / etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added owningManager for this issue in StreamingIngestStorage.

this.owningClient.getName(),
DEFAULT_MAX_UPLOAD_RETRIES));
} catch (SnowflakeSQLException | IOException err) {
throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. what happens when this exception is thrown? Does the customer's code get an opportunity to handle this exception, or does it go unhandled and cause a process crash?
  2. Lets add retry handling to all API calls made via the new SnowflakeServiceClient I proposed in another thread. Need retry with backoff and jitter.
  3. Can we log to whatever log provider is used by the sdk before throwing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added log for this and retry logic is in executeWithRetries. If the error is thrown, the process stop immediately.

}
}
}

// TODO: SNOW-1502887 Blob path generation for iceberg table
@Override
public String generateBlobPath() {
return "snow_dummy_file_name";
}

// TODO: SNOW-1502887 Blob path generation for iceberg table
@Override
public String getBlobPath(Calendar calendar, String clientPrefix) {
return "";
}

/**
* Get the client prefix from first external volume in the map
*
* @return the client prefix
*/
@Override
public String getClientPrefix() {
if (this.externalVolumeMap.isEmpty()) {
return null;
}
return this.externalVolumeMap.values().iterator().next().getClientPrefix();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets chat about this, am not clear on what's being done here. Couple issues I see here:

  1. return type should be Optional if there's a chance we'll return null
  2. why is the first external volume "special"?
  3. map.values doesn't typically guarantee what order it iterates in (for some map implementations if not all), so there's no guarantee that every call to map.values.iterator.next will give you the same object.
  4. It appears that clientPrefix is a static value that isn't meant to change, so we should pass it in via the ctor if possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the clientPrefix logic to first configure call.

}
}
Loading
Loading