Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1497358 Support multiple storage for Iceberg mode #783

Merged
merged 12 commits into from
Jul 11, 2024
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012-2017 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.connection;
Expand Down Expand Up @@ -42,7 +42,8 @@ public enum ApiName {
STREAMING_DROP_CHANNEL("POST"),
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST");
STREAMING_CLIENT_CONFIGURE("POST"),
STREAMING_CHANNEL_CONFIGURE("POST");
private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;
Expand Down Expand Up @@ -150,7 +150,7 @@ public ZoneId getDefaultTimezone() {
}

public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
return Utils.getFullyQualifiedTableName(this.dbName, this.schemaName, this.tableName);
}

public OnErrorOption getOnErrorOption() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -101,4 +101,11 @@ void invalidateChannelIfSequencersMatch(
int getSize() {
return cache.size();
}

/** Get the number of channels for a given table */
int getSizePerTable(String fullyQualifiedTableName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int getSizePerTable(String fullyQualifiedTableName) {
int getChannelCountForTable(String fullyQualifiedTableName) {

ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> channelsMapPerTable =
cache.get(fullyQualifiedTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this thread safe? It's possible that the count changes between these two calls, right?

return channelsMapPerTable == null ? 0 : channelsMapPerTable.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize the channel configure request. */
class ChannelConfigureRequest extends ConfigureRequest {
@JsonProperty("database")
private String database;

@JsonProperty("schema")
private String schema;

@JsonProperty("table")
private String table;

/**
* Constructor for channel configure request
*
* @param role Role to be used for the request.
* @param database Database name.
* @param schema Schema name.
* @param table Table name.
*/
ChannelConfigureRequest(String role, String database, String schema, String table) {
setRole(role);
this.database = database;
this.schema = schema;
this.table = table;
}

String getDatabase() {
return database;
}

String getSchema() {
return schema;
}

String getTable() {
return table;
}

@Override
public String getStringForLogging() {
return String.format(
"ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)",
getRole(), database, schema, table, getFileName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to deserialize responses from channel configure endpoint */
@JsonIgnoreProperties(ignoreUnknown = true)
class ChannelConfigureResponse extends StreamingIngestResponse {
@JsonProperty("status_code")
private Long statusCode;

@JsonProperty("message")
private String message;

@JsonProperty("stage_location")
private FileLocationInfo stageLocation;

@Override
Long getStatusCode() {
return statusCode;
}

void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

String getMessage() {
return message;
}

void setMessage(String message) {
this.message = message;
}

FileLocationInfo getStageLocation() {
return stageLocation;
}

void setStageLocation(FileLocationInfo stageLocation) {
this.stageLocation = stageLocation;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Utils;

/**
* Channel immutable identification and encryption attributes.
*
Expand Down Expand Up @@ -36,12 +38,12 @@ class ChannelFlushContext {
String encryptionKey,
Long encryptionKeyId) {
this.name = name;
this.fullyQualifiedName = String.format("%s.%s.%s.%s", dbName, schemaName, tableName, name);
this.fullyQualifiedName =
Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name);
this.dbName = dbName;
this.schemaName = schemaName;
this.tableName = tableName;
this.fullyQualifiedTableName =
String.format("%s.%s.%s", this.getDbName(), this.getSchemaName(), this.getTableName());
this.fullyQualifiedTableName = Utils.getFullyQualifiedTableName(dbName, schemaName, tableName);
this.channelSequencer = channelSequencer;
this.encryptionKey = encryptionKey;
this.encryptionKeyId = encryptionKeyId;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Utils;

/** Class to deserialize a request from a channel status request */
class ChannelsStatusRequest {
class ChannelsStatusRequest implements StreamingIngestRequest {

// Used to deserialize a channel request
static class ChannelStatusRequestDTO {
Expand Down Expand Up @@ -99,4 +101,21 @@ void setChannels(List<ChannelStatusRequestDTO> channels) {
List<ChannelStatusRequestDTO> getChannels() {
return channels;
}

@Override
public String getStringForLogging() {
return String.format(
"ChannelsStatusRequest(requestId=%s, role=%s, channels=[%s])",
requestId,
role,
channels.stream()
.map(
r ->
Utils.getFullyQualifiedChannelName(
r.getDatabaseName(),
r.getSchemaName(),
r.getTableName(),
r.getChannelName()))
.collect(Collectors.joining(", ")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

/** Class used to serialize client configure request */
class ClientConfigureRequest extends ConfigureRequest {
/**
* Constructor for client configure request
*
* @param role Role to be used for the request.
*/
ClientConfigureRequest(String role) {
setRole(role);
}

@Override
public String getStringForLogging() {
return String.format("ClientConfigureRequest(role=%s, file_name=%s)", getRole(), getFileName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to deserialize responses from configure endpoint */
class ConfigureResponse extends StreamingIngestResponse {
@JsonIgnoreProperties(ignoreUnknown = true)
class ClientConfigureResponse extends StreamingIngestResponse {
@JsonProperty("prefix")
private String prefix;

Expand Down Expand Up @@ -63,4 +65,11 @@ Long getDeploymentId() {
void setDeploymentId(Long deploymentId) {
this.deploymentId = deploymentId;
}

String getClientPrefix() {
if (this.deploymentId == null) {
return this.prefix;
}
return this.prefix + "_" + this.deploymentId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Abstract class for {@link ChannelConfigureRequest} and {@link ClientConfigureRequest} */
abstract class ConfigureRequest implements StreamingIngestRequest {
@JsonProperty("role")
private String role;

// File name for the GCS signed url request
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("file_name")
private String fileName;

String getRole() {
return role;
}

void setRole(String role) {
this.role = role;
}

String getFileName() {
return fileName;
}

void setFileName(String fileName) {
this.fileName = fileName;
}

@Override
public abstract String getStringForLogging();
}
Loading
Loading