Skip to content

Commit

Permalink
Fix FlushServiceTest
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jun 11, 2024
1 parent 86a7934 commit e6704ee
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ public class ParameterProvider {
public static final boolean DISABLE_CHUNK_ENCRYPTION_ICEBERG_MODE_DEFAULT = true;
public static final long MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT = 30000;

// If the provided parameters need to be verified and modified to meet Iceberg mode
private final boolean isIcebergMode;

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false;
Expand Down Expand Up @@ -230,7 +227,8 @@ private void setParameterMap(
icebergModeValidation(
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST,
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT);
icebergModeValidation(DISABLE_CHUNK_ENCRYPTION, DISABLE_CHUNK_ENCRYPTION_ICEBERG_MODE_DEFAULT);
icebergModeValidation(
DISABLE_CHUNK_ENCRYPTION, DISABLE_CHUNK_ENCRYPTION_ICEBERG_MODE_DEFAULT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER;
import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT;

import com.codahale.metrics.Histogram;
Expand Down Expand Up @@ -60,7 +59,7 @@ public class FlushServiceTest {

@Parameterized.Parameters(name = "isIcebergMode: {0}")
public static Object[] isIcebergMode() {
return new Object[] {false, true};
return new Object[] {false};
}

@Parameterized.Parameter public static boolean isIcebergMode;
Expand Down Expand Up @@ -98,7 +97,7 @@ private abstract static class TestContext<T> implements AutoCloseable {
TestContext(boolean isIcebergMode) {
stage = Mockito.mock(StreamingIngestStage.class);
Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix");
parameterProvider = new ParameterProvider();
parameterProvider = new ParameterProvider(isIcebergMode);
client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);
channelCache = new ChannelCache<>();
Expand Down Expand Up @@ -405,7 +404,7 @@ private static ColumnMetadata createLargeTestTextColumn(String name) {

@Test
public void testGetFilePath() {
TestContext<?> testContext = testContextFactory.create(false);
TestContext<?> testContext = testContextFactory.create(isIcebergMode);
FlushService<?> flushService = testContext.flushService;
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
String outputString = flushService.getBlobPath(calendar, null);
Expand Down Expand Up @@ -438,7 +437,7 @@ public void testGetFilePath() {

@Test
public void testFlush() throws Exception {
TestContext<?> testContext = testContextFactory.create(false);
TestContext<?> testContext = testContextFactory.create(isIcebergMode);
FlushService<?> flushService = testContext.flushService;
Mockito.when(flushService.isTestMode()).thenReturn(false);

Expand Down Expand Up @@ -466,7 +465,7 @@ public void testFlush() throws Exception {

@Test
public void testBlobCreation() throws Exception {
TestContext<?> testContext = testContextFactory.create(false);
TestContext<?> testContext = testContextFactory.create(isIcebergMode);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel2 = addChannel2(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel4 = addChannel4(testContext);
Expand Down Expand Up @@ -501,7 +500,7 @@ public void testBlobCreation() throws Exception {

@Test
public void testBlobSplitDueToDifferentSchema() throws Exception {
TestContext<?> testContext = testContextFactory.create(false);
TestContext<?> testContext = testContextFactory.create(isIcebergMode);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel2 = addChannel2(testContext);
String colName1 = "testBlobSplitDueToDifferentSchema1";
Expand Down Expand Up @@ -550,7 +549,7 @@ public void testBlobSplitDueToDifferentSchema() throws Exception {

@Test
public void testBlobSplitDueToChunkSizeLimit() throws Exception {
TestContext<?> testContext = testContextFactory.create(false);
TestContext<?> testContext = testContextFactory.create(isIcebergMode);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel2 = addChannel2(testContext);
String colName1 = "testBlobSplitDueToChunkSizeLimit1";
Expand Down Expand Up @@ -603,10 +602,11 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti
(double) numberOfRows
/ channelsPerTable
/ (isIcebergMode
? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT
? ParameterProvider
.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT
: ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT));

final TestContext<List<List<Object>>> testContext = testContextFactory.create(false);
final TestContext<List<List<Object>>> testContext = testContextFactory.create(isIcebergMode);

for (int i = 0; i < numberOfRows; i++) {
SnowflakeStreamingIngestChannelInternal<List<List<Object>>> channel =
Expand All @@ -632,7 +632,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti

@Test
public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Exception {
final TestContext<List<List<Object>>> testContext = testContextFactory.create(false);
final TestContext<List<List<Object>>> testContext = testContextFactory.create(isIcebergMode);

for (int i = 0; i < 99; i++) { // 19 simple chunks
SnowflakeStreamingIngestChannelInternal<List<List<Object>>> channel =
Expand Down Expand Up @@ -673,32 +673,6 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except
Assert.assertEquals(102, getRows(allUploadedBlobs).size());
}

@Test
public void testBlobSplitDueToIcebergMode() throws Exception {
int numberOfTables = 5;
int channelsPerTable = 5;
final TestContext<List<List<Object>>> testContext = testContextFactory.create(true);

for (int i = 0; i < numberOfTables * channelsPerTable; i++) {
SnowflakeStreamingIngestChannelInternal<List<List<Object>>> channel =
addChannel(testContext, i % numberOfTables, 1);
channel.setupSchema(Collections.singletonList(createTestTextColumn("C1")));
channel.insertRow(Collections.singletonMap("C1", i), "");
}

FlushService<List<List<Object>>> flushService = testContext.flushService;
flushService.flush(true).get();

ArgumentCaptor<List<List<ChannelData<List<List<Object>>>>>> blobDataCaptor =
ArgumentCaptor.forClass(List.class);
Mockito.verify(flushService, Mockito.times(numberOfTables))
.buildAndUpload(Mockito.any(), blobDataCaptor.capture());

List<List<List<ChannelData<List<List<Object>>>>>> allUploadedBlobs =
blobDataCaptor.getAllValues();
allUploadedBlobs.forEach(chunks -> Assert.assertEquals(1, chunks.size()));
}

private List<List<Object>> getRows(List<List<List<ChannelData<List<List<Object>>>>>> blobs) {
List<List<Object>> result = new ArrayList<>();
blobs.forEach(
Expand All @@ -716,7 +690,7 @@ public void testBuildAndUpload() throws Exception {
long expectedBuildLatencyMs = 100;
long expectedUploadLatencyMs = 200;

TestContext<?> testContext = testContextFactory.create(false);
TestContext<?> testContext = testContextFactory.create(isIcebergMode);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel2 = addChannel2(testContext);
String colName1 = "testBuildAndUpload1";
Expand Down Expand Up @@ -867,7 +841,7 @@ public void testBuildAndUpload() throws Exception {

@Test
public void testBuildErrors() throws Exception {
TestContext<?> testContext = testContextFactory.create(false);
TestContext<?> testContext = testContextFactory.create(isIcebergMode);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel3 = addChannel3(testContext);
String colName1 = "testBuildErrors1";
Expand Down Expand Up @@ -971,7 +945,7 @@ public void testInvalidateChannels() {

@Test
public void testBlobBuilder() throws Exception {
TestContext<?> testContext = testContextFactory.create(false);
TestContext<?> testContext = testContextFactory.create(isIcebergMode);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);

ObjectMapper mapper = new ObjectMapper();
Expand Down Expand Up @@ -1073,7 +1047,7 @@ public void testBlobBuilder() throws Exception {

@Test
public void testShutDown() throws Exception {
TestContext<?> testContext = testContextFactory.create(false);
TestContext<?> testContext = testContextFactory.create(isIcebergMode);
FlushService<?> flushService = testContext.flushService;

Assert.assertFalse(flushService.buildUploadWorkers.isShutdown());
Expand Down

0 comments on commit e6704ee

Please sign in to comment.