Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1497358 Support multiple storage for Iceberg mode #783

Merged
merged 12 commits into from
Jul 11, 2024

Conversation

sfc-gh-alhuang
Copy link
Contributor

This PR includes several refactoring for FlushService and StreamingIngestInternalStage to support Iceberg/Non-Iceberg mode file upload

  • Create ConfigureCallHandler for different configure call includes per client (Non-Iceberg) and per channel (Iceberg mode).
  • Create StorageManager to manage storage and blob path generation.

Tests for Iceberg mode will be added once the refactor review is done.

@sfc-gh-alhuang sfc-gh-alhuang changed the base branch from master to iceberg-support June 24, 2024 23:08
@sfc-gh-alhuang sfc-gh-alhuang changed the title SNOW-1497358 Support multiple storages for Iceberg mode SNOW-1497358 Support multiple storage for Iceberg mode Jun 24, 2024
@sfc-gh-alhuang sfc-gh-alhuang force-pushed the alhuang-iceberg-multiple-stages branch from d98dabd to 082bf07 Compare June 24, 2024 23:16
@sfc-gh-alhuang sfc-gh-alhuang force-pushed the alhuang-iceberg-multiple-stages branch from 082bf07 to 44cc28e Compare June 24, 2024 23:45
*
* @param openChannelResponse response from open channel
*/
void addStorage(OpenChannelResponse openChannelResponse);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets not make the StorageManager interface directly interact with OpenChannelResponse, and pass in the storage info object instead. It leaves the door open in cas we want to use this one interface for non-iceberg tables too, over time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the signature to addStorage(dbName, schemaName, tableName, channelContext)


// Object mapper for creating payload, ignore null fields
private static final ObjectMapper mapper =
new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we reuse object mappers across the process, via some JsonUtils class or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to create JsonUtils with a pool of object mappers to balance between concurrent mapping and memory. Could we add it to backlog and address this later?

@Override
public StreamingIngestStorage getStorage(List<List<ChannelData<T>>> blobData) {
// Only one chunk per blob in Iceberg mode.
ChannelFlushContext channelContext = blobData.get(0).get(0).getChannelContext();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets pass in the channelContext on the method signature instead of assuming blobData's structure is constructed a certain way by the caller.

Comment on lines 71 to 72
String.format(
"%s.%s.%s",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. do we have other places that do this concatenation? lets unify all these fully-qualified-name-construction pieces of code into one util method.
  2. How is each component's encoding taken care of? There's some single quotes wrapping/unwrapping business that probably needs to be accounted for.
  3. String.format behaves differently for different "cultures" (string.format for english typically behaves differently than string.format for arabic), please investigate how String.format infers the culture - is it some context on the thread? on the process? You might have to explicitly pass in "invariant" culture if such a thing exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created unify method for fully-qualified-name in StreamingIngestUtils. I don't think we deal with the cultures issue explicitly now. I think if the naming pattern follows the snowflake rule we should be fine.

this.owningClient.getName(),
DEFAULT_MAX_UPLOAD_RETRIES));
} catch (SnowflakeSQLException | IOException err) {
throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. what happens when this exception is thrown? Does the customer's code get an opportunity to handle this exception, or does it go unhandled and cause a process crash?
  2. Lets add retry handling to all API calls made via the new SnowflakeServiceClient I proposed in another thread. Need retry with backoff and jitter.
  3. Can we log to whatever log provider is used by the sdk before throwing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added log for this and retry logic is in executeWithRetries. If the error is thrown, the process stop immediately.


if (stage == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as another thread: does customer code get a chance to intercept this exception? Does it cause a process crash (or does it silently get ignored) ?

Copy link
Contributor Author

@sfc-gh-alhuang sfc-gh-alhuang Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cause a process crash. I don't think this is suppose to happen given that client seems impossible to ingest to a channel without open it via StreamingIngestClient

fullyQualifiedTableName,
new StreamingIngestStorage(
isTestMode,
configureCallHandler,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the thought process here but here's the problem - you're forcing CLIENT_CONFIGURE and CHANNEL_CONFIGURE APIs to behave almost exactly the same.

  • The goal is for StreamingIngestStorage to have a clean way to call the configure API without knowing whether we're in iceberg mode or not, and without knowing which API was called to get that info.
  • The StorageManager abstraction is serving pretty much this purpose already
  • I suggest passing in the StorageManager interface object to StreamingIngestStorage, and adding a configure() method on that interface that StreamingIngestStorage can call. Let this method return a strongly typed object, and each concrete impl of that interface will call whatever it wants on the service side to get latest tokens / figs ids / etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added owningManager for this issue in StreamingIngestStorage.

.setSchema(openChannelResponse.getSchemaName())
.setTable(openChannelResponse.getTableName())
.build();
this.externalVolumeMap.put(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there were two concurrent addStorage calls for the same table? Suggest taking a lock after if (!containsKey) in line 76 so that we only ever construct one StreamingIngestStorage object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to putIfAbsent with ConcurrentHashMap to address concurrent acces.

Comment on lines 121 to 124
if (this.externalVolumeMap.isEmpty()) {
return null;
}
return this.externalVolumeMap.values().iterator().next().getClientPrefix();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets chat about this, am not clear on what's being done here. Couple issues I see here:

  1. return type should be Optional if there's a chance we'll return null
  2. why is the first external volume "special"?
  3. map.values doesn't typically guarantee what order it iterates in (for some map implementations if not all), so there's no guarantee that every call to map.values.iterator.next will give you the same object.
  4. It appears that clientPrefix is a static value that isn't meant to change, so we should pass it in via the ctor if possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the clientPrefix logic to first configure call.

@@ -444,11 +408,8 @@ && shouldStopProcessing(
}

// Kick off a build job
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The counter.decrement is gone, which manes blob names can have gaps now?

@@ -116,26 +111,43 @@ state to record unknown age.
}
}

/**
* Default constructor for external volume
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this looks odd, having different constructors for different flows when we already are trying to abstract away the differences with the StorageManager interface.
Am guessing what's happening here is that in iceberg mode, we first get the fileLocation from open_channel but subsequently call channel_configure for token renewal. Whereas for iceberg mode we get file location and renewal tokens from the same CLIENT_CONFIGURE api call, thus you're doing it this way? If so I have a suggestion here:

  1. As suggested in another thread - ExternalVolumemanager's line 93 - the StreamingIngestStorage class should just take in an object that knows how to renew tokens and doesn't impose client_configure / channel_configure should have the same call handler.
  2. Always take in a FileLocationInfo object for both iceberg and non-iceberg modes, so that the "boostrap" codepath is also identical

this means for non-iceberg, you'll have to make an explicit call to client_configure at the callsite of StreamingIngestStorage.ctor to get the FileLocationInfo for the internal stage, which IMO is okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to the same ctor.

Copy link
Contributor Author

@sfc-gh-alhuang sfc-gh-alhuang Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move these API request and response class to a separate package/directory?

@@ -560,26 +518,36 @@ BlobMetadata buildAndUpload(String blobPath, List<List<ChannelData<T>>> blobData

blob.blobStats.setBuildDurationMs(buildContext);

return upload(blobPath, blob.blobBytes, blob.chunksMetadataList, blob.blobStats);
return upload(
this.storageManager.getStorage(blobData.get(0).get(0).getChannelContext()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets chat about this f2f. Looks ripe for a nullref or a logical bug where the wrong channel context gets passed in.
Ideally you want to pass the channelContext into this method as an argument and not code in this get(0).get(0) assumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the getting channel context logic to call site.

sfc-gh-alhuang and others added 3 commits July 1, 2024 17:32
… configure response (#787)

cleanup configurerequest / configureresponse and separate out channelConfigureResponse class
@sfc-gh-alhuang sfc-gh-alhuang marked this pull request as ready for review July 3, 2024 20:54
@sfc-gh-alhuang sfc-gh-alhuang requested review from sfc-gh-tzhang and a team as code owners July 3, 2024 20:54
Copy link
Contributor

@sfc-gh-tzhang sfc-gh-tzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, PTAL!

ChannelFlushContext channelFlushContext) {
// Only one chunk per blob in Iceberg mode.
StreamingIngestStorage<T, ExternalVolumeLocation> stage =
this.externalVolumeMap.get(channelFlushContext.getFullyQualifiedTableName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does channelFlushContext.getFullyQualifiedTableName take care of the name resolution? For example, A.B.C and a.b.c are the same table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getFullyQualifiedTableName uses the information directly from server response. As long as the table name from server is consist, it should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the table name from server is consist, it should be fine.

Have you verify this? Basically do an experiment with A.B.C and a.b.c, the fully qualified name should be A.B.C for both case

/**
* The SnowflakeServiceClient class is responsible for making API requests to the Snowflake service.
*/
class SnowflakeServiceClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me that there're many duplicate logic for each function, is there any room for consolidation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this is marked as resolved?

throw new SFException(ErrorCode.OPEN_CHANNEL_FAILURE, response.getMessage());
}
OpenChannelRequestInternal openChannelRequest =
new OpenChannelRequestInternal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you miss isOffsetTokenProvided?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request.getOffsetToken() is okay to be null. As @JsonInclude(JsonInclude.Include.NON_NULL) in OpenChannelRequest will ignore it automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if I understand, do you mean that the isOffsetTokenProvided field is redundant? I'm assuming it was added for a reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was used to avoid putting null value in payload string. As we are changing to json serialization now, using @JsonInclude(JsonInclude.Include.NON_NULL) will ignore the null fields. Removed isOffsetTokenProvided in OpenChannelRequest.

@sfc-gh-alhuang sfc-gh-alhuang force-pushed the iceberg-support branch 2 times, most recently from 1678a6d to a9aa682 Compare July 10, 2024 21:01
Copy link
Collaborator

@sfc-gh-hmadan sfc-gh-hmadan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@@ -102,8 +94,8 @@ List<List<ChannelData<T>>> getData() {
// Reference to the channel cache
private final ChannelCache<T> channelCache;

// Reference to the Streaming Ingest stage
private final StreamingIngestStage targetStage;
// Reference to the Stream Ingest storage manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream Ingest?

/**
* The SnowflakeServiceClient class is responsible for making API requests to the Snowflake service.
*/
class SnowflakeServiceClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this is marked as resolved?

throw new SFException(ErrorCode.OPEN_CHANNEL_FAILURE, response.getMessage());
}
OpenChannelRequestInternal openChannelRequest =
new OpenChannelRequestInternal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if I understand, do you mean that the isOffsetTokenProvided field is redundant? I'm assuming it was added for a reason.

@@ -101,4 +101,11 @@ void invalidateChannelIfSequencersMatch(
int getSize() {
return cache.size();
}

/** Get the number of channels for a given table */
int getSizePerTable(String fullyQualifiedTableName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int getSizePerTable(String fullyQualifiedTableName) {
int getChannelCountForTable(String fullyQualifiedTableName) {

Comment on lines 107 to 108
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> channelsMapPerTable =
cache.get(fullyQualifiedTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this thread safe? It's possible that the count changes between these two calls, right?

@@ -93,25 +93,28 @@ private abstract static class TestContext<T> implements AutoCloseable {
ChannelCache<T> channelCache;
final Map<String, SnowflakeStreamingIngestChannelInternal<T>> channels = new HashMap<>();
FlushService<T> flushService;
StreamingIngestStage stage;
StorageManager<T, InternalStageLocation> storageManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have tests for both the storage types?

Copy link
Contributor

@sfc-gh-tzhang sfc-gh-tzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline with @sfc-gh-alhuang, thanks!

@sfc-gh-alhuang sfc-gh-alhuang merged commit eac448a into iceberg-support Jul 11, 2024
13 checks passed
@sfc-gh-alhuang sfc-gh-alhuang deleted the alhuang-iceberg-multiple-stages branch July 11, 2024 20:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants