Skip to content

Commit

Permalink
Merge branch 'master' into alhuang-table-level-flush
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jul 23, 2024
2 parents 5f57e88 + e808bbd commit 90b2e2f
Show file tree
Hide file tree
Showing 36 changed files with 1,693 additions and 505 deletions.
2 changes: 1 addition & 1 deletion e2e-jar-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.1.2-SNAPSHOT</version>
<version>2.1.2</version>
</dependency>

<dependency>
Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<!-- Arifact name and version information -->
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.1.2-SNAPSHOT</version>
<version>2.1.2</version>
<packaging>jar</packaging>
<name>Snowflake Ingest SDK</name>
<description>Snowflake Ingest SDK</description>
Expand Down Expand Up @@ -63,7 +63,7 @@
<parquet.version>1.14.1</parquet.version>
<powermock.version>2.0.9</powermock.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<protobuf.version>3.19.6</protobuf.version>
<protobuf.version>4.27.2</protobuf.version>
<shadeBase>net.snowflake.ingest.internal</shadeBase>
<slf4j.version>1.7.36</slf4j.version>
<snappy.version>1.1.10.5</snappy.version>
Expand Down Expand Up @@ -873,6 +873,7 @@
<licenseMerge>BSD 2-Clause License
|The BSD License</licenseMerge>
<licenseMerge>The MIT License|MIT License</licenseMerge>
<licenseMerge>3-Clause BSD License|BSD-3-Clause</licenseMerge>
</licenseMerges>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void refreshToken() throws IOException {

/** Helper method for making refresh request */
private HttpUriRequest makeRefreshTokenRequest() {
// TODO SNOW-1538108 Use SnowflakeServiceClient to make the request
HttpPost post = new HttpPost(oAuthCredential.get().getOAuthTokenEndpoint());
post.addHeader(HttpHeaders.CONTENT_TYPE, OAUTH_CONTENT_TYPE_HEADER);
post.addHeader(HttpHeaders.AUTHORIZATION, oAuthCredential.get().getAuthHeader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public class RequestBuilder {
// Don't change!
public static final String CLIENT_NAME = "SnowpipeJavaSDK";

public static final String DEFAULT_VERSION = "2.1.2-SNAPSHOT";
public static final String DEFAULT_VERSION = "2.1.2";

public static final String JAVA_USER_AGENT = "JAVA";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;
Expand Down Expand Up @@ -150,7 +150,7 @@ public ZoneId getDefaultTimezone() {
}

public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
return Utils.getFullyQualifiedTableName(this.dbName, this.schemaName, this.tableName);
}

public OnErrorOption getOnErrorOption() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Utils;

/**
* Channel immutable identification and encryption attributes.
*
Expand Down Expand Up @@ -36,12 +38,12 @@ class ChannelFlushContext {
String encryptionKey,
Long encryptionKeyId) {
this.name = name;
this.fullyQualifiedName = String.format("%s.%s.%s.%s", dbName, schemaName, tableName, name);
this.fullyQualifiedName =
Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name);
this.dbName = dbName;
this.schemaName = schemaName;
this.tableName = tableName;
this.fullyQualifiedTableName =
String.format("%s.%s.%s", this.getDbName(), this.getSchemaName(), this.getTableName());
this.fullyQualifiedTableName = Utils.getFullyQualifiedTableName(dbName, schemaName, tableName);
this.channelSequencer = channelSequencer;
this.encryptionKey = encryptionKey;
this.encryptionKeyId = encryptionKeyId;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Utils;

/** Class to deserialize a request from a channel status request */
class ChannelsStatusRequest {
class ChannelsStatusRequest implements IStreamingIngestRequest {

// Used to deserialize a channel request
static class ChannelStatusRequestDTO {
Expand Down Expand Up @@ -86,4 +88,20 @@ void setChannels(List<ChannelStatusRequestDTO> channels) {
List<ChannelStatusRequestDTO> getChannels() {
return channels;
}

@Override
public String getStringForLogging() {
return String.format(
"ChannelsStatusRequest(role=%s, channels=[%s])",
role,
channels.stream()
.map(
r ->
Utils.getFullyQualifiedChannelName(
r.getDatabaseName(),
r.getSchemaName(),
r.getTableName(),
r.getChannelName()))
.collect(Collectors.joining(", ")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize client configure request */
class ClientConfigureRequest implements IStreamingIngestRequest {
/**
* Constructor for client configure request
*
* @param role Role to be used for the request.
*/
ClientConfigureRequest(String role) {
this.role = role;
}

@JsonProperty("role")
private String role;

// File name for the GCS signed url request
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("file_name")
private String fileName;

String getRole() {
return role;
}

void setRole(String role) {
this.role = role;
}

String getFileName() {
return fileName;
}

void setFileName(String fileName) {
this.fileName = fileName;
}

@Override
public String getStringForLogging() {
return String.format("ClientConfigureRequest(role=%s, file_name=%s)", getRole(), getFileName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to deserialize responses from configure endpoint */
@JsonIgnoreProperties(ignoreUnknown = true)
class ClientConfigureResponse extends StreamingIngestResponse {
@JsonProperty("prefix")
private String prefix;

@JsonProperty("status_code")
private Long statusCode;

@JsonProperty("message")
private String message;

@JsonProperty("stage_location")
private FileLocationInfo stageLocation;

@JsonProperty("deployment_id")
private Long deploymentId;

String getPrefix() {
return prefix;
}

void setPrefix(String prefix) {
this.prefix = prefix;
}

@Override
Long getStatusCode() {
return statusCode;
}

void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

String getMessage() {
return message;
}

void setMessage(String message) {
this.message = message;
}

FileLocationInfo getStageLocation() {
return stageLocation;
}

void setStageLocation(FileLocationInfo stageLocation) {
this.stageLocation = stageLocation;
}

Long getDeploymentId() {
return deploymentId;
}

void setDeploymentId(Long deploymentId) {
this.deploymentId = deploymentId;
}

String getClientPrefix() {
if (this.deploymentId == null) {
return this.prefix;
}
return this.prefix + "_" + this.deploymentId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.utils.Utils;

/** Class used to serialize the {@link DropChannelRequest} */
class DropChannelRequestInternal implements IStreamingIngestRequest {
@JsonProperty("request_id")
private String requestId;

@JsonProperty("role")
private String role;

@JsonProperty("channel")
private String channel;

@JsonProperty("table")
private String table;

@JsonProperty("database")
private String database;

@JsonProperty("schema")
private String schema;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("client_sequencer")
Long clientSequencer;

DropChannelRequestInternal(
String requestId,
String role,
String database,
String schema,
String table,
String channel,
Long clientSequencer) {
this.requestId = requestId;
this.role = role;
this.database = database;
this.schema = schema;
this.table = table;
this.channel = channel;
this.clientSequencer = clientSequencer;
}

String getRequestId() {
return requestId;
}

String getRole() {
return role;
}

String getChannel() {
return channel;
}

String getTable() {
return table;
}

String getDatabase() {
return database;
}

String getSchema() {
return schema;
}

Long getClientSequencer() {
return clientSequencer;
}

String getFullyQualifiedTableName() {
return Utils.getFullyQualifiedTableName(database, schema, table);
}

@Override
public String getStringForLogging() {
return String.format(
"DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
+ " clientSequencer=%s)",
requestId, role, database, schema, table, channel, clientSequencer);
}
}
Loading

0 comments on commit 90b2e2f

Please sign in to comment.