Skip to content

Commit

Permalink
Code refactor for Iceberg support (snowflakedb#792)
Browse files Browse the repository at this point in the history
Co-authored-by: Hitesh Madan <[email protected]>
  • Loading branch information
2 people authored and sfc-gh-kgaputis committed Sep 12, 2024
1 parent ecaec0f commit 334767d
Show file tree
Hide file tree
Showing 25 changed files with 1,379 additions and 511 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void refreshToken() throws IOException {

/** Helper method for making refresh request */
private HttpUriRequest makeRefreshTokenRequest() {
// TODO SNOW-1538108 Use SnowflakeServiceClient to make the request
HttpPost post = new HttpPost(oAuthCredential.get().getOAuthTokenEndpoint());
post.addHeader(HttpHeaders.CONTENT_TYPE, OAUTH_CONTENT_TYPE_HEADER);
post.addHeader(HttpHeaders.AUTHORIZATION, oAuthCredential.get().getAuthHeader());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming;
Expand Down Expand Up @@ -150,7 +150,7 @@ public ZoneId getDefaultTimezone() {
}

public String getFullyQualifiedTableName() {
return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
return Utils.getFullyQualifiedTableName(this.dbName, this.schemaName, this.tableName);
}

public OnErrorOption getOnErrorOption() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Utils;

/**
* Channel immutable identification and encryption attributes.
*
Expand Down Expand Up @@ -36,12 +38,12 @@ class ChannelFlushContext {
String encryptionKey,
Long encryptionKeyId) {
this.name = name;
this.fullyQualifiedName = String.format("%s.%s.%s.%s", dbName, schemaName, tableName, name);
this.fullyQualifiedName =
Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name);
this.dbName = dbName;
this.schemaName = schemaName;
this.tableName = tableName;
this.fullyQualifiedTableName =
String.format("%s.%s.%s", this.getDbName(), this.getSchemaName(), this.getTableName());
this.fullyQualifiedTableName = Utils.getFullyQualifiedTableName(dbName, schemaName, tableName);
this.channelSequencer = channelSequencer;
this.encryptionKey = encryptionKey;
this.encryptionKeyId = encryptionKeyId;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Utils;

/** Class to deserialize a request from a channel status request */
class ChannelsStatusRequest {
class ChannelsStatusRequest implements IStreamingIngestRequest {

// Used to deserialize a channel request
static class ChannelStatusRequestDTO {
Expand Down Expand Up @@ -86,4 +88,20 @@ void setChannels(List<ChannelStatusRequestDTO> channels) {
List<ChannelStatusRequestDTO> getChannels() {
return channels;
}

@Override
public String getStringForLogging() {
return String.format(
"ChannelsStatusRequest(role=%s, channels=[%s])",
role,
channels.stream()
.map(
r ->
Utils.getFullyQualifiedChannelName(
r.getDatabaseName(),
r.getSchemaName(),
r.getTableName(),
r.getChannelName()))
.collect(Collectors.joining(", ")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize client configure request */
class ClientConfigureRequest implements IStreamingIngestRequest {
/**
* Constructor for client configure request
*
* @param role Role to be used for the request.
*/
ClientConfigureRequest(String role) {
this.role = role;
}

@JsonProperty("role")
private String role;

// File name for the GCS signed url request
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("file_name")
private String fileName;

String getRole() {
return role;
}

void setRole(String role) {
this.role = role;
}

String getFileName() {
return fileName;
}

void setFileName(String fileName) {
this.fileName = fileName;
}

@Override
public String getStringForLogging() {
return String.format("ClientConfigureRequest(role=%s, file_name=%s)", getRole(), getFileName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to deserialize responses from configure endpoint */
@JsonIgnoreProperties(ignoreUnknown = true)
class ClientConfigureResponse extends StreamingIngestResponse {
@JsonProperty("prefix")
private String prefix;

@JsonProperty("status_code")
private Long statusCode;

@JsonProperty("message")
private String message;

@JsonProperty("stage_location")
private FileLocationInfo stageLocation;

@JsonProperty("deployment_id")
private Long deploymentId;

String getPrefix() {
return prefix;
}

void setPrefix(String prefix) {
this.prefix = prefix;
}

@Override
Long getStatusCode() {
return statusCode;
}

void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

String getMessage() {
return message;
}

void setMessage(String message) {
this.message = message;
}

FileLocationInfo getStageLocation() {
return stageLocation;
}

void setStageLocation(FileLocationInfo stageLocation) {
this.stageLocation = stageLocation;
}

Long getDeploymentId() {
return deploymentId;
}

void setDeploymentId(Long deploymentId) {
this.deploymentId = deploymentId;
}

String getClientPrefix() {
if (this.deploymentId == null) {
return this.prefix;
}
return this.prefix + "_" + this.deploymentId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.utils.Utils;

/** Class used to serialize the {@link DropChannelRequest} */
class DropChannelRequestInternal implements IStreamingIngestRequest {
@JsonProperty("request_id")
private String requestId;

@JsonProperty("role")
private String role;

@JsonProperty("channel")
private String channel;

@JsonProperty("table")
private String table;

@JsonProperty("database")
private String database;

@JsonProperty("schema")
private String schema;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("client_sequencer")
Long clientSequencer;

DropChannelRequestInternal(
String requestId,
String role,
String database,
String schema,
String table,
String channel,
Long clientSequencer) {
this.requestId = requestId;
this.role = role;
this.database = database;
this.schema = schema;
this.table = table;
this.channel = channel;
this.clientSequencer = clientSequencer;
}

String getRequestId() {
return requestId;
}

String getRole() {
return role;
}

String getChannel() {
return channel;
}

String getTable() {
return table;
}

String getDatabase() {
return database;
}

String getSchema() {
return schema;
}

Long getClientSequencer() {
return clientSequencer;
}

String getFullyQualifiedTableName() {
return Utils.getFullyQualifiedTableName(database, schema, table);
}

@Override
public String getStringForLogging() {
return String.format(
"DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
+ " clientSequencer=%s)",
requestId, role, database, schema, table, channel, clientSequencer);
}
}
Loading

0 comments on commit 334767d

Please sign in to comment.