Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1754295 - Prep for upcoming subscoped tokens PR #867

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,22 @@
package net.snowflake.ingest.streaming.internal;

/**
* Class to manage blob path strings that might have an embedded security token if its a presigned
* url
* Class to maintain the upload-path (relative to the location for which we have authorized access)
* and the file registration path (relative to the volume location).
*
* <p>In the case of FDN tables, these values are identical as we get access to the account's
* streaming_ingest volume.
*
* <p>In the case of Iceberg tables, these values are different since we scope the token down to a
* per-session subpath under the external volume's location, whereas the file registration still
* needs to happen relative to the ext vol.
*/
public class BlobPath {
public final String blobPath;
public final Boolean hasToken;
public final String fileName;
class BlobPath {
public final String uploadPath;
public final String fileRegistrationPath;

private BlobPath(String fileName, String blobPath, Boolean hasToken) {
this.blobPath = blobPath;
this.hasToken = hasToken;
this.fileName = fileName;
}

public static BlobPath fileNameWithoutToken(String fileName) {
return new BlobPath(fileName, fileName, false);
}

public static BlobPath presignedUrlWithToken(String fileName, String url) {
return new BlobPath(fileName, url, true);
public BlobPath(String uploadPath, String fileRegistrationPath) {
this.uploadPath = uploadPath;
this.fileRegistrationPath = fileRegistrationPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ && shouldStopProcessing(
long flushStartMs = System.currentTimeMillis();
if (this.owningClient.flushLatency != null) {
latencyTimerContextMap.putIfAbsent(
blobPath.fileName, this.owningClient.flushLatency.time());
blobPath.fileRegistrationPath, this.owningClient.flushLatency.time());
}

Supplier<BlobMetadata> supplier =
Expand All @@ -510,7 +510,7 @@ && shouldStopProcessing(
+ " detail=%s, trace=%s, all channels in the blob will be"
+ " invalidated",
this.owningClient.getName(),
blobPath.fileName,
blobPath.fileRegistrationPath,
ex,
ex.getMessage(),
getStackTrace(ex));
Expand Down Expand Up @@ -540,7 +540,7 @@ && shouldStopProcessing(

blobs.add(
new Pair<>(
new BlobData<>(blobPath.fileName, blobData),
new BlobData<>(blobPath.fileRegistrationPath, blobData),
CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers)));

logger.logInfo(
Expand Down Expand Up @@ -600,7 +600,7 @@ BlobMetadata buildAndUpload(
// Construct the blob along with the metadata of the blob
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
blobPath.fileName,
blobPath.fileRegistrationPath,
blobData,
bdecVersion,
this.owningClient.getInternalParameterProvider());
Expand Down Expand Up @@ -632,7 +632,7 @@ BlobMetadata upload(
List<ChunkMetadata> metadata,
BlobStats blobStats)
throws NoSuchAlgorithmException {
logger.logInfo("Start uploading blob={}, size={}", blobPath.fileName, blob.length);
logger.logInfo("Start uploading blob={}, size={}", blobPath.fileRegistrationPath, blob.length);
long startTime = System.currentTimeMillis();

Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency);
Expand All @@ -648,14 +648,14 @@ BlobMetadata upload(

logger.logInfo(
"Finish uploading blob={}, size={}, timeInMillis={}",
blobPath.fileName,
blobPath.fileRegistrationPath,
blob.length,
System.currentTimeMillis() - startTime);

// at this point we know for sure if the BDEC file has data for more than one chunk, i.e.
// spans mixed tables or not
return BlobMetadata.createBlobMetadata(
blobPath.fileName,
blobPath.fileRegistrationPath,
BlobBuilder.computeMD5(blob),
bdecVersion,
metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package net.snowflake.ingest.streaming.internal;

/** Interface to manage {@link InternalStage} and {@link ExternalVolume} for {@link FlushService} */
/**
* Interface to manage {@link InternalStage} and {@link PresignedUrlExternalVolume} for {@link
* FlushService}
*/
interface IStorageManager {

/** Default max upload retries for streaming ingest storage */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,11 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName)

/** Upload file to internal stage */
public void put(BlobPath blobPath, byte[] blob) {
String filePath = blobPath.fileName;
if (this.isLocalFS()) {
putLocal(this.fileTransferMetadataWithAge.localLocation, filePath, blob);
putLocal(this.fileTransferMetadataWithAge.localLocation, blobPath.fileRegistrationPath, blob);
} else {
try {
putRemote(filePath, blob, 0);
putRemote(blobPath.uploadPath, blob, 0);
} catch (SnowflakeSQLException | IOException e) {
throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public BlobPath generateBlobPath(String fullyQualifiedTableName) {
// other implementation
// of IStorageManager does end up using this argument.
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
return BlobPath.fileNameWithoutToken(getNextFileName(calendar, this.clientPrefix));
String fileName = getNextFileName(calendar, this.clientPrefix);
return new BlobPath(fileName /* uploadPath */, fileName /* fileRegistrationPath */);
}

/** For TESTING */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import net.snowflake.ingest.utils.SFException;

/** Handles uploading files to the Iceberg Table's external volume's table data path */
class ExternalVolume implements IStorage {
class PresignedUrlExternalVolume implements IStorage {
// TODO everywhere: static final should be named in all capitals
private static final Logging logger = new Logging(ExternalVolume.class);
private static final Logging logger = new Logging(PresignedUrlExternalVolume.class);
private static final int DEFAULT_PRESIGNED_URL_COUNT = 10;
private static final int DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS = 900;

Expand Down Expand Up @@ -74,7 +74,7 @@ class ExternalVolume implements IStorage {
private final FileLocationInfo locationInfo;
private final SnowflakeFileTransferMetadataWithAge fileTransferMetadata;

ExternalVolume(
PresignedUrlExternalVolume(
String clientName,
String clientPrefix,
Long deploymentId,
Expand Down Expand Up @@ -123,12 +123,13 @@ class ExternalVolume implements IStorage {
@Override
public void put(BlobPath blobPath, byte[] blob) {
if (this.fileTransferMetadata.isLocalFS) {
InternalStage.putLocal(this.fileTransferMetadata.localLocation, blobPath.fileName, blob);
InternalStage.putLocal(
this.fileTransferMetadata.localLocation, blobPath.fileRegistrationPath, blob);
return;
}

try {
putRemote(blobPath.blobPath, blob);
putRemote(blobPath.uploadPath, blob);
} catch (Throwable e) {
throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import net.snowflake.ingest.utils.SFException;

/** Class to manage multiple external volumes */
class ExternalVolumeManager implements IStorageManager {
class PresignedUrlExternalVolumeManager implements IStorageManager {
// TODO: Rename all logger members to LOGGER and checkin code formatting rules
private static final Logging logger = new Logging(ExternalVolumeManager.class);
private static final Logging logger = new Logging(PresignedUrlExternalVolumeManager.class);
// Reference to the external volume per table
private final Map<String, ExternalVolume> externalVolumeMap;
private final Map<String, PresignedUrlExternalVolume> externalVolumeMap;

// name of the owning client
private final String clientName;
Expand Down Expand Up @@ -48,7 +48,7 @@ class ExternalVolumeManager implements IStorageManager {
* @param clientName the name of the client
* @param snowflakeServiceClient the Snowflake service client used for configure calls
*/
ExternalVolumeManager(
PresignedUrlExternalVolumeManager(
boolean isTestMode,
String role,
String clientName,
Expand All @@ -66,7 +66,7 @@ class ExternalVolumeManager implements IStorageManager {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
}
logger.logDebug(
"Created ExternalVolumeManager with clientName=%s and clientPrefix=%s",
"Created PresignedUrlExternalVolumeManager with clientName=%s and clientPrefix=%s",
clientName, clientPrefix);
}

Expand All @@ -77,7 +77,7 @@ class ExternalVolumeManager implements IStorageManager {
* @return target storage
*/
@Override
public ExternalVolume getStorage(String fullyQualifiedTableName) {
public PresignedUrlExternalVolume getStorage(String fullyQualifiedTableName) {
// Only one chunk per blob in Iceberg mode.
return getVolumeSafe(fullyQualifiedTableName);
}
Expand All @@ -104,8 +104,8 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {
}

try {
ExternalVolume externalVolume =
new ExternalVolume(
PresignedUrlExternalVolume externalVolume =
new PresignedUrlExternalVolume(
clientName,
getClientPrefix(),
deploymentId,
Expand All @@ -132,9 +132,9 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {

@Override
public BlobPath generateBlobPath(String fullyQualifiedTableName) {
ExternalVolume volume = getVolumeSafe(fullyQualifiedTableName);
PresignedUrlExternalVolume volume = getVolumeSafe(fullyQualifiedTableName);
PresignedUrlInfo urlInfo = volume.dequeueUrlInfo();
return BlobPath.presignedUrlWithToken(urlInfo.fileName, urlInfo.url);
return new BlobPath(urlInfo.url /* uploadPath */, urlInfo.fileName /* fileRegistrationPath */);
}

/**
Expand All @@ -147,8 +147,8 @@ public String getClientPrefix() {
return this.clientPrefix;
}

private ExternalVolume getVolumeSafe(String fullyQualifiedTableName) {
ExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName);
private PresignedUrlExternalVolume getVolumeSafe(String fullyQualifiedTableName) {
PresignedUrlExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName);

if (volume == null) {
throw new SFException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea

this.storageManager =
isIcebergMode
? new ExternalVolumeManager(
? new PresignedUrlExternalVolumeManager(
isTestMode, this.role, this.name, this.snowflakeServiceClient)
: new InternalStageManager<T>(
isTestMode, this.role, this.name, this.snowflakeServiceClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ private abstract static class TestContext<T> implements AutoCloseable {
FlushService<T> flushService;
IStorageManager storageManager;
InternalStage storage;
ExternalVolume extVolume;
PresignedUrlExternalVolume extVolume;
ParameterProvider parameterProvider;
RegisterService registerService;

final List<ChannelData<T>> channelData = new ArrayList<>();

TestContext() {
storage = Mockito.mock(InternalStage.class);
extVolume = Mockito.mock(ExternalVolume.class);
extVolume = Mockito.mock(PresignedUrlExternalVolume.class);
parameterProvider = new ParameterProvider(isIcebergMode);
InternalParameterProvider internalParameterProvider =
new InternalParameterProvider(isIcebergMode);
Expand All @@ -118,7 +118,7 @@ private abstract static class TestContext<T> implements AutoCloseable {
storageManager =
Mockito.spy(
isIcebergMode
? new ExternalVolumeManager(
? new PresignedUrlExternalVolumeManager(
true, "role", "client", MockSnowflakeServiceClient.create())
: new InternalStageManager(true, "role", "client", null));
Mockito.doReturn(isIcebergMode ? extVolume : storage)
Expand Down Expand Up @@ -148,7 +148,7 @@ ChannelData<T> flushChannel(String name) {
BlobMetadata buildAndUpload() throws Exception {
List<List<ChannelData<T>>> blobData = Collections.singletonList(channelData);
return flushService.buildAndUpload(
BlobPath.fileNameWithoutToken("file_name"),
new BlobPath("file_name" /* uploadPath */, "file_name" /* fileRegistrationPath */),
blobData,
blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName());
}
Expand Down Expand Up @@ -966,7 +966,7 @@ public void testBuildAndUpload() throws Exception {
blobCaptor.capture(),
metadataCaptor.capture(),
ArgumentMatchers.any());
Assert.assertEquals("file_name", nameCaptor.getValue().fileName);
Assert.assertEquals("file_name", nameCaptor.getValue().fileRegistrationPath);

ChunkMetadata metadataResult = metadataCaptor.getValue().get(0);
List<ChannelMetadata> channelMetadataResult = metadataResult.getChannels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ public void testPutRemote() throws Exception {
final ArgumentCaptor<SnowflakeFileTransferConfig> captor =
ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class);

stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes);
stage.put(
new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */),
dataBytes);
PowerMockito.verifyStatic(SnowflakeFileTransferAgent.class);
SnowflakeFileTransferAgent.uploadWithoutConnection(captor.capture());
SnowflakeFileTransferConfig capturedConfig = captor.getValue();
Expand Down Expand Up @@ -186,7 +188,8 @@ public void testPutLocal() throws Exception {
1));
Mockito.doReturn(true).when(stage).isLocalFS();

stage.put(BlobPath.fileNameWithoutToken(fileName), dataBytes);
stage.put(
new BlobPath(fileName /* uploadPath */, fileName /* fileRegistrationPath */), dataBytes);
Path outputPath = Paths.get(fullFilePath, fileName);
List<String> output = Files.readAllLines(outputPath);
Assert.assertEquals(1, output.size());
Expand Down Expand Up @@ -223,7 +226,9 @@ public void doTestPutRemoteRefreshes() throws Exception {
ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class);

try {
stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes);
stage.put(
new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */),
dataBytes);
Assert.fail("Should not succeed");
} catch (SFException ex) {
// Expected behavior given mocked response
Expand Down Expand Up @@ -272,7 +277,9 @@ public void testPutRemoteGCS() throws Exception {
SnowflakeFileTransferMetadataV1 metaMock = Mockito.mock(SnowflakeFileTransferMetadataV1.class);

Mockito.doReturn(metaMock).when(stage).fetchSignedURL(Mockito.any());
stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes);
stage.put(
new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */),
dataBytes);
SnowflakeFileTransferAgent.uploadWithoutConnection(Mockito.any());
Mockito.verify(stage, times(1)).fetchSignedURL("test/path");
}
Expand Down Expand Up @@ -593,7 +600,9 @@ public Object answer(org.mockito.invocation.InvocationOnMock invocation)
final ArgumentCaptor<SnowflakeFileTransferConfig> captor =
ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class);

stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes);
stage.put(
new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */),
dataBytes);

PowerMockito.verifyStatic(SnowflakeFileTransferAgent.class, times(maxUploadRetryCount));
SnowflakeFileTransferAgent.uploadWithoutConnection(captor.capture());
Expand Down
Loading
Loading