Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 23, 2024
1 parent 513b589 commit 2ecd4c9
Show file tree
Hide file tree
Showing 15 changed files with 62 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ public static class Builder {
// Indicates whether it's under test mode
private boolean isTestMode;

// Whether we are going to ingest into iceberg tables
private boolean isIceberg;

private Builder(String name) {
this.name = name;
}
Expand All @@ -53,12 +50,6 @@ public Builder setIsTestMode(boolean isTestMode) {
return this;
}

// do not make public until the feature is ready
Builder setIsIceberg(boolean isIceberg) {
this.isIceberg = isIceberg;
return this;
}

public SnowflakeStreamingIngestClient build() {
Utils.assertStringNotNullOrEmpty("client name", this.name);
Utils.assertNotNull("connection properties", this.prop);
Expand All @@ -67,7 +58,7 @@ public SnowflakeStreamingIngestClient build() {
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));

return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, this.isIceberg, this.isTestMode);
this.name, accountURL, prop, this.parameterOverrides, this.isTestMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private final Constants.BdecParquetCompression bdecParquetCompression;
private final ParquetProperties.WriterVersion parquetWriterVersion;
private final boolean enableDictionaryEncoding;
private final boolean isIcebergMode;

/** Construct parquet flusher from its schema. */
public ParquetFlusher(
Expand All @@ -41,13 +42,15 @@ public ParquetFlusher(
Optional<Integer> maxRowGroups,
Constants.BdecParquetCompression bdecParquetCompression,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding) {
boolean enableDictionaryEncoding,
boolean isIcebergMode) {
this.schema = schema;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxRowGroups = maxRowGroups;
this.bdecParquetCompression = bdecParquetCompression;
this.parquetWriterVersion = parquetWriterVersion;
this.enableDictionaryEncoding = enableDictionaryEncoding;
this.isIcebergMode = isIcebergMode;
}

@Override
Expand Down Expand Up @@ -127,7 +130,9 @@ private SerializationResult serializeFromJavaObjects(
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
metadata.put(
isIcebergMode ? Constants.FULL_FILL_NAME_KEY : Constants.PRIMARY_FILE_ID_KEY,
StreamingIngestUtils.getShortname(filePath));
parquetWriter =
new SnowflakeParquetWriter(
mergedData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ public Flusher<ParquetChunkData> createFlusher() {
clientBufferParameters.getMaxRowGroups(),
clientBufferParameters.getBdecParquetCompression(),
parquetWriterVersion,
clientBufferParameters.isEnableDictionaryEncoding());
parquetWriterVersion == ParquetProperties.WriterVersion.PARQUET_2_0
&& clientBufferParameters
.isEnableDictionaryEncoding() /* writer 1.0 does not support dictionary encoding*/,
clientBufferParameters.getIsIcebergMode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
* @param prop connection properties
* @param httpClient http client for sending request
* @param isTestMode whether we're under test mode
* @param isIcebergMode whether we're streaming to Iceberg tables
* @param requestBuilder http request builder
* @param parameterOverrides parameters we override in case we want to set different values
*/
Expand All @@ -162,16 +161,16 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
SnowflakeURL accountURL,
Properties prop,
CloseableHttpClient httpClient,
boolean isIcebergMode,
boolean isTestMode,
RequestBuilder requestBuilder,
Map<String, Object> parameterOverrides) {
this.isIcebergMode =
prop != null && Boolean.parseBoolean(prop.getProperty(Constants.STREAMING_ICEBERG));
this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode);
this.internalParameterProvider = new InternalParameterProvider(isIcebergMode);

this.name = name;
String accountName = accountURL == null ? null : accountURL.getAccount();
this.isIcebergMode = isIcebergMode;
this.isTestMode = isTestMode;
this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient;
this.channelCache = new ChannelCache<>();
Expand Down Expand Up @@ -267,25 +266,23 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
* @param accountURL Snowflake account url
* @param prop connection properties
* @param parameterOverrides map of parameters to override for this client
* @param isIcebergMode whether we're streaming to Iceberg tables
* @param isTestMode indicates whether it's under test mode
*/
public SnowflakeStreamingIngestClientInternal(
String name,
SnowflakeURL accountURL,
Properties prop,
Map<String, Object> parameterOverrides,
boolean isIcebergMode,
boolean isTestMode) {
this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides);
this(name, accountURL, prop, null, isTestMode, null, parameterOverrides);
}

/*** Constructor for TEST ONLY
*
* @param name the name of the client
*/
SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) {
this(name, null, null, null, isIcebergMode, true, null, new HashMap<>());
SnowflakeStreamingIngestClientInternal(String name) {
this(name, null, null, null, true, null, new HashMap<>());
}

// TESTING ONLY - inject the request builder
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ public class Constants {
public static final String OAUTH_CLIENT_SECRET = "oauth_client_secret";
public static final String OAUTH_REFRESH_TOKEN = "oauth_refresh_token";
public static final String OAUTH_TOKEN_ENDPOINT = "oauth_token_endpoint";
public static final String STREAMING_ICEBERG = "streaming_iceberg";
public static final String SNOWFLAKE_OAUTH_TOKEN_ENDPOINT = "/oauth/token-request";
public static final String PRIMARY_FILE_ID_KEY =
"primaryFileId"; // Don't change, should match Parquet Scanner
public static final String FULL_FILL_NAME_KEY =
"fullFillNameKey"; // Don't change, should match Parquet Scanner
public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side
public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST =
10L; // Don't change, should match server side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class ParameterProvider {

public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;
public static final Constants.BdecParquetCompression
PARQUET_COMPRESSION_ALGORITHM_ICEBERG_MODE_DEFAULT = Constants.BdecParquetCompression.ZSTD;

/* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */
public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1;
Expand Down Expand Up @@ -253,7 +255,9 @@ private void setParameterMap(

this.checkAndUpdate(
BDEC_PARQUET_COMPRESSION_ALGORITHM,
BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
isIcebergMode
? BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT
: PARQUET_COMPRESSION_ALGORITHM_ICEBERG_MODE_DEFAULT,
parameterOverrides,
props,
false /* enforceDefault */);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private List<ChannelData<ParquetChunkData>> createChannelDataPerTable(int metada
isIceberg
? ParquetProperties.WriterVersion.PARQUET_2_0
: ParquetProperties.WriterVersion.PARQUET_1_0,
isIceberg,
isIceberg))
.when(channelData)
.createFlusher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
import org.apache.parquet.column.ParquetProperties;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -45,9 +47,11 @@ public void setup() {
cache = new ChannelCache<>();
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient();
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
Properties prop = new Properties();
prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
client =
new SnowflakeStreamingIngestClientInternal<>(
"client", null, null, httpClient, isIcebergMode, true, requestBuilder, new HashMap<>());
"client", null, prop, httpClient, true, requestBuilder, new HashMap<>());

channel1 =
new SnowflakeStreamingIngestChannelInternal<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;
import org.junit.Assert;
Expand Down Expand Up @@ -55,16 +57,11 @@ public void setUpBeforeAll() {
// SNOW-1490151: Testing gaps
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient();
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
Properties prop = new Properties();
prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
client =
new SnowflakeStreamingIngestClientInternal<>(
"client_PARQUET",
null,
null,
httpClient,
isIcebergMode,
true,
requestBuilder,
new HashMap<>());
"client_PARQUET", null, prop, httpClient, true, requestBuilder, new HashMap<>());

channel =
new SnowflakeStreamingIngestChannelInternal<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testCreateOAuthClient() throws Exception {
public void testSetRefreshToken() throws Exception {
// SNOW-1490151: Testing gaps
SnowflakeStreamingIngestClientInternal<StubChunkData> client =
new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false);
new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT");
MockOAuthClient mockOAuthClient = new MockOAuthClient();

OAuthManager oAuthManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Pair;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -41,9 +43,11 @@ public static Object[] isIcebergMode() {
public void setup() {
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient();
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
Properties prop = new Properties();
prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
client =
new SnowflakeStreamingIngestClientInternal<>(
"client", null, null, httpClient, isIcebergMode, true, requestBuilder, new HashMap<>());
"client", null, prop, httpClient, true, requestBuilder, new HashMap<>());
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2026,7 +2026,11 @@ public void testParquetFileNameMetadata() throws IOException {
flusher.serialize(Collections.singletonList(data), filePath);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
filePath,
reader
.getKeyValueMetadata()
.get(isIcebergMode ? Constants.FULL_FILL_NAME_KEY : Constants.PRIMARY_FILE_ID_KEY));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,18 @@ public static Object[] isIcebergMode() {
private SnowflakeStreamingIngestClientInternal<StubChunkData> client;
private MockSnowflakeServiceClient.ApiOverride apiOverride;

private Properties prop;

@Before
public void setup() {
apiOverride = new MockSnowflakeServiceClient.ApiOverride();
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(apiOverride);
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
prop = new Properties();
prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
client =
new SnowflakeStreamingIngestClientInternal<>(
"client", null, null, httpClient, isIcebergMode, true, requestBuilder, new HashMap<>());
"client", null, prop, httpClient, true, requestBuilder, new HashMap<>());

// some tests assume client is a mock object, just do it for everyone.
client = Mockito.spy(client);
Expand Down Expand Up @@ -501,9 +505,8 @@ public void testOpenChannelSuccessResponse() throws Exception {
new SnowflakeStreamingIngestClientInternal<>(
"client",
new SnowflakeURL("snowflake.dev.local:8082"),
null,
prop,
httpClient,
isIcebergMode,
true,
requestBuilder,
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public static Object[] isIcebergMode() {
SnowflakeStreamingIngestClientInternal<StubChunkData> client;
private MockSnowflakeServiceClient.ApiOverride apiOverride;
RequestBuilder requestBuilder;
private Properties isIcebergProp;

@Before
public void setup() throws Exception {
Expand All @@ -110,13 +111,15 @@ public void setup() throws Exception {
prop.put(ACCOUNT_URL, TestUtils.getHost());
prop.put(PRIVATE_KEY, TestUtils.getPrivateKey());
prop.put(ROLE, TestUtils.getRole());
isIcebergProp = new Properties();
isIcebergProp.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode));

apiOverride = new MockSnowflakeServiceClient.ApiOverride();
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(apiOverride);
requestBuilder = Mockito.spy(MockSnowflakeServiceClient.createRequestBuilder(httpClient));
client =
new SnowflakeStreamingIngestClientInternal<>(
"client", null, null, httpClient, isIcebergMode, true, requestBuilder, new HashMap<>());
"client", null, isIcebergProp, httpClient, true, requestBuilder, new HashMap<>());

channel1 =
new SnowflakeStreamingIngestChannelInternal<>(
Expand Down Expand Up @@ -472,9 +475,8 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception {
new SnowflakeStreamingIngestClientInternal<>(
"client",
new SnowflakeURL("snowflake.dev.local:8082"),
null,
isIcebergProp,
httpClient,
isIcebergMode,
true,
requestBuilder,
null);
Expand Down Expand Up @@ -1001,9 +1003,8 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception {
new SnowflakeStreamingIngestClientInternal<>(
"client",
new SnowflakeURL("snowflake.dev.local:8082"),
null,
isIcebergProp,
httpClient,
isIcebergMode,
true,
requestBuilder,
null);
Expand Down Expand Up @@ -1241,9 +1242,8 @@ public void testFlushServiceException() throws Exception {
new SnowflakeStreamingIngestClientInternal<>(
"client",
new SnowflakeURL("snowflake.dev.local:8082"),
null,
isIcebergProp,
httpClient,
isIcebergMode,
true,
requestBuilder,
parameterMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ protected void setUp(
conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse()));

Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
props.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIceberg));
if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
Expand All @@ -135,7 +136,7 @@ protected void setUp(
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
client =
new SnowflakeStreamingIngestClientInternal<>(
"client1", accountURL, prop, parameterMap, isIceberg, false);
"client1", accountURL, prop, parameterMap, false);
}

@After
Expand Down

0 comments on commit 2ecd4c9

Please sign in to comment.