Skip to content

Commit

Permalink
introduce subscopedtoken ext vol manager that reuses InternalStage.ja…
Browse files Browse the repository at this point in the history
…va for uploads via jdbc
  • Loading branch information
sfc-gh-hmadan committed Oct 21, 2024
1 parent 051fa5d commit b511743
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package net.snowflake.ingest.streaming.internal;

import java.util.Optional;

/**
* Interface to manage {@link InternalStage} and {@link PresignedUrlExternalVolume} for {@link
* FlushService}
Expand Down Expand Up @@ -42,4 +44,13 @@ interface IStorageManager {
* @return the client prefix
*/
String getClientPrefix();

/**
* Get the updated subscoped tokens and location info for this table
*
* @param tableRef
* @param fileName
* @return
*/
FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional<String> fileName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import net.snowflake.ingest.utils.Utils;

/** Handles uploading files to the Snowflake Streaming Ingest Storage */
class InternalStage<T> implements IStorage {
class InternalStage implements IStorage {
private static final ObjectMapper mapper = new ObjectMapper();

/**
Expand All @@ -59,31 +59,45 @@ class InternalStage<T> implements IStorage {

private static final Logging logger = new Logging(InternalStage.class);

private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;
private final InternalStageManager<T> owningManager;
private final IStorageManager owningManager;
private final String clientName;

private final String clientPrefix;
private final TableRef tableRef;
private final int maxUploadRetries;

// Proxy parameters that we set while calling the Snowflake JDBC to upload the streams
private final Properties proxyProperties;

private FileLocationInfo fileLocationInfo;
private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;

/**
* Default constructor
*
* @param owningManager the storage manager owning this storage
* @param clientName The client name
* @param clientPrefix client prefix
* @param tableRef
* @param fileLocationInfo The file location information from open channel response
* @param maxUploadRetries The maximum number of retries to attempt
*/
InternalStage(
InternalStageManager<T> owningManager,
IStorageManager owningManager,
String clientName,
String clientPrefix,
TableRef tableRef,
FileLocationInfo fileLocationInfo,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
this(owningManager, clientName, (SnowflakeFileTransferMetadataWithAge) null, maxUploadRetries);
Utils.assertStringNotNullOrEmpty("client prefix", this.owningManager.getClientPrefix());
this(
owningManager,
clientName,
clientPrefix,
tableRef,
(SnowflakeFileTransferMetadataWithAge) null,
maxUploadRetries);
Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix);
this.fileLocationInfo = fileLocationInfo;
this.fileTransferMetadataWithAge = createFileTransferMetadataWithAge(fileLocationInfo);
}

Expand All @@ -92,17 +106,23 @@ class InternalStage<T> implements IStorage {
*
* @param owningManager the storage manager owning this storage
* @param clientName the client name
* @param clientPrefix
* @param tableRef
* @param testMetadata SnowflakeFileTransferMetadataWithAge to test with
* @param maxUploadRetries the maximum number of retries to attempt
*/
InternalStage(
InternalStageManager<T> owningManager,
IStorageManager owningManager,
String clientName,
String clientPrefix,
TableRef tableRef,
SnowflakeFileTransferMetadataWithAge testMetadata,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
this.owningManager = owningManager;
this.clientName = clientName;
this.clientPrefix = clientPrefix;
this.tableRef = tableRef;
this.maxUploadRetries = maxUploadRetries;
this.proxyProperties = generateProxyPropertiesForJDBC();
this.fileTransferMetadataWithAge = testMetadata;
Expand Down Expand Up @@ -153,7 +173,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
.setUploadStream(inStream)
.setRequireCompress(false)
.setOcspMode(OCSPMode.FAIL_OPEN)
.setStreamingIngestClientKey(this.owningManager.getClientPrefix())
.setStreamingIngestClientKey(this.clientPrefix)
.setStreamingIngestClientName(this.clientName)
.setProxyProperties(this.proxyProperties)
.setDestFileName(fullFilePath)
Expand Down Expand Up @@ -204,8 +224,10 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole
return fileTransferMetadataWithAge;
}

FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.empty());
FileLocationInfo location =
this.owningManager.getRefreshedLocation(this.tableRef, Optional.empty());
SnowflakeFileTransferMetadataWithAge metadata = createFileTransferMetadataWithAge(location);
this.fileLocationInfo = location;
this.fileTransferMetadataWithAge = metadata;
return metadata;
}
Expand Down Expand Up @@ -265,7 +287,8 @@ static SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge(
SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName)
throws SnowflakeSQLException, IOException {

FileLocationInfo location = this.owningManager.getRefreshedLocation(Optional.of(fileName));
FileLocationInfo location =
this.owningManager.getRefreshedLocation(this.tableRef, Optional.of(fileName));

SnowflakeFileTransferMetadataV1 metadata =
(SnowflakeFileTransferMetadataV1)
Expand Down Expand Up @@ -336,4 +359,8 @@ static void putLocal(String stageLocation, String fullFilePath, byte[] data) {
throw new SFException(ex, ErrorCode.BLOB_UPLOAD_FAILURE);
}
}

FileLocationInfo getFileLocationInfo() {
return this.fileLocationInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import net.snowflake.ingest.utils.Utils;

/** Class to manage single Snowflake internal stage */
class InternalStageManager<T> implements IStorageManager {
class InternalStageManager implements IStorageManager {
public static final TableRef NO_TABLE_REF = new TableRef("$NO_DB$", "$NO_SCH$", "$NO_TABLE$");
/** Target stage for the client */
private final InternalStage<T> targetStage;
private final InternalStage targetStage;

/** Increasing counter to generate a unique blob name per client */
private final AtomicLong counter;
Expand Down Expand Up @@ -70,15 +71,22 @@ class InternalStageManager<T> implements IStorageManager {
this.clientPrefix = response.getClientPrefix();
this.deploymentId = response.getDeploymentId();
this.targetStage =
new InternalStage<T>(
this, clientName, response.getStageLocation(), DEFAULT_MAX_UPLOAD_RETRIES);
new InternalStage(
this,
clientName,
clientPrefix,
NO_TABLE_REF,
response.getStageLocation(),
DEFAULT_MAX_UPLOAD_RETRIES);
} else {
this.clientPrefix = null;
this.deploymentId = null;
this.targetStage =
new InternalStage<T>(
new InternalStage(
this,
"testClient",
null /* clientPrefix */,
NO_TABLE_REF,
(SnowflakeFileTransferMetadataWithAge) null,
DEFAULT_MAX_UPLOAD_RETRIES);
}
Expand All @@ -98,7 +106,7 @@ class InternalStageManager<T> implements IStorageManager {
*/
@Override
@SuppressWarnings("unused")
public InternalStage<T> getStorage(String fullyQualifiedTableName) {
public InternalStage getStorage(String fullyQualifiedTableName) {
// There's always only one stage for the client in non-iceberg mode
return targetStage;
}
Expand All @@ -117,7 +125,15 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {}
* @param fileName optional filename for single-file signed URL fetch from server
* @return the new location information
*/
FileLocationInfo getRefreshedLocation(Optional<String> fileName) {
@Override
public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional<String> fileName) {
if (!tableRef.equals(NO_TABLE_REF)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"getRefreshedLocation received TableRef=%s and expected=%s", tableRef, NO_TABLE_REF));
}

try {
ClientConfigureRequest request = new ClientConfigureRequest(this.role);
fileName.ifPresent(request::setFileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.utils.ErrorCode;
Expand Down Expand Up @@ -158,4 +159,9 @@ private PresignedUrlExternalVolume getVolumeSafe(String fullyQualifiedTableName)

return volume;
}

@Override
public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional<String> fileName) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Unsupported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea

this.storageManager =
isIcebergMode
? new PresignedUrlExternalVolumeManager(
? new SubscopedTokenExternalVolumeManager(
isTestMode, this.role, this.name, this.snowflakeServiceClient)
: new InternalStageManager<T>(
: new InternalStageManager(
isTestMode, this.role, this.name, this.snowflakeServiceClient);

try {
Expand Down
Loading

0 comments on commit b511743

Please sign in to comment.