Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 28, 2024
1 parent dbe021a commit 422ac86
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 51 deletions.
31 changes: 16 additions & 15 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public class ParameterProvider {
// Cached buffer flush interval - avoid parsing each time for quick lookup
private Long cachedBufferFlushIntervalMs = -1L;

// Cached isIcebergMode - avoid parsing each time for quick lookup
private Boolean cachedIsIcebergMode = null;

/**
* Constructor. Takes properties from profile file and properties from client constructor and
* resolves final parameter value
Expand All @@ -100,20 +103,6 @@ public ParameterProvider(Map<String, Object> parameterOverrides, Properties prop
this.setParameterMap(parameterOverrides, props);
}

/** Constructor for tests */
public ParameterProvider(
Map<String, Object> parameterOverrides, Properties props, boolean isIcebergMode) {
if (parameterOverrides != null) {
parameterOverrides.put(STREAMING_ICEBERG, isIcebergMode);
}
this.setParameterMap(parameterOverrides, props);
}

/** Empty constructor for tests */
public ParameterProvider(boolean isIcebergMode) {
this(new HashMap<>(), null, isIcebergMode);
}

private void checkAndUpdate(
String key,
Object defaultValue,
Expand Down Expand Up @@ -153,6 +142,7 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
BUFFER_FLUSH_INTERVAL_IN_MILLIS, MAX_CLIENT_LAG));
}

/* STREAMING_ICEBERG should be the first thing to set as it affects other parameters */
this.checkAndUpdate(
STREAMING_ICEBERG,
STREAMING_ICEBERG_DEFAULT,
Expand Down Expand Up @@ -509,8 +499,19 @@ public boolean isEnableNewJsonParsingLogic() {

/** @return Whether the client is in Iceberg mode */
public boolean isIcebergMode() {
if (cachedIsIcebergMode != null) {
return cachedIsIcebergMode;
}
Object val = this.parameterMap.getOrDefault(STREAMING_ICEBERG, STREAMING_ICEBERG_DEFAULT);
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;

try {
cachedIsIcebergMode =
(val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
} catch (Throwable t) {
throw new IllegalArgumentException(
String.format("Failed to parse STREAMING_ICEBERG = '%s'", val), t);
}
return cachedIsIcebergMode;
}

@Override
Expand Down
24 changes: 24 additions & 0 deletions src/test/java/net/snowflake/ingest/TestUtils.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest;

import static net.snowflake.ingest.utils.Constants.ACCOUNT;
Expand All @@ -14,6 +18,7 @@
import static net.snowflake.ingest.utils.Constants.USER;
import static net.snowflake.ingest.utils.Constants.WAREHOUSE;
import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION;
import static net.snowflake.ingest.utils.ParameterProvider.STREAMING_ICEBERG;

import java.io.IOException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -46,6 +51,7 @@
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.Utils;
import org.apache.commons.codec.binary.Base64;
import org.junit.Assert;
Expand Down Expand Up @@ -497,6 +503,24 @@ public static URI getTokenRequestURI() {
return tokenRequestURI;
}

public static ParameterProvider createParameterProvider(
Map<String, Object> parameterOverrides, Properties props, boolean isIcebergMode) {
if (parameterOverrides != null) {
parameterOverrides.put(STREAMING_ICEBERG, isIcebergMode);
}
return new ParameterProvider(parameterOverrides, props);
}

public static ParameterProvider createParameterProvider(boolean isIcebergMode) {
return createParameterProvider(new HashMap<>(), null, isIcebergMode);
}

public static Properties createProps(boolean isIcebergMode) {
Properties prop = new Properties();
prop.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
return prop;
}

private static <T> T nullOrIfNullable(boolean nullable, Random r, Supplier<T> value) {
return !nullable ? value.get() : (r.nextBoolean() ? value.get() : null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.TestUtils.createParameterProvider;
import static net.snowflake.ingest.utils.Constants.BLOB_CHECKSUM_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.BLOB_CHUNK_METADATA_LENGTH_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE;
Expand Down Expand Up @@ -107,7 +108,7 @@ private abstract static class TestContext<T> implements AutoCloseable {

TestContext() {
storage = Mockito.mock(InternalStage.class);
parameterProvider = new ParameterProvider(isIcebergMode);
parameterProvider = createParameterProvider(isIcebergMode);
InternalParameterProvider internalParameterProvider =
new InternalParameterProvider(isIcebergMode);
client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
Expand Down Expand Up @@ -1049,7 +1050,7 @@ public void testInvalidateChannels() {
// Create a new Client in order to not interfere with other tests
SnowflakeStreamingIngestClientInternal<StubChunkData> client =
Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode);
ParameterProvider parameterProvider = createParameterProvider(isIcebergMode);
ChannelCache<StubChunkData> channelCache = new ChannelCache<>();
InternalParameterProvider internalParameterProvider =
new InternalParameterProvider(isIcebergMode);
Expand Down
Loading

0 comments on commit 422ac86

Please sign in to comment.