Skip to content

Commit

Permalink
Iceberg Ingestion in CloudStorage mode - Carve out IStorage and Exter…
Browse files Browse the repository at this point in the history
…nalVolume from InternalStage (#828)

* carve out IStorage and ExternalVolume

* fix formatting

* early exit in some testcases, to be reenabled in the next PR when ExternalVolume is filled in

* minor PR feedback, retrigger snyk merge gate too
  • Loading branch information
sfc-gh-hmadan authored Sep 15, 2024
1 parent d69adfd commit c063036
Show file tree
Hide file tree
Showing 14 changed files with 478 additions and 426 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public static class Builder {
// Indicates whether it's under test mode
private boolean isTestMode;

// Whether we are going to ingest into iceberg tables
private boolean isIceberg;

private Builder(String name) {
this.name = name;
}
Expand All @@ -50,6 +53,12 @@ public Builder setIsTestMode(boolean isTestMode) {
return this;
}

// do not make public until the feature is ready
Builder setIsIceberg(boolean isIceberg) {
this.isIceberg = isIceberg;
return this;
}

public SnowflakeStreamingIngestClient build() {
Utils.assertStringNotNullOrEmpty("client name", this.name);
Utils.assertNotNull("connection properties", this.prop);
Expand All @@ -58,7 +67,7 @@ public SnowflakeStreamingIngestClient build() {
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));

return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, false, this.isTestMode);
this.name, accountURL, prop, this.parameterOverrides, this.isIceberg, this.isTestMode);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

/**
* Class to manage blob path strings that might have an embedded security token if its a presigned
* url
*/
public class BlobPath {
public final String blobPath;
public final Boolean hasToken;
public final String fileName;

private BlobPath(String fileName, String blobPath, Boolean hasToken) {
this.blobPath = blobPath;
this.hasToken = hasToken;
this.fileName = fileName;
}

public static BlobPath fileNameWithoutToken(String fileName) {
return new BlobPath(fileName, fileName, false);
}

public static BlobPath presignedUrlWithToken(String fileName, String url) {
return new BlobPath(fileName, url, true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package net.snowflake.ingest.streaming.internal;

/** Handles uploading files to the Iceberg Table's external volume's table data path */
class ExternalVolume implements IStorage {
@Override
public void put(BlobPath blobPath, byte[] blob) {
throw new RuntimeException("not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,37 @@
package net.snowflake.ingest.streaming.internal;

import java.io.IOException;
import java.util.Calendar;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

class ExternalVolumeLocation {
public final String dbName;
public final String schemaName;
public final String tableName;

public ExternalVolumeLocation(String dbName, String schemaName, String tableName) {
this.dbName = dbName;
this.schemaName = schemaName;
this.tableName = tableName;
}
}

/** Class to manage multiple external volumes */
class ExternalVolumeManager<T> implements IStorageManager<T, ExternalVolumeLocation> {
class ExternalVolumeManager implements IStorageManager {
// TODO: Rename all logger members to LOGGER and checkin code formatting rules
private static final Logging logger = new Logging(ExternalVolumeManager.class);

// Reference to the external volume per table
private final Map<String, StreamingIngestStorage<T, ExternalVolumeLocation>> externalVolumeMap;
private final Map<String, ExternalVolume> externalVolumeMap;

// name of the owning client
private final String clientName;

// role of the owning client
private final String role;

// Reference to the Snowflake service client used for configure calls
private final SnowflakeServiceClient snowflakeServiceClient;
private final SnowflakeServiceClient serviceClient;

// Client prefix generated by the Snowflake server
private final String clientPrefix;

// concurrency control to avoid creating multiple ExternalVolume objects for the same table (if
// openChannel is called
// multiple times concurrently)
private final Object registerTableLock = new Object();

/**
* Constructor for ExternalVolumeManager
*
Expand All @@ -57,20 +49,24 @@ class ExternalVolumeManager<T> implements IStorageManager<T, ExternalVolumeLocat
String role,
String clientName,
SnowflakeServiceClient snowflakeServiceClient) {
this.role = role;
this.clientName = clientName;
this.snowflakeServiceClient = snowflakeServiceClient;
this.role = role;
this.serviceClient = snowflakeServiceClient;
this.externalVolumeMap = new ConcurrentHashMap<>();
try {
this.clientPrefix =
isTestMode
? "testPrefix"
: this.snowflakeServiceClient
: this.serviceClient
.clientConfigure(new ClientConfigureRequest(role))
.getClientPrefix();
} catch (IngestResponseException | IOException e) {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
}

logger.logDebug(
"Created ExternalVolumeManager with clientName=%s and clientPrefix=%s",
clientName, clientPrefix);
}

/**
Expand All @@ -80,88 +76,55 @@ class ExternalVolumeManager<T> implements IStorageManager<T, ExternalVolumeLocat
* @return target storage
*/
@Override
public StreamingIngestStorage<T, ExternalVolumeLocation> getStorage(
String fullyQualifiedTableName) {
public IStorage getStorage(String fullyQualifiedTableName) {
// Only one chunk per blob in Iceberg mode.
StreamingIngestStorage<T, ExternalVolumeLocation> stage =
this.externalVolumeMap.get(fullyQualifiedTableName);

if (stage == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("No external volume found for table %s", fullyQualifiedTableName));
}

return stage;
return getVolumeSafe(fullyQualifiedTableName);
}

/**
* Add a storage to the manager by looking up the table name from the open channel response
*
* @param dbName the database name
* @param schemaName the schema name
* @param tableName the table name
* @param fileLocationInfo response from open channel
*/
/** Informs the storage manager about a new table that's being ingested into by the client. */
@Override
public void addStorage(
String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {
String fullyQualifiedTableName =
Utils.getFullyQualifiedTableName(dbName, schemaName, tableName);

try {
this.externalVolumeMap.put(
fullyQualifiedTableName,
new StreamingIngestStorage<T, ExternalVolumeLocation>(
this,
this.clientName,
fileLocationInfo,
new ExternalVolumeLocation(dbName, schemaName, tableName),
DEFAULT_MAX_UPLOAD_RETRIES));
} catch (SnowflakeSQLException | IOException err) {
throw new SFException(
err,
ErrorCode.UNABLE_TO_CONNECT_TO_STAGE,
String.format("fullyQualifiedTableName=%s", fullyQualifiedTableName));
public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {
if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) {
logger.logInfo(
"Skip registering table since its already been registered with the VolumeManager."
+ " tableRef=%s",
tableRef);
return;
}
}

/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
* location
*
* @param location A reference to the target location
* @param fileName optional filename for single-file signed URL fetch from server
* @return the new location information
*/
@Override
public FileLocationInfo getRefreshedLocation(
ExternalVolumeLocation location, Optional<String> fileName) {
try {
ChannelConfigureRequest request =
new ChannelConfigureRequest(
this.role, location.dbName, location.schemaName, location.tableName);
fileName.ifPresent(request::setFileName);
ChannelConfigureResponse response = this.snowflakeServiceClient.channelConfigure(request);
return response.getStageLocation();
} catch (IngestResponseException | IOException e) {
throw new SFException(e, ErrorCode.CHANNEL_CONFIGURE_FAILURE, e.getMessage());
// future enhancement - per table locks instead of per-client lock
synchronized (registerTableLock) {
if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) {
logger.logInfo(
"Skip registering table since its already been registered with the VolumeManager."
+ " tableRef=%s",
tableRef);
return;
}

try {
ExternalVolume externalVolume = new ExternalVolume();
this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume);
} catch (SFException ex) {
logger.logError(
"ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex);
// allow external volume ctor's SFExceptions to bubble up directly
throw ex;
} catch (Exception err) {
logger.logError(
"ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err);

throw new SFException(
err,
ErrorCode.UNABLE_TO_CONNECT_TO_STAGE,
String.format("fullyQualifiedTableName=%s", tableRef));
}
}
}

// TODO: SNOW-1502887 Blob path generation for external volume
@Override
public String generateBlobPath() {
return "snow_dummy_file_name.parquet";
}

// TODO: SNOW-1502887 Blob path generation for iceberg table
@Override
public void decrementBlobSequencer() {}

// TODO: SNOW-1502887 Blob path generation for iceberg table
public String getBlobPath(Calendar calendar, String clientPrefix) {
return "";
public BlobPath generateBlobPath(String fullyQualifiedTableName) {
throw new RuntimeException("not implemented");
}

/**
Expand All @@ -173,4 +136,16 @@ public String getBlobPath(Calendar calendar, String clientPrefix) {
public String getClientPrefix() {
return this.clientPrefix;
}

private ExternalVolume getVolumeSafe(String fullyQualifiedTableName) {
ExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName);

if (volume == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("No external volume found for tableRef=%s", fullyQualifiedTableName));
}

return volume;
}
}
Loading

0 comments on commit c063036

Please sign in to comment.