Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1057812: Support Open Rowset API Channel #731

Closed
wants to merge 5 commits into from

Conversation

sfc-gh-tzhang
Copy link
Contributor

@sfc-gh-tzhang sfc-gh-tzhang commented Mar 26, 2024

This PR contains the change to allow creation of a Rowset API channel:

  • No behavior change on existing CLOUD_STORAGE interface
  • Add an option to existing OpenChannelRequest to allow the creation of a Rowset API channel with ChannelType=ROWSET_API
  • The OpenChannelRequest will be converted internally to a new REST API added for Rowset API, and we have a new implementation of the channel class (SnowflakeStreamingIngestChannelRowset) which is much simpler than the one for CLOUD_STORAGE

Note that we're merging into a feature branch instead of master

@sfc-gh-tzhang sfc-gh-tzhang marked this pull request as ready for review March 27, 2024 05:27
@sfc-gh-tzhang sfc-gh-tzhang requested review from a team as code owners March 27, 2024 05:27
Copy link
Contributor

@sfc-gh-asen sfc-gh-asen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes

@@ -114,19 +135,23 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
Utils.assertStringNotNullOrEmpty("channel name", builder.channelName);
Utils.assertStringNotNullOrEmpty("database name", builder.dbName);
Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName);
Utils.assertStringNotNullOrEmpty("table name", builder.tableName);
Utils.assertStringsNotNullOrEmpty(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also assert that both table and pipe name are not set at the same time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a good edge case check to add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior right now is that the table name will be ignored if there is a pipe name, I thought about assert if both are provided but I think it's too restricted, it's perfect valid if customer inputs both and pipe is created on that table

@@ -247,6 +262,8 @@ public interface SnowflakeStreamingIngestChannel {
*/
InsertValidationResponse insertRow(Map<String, Object> row, @Nullable String offsetToken);

InsertValidationResponse insertRow(Map<String, Object> row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which cases will a customers use insertRow without an offsetToken?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah shouldn't this be InsertValidationResponse insertRow(Map<String, Object> row, @Nullable String offsetToken);?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember that we say the offset_token will be per row for Rowset API?

* Get the fully qualified channel name
*
* @return fully qualified name of the channel, in the format of
* dbName.schemaName.tableName.channelName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : or dbName.schemaName.pipeName.channelName

Comment on lines 288 to 289
InsertValidationResponse insertRows(
Iterable<Map<String, Object>> rows, @Nullable String offsetToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need to add something like

insertRows(Iterable<Map<String, Object>> rows, List offsetTokens)

for partial ingestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the one below, offset_token will be part of the row now.

* @throws SFException
*/
/** Assert when all the Strings are either null or Empty */
public static void assertStringsNotNullOrEmpty(String name, Collection<String> values)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: when I first saw this method, I assumed this was asserting that all the strings in the collection should be not null, maybe we can rename it to something like assertAtLeastOneStringNotNullOrEmpty to be extra clear?

Comment on lines +475 to +485
/**
* Get the type of channel, please referring to {@link
* net.snowflake.ingest.streaming.OpenChannelRequest.ChannelType} for the supported channel type
*
* @return type of the channel
*/
@Override
public OpenChannelRequest.ChannelType getType() {
return OpenChannelRequest.ChannelType.CLOUD_STORAGE;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this class to SnowflakeStreamingIngestChannelCloud to be parallel with the rowset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can do that in a separate PR

return SnowflakeStreamingIngestChannelFactory.<T>builder(request.getChannelName())
.setDBName(request.getDBName())
.setSchemaName(request.getSchemaName())
.setTableName(request.getTableName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to setPipeName as well

Comment on lines +20 to +23
public enum ChannelType {
CLOUD_STORAGE,
ROWSET_API,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this exists already? If you look at StreamingResource there's a check somewhere that compares against an enum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the one used before was hided in the SDK, now this is a public enum in the public class

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants