Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup configurerequest / configureresponse and separate out channel configure response #787

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void setFileName(String fileName) {
@Override
public String getStringForLogging() {
return String.format(
"ChannelConfigureResponse(role=%s, table=%s, file_name=%s)",
role, Utils.getFullyQualifiedTableName(database, schema, table), fileName);
"ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)",
role, database, schema, table, fileName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Class used to deserialize responses from channel configure endpoint
*/
@JsonIgnoreProperties(ignoreUnknown = true)
class ChannelConfigureResponse extends StreamingIngestResponse {
@JsonProperty("status_code")
private Long statusCode;

@JsonProperty("message")
private String message;

@JsonProperty("stage_location")
private FileLocationInfo stageLocation;

@Override
Long getStatusCode() {
return statusCode;
}

void setStatusCode(Long statusCode) {
this.statusCode = statusCode;
}

String getMessage() {
return message;
}

void setMessage(String message) {
this.message = message;
}

FileLocationInfo getStageLocation() {
return stageLocation;
}

void setStageLocation(FileLocationInfo stageLocation) {
this.stageLocation = stageLocation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

/** Class used to deserialize responses from configure endpoint */
@JsonIgnoreProperties(ignoreUnknown = true)
class ConfigureResponse extends StreamingIngestResponse {
class ClientConfigureResponse extends StreamingIngestResponse {
@JsonProperty("prefix")
private String prefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,16 @@ String getFullyQualifiedTableName() {

@Override
public String getStringForLogging() {
return String.format(
"DropChannelRequestInternal(requestId=%s, role=%s, channel=%s, isIceberg=%s,"
return String.format(
"DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s, isIceberg=%s,"
+ " clientSequencer=%s)",
requestId,
role,
Utils.getFullyQualifiedChannelName(database, schema, table, channel),
isIceberg,
clientSequencer);
requestId,
role,
database,
schema,
table,
channel,
isIceberg,
clientSequencer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,33 @@

package net.snowflake.ingest.streaming.internal;

import java.io.IOException;
import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;

/** Class to manage multiple external volumes */
class ExternalVolumeManager<T> implements StorageManager<T> {
import java.io.IOException;
import java.util.Calendar;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

class ExternalVolumeLocation {
public final String dbName;
public final String schemaName;
public final String tableName;

public ExternalVolumeLocation(String dbName, String schemaName, String tableName) {
this.dbName = dbName;
this.schemaName = schemaName;
this.tableName = tableName;
}
}

/** Class to manage multiple external volumes */
class ExternalVolumeManager<T> implements StorageManager<T, ExternalVolumeLocation> {
// Reference to the external volume per table
private final Map<String, StreamingIngestStorage<T>> externalVolumeMap;
private final Map<String, StreamingIngestStorage<T, ExternalVolumeLocation>> externalVolumeMap;

// name of the owning client
private final String clientName;
Expand Down Expand Up @@ -67,9 +80,9 @@ class ExternalVolumeManager<T> implements StorageManager<T> {
* @return target storage
*/
@Override
public StreamingIngestStorage<T> getStorage(ChannelFlushContext channelFlushContext) {
public StreamingIngestStorage<T, ExternalVolumeLocation> getStorage(ChannelFlushContext channelFlushContext) {
// Only one chunk per blob in Iceberg mode.
StreamingIngestStorage<T> stage =
StreamingIngestStorage<T, ExternalVolumeLocation> stage =
this.externalVolumeMap.get(channelFlushContext.getFullyQualifiedTableName());

if (stage == null) {
Expand Down Expand Up @@ -98,27 +111,32 @@ public void addStorage(
try {
this.externalVolumeMap.putIfAbsent(
fullyQualifiedTableName,
new StreamingIngestStorage<>(
new StreamingIngestStorage<T, ExternalVolumeLocation>(
this,
this.clientName,
fileLocationInfo,
new ChannelConfigureRequest(this.role, dbName, schemaName, tableName),
new ExternalVolumeLocation(dbName, schemaName, tableName),
DEFAULT_MAX_UPLOAD_RETRIES));
} catch (SnowflakeSQLException | IOException err) {
throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STORAGE);
}
}

/**
* Configure method for storage
* Gets the latest file location info (with a renewed short-lived access token) for the specified location
*
* @param request the configure request
* @return the configure response
* @param location A reference to the target location
* @param fileName optional filename for single-file signed URL fetch from server
* @return the new location information
*/
@Override
public ConfigureResponse configure(ConfigureRequest request) {
public FileLocationInfo refreshLocation(ExternalVolumeLocation location, Optional<String> fileName) {
try {
return this.snowflakeServiceClient.channelConfigure((ChannelConfigureRequest) request);
ChannelConfigureRequest request = new ChannelConfigureRequest(
this.role, location.dbName, location.schemaName, location.tableName);
fileName.ifPresent(request::setFileName);
ChannelConfigureResponse response = this.snowflakeServiceClient.channelConfigure(request);
return response.getStageLocation();
} catch (IngestResponseException | IOException e) {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ List<List<ChannelData<T>>> getData() {
private final ChannelCache<T> channelCache;

// Reference to the Stream Ingest storage manager
private final StorageManager<T> storageManager;
private final StorageManager<T, ?> storageManager;

// Reference to register service
private final RegisterService<T> registerService;
Expand Down Expand Up @@ -126,7 +126,7 @@ List<List<ChannelData<T>>> getData() {
FlushService(
SnowflakeStreamingIngestClientInternal<T> client,
ChannelCache<T> cache,
StorageManager<T> storageManager,
StorageManager<T, ?> storageManager,
boolean isTestMode) {
this.owningClient = client;
this.channelCache = cache;
Expand Down Expand Up @@ -545,7 +545,7 @@ BlobMetadata buildAndUpload(
* @return BlobMetadata object used to create the register blob request
*/
BlobMetadata upload(
StreamingIngestStorage storage,
StreamingIngestStorage<T, ?> storage,
String blobPath,
byte[] blob,
List<ChunkMetadata> metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,31 @@

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Calendar;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

import java.io.IOException;
import java.util.Calendar;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE;

class InternalStageLocation {
public InternalStageLocation() {
}
}

/** Class to manage single Snowflake internal stage */
class InternalStageManager<T> implements StorageManager<T> {
class InternalStageManager<T> implements StorageManager<T, InternalStageLocation> {
// Target stage for the client
private final StreamingIngestStorage<T> targetStage;
private final StreamingIngestStorage<T, InternalStageLocation> targetStage;

// Increasing counter to generate a unique blob name per client
private final AtomicLong counter;
Expand All @@ -32,6 +39,9 @@ class InternalStageManager<T> implements StorageManager<T> {
// Snowflake service client used for configure calls
private final SnowflakeServiceClient snowflakeServiceClient;

// The role of the client
private final String role;

// Client prefix generated by the Snowflake server
private final String clientPrefix;

Expand All @@ -50,23 +60,23 @@ class InternalStageManager<T> implements StorageManager<T> {
SnowflakeServiceClient snowflakeServiceClient) {
this.snowflakeServiceClient = snowflakeServiceClient;
this.isTestMode = isTestMode;
this.role = role;
this.counter = new AtomicLong(0);
try {
if (!isTestMode) {
ClientConfigureRequest request = new ClientConfigureRequest(role);
ConfigureResponse response = this.snowflakeServiceClient.clientConfigure(request);
ClientConfigureResponse response = this.snowflakeServiceClient.clientConfigure(new ClientConfigureRequest(role));
this.clientPrefix = response.getClientPrefix();
this.targetStage =
new StreamingIngestStorage<>(
this, clientName, response.getStageLocation(), request, DEFAULT_MAX_UPLOAD_RETRIES);
new StreamingIngestStorage<T, InternalStageLocation>(
this, clientName, response.getStageLocation(), new InternalStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES);
} else {
this.clientPrefix = "testPrefix";
this.targetStage =
new StreamingIngestStorage<>(
new StreamingIngestStorage<T, InternalStageLocation>(
this,
"testClient",
(StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null,
new ClientConfigureRequest(role),
new InternalStageLocation(),
DEFAULT_MAX_UPLOAD_RETRIES);
}
} catch (IngestResponseException | IOException e) {
Expand All @@ -85,33 +95,32 @@ class InternalStageManager<T> implements StorageManager<T> {
*/
@Override
@SuppressWarnings("unused")
public StreamingIngestStorage<T> getStorage(ChannelFlushContext channelFlushContext) {
public StreamingIngestStorage<T, InternalStageLocation> getStorage(ChannelFlushContext channelFlushContext) {
// There's always only one stage for the client in non-iceberg mode
return targetStage;
}

/**
* Add storage to the manager. Do nothing as there's only one stage in non-Iceberg mode.
*
* @param dbName
* @param schemaName
* @param tableName
* @param fileLocationInfo
*/
@Override
public void addStorage(
String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {}

/**
* Configure method for storage
* Gets the latest file location info (with a renewed short-lived access token) for the specified location
*
* @param request the configure request
* @return the configure response
* @param location A reference to the target location
* @param fileName optional filename for single-file signed URL fetch from server
* @return the new location information
*/
@Override
public ConfigureResponse configure(ConfigureRequest request) {
public FileLocationInfo refreshLocation(InternalStageLocation location, Optional<String> fileName) {
try {
return snowflakeServiceClient.clientConfigure((ClientConfigureRequest) request);
ClientConfigureRequest request = new ClientConfigureRequest(this.role);
fileName.ifPresent(request::setFileName);
ClientConfigureResponse response = snowflakeServiceClient.clientConfigure(request);
return response.getStageLocation();
} catch (IngestResponseException | IOException e) {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ String getFullyQualifiedTableName() {
@Override
public String getStringForLogging() {
return String.format(
"OpenChannelRequestInternal(requestId=%s, role=%s, channel=%s, writeMode=%s, isIceberg=%s,"
"OpenChannelRequestInternal(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s, writeMode=%s, isIceberg=%s,"
+ " offsetToken=%s)",
requestId,
role,
Utils.getFullyQualifiedChannelName(database, schema, table, channel),
writeMode,
isIceberg,
offsetToken);
requestId,
role,
database,
schema,
table,
channel,
writeMode,
isIceberg,
offsetToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ class SnowflakeServiceClient {
* @param request the client configuration request
* @return the response from the configuration request
*/
ConfigureResponse clientConfigure(ClientConfigureRequest request)
ClientConfigureResponse clientConfigure(ClientConfigureRequest request)
throws IngestResponseException, IOException {
ConfigureResponse response =
ClientConfigureResponse response =
executeApiRequestWithRetries(
ConfigureResponse.class,
ClientConfigureResponse.class,
request,
CLIENT_CONFIGURE_ENDPOINT,
"client configure",
Expand All @@ -83,11 +83,11 @@ ConfigureResponse clientConfigure(ClientConfigureRequest request)
* @param request the channel configuration request
* @return the response from the configuration request
*/
ConfigureResponse channelConfigure(ChannelConfigureRequest request)
ChannelConfigureResponse channelConfigure(ChannelConfigureRequest request)
throws IngestResponseException, IOException {
ConfigureResponse response =
ChannelConfigureResponse response =
executeApiRequestWithRetries(
ConfigureResponse.class,
ChannelConfigureResponse.class,
request,
CHANNEL_CONFIGURE_ENDPOINT,
"channel configure",
Expand Down
Loading
Loading