Skip to content

Commit

Permalink
SNOW-1754295 - Start using subscoped tokens for iceberg ingestion (#868)
Browse files Browse the repository at this point in the history
* add RefreshTableInformation API

* introduce subscopedtoken ext vol manager that reuses InternalStage.java for uploads via jdbc

* add tests, add two-hex-chars on client instead of server, add refreshTableInformation call right on registerTable

* pr comments

* pr comments
  • Loading branch information
sfc-gh-hmadan authored Oct 28, 2024
1 parent b1f76d9 commit a4ca491
Show file tree
Hide file tree
Showing 21 changed files with 694 additions and 841 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public enum ApiName {
STREAMING_CHANNEL_STATUS("POST"),
STREAMING_REGISTER_BLOB("POST"),
STREAMING_CLIENT_CONFIGURE("POST"),
GENERATE_PRESIGNED_URLS("POST");
GENERATE_PRESIGNED_URLS("POST"),
REFRESH_TABLE_INFORMATION("POST");

private final String httpMethod;

private ApiName(String httpMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package net.snowflake.ingest.streaming.internal;

import java.util.Optional;

/**
* Interface to manage {@link InternalStage} and {@link PresignedUrlExternalVolume} for {@link
* FlushService}
Expand All @@ -24,7 +26,7 @@ interface IStorageManager {
IStorage getStorage(String fullyQualifiedTableName);

/** Informs the storage manager about a new table that's being ingested into by the client. */
void registerTable(TableRef tableRef, FileLocationInfo locationInfo);
void registerTable(TableRef tableRef);

/**
* Generate a unique blob path and increment the blob sequencer
Expand All @@ -42,4 +44,14 @@ interface IStorageManager {
* @return the client prefix
*/
String getClientPrefix();

/**
* Get the updated subscoped tokens and location info for this table
*
* @param tableRef The table for which to get the location
* @param fileName Legacy, was used by deprecated GCS codepaths when it didn't support subscoped
* tokens. Not in use.
* @return
*/
FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional<String> fileName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import net.snowflake.ingest.utils.Utils;

/** Handles uploading files to the Snowflake Streaming Ingest Storage */
class InternalStage<T> implements IStorage {
class InternalStage implements IStorage {
private static final ObjectMapper mapper = new ObjectMapper();

/**
Expand All @@ -59,50 +59,69 @@ class InternalStage<T> implements IStorage {

private static final Logging logger = new Logging(InternalStage.class);

private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;
private final InternalStageManager<T> owningManager;
private final IStorageManager owningManager;
private final String clientName;

private final String clientPrefix;
private final TableRef tableRef;
private final int maxUploadRetries;

// Proxy parameters that we set while calling the Snowflake JDBC to upload the streams
private final Properties proxyProperties;

private FileLocationInfo fileLocationInfo;
private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;

/**
* Default constructor
*
* @param owningManager the storage manager owning this storage
* @param clientName The client name
* @param clientPrefix client prefix
* @param tableRef
* @param fileLocationInfo The file location information from open channel response
* @param maxUploadRetries The maximum number of retries to attempt
*/
InternalStage(
InternalStageManager<T> owningManager,
IStorageManager owningManager,
String clientName,
String clientPrefix,
TableRef tableRef,
FileLocationInfo fileLocationInfo,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
this(owningManager, clientName, (SnowflakeFileTransferMetadataWithAge) null, maxUploadRetries);
Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix());
this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo);
this(
owningManager,
clientName,
clientPrefix,
tableRef,
(SnowflakeFileTransferMetadataWithAge) null,
maxUploadRetries);
Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix);
setFileLocationInfo(fileLocationInfo);
}

/**
* Constructor for TESTING that takes SnowflakeFileTransferMetadataWithAge as input
*
* @param owningManager the storage manager owning this storage
* @param clientName the client name
* @param clientPrefix
* @param tableRef
* @param testMetadata SnowflakeFileTransferMetadataWithAge to test with
* @param maxUploadRetries the maximum number of retries to attempt
*/
InternalStage(
InternalStageManager<T> owningManager,
IStorageManager owningManager,
String clientName,
String clientPrefix,
TableRef tableRef,
SnowflakeFileTransferMetadataWithAge testMetadata,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
this.owningManager = owningManager;
this.clientName = clientName;
this.clientPrefix = clientPrefix;
this.tableRef = tableRef;
this.maxUploadRetries = maxUploadRetries;
this.proxyProperties = generateProxyPropertiesForJDBC();
this.fileTransferMetadataWithAge = testMetadata;
Expand Down Expand Up @@ -153,7 +172,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
.setUploadStream(inStream)
.setRequireCompress(false)
.setOcspMode(OCSPMode.FAIL_OPEN)
.setStreamingIngestClientKey(this.owningManager.getClientPrefix())
.setStreamingIngestClientKey(this.clientPrefix)
.setStreamingIngestClientName(this.clientName)
.setProxyProperties(this.proxyProperties)
.setDestFileName(fullFilePath)
Expand Down Expand Up @@ -194,8 +213,6 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
*/
synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boolean force)
throws SnowflakeSQLException, IOException {
logger.logInfo("Refresh Snowflake metadata, client={} force={}", clientName, force);

if (!force
&& fileTransferMetadataWithAge != null
&& fileTransferMetadataWithAge.timestamp.isPresent()
Expand All @@ -204,10 +221,19 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole
return fileTransferMetadataWithAge;
}

FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.empty());
SnowflakeFileTransferMetadataWithAge metadata = createFileTransferMetadataWithAge(location);
this.fileTransferMetadataWithAge = metadata;
return metadata;
logger.logInfo(
"Refresh Snowflake metadata, client={} force={} tableRef={}", clientName, force, tableRef);

FileLocationInfo location =
this.owningManager.getRefreshedLocation(this.tableRef, Optional.empty());
setFileLocationInfo(location);
return this.fileTransferMetadataWithAge;
}

private synchronized void setFileLocationInfo(FileLocationInfo fileLocationInfo)
throws SnowflakeSQLException, IOException {
this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo);
this.fileLocationInfo = fileLocationInfo;
}

static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge(
Expand Down Expand Up @@ -265,7 +291,8 @@ static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge(
SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName)
throws SnowflakeSQLException, IOException {

FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.of(fileName));
FileLocationInfo location =
this.owningManager.getRefreshedLocation(this.tableRef, Optional.of(fileName));

SnowflakeFileTransferMetadataV1 metadata =
(SnowflakeFileTransferMetadataV1)
Expand Down Expand Up @@ -336,4 +363,8 @@ static void putLocal(String stageLocation, String fullFilePath, byte[] data) {
throw new SFException(ex, ErrorCode.BLOB_UPLOAD_FAILURE);
}
}

FileLocationInfo getFileLocationInfo() {
return this.fileLocationInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import net.snowflake.ingest.utils.Utils;

/** Class to manage single Snowflake internal stage */
class InternalStageManager<T> implements IStorageManager {
class InternalStageManager implements IStorageManager {
public static final TableRef NO_TABLE_REF = new TableRef("$NO_DB$", "$NO_SCH$", "$NO_TABLE$");
/** Target stage for the client */
private final InternalStage<T> targetStage;
private final InternalStage targetStage;

/** Increasing counter to generate a unique blob name per client */
private final AtomicLong counter;
Expand Down Expand Up @@ -70,15 +71,22 @@ class InternalStageManager<T> implements IStorageManager {
this.clientPrefix = response.getClientPrefix();
this.deploymentId = response.getDeploymentId();
this.targetStage =
new InternalStage<T>(
this, clientName, response.getStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES);
new InternalStage(
this,
clientName,
clientPrefix,
NO_TABLE_REF,
response.getStageLocation(),
DEFAULT_MAX_UPLOAD_RETRIES);
} else {
this.clientPrefix = null;
this.deploymentId = null;
this.targetStage =
new InternalStage<T>(
new InternalStage(
this,
"testClient",
null /* clientPrefix */,
NO_TABLE_REF,
(SnowflakeFileTransferMetadataWithAge) null,
DEFAULT_MAX_UPLOAD_RETRIES);
}
Expand All @@ -98,7 +106,7 @@ class InternalStageManager<T> implements IStorageManager {
*/
@Override
@SuppressWarnings("unused")
public InternalStage<T> getStorage(String fullyQualifiedTableName) {
public InternalStage getStorage(String fullyQualifiedTableName) {
// There's always only one stage for the client in non-iceberg mode
return targetStage;
}
Expand All @@ -108,7 +116,7 @@ public InternalStage<T> getStorage(String fullyQualifiedTableName) {
* nothing as there's no per-table state yet for FDN tables (that use internal stages).
*/
@Override
public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {}
public void registerTable(TableRef tableRef) {}

/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
Expand All @@ -117,7 +125,15 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {}
* @param fileName optional filename for single-file signed URL fetch from server
* @return the new location information
*/
FileLocationInfo getRefreshedLocation(Optional<String> fileName) {
@Override
public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional<String> fileName) {
if (!tableRef.equals(NO_TABLE_REF)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"getRefreshedLocation received TableRef=%s and expected=%s", tableRef, NO_TABLE_REF));
}

try {
ClientConfigureRequest request = new ClientConfigureRequest(this.role);
fileName.ifPresent(request::setFileName);
Expand Down
Loading

0 comments on commit a4ca491

Please sign in to comment.