Skip to content

Commit

Permalink
split ColumnNamesIt
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-hmadan committed Nov 8, 2024
1 parent 9f46c2b commit bb26539
Show file tree
Hide file tree
Showing 19 changed files with 246 additions and 176 deletions.
56 changes: 56 additions & 0 deletions src/test/java/net/snowflake/ingest/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static net.snowflake.ingest.utils.Constants.SSL;
import static net.snowflake.ingest.utils.Constants.USER;
import static net.snowflake.ingest.utils.Constants.WAREHOUSE;
import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;
import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION;
import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING;

Expand Down Expand Up @@ -50,8 +51,12 @@
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Constants.IcebergSerializationPolicy;
import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.SnowflakeURL;
import net.snowflake.ingest.utils.Utils;
import org.apache.commons.codec.binary.Base64;
import org.junit.Assert;
Expand Down Expand Up @@ -522,6 +527,57 @@ public static Properties createProps(boolean enableIcebergStreaming) {
return prop;
}

public static SnowflakeStreamingIngestClient setUp(
Connection conn,
String databaseName,
String schemaName,
boolean enableIcebergStreaming,
String compressionAlgorithm,
IcebergSerializationPolicy serializationPolicy)
throws Exception {
conn.createStatement().execute(String.format("create or replace database %s;", databaseName));
conn.createStatement().execute(String.format("use database %s;", databaseName));
conn.createStatement().execute(String.format("use schema %s;", schemaName));

if (enableIcebergStreaming) {
switch (serializationPolicy) {
case COMPATIBLE:
conn.createStatement()
.execute(
String.format(
"alter schema %s set STORAGE_SERIALIZATION_POLICY = 'COMPATIBLE';",
schemaName));
break;
case OPTIMIZED:
conn.createStatement()
.execute(
String.format(
"alter schema %s set STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED';",
schemaName));
break;
}
}

conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse()));

Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
props.setProperty(
ParameterProvider.ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming));
if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
props.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);

// Override Iceberg mode client lag to 1 second for faster test execution
Map<String, Object> parameterMap = new HashMap<>();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 1000L);

Properties prop = Utils.createProperties(props);
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
return new SnowflakeStreamingIngestClientInternal<>(
"client1", accountURL, prop, parameterMap, false);
}

private static <T> T nullOrIfNullable(boolean nullable, Random r, Supplier<T> value) {
return !nullable ? value.get() : (r.nextBoolean() ? value.get() : null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

package net.snowflake.ingest.streaming.internal.datatypes;

import static net.snowflake.ingest.utils.Constants.ROLE;
import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;

import com.fasterxml.jackson.core.json.JsonReadFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
Expand All @@ -21,28 +18,18 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Predicate;
import net.snowflake.ingest.TestUtils;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.SnowflakeURL;
import net.snowflake.ingest.utils.Utils;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public abstract class AbstractDataTypeTest {
private static final String SOURCE_COLUMN_NAME = "source";
private static final String VALUE_COLUMN_NAME = "value";
Expand All @@ -58,6 +45,8 @@ public abstract class AbstractDataTypeTest {

protected static BigDecimal MAX_ALLOWED_BIG_DECIMAL = new BigDecimal(MAX_ALLOWED_BIG_INTEGER);
protected static BigDecimal MIN_ALLOWED_BIG_DECIMAL = new BigDecimal(MIN_ALLOWED_BIG_INTEGER);
protected static final ObjectMapper objectMapper =
JsonMapper.builder().enable(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER).build();

protected Connection conn;
private String databaseName;
Expand All @@ -69,79 +58,29 @@ public abstract class AbstractDataTypeTest {
private Optional<ZoneId> defaultTimezone = Optional.empty();

private String schemaName = "PUBLIC";
private SnowflakeStreamingIngestClient client;
protected static final ObjectMapper objectMapper =
JsonMapper.builder().enable(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER).build();

@Parameters(name = "{index}: {0}")
public static Object[] parameters() {
return new Object[] {"GZIP", "ZSTD"};
}

@Parameter public String compressionAlgorithm;

public void before() throws Exception {
setUp(
false /* enableIcebergStreaming */,
compressionAlgorithm,
Constants.IcebergSerializationPolicy.COMPATIBLE);
}

public void beforeIceberg(
String compressionAlgorithm, Constants.IcebergSerializationPolicy serializationPolicy)
throws Exception {
setUp(true /* enableIcebergStreaming */, compressionAlgorithm, serializationPolicy);
}
protected SnowflakeStreamingIngestClient client = null;
protected Boolean enableIcebergStreaming = null;
protected String compressionAlgorithm = null;
protected Constants.IcebergSerializationPolicy serializationPolicy = null;

protected void setUp(
boolean enableIcebergStreaming,
String compressionAlgorithm,
Constants.IcebergSerializationPolicy serializationPolicy)
throws Exception {
databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier());
conn = TestUtils.getConnection(true);
conn.createStatement().execute(String.format("create or replace database %s;", databaseName));
conn.createStatement().execute(String.format("use database %s;", databaseName));
conn.createStatement().execute(String.format("use schema %s;", schemaName));

if (enableIcebergStreaming) {
switch (serializationPolicy) {
case COMPATIBLE:
conn.createStatement()
.execute(
String.format(
"alter schema %s set STORAGE_SERIALIZATION_POLICY = 'COMPATIBLE';",
schemaName));
break;
case OPTIMIZED:
conn.createStatement()
.execute(
String.format(
"alter schema %s set STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED';",
schemaName));
break;
}
}

conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse()));

Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
props.setProperty(
ParameterProvider.ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming));
if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
props.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);

// Override Iceberg mode client lag to 1 second for faster test execution
Map<String, Object> parameterMap = new HashMap<>();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 1000L);

Properties prop = Utils.createProperties(props);
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
client =
new SnowflakeStreamingIngestClientInternal<>(
"client1", accountURL, prop, parameterMap, false);
this.enableIcebergStreaming = enableIcebergStreaming;
this.compressionAlgorithm = compressionAlgorithm;
this.serializationPolicy = serializationPolicy;
this.databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier());
this.conn = TestUtils.getConnection(true);
this.client =
TestUtils.setUp(
conn,
databaseName,
schemaName,
enableIcebergStreaming,
compressionAlgorithm,
serializationPolicy);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,23 @@
import org.bouncycastle.util.encoders.Hex;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class BinaryIT extends AbstractDataTypeTest {
@Parameters(name = "{index}: {0}")
public static Object[] parameters() {
return new Object[] {"GZIP", "ZSTD"};
}

@Parameter public String compressionAlgorithm;

@Before
public void before() throws Exception {
super.before();
super.setUp(false, compressionAlgorithm, null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,27 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class DateTimeIT extends AbstractDataTypeTest {

private static final ZoneId TZ_LOS_ANGELES = ZoneId.of("America/Los_Angeles");
private static final ZoneId TZ_BERLIN = ZoneId.of("Europe/Berlin");
private static final ZoneId TZ_TOKYO = ZoneId.of("Asia/Tokyo");

@Parameters(name = "{index}: {0}")
public static Object[] parameters() {
return new Object[] {"GZIP", "ZSTD"};
}

@Parameter public String compressionAlgorithm;

@Before
public void setup() throws Exception {
super.before();
super.setUp(false, compressionAlgorithm, null);
// Set to a random time zone not to interfere with any of the tests
conn.createStatement().execute("alter session set timezone = 'America/New_York';");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
@RunWith(Parameterized.class)
public class IcebergDateTimeIT extends AbstractDataTypeTest {

@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
public static Object[][] parameters() {
return new Object[][] {
Expand All @@ -34,14 +35,15 @@ public static Object[][] parameters() {
};
}

@Parameterized.Parameter public static String compressionAlgorithm;
@Parameterized.Parameter(0)
public static String compressionAlgorithm;

@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

@Before
public void before() throws Exception {
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
super.setUp(true, compressionAlgorithm, icebergSerializationPolicy);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
@RunWith(Parameterized.class)
public class IcebergLogicalTypesIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
public static Object[][] parameters() {
Expand All @@ -20,14 +22,15 @@ public static Object[][] parameters() {
};
}

@Parameterized.Parameter public static String compressionAlgorithm;
@Parameterized.Parameter(0)
public static String compressionAlgorithm;

@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

@Before
public void before() throws Exception {
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
super.setUp(true, compressionAlgorithm, icebergSerializationPolicy);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
@RunWith(Parameterized.class)
public class IcebergNumericTypesIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
public static Object[][] parameters() {
Expand All @@ -31,7 +33,8 @@ public static Object[][] parameters() {
};
}

@Parameterized.Parameter public static String compressionAlgorithm;
@Parameterized.Parameter(0)
public static String compressionAlgorithm;

@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;
Expand All @@ -42,7 +45,7 @@ public static Object[][] parameters() {

@Before
public void before() throws Exception {
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
super.setUp(true, compressionAlgorithm, icebergSerializationPolicy);
long seed = System.currentTimeMillis();
logger.info("Random seed: {}", seed);
generator = new Random(seed);
Expand Down
Loading

0 comments on commit bb26539

Please sign in to comment.