Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1497358 Support multiple storage for Iceberg mode #783

Merged
merged 12 commits into from
Jul 11, 2024
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012-2017 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.connection;
Expand Down Expand Up @@ -42,7 +42,8 @@ public enum ApiName {
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_CHANNEL_CONFIGURE("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;
Expand Down Expand Up @@ -150,7 +150,7 @@ public ZoneId getDefaultTimezone() {
}

public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
return Utils.getFullyQualifiedTableName(this.dbName, this.schemaName, this.tableName);
}

public OnErrorOption getOnErrorOption() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize the client / channel configure request. */
class ChannelConfigureRequest implements ConfigureRequest {
@JsonProperty("role")
private String role;
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved

@JsonProperty("database")
private String database;

@JsonProperty("schema")
private String schema;

@JsonProperty("table")
private String table;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("file_name")
private String fileName;
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved

/**
* Constructor for channel configure request
*
* @param role Role to be used for the request.
* @param database Database name.
* @param schema Schema name.
* @param table Table name.
*/
ChannelConfigureRequest(String role, String database, String schema, String table) {
this.role = role;
this.database = database;
this.schema = schema;
this.table = table;
}

@Override
public String getRole() {
return role;
}

String getDatabase() {
return database;
}

String getSchema() {
return schema;
}

String getTable() {
return table;
}

String getFileName() {
return fileName;
}

/** Set the file name for the GCS signed url request. */
@Override
public void setFileName(String fileName) {
this.fileName = fileName;
}

@Override
public String getStringForLogging() {
return String.format(
"ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)",
role, database, schema, table, fileName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to deserialize responses from channel configure endpoint */
@JsonIgnoreProperties(ignoreUnknown = true)
class ChannelConfigureResponse extends StreamingIngestResponse {
@JsonProperty("status_code")
private Long statusCode;

@JsonProperty("message")
private String message;

@JsonProperty("stage_location")
private FileLocationInfo stageLocation;

@Override
Long getStatusCode() {
return statusCode;
}

void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

String getMessage() {
return message;
}

void setMessage(String message) {
this.message = message;
}

FileLocationInfo getStageLocation() {
return stageLocation;
}

void setStageLocation(FileLocationInfo stageLocation) {
this.stageLocation = stageLocation;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Utils;

/**
* Channel immutable identification and encryption attributes.
*
Expand Down Expand Up @@ -36,12 +38,12 @@ class ChannelFlushContext {
String encryptionKey,
Long encryptionKeyId) {
this.name = name;
this.fullyQualifiedName = String.format("%s.%s.%s.%s", dbName, schemaName, tableName, name);
this.fullyQualifiedName =
Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name);
this.dbName = dbName;
this.schemaName = schemaName;
this.tableName = tableName;
this.fullyQualifiedTableName =
String.format("%s.%s.%s", this.getDbName(), this.getSchemaName(), this.getTableName());
this.fullyQualifiedTableName = Utils.getFullyQualifiedTableName(dbName, schemaName, tableName);
this.channelSequencer = channelSequencer;
this.encryptionKey = encryptionKey;
this.encryptionKeyId = encryptionKeyId;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Utils;

/** Class to deserialize a request from a channel status request */
class ChannelsStatusRequest {
class ChannelsStatusRequest implements StreamingIngestRequest {

// Used to deserialize a channel request
static class ChannelStatusRequestDTO {
Expand Down Expand Up @@ -99,4 +101,21 @@ void setChannels(List<ChannelStatusRequestDTO> channels) {
List<ChannelStatusRequestDTO> getChannels() {
return channels;
}

@Override
public String getStringForLogging() {
return String.format(
"ChannelsStatusRequest(requestId=%s, role=%s, channels=[%s])",
requestId,
role,
channels.stream()
.map(
r ->
Utils.getFullyQualifiedChannelName(
r.getDatabaseName(),
r.getSchemaName(),
r.getTableName(),
r.getChannelName()))
.collect(Collectors.joining(", ")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize client configure request */
class ClientConfigureRequest implements ConfigureRequest {
@JsonProperty("role")
private String role;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("file_name")
private String fileName;

/**
* Constructor for client configure request
*
* @param role Role to be used for the request.
*/
ClientConfigureRequest(String role) {
this.role = role;
}

@Override
public String getRole() {
return role;
}

String getFileName() {
return fileName;
}

/** Set the file name for the GCS signed url request. */
@Override
public void setFileName(String fileName) {
this.fileName = fileName;
}

@Override
public String getStringForLogging() {
return String.format("ClientConfigureRequest(role=%s, file_name=%s)", role, fileName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to deserialize responses from configure endpoint */
class ConfigureResponse extends StreamingIngestResponse {
@JsonIgnoreProperties(ignoreUnknown = true)
class ClientConfigureResponse extends StreamingIngestResponse {
@JsonProperty("prefix")
private String prefix;

Expand Down Expand Up @@ -63,4 +65,11 @@ Long getDeploymentId() {
void setDeploymentId(Long deploymentId) {
this.deploymentId = deploymentId;
}

String getClientPrefix() {
if (this.deploymentId == null) {
return this.prefix;
}
return this.prefix + "_" + this.deploymentId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

/** Interface for {@link ChannelConfigureRequest} and {@link ClientConfigureRequest} */
interface ConfigureRequest extends StreamingIngestRequest {
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
String getRole();

void setFileName(String fileName);
}
Loading
Loading