Skip to content

Commit

Permalink
Fix ITs
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 9, 2024
1 parent 756771e commit 7fa0e3a
Show file tree
Hide file tree
Showing 18 changed files with 148 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,8 @@ static <T> Blob constructBlobAndMetadata(
List<List<ChannelData<T>>> blobData,
Constants.BdecVersion bdecVersion,
InternalParameterProvider internalParameterProvider)
throws IOException,
NoSuchPaddingException,
NoSuchAlgorithmException,
InvalidAlgorithmParameterException,
InvalidKeyException,
IllegalBlockSizeException,
throws IOException, NoSuchPaddingException, NoSuchAlgorithmException,
InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException,
BadPaddingException {
List<ChunkMetadata> chunksMetadataList = new ArrayList<>();
List<byte[]> chunksDataList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ private ClientBufferParameters(
this.isIcebergMode = isIcebergMode;
}

/**
* @param clientInternal reference to the client object where the relevant parameters are set
*/
/** @param clientInternal reference to the client object where the relevant parameters are set */
public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) {
this.maxChunkSizeInBytes =
clientInternal != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,12 +1231,8 @@ public void testShutDown() throws Exception {

@Test
public void testEncryptionDecryption()
throws InvalidAlgorithmParameterException,
NoSuchPaddingException,
IllegalBlockSizeException,
NoSuchAlgorithmException,
BadPaddingException,
InvalidKeyException {
throws InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException,
NoSuchAlgorithmException, BadPaddingException, InvalidKeyException {
byte[] data = "testEncryptionDecryption".getBytes(StandardCharsets.UTF_8);
String encryptionKey =
Base64.getEncoder().encodeToString("encryption_key".getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void setup() throws Exception {
ZoneOffset.UTC,
BDEC_VERSION,
null,
null);
null);
channel2 =
new SnowflakeStreamingIngestChannelInternal<>(
"channel2",
Expand All @@ -150,7 +150,7 @@ public void setup() throws Exception {
ZoneOffset.UTC,
BDEC_VERSION,
null,
null);
null);
channel3 =
new SnowflakeStreamingIngestChannelInternal<>(
"channel3",
Expand All @@ -167,7 +167,7 @@ public void setup() throws Exception {
ZoneOffset.UTC,
BDEC_VERSION,
null,
null);
null);
channel4 =
new SnowflakeStreamingIngestChannelInternal<>(
"channel4",
Expand All @@ -184,7 +184,7 @@ public void setup() throws Exception {
ZoneOffset.UTC,
BDEC_VERSION,
null,
null);
null);
}

@Test
Expand Down Expand Up @@ -382,7 +382,7 @@ public void testGetChannelsStatusWithRequest() throws Exception {
ZoneOffset.UTC,
BDEC_VERSION,
null,
null);
null);

ChannelsStatusRequest.ChannelStatusRequestDTO dto =
new ChannelsStatusRequest.ChannelStatusRequestDTO(channel);
Expand Down Expand Up @@ -445,7 +445,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception {
ZoneOffset.UTC,
BDEC_VERSION,
null,
null);
null);

try {
client.getChannelsStatus(Collections.singletonList(channel));
Expand Down Expand Up @@ -499,7 +499,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception {
ZoneOffset.UTC,
BDEC_VERSION,
null,
null);
null);

ChannelMetadata channelMetadata =
ChannelMetadata.builder()
Expand Down Expand Up @@ -1266,7 +1266,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception {
ZoneOffset.UTC,
BDEC_VERSION,
null,
null);
null);

ChannelsStatusRequest.ChannelStatusRequestDTO dto =
new ChannelsStatusRequest.ChannelStatusRequestDTO(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,52 @@ public abstract class AbstractDataTypeTest {
protected static final ObjectMapper objectMapper = new ObjectMapper();

@Parameters(name = "{index}: {0}")
public static Object[] compressionAlgorithms() {
public static Object[] parameters() {
return new Object[] {"GZIP", "ZSTD"};
}

@Parameter public String compressionAlgorithm;

public void before(boolean isIceberg) throws Exception {
public void before() throws Exception {
setUp(
false /* isIceberg */,
compressionAlgorithm,
Constants.IcebergSerializationPolicy.NON_ICEBERG);
}

public void beforeIceberg(
String compressionAlgorithm, Constants.IcebergSerializationPolicy serializationPolicy)
throws Exception {
setUp(true /* isIceberg */, compressionAlgorithm, serializationPolicy);
}

protected void setUp(
boolean isIceberg,
String compressionAlgorithm,
Constants.IcebergSerializationPolicy serializationPolicy)
throws Exception {
databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier());
conn = TestUtils.getConnection(true);
conn.createStatement().execute(String.format("create or replace database %s;", databaseName));
conn.createStatement().execute(String.format("use database %s;", databaseName));
conn.createStatement().execute(String.format("use schema %s;", schemaName));

switch (serializationPolicy) {
case COMPATIBLE:
conn.createStatement()
.execute(
String.format(
"alter schema %s set STORAGE_SERIALIZATION_POLICY = 'COMPATIBLE';",
schemaName));
break;
case OPTIMIZED:
conn.createStatement()
.execute(
String.format(
"alter schema %s set STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED';", schemaName));
break;
}

conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse()));

Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
public class BinaryIT extends AbstractDataTypeTest {
@Before
public void before() throws Exception {
super.before(false);
super.before();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class DateTimeIT extends AbstractDataTypeTest {

@Before
public void setup() throws Exception {
super.before(false);
super.before();
// Set to a random time zone not to interfere with any of the tests
conn.createStatement().execute("alter session set timezone = 'America/New_York';");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,36 @@
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
public class IcebergDateTimeIT extends AbstractDataTypeTest {

@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
public static Object[][] parameters() {
return new Object[][] {
{"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED},
{"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED}
};
}

@Parameterized.Parameter public static String compressionAlgorithm;

@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

@Before
public void before() throws Exception {
super.before(true);
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
package net.snowflake.ingest.streaming.internal.datatypes;

import java.util.Arrays;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
public class IcebergLogicalTypesIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
public static Object[][] parameters() {
return new Object[][] {
{"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED},
{"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED}
};
}

@Parameterized.Parameter public static String compressionAlgorithm;

@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

@Before
public void before() throws Exception {
super.before(true);
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,35 @@

import java.math.BigDecimal;
import java.util.Arrays;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
public class IcebergNumericTypesIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
public static Object[][] parameters() {
return new Object[][] {
{"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED},
{"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED}
};
}

@Parameterized.Parameter public static String compressionAlgorithm;

@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

@Before
public void before() throws Exception {
super.before(true);
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,36 @@

import java.math.BigDecimal;
import java.util.Arrays;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
public class IcebergStringIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
public static Object[][] parameters() {
return new Object[][] {
{"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED},
{"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED}
};
}

@Parameterized.Parameter public static String compressionAlgorithm;

@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

@Before
public void before() throws Exception {
super.before(true);
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,35 @@
import java.util.UUID;
import net.snowflake.ingest.TestUtils;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.Parameterized;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
public class IcebergStructuredIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
public static Object[][] parameters() {
return new Object[][] {
{"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED},
{"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED}
};
}

@Parameterized.Parameter public static String compressionAlgorithm;

@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

@Before
public void before() throws Exception {
super.before(true);
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public class LogicalTypesIT extends AbstractDataTypeTest {
@Before
public void before() throws Exception {
super.before(false);
super.before();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
public class NullIT extends AbstractDataTypeTest {
@Before
public void before() throws Exception {
super.before(false);
super.before();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public class NumericTypesIT extends AbstractDataTypeTest {
@Before
public void before() throws Exception {
super.before(false);
super.before();
}

@Test
Expand Down
Loading

0 comments on commit 7fa0e3a

Please sign in to comment.