Skip to content

Commit

Permalink
SNOW-1497358 Support multiple storage for Iceberg mode (#783)
Browse files Browse the repository at this point in the history
Co-authored-by: Hitesh Madan <[email protected]>
  • Loading branch information
sfc-gh-alhuang and sfc-gh-hmadan authored Jul 11, 2024
1 parent a9aa682 commit eac448a
Show file tree
Hide file tree
Showing 31 changed files with 1,522 additions and 490 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012-2017 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.connection;
Expand Down Expand Up @@ -42,7 +42,8 @@ public enum ApiName {
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_CHANNEL_CONFIGURE("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;
Expand Down Expand Up @@ -41,7 +41,6 @@ public enum OnErrorOption {
private final ZoneId defaultTimezone;

private final String offsetToken;
private final boolean isOffsetTokenProvided;

private final OffsetTokenVerificationFunction offsetTokenVerificationFunction;

Expand All @@ -59,7 +58,6 @@ public static class OpenChannelRequestBuilder {
private ZoneId defaultTimezone;

private String offsetToken;
private boolean isOffsetTokenProvided = false;

private OffsetTokenVerificationFunction offsetTokenVerificationFunction;

Expand Down Expand Up @@ -95,7 +93,6 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) {

public OpenChannelRequestBuilder setOffsetToken(String offsetToken) {
this.offsetToken = offsetToken;
this.isOffsetTokenProvided = true;
return this;
}

Expand Down Expand Up @@ -125,7 +122,6 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
this.onErrorOption = builder.onErrorOption;
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
this.offsetTokenVerificationFunction = builder.offsetTokenVerificationFunction;
}

Expand All @@ -150,7 +146,7 @@ public ZoneId getDefaultTimezone() {
}

public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
return Utils.getFullyQualifiedTableName(this.dbName, this.schemaName, this.tableName);
}

public OnErrorOption getOnErrorOption() {
Expand All @@ -161,10 +157,6 @@ public String getOffsetToken() {
return this.offsetToken;
}

public boolean isOffsetTokenProvided() {
return this.isOffsetTokenProvided;
}

public OffsetTokenVerificationFunction getOffsetTokenVerificationFunction() {
return this.offsetTokenVerificationFunction;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize the channel configure request. */
class ChannelConfigureRequest extends ConfigureRequest {
@JsonProperty("database")
private String database;

@JsonProperty("schema")
private String schema;

@JsonProperty("table")
private String table;

/**
* Constructor for channel configure request
*
* @param role Role to be used for the request.
* @param database Database name.
* @param schema Schema name.
* @param table Table name.
*/
ChannelConfigureRequest(String role, String database, String schema, String table) {
setRole(role);
this.database = database;
this.schema = schema;
this.table = table;
}

String getDatabase() {
return database;
}

String getSchema() {
return schema;
}

String getTable() {
return table;
}

@Override
public String getStringForLogging() {
return String.format(
"ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)",
getRole(), database, schema, table, getFileName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to deserialize responses from channel configure endpoint */
@JsonIgnoreProperties(ignoreUnknown = true)
class ChannelConfigureResponse extends StreamingIngestResponse {
@JsonProperty("status_code")
private Long statusCode;

@JsonProperty("message")
private String message;

@JsonProperty("stage_location")
private FileLocationInfo stageLocation;

@Override
Long getStatusCode() {
return statusCode;
}

void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

String getMessage() {
return message;
}

void setMessage(String message) {
this.message = message;
}

FileLocationInfo getStageLocation() {
return stageLocation;
}

void setStageLocation(FileLocationInfo stageLocation) {
this.stageLocation = stageLocation;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Utils;

/**
* Channel immutable identification and encryption attributes.
*
Expand Down Expand Up @@ -36,12 +38,12 @@ class ChannelFlushContext {
String encryptionKey,
Long encryptionKeyId) {
this.name = name;
this.fullyQualifiedName = String.format("%s.%s.%s.%s", dbName, schemaName, tableName, name);
this.fullyQualifiedName =
Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name);
this.dbName = dbName;
this.schemaName = schemaName;
this.tableName = tableName;
this.fullyQualifiedTableName =
String.format("%s.%s.%s", this.getDbName(), this.getSchemaName(), this.getTableName());
this.fullyQualifiedTableName = Utils.getFullyQualifiedTableName(dbName, schemaName, tableName);
this.channelSequencer = channelSequencer;
this.encryptionKey = encryptionKey;
this.encryptionKeyId = encryptionKeyId;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Utils;

/** Class to deserialize a request from a channel status request */
class ChannelsStatusRequest {
class ChannelsStatusRequest implements StreamingIngestRequest {

// Used to deserialize a channel request
static class ChannelStatusRequestDTO {
Expand Down Expand Up @@ -99,4 +101,21 @@ void setChannels(List<ChannelStatusRequestDTO> channels) {
List<ChannelStatusRequestDTO> getChannels() {
return channels;
}

@Override
public String getStringForLogging() {
return String.format(
"ChannelsStatusRequest(requestId=%s, role=%s, channels=[%s])",
requestId,
role,
channels.stream()
.map(
r ->
Utils.getFullyQualifiedChannelName(
r.getDatabaseName(),
r.getSchemaName(),
r.getTableName(),
r.getChannelName()))
.collect(Collectors.joining(", ")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

/** Class used to serialize client configure request */
class ClientConfigureRequest extends ConfigureRequest {
/**
* Constructor for client configure request
*
* @param role Role to be used for the request.
*/
ClientConfigureRequest(String role) {
setRole(role);
}

@Override
public String getStringForLogging() {
return String.format("ClientConfigureRequest(role=%s, file_name=%s)", getRole(), getFileName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to deserialize responses from configure endpoint */
class ConfigureResponse extends StreamingIngestResponse {
@JsonIgnoreProperties(ignoreUnknown = true)
class ClientConfigureResponse extends StreamingIngestResponse {
@JsonProperty("prefix")
private String prefix;

Expand Down Expand Up @@ -63,4 +65,11 @@ Long getDeploymentId() {
void setDeploymentId(Long deploymentId) {
this.deploymentId = deploymentId;
}

String getClientPrefix() {
if (this.deploymentId == null) {
return this.prefix;
}
return this.prefix + "_" + this.deploymentId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Abstract class for {@link ChannelConfigureRequest} and {@link ClientConfigureRequest} */
abstract class ConfigureRequest implements StreamingIngestRequest {
@JsonProperty("role")
private String role;

// File name for the GCS signed url request
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("file_name")
private String fileName;

String getRole() {
return role;
}

void setRole(String role) {
this.role = role;
}

String getFileName() {
return fileName;
}

void setFileName(String fileName) {
this.fileName = fileName;
}

@Override
public abstract String getStringForLogging();
}
Loading

0 comments on commit eac448a

Please sign in to comment.