Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1754295 - Start using subscoped tokens for iceberg ingestion #868

Merged
merged 5 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public enum ApiName {
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST"),
GENERATE_PRESIGNED_URLS("POST");
GENERATE_PRESIGNED_URLS("POST"),
REFRESH_TABLE_INFORMATION("POST");

private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package net.snowflake.ingest.streaming.internal;

import java.util.Optional;

/**
* Interface to manage {@link InternalStage} and {@link PresignedUrlExternalVolume} for {@link
* FlushService}
Expand All @@ -24,7 +26,7 @@ interface IStorageManager {
IStorage getStorage(String fullyQualifiedTableName);

/** Informs the storage manager about a new table that's being ingested into by the client. */
void registerTable(TableRef tableRef, FileLocationInfo locationInfo);
void registerTable(TableRef tableRef);

/**
* Generate a unique blob path and increment the blob sequencer
Expand All @@ -42,4 +44,14 @@ interface IStorageManager {
* @return the client prefix
*/
String getClientPrefix();

/**
* Get the updated subscoped tokens and location info for this table
*
* @param tableRef The table for which to get the location
* @param fileName Legacy, was used by deprecated GCS codepaths when it didn't support subscoped
* tokens. Not in use.
* @return
*/
FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional<String> fileName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import net.snowflake.ingest.utils.Utils;

/** Handles uploading files to the Snowflake Streaming Ingest Storage */
class InternalStage<T> implements IStorage {
class InternalStage implements IStorage {
private static final ObjectMapper mapper = new ObjectMapper();

/**
Expand All @@ -59,50 +59,69 @@ class InternalStage<T> implements IStorage {

private static final Logging logger = new Logging(InternalStage.class);

private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;
private final InternalStageManager<T> owningManager;
private final IStorageManager owningManager;
private final String clientName;

private final String clientPrefix;
private final TableRef tableRef;
private final int maxUploadRetries;

// Proxy parameters that we set while calling the Snowflake JDBC to upload the streams
private final Properties proxyProperties;

private FileLocationInfo fileLocationInfo;
private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;

/**
* Default constructor
*
* @param owningManager the storage manager owning this storage
* @param clientName The client name
* @param clientPrefix client prefix
* @param tableRef
* @param fileLocationInfo The file location information from open channel response
* @param maxUploadRetries The maximum number of retries to attempt
*/
InternalStage(
InternalStageManager<T> owningManager,
IStorageManager owningManager,
String clientName,
String clientPrefix,
TableRef tableRef,
FileLocationInfo fileLocationInfo,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
this(owningManager, clientName, (SnowflakeFileTransferMetadataWithAge) null, maxUploadRetries);
Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix());
this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo);
this(
owningManager,
clientName,
clientPrefix,
tableRef,
(SnowflakeFileTransferMetadataWithAge) null,
maxUploadRetries);
Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix);
setFileLocationInfo(fileLocationInfo);
}

/**
* Constructor for TESTING that takes SnowflakeFileTransferMetadataWithAge as input
*
* @param owningManager the storage manager owning this storage
* @param clientName the client name
* @param clientPrefix
* @param tableRef
* @param testMetadata SnowflakeFileTransferMetadataWithAge to test with
* @param maxUploadRetries the maximum number of retries to attempt
*/
InternalStage(
InternalStageManager<T> owningManager,
IStorageManager owningManager,
String clientName,
String clientPrefix,
TableRef tableRef,
SnowflakeFileTransferMetadataWithAge testMetadata,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
this.owningManager = owningManager;
this.clientName = clientName;
this.clientPrefix = clientPrefix;
this.tableRef = tableRef;
this.maxUploadRetries = maxUploadRetries;
this.proxyProperties = generateProxyPropertiesForJDBC();
this.fileTransferMetadataWithAge = testMetadata;
Expand Down Expand Up @@ -153,7 +172,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
.setUploadStream(inStream)
.setRequireCompress(false)
.setOcspMode(OCSPMode.FAIL_OPEN)
.setStreamingIngestClientKey(this.owningManager.getClientPrefix())
.setStreamingIngestClientKey(this.clientPrefix)
.setStreamingIngestClientName(this.clientName)
.setProxyProperties(this.proxyProperties)
.setDestFileName(fullFilePath)
Expand Down Expand Up @@ -194,8 +213,6 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
*/
synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boolean force)
throws SnowflakeSQLException, IOException {
logger.logInfo("Refresh Snowflake metadata, client={} force={}", clientName, force);

if (!force
&& fileTransferMetadataWithAge != null
&& fileTransferMetadataWithAge.timestamp.isPresent()
Expand All @@ -204,10 +221,19 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole
return fileTransferMetadataWithAge;
}

FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.empty());
SnowflakeFileTransferMetadataWithAge metadata = createFileTransferMetadataWithAge(location);
this.fileTransferMetadataWithAge = metadata;
return metadata;
logger.logInfo(
"Refresh Snowflake metadata, client={} force={} tableRef={}", clientName, force, tableRef);

FileLocationInfo location =
this.owningManager.getRefreshedLocation(this.tableRef, Optional.empty());
setFileLocationInfo(location);
return this.fileTransferMetadataWithAge;
}

private synchronized void setFileLocationInfo(FileLocationInfo fileLocationInfo)
throws SnowflakeSQLException, IOException {
this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo);
this.fileLocationInfo = fileLocationInfo;
}

static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge(
Expand Down Expand Up @@ -265,7 +291,8 @@ static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge(
SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName)
throws SnowflakeSQLException, IOException {

FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.of(fileName));
FileLocationInfo location =
this.owningManager.getRefreshedLocation(this.tableRef, Optional.of(fileName));

SnowflakeFileTransferMetadataV1 metadata =
(SnowflakeFileTransferMetadataV1)
Expand Down Expand Up @@ -336,4 +363,8 @@ static void putLocal(String stageLocation, String fullFilePath, byte[] data) {
throw new SFException(ex, ErrorCode.BLOB_UPLOAD_FAILURE);
}
}

FileLocationInfo getFileLocationInfo() {
return this.fileLocationInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import net.snowflake.ingest.utils.Utils;

/** Class to manage single Snowflake internal stage */
class InternalStageManager<T> implements IStorageManager {
class InternalStageManager implements IStorageManager {
public static final TableRef NO_TABLE_REF = new TableRef("$NO_DB$", "$NO_SCH$", "$NO_TABLE$");
/** Target stage for the client */
private final InternalStage<T> targetStage;
private final InternalStage targetStage;

/** Increasing counter to generate a unique blob name per client */
private final AtomicLong counter;
Expand Down Expand Up @@ -70,15 +71,22 @@ class InternalStageManager<T> implements IStorageManager {
this.clientPrefix = response.getClientPrefix();
this.deploymentId = response.getDeploymentId();
this.targetStage =
new InternalStage<T>(
this, clientName, response.getStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES);
new InternalStage(
this,
clientName,
clientPrefix,
NO_TABLE_REF,
response.getStageLocation(),
DEFAULT_MAX_UPLOAD_RETRIES);
} else {
this.clientPrefix = null;
this.deploymentId = null;
this.targetStage =
new InternalStage<T>(
new InternalStage(
this,
"testClient",
null /* clientPrefix */,
NO_TABLE_REF,
(SnowflakeFileTransferMetadataWithAge) null,
DEFAULT_MAX_UPLOAD_RETRIES);
}
Expand All @@ -98,7 +106,7 @@ class InternalStageManager<T> implements IStorageManager {
*/
@Override
@SuppressWarnings("unused")
public InternalStage<T> getStorage(String fullyQualifiedTableName) {
public InternalStage getStorage(String fullyQualifiedTableName) {
// There's always only one stage for the client in non-iceberg mode
return targetStage;
}
Expand All @@ -108,7 +116,7 @@ public InternalStage<T> getStorage(String fullyQualifiedTableName) {
* nothing as there's no per-table state yet for FDN tables (that use internal stages).
*/
@Override
public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {}
public void registerTable(TableRef tableRef) {}

/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
Expand All @@ -117,7 +125,15 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {}
* @param fileName optional filename for single-file signed URL fetch from server
* @return the new location information
*/
FileLocationInfo getRefreshedLocation(Optional<String> fileName) {
@Override
public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional<String> fileName) {
if (!tableRef.equals(NO_TABLE_REF)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"getRefreshedLocation received TableRef=%s and expected=%s", tableRef, NO_TABLE_REF));
}

try {
ClientConfigureRequest request = new ClientConfigureRequest(this.role);
fileName.ifPresent(request::setFileName);
Expand Down
Loading
Loading