Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1057812: Support Open Rowset API Channel #731

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyPair;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import net.snowflake.client.jdbc.internal.apache.http.HttpHeaders;
import net.snowflake.client.jdbc.internal.apache.http.NameValuePair;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpGet;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPost;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpUriRequest;
Expand Down Expand Up @@ -674,16 +676,24 @@ public HttpGet generateHistoryRangeRequest(
* @param payload POST request payload as string
* @param endPoint REST API endpoint
* @param message error message if there are failures during HTTP building
* @param queryParameters POST request query parameters
* @return URI for the POST request
*/
public HttpPost generateStreamingIngestPostRequest(
String payload, String endPoint, String message) {
String payload, String endPoint, String message, List<NameValuePair> queryParameters) {
LOGGER.debug("Generate Snowpipe streaming request: endpoint={}, payload={}", endPoint, payload);
// Make the corresponding URI
URI uri = null;
queryParameters = queryParameters == null ? new ArrayList<>() : queryParameters;
try {
uri =
new URIBuilder().setScheme(scheme).setHost(host).setPort(port).setPath(endPoint).build();
new URIBuilder()
.setScheme(scheme)
.setHost(host)
.setPort(port)
.setPath(endPoint)
.setParameters(queryParameters)
.build();
} catch (URISyntaxException e) {
throw new SFException(e, ErrorCode.BUILD_REQUEST_FAILURE, message);
}
Expand All @@ -706,10 +716,14 @@ public HttpPost generateStreamingIngestPostRequest(
* @param payload POST request payload
* @param endPoint REST API endpoint
* @param message error message if there are failures during HTTP building
* @param queryParameters POST request query parameters
* @return URI for the POST request
*/
public HttpPost generateStreamingIngestPostRequest(
Map<Object, Object> payload, String endPoint, String message) {
Map<Object, Object> payload,
String endPoint,
String message,
List<NameValuePair> queryParameters) {
// Convert the payload to string
String payloadInString = null;
try {
Expand All @@ -718,7 +732,8 @@ public HttpPost generateStreamingIngestPostRequest(
throw new SFException(e, ErrorCode.BUILD_REQUEST_FAILURE, message);
}

return this.generateStreamingIngestPostRequest(payloadInString, endPoint, message);
return this.generateStreamingIngestPostRequest(
payloadInString, endPoint, message, queryParameters);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public enum ApiName {
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_ROWSET_OPEN_CHANNEL("POST"),
STREAMING_ROWSET_INSERT_ROWS("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand All @@ -53,6 +55,7 @@ public String getHttpMethod() {
return httpMethod;
}
}

// the object mapper we use for deserialization
static ObjectMapper mapper = new ObjectMapper();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming;

import java.time.ZoneId;
import java.util.Arrays;
import net.snowflake.ingest.utils.Utils;

/** A class that is used to open/create a {@link SnowflakeStreamingIngestChannel} */
Expand All @@ -16,6 +17,11 @@ public enum OnErrorOption {
// indexes. No data is ingested
}

public enum ChannelType {
CLOUD_STORAGE,
ROWSET_API,
}
Comment on lines +20 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this exists already? If you look at StreamingResource there's a check somewhere that compares against an enum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the one used before was hided in the SDK, now this is a public enum in the public class


/**
* Default value of the timezone, which will be used for TIMESTAMP_LTZ and TIMESTAMP_TZ column
* types when the user input does not have any timezone information.
Expand All @@ -34,6 +40,9 @@ public enum OnErrorOption {
// Name of the table that the channel belongs to
private final String tableName;

// Name of the pipe that the channel belongs to
private final String pipeName;

// On_error option on this channel
private final OnErrorOption onErrorOption;

Expand All @@ -45,6 +54,8 @@ public enum OnErrorOption {

private final OffsetTokenVerificationFunction offsetTokenVerificationFunction;

private final ChannelType channelType;

public static OpenChannelRequestBuilder builder(String channelName) {
return new OpenChannelRequestBuilder(channelName);
}
Expand All @@ -55,13 +66,13 @@ public static class OpenChannelRequestBuilder {
private String dbName;
private String schemaName;
private String tableName;
private String pipeName;
private OnErrorOption onErrorOption;
private ZoneId defaultTimezone;

private String offsetToken;
private boolean isOffsetTokenProvided = false;

private OffsetTokenVerificationFunction offsetTokenVerificationFunction;
private ChannelType channelType = ChannelType.CLOUD_STORAGE;

public OpenChannelRequestBuilder(String channelName) {
this.channelName = channelName;
Expand All @@ -83,6 +94,11 @@ public OpenChannelRequestBuilder setTableName(String tableName) {
return this;
}

public OpenChannelRequestBuilder setPipeName(String pipeName) {
this.pipeName = pipeName;
return this;
}

public OpenChannelRequestBuilder setOnErrorOption(OnErrorOption onErrorOption) {
this.onErrorOption = onErrorOption;
return this;
Expand All @@ -105,6 +121,11 @@ public OpenChannelRequestBuilder setOffsetTokenVerificationFunction(
return this;
}

public OpenChannelRequestBuilder setChannelType(ChannelType type) {
this.channelType = type;
return this;
}

public OpenChannelRequest build() {
return new OpenChannelRequest(this);
}
Expand All @@ -114,19 +135,23 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
Utils.assertStringNotNullOrEmpty("channel name", builder.channelName);
Utils.assertStringNotNullOrEmpty("database name", builder.dbName);
Utils.assertStringNotNullOrEmpty("schema name", builder.schemaName);
Utils.assertStringNotNullOrEmpty("table name", builder.tableName);
Utils.assertStringsNotNullOrEmpty(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also assert that both table and pipe name are not set at the same time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a good edge case check to add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior right now is that the table name will be ignored if there is a pipe name, I thought about assert if both are provided but I think it's too restricted, it's perfect valid if customer inputs both and pipe is created on that table

"table or pipe name", Arrays.asList(builder.tableName, builder.pipeName));
Utils.assertNotNull("on_error option", builder.onErrorOption);
Utils.assertNotNull("default_timezone", builder.defaultTimezone);
Utils.assertNotNull("channel_type", builder.channelType);

this.channelName = builder.channelName;
this.dbName = builder.dbName;
this.schemaName = builder.schemaName;
this.tableName = builder.tableName;
this.pipeName = builder.pipeName;
this.onErrorOption = builder.onErrorOption;
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
this.offsetTokenVerificationFunction = builder.offsetTokenVerificationFunction;
this.channelType = builder.channelType;
}

public String getDBName() {
Expand All @@ -141,6 +166,10 @@ public String getTableName() {
return this.tableName;
}

public String getPipeName() {
return this.pipeName;
}

public String getChannelName() {
return this.channelName;
}
Expand All @@ -153,6 +182,10 @@ public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
}

public String getFullyQualifiedPipeName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.pipeName);
}

public OnErrorOption getOnErrorOption() {
return this.onErrorOption;
}
Expand All @@ -168,4 +201,8 @@ public boolean isOffsetTokenProvided() {
public OffsetTokenVerificationFunction getOffsetTokenVerificationFunction() {
return this.offsetTokenVerificationFunction;
}

public ChannelType getChannelType() {
return this.channelType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,28 @@ public interface SnowflakeStreamingIngestChannel {
*/
String getTableName();

/**
* Get the name of the table or pipe based on the ownership of the channel (either a table or
* pipe)
*
* @return name of the table or pipe
*/
String getTableOrPipeName();

/**
* Get the fully qualified table name that the channel belongs to
*
* @return fully qualified table name, in the format of dbName.schemaName.tableName
*/
String getFullyQualifiedTableName();

/**
* Get the fully qualified table or pipe name that the channel belongs to
*
* @return fully qualified table or pipe name
*/
String getFullyQualifiedTableOrPipeName();

/** @return a boolean which indicates whether the channel is valid */
boolean isValid();

Expand Down Expand Up @@ -247,6 +262,8 @@ public interface SnowflakeStreamingIngestChannel {
*/
InsertValidationResponse insertRow(Map<String, Object> row, @Nullable String offsetToken);

InsertValidationResponse insertRow(Map<String, Object> row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which cases will a customers use insertRow without an offsetToken?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah shouldn't this be InsertValidationResponse insertRow(Map<String, Object> row, @Nullable String offsetToken);?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember that we say the offset_token will be per row for Rowset API?


/**
* Insert a batch of rows into the channel, each row is represented using Map where the key is
* column name and the value is a row of data. See {@link
Expand All @@ -271,6 +288,8 @@ InsertValidationResponse insertRows(
InsertValidationResponse insertRows(
Iterable<Map<String, Object>> rows, @Nullable String offsetToken);
Comment on lines 288 to 289
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need to add something like

insertRows(Iterable<Map<String, Object>> rows, List offsetTokens)

for partial ingestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the one below, offset_token will be part of the row now.


InsertValidationResponse insertRows(Iterable<Map<String, Object>> rows);

/**
* Get the latest committed offset token from Snowflake
*
Expand All @@ -287,4 +306,12 @@ InsertValidationResponse insertRows(
* @return map representing Column Name to Column Properties
*/
Map<String, ColumnProperties> getTableSchema();

/**
* Get the type of channel, please referring to {@link
* net.snowflake.ingest.streaming.OpenChannelRequest.ChannelType} for the supported channel type
*
* @return type of the channel
*/
OpenChannelRequest.ChannelType getType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.example;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;

/**
* Example on how to use the Streaming Ingest client APIs.
*
* <p>Please read the README.md file for detailed steps
*/
public class SnowflakeStreamingIngestRowsetExample {
// Please follow the example in profile_streaming.json.example to see the required properties, or
// if you have already set up profile.json with Snowpipe before, all you need is to add the "role"
// property. If the "role" is not specified, the default user role will be applied.
private static String PROFILE_PATH = "profile.json";
private static final ObjectMapper mapper = new ObjectMapper();

public static void main(String[] args) throws Exception {
Properties props = new Properties();
Iterator<Map.Entry<String, JsonNode>> propIt =
mapper.readTree(new String(Files.readAllBytes(Paths.get(PROFILE_PATH)))).fields();
while (propIt.hasNext()) {
Map.Entry<String, JsonNode> prop = propIt.next();
props.put(prop.getKey(), prop.getValue().asText());
}

// Create a streaming ingest client
try (SnowflakeStreamingIngestClient client =
SnowflakeStreamingIngestClientFactory.builder("MY_CLIENT").setProperties(props).build()) {

// Create an open channel request on table MY_TABLE, note that the corresponding
// db/schema/table needs to be present
// Example: create or replace table MY_TABLE(c1 number);
OpenChannelRequest request =
OpenChannelRequest.builder("MY_CHANNEL")
.setDBName("MY_DATABASE")
.setSchemaName("MY_SCHEMA")
.setPipeName("MY_PIPE")
.setChannelType(OpenChannelRequest.ChannelType.ROWSET_API)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.build();

// Open a streaming ingest channel from the given client
SnowflakeStreamingIngestChannel channel1 = client.openChannel(request);

// Insert rows into the channel (Using insertRows API)
final int totalRowsInTable = 1000;
for (int val = 0; val < totalRowsInTable; val++) {
Map<String, Object> row = new HashMap<>();

// c1 corresponds to the column name in table
row.put("c1", val);

// Insert the row with the current offset_token
InsertValidationResponse response = channel1.insertRow(row, String.valueOf(val));
if (response.hasErrors()) {
// Simply throw if there is an exception, or you can do whatever you want with the
// erroneous row
throw response.getInsertErrors().get(0).getException();
}
}

// TODO: Wait for 20sec for now, will use get status endpoint once we have it
Thread.sleep(20000);

// Close the channel, the function internally will make sure everything is committed (or throw
// an exception if there is any issue)
channel1.close().get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ void setMessage(String message) {
}

@JsonProperty("message")
@Override
String getMessage() {
return this.message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void setMessage(String message) {
this.message = message;
}

@Override
String getMessage() {
return this.message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void setMessage(String message) {
this.message = message;
}

@Override
String getMessage() {
return this.message;
}
Expand Down
Loading
Loading