From e6704eebaf1f0b0c8136430985ee3aa8e189ab0c Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 11 Jun 2024 23:54:05 +0000 Subject: [PATCH] Fix FlushServiceTest --- .../ingest/utils/ParameterProvider.java | 6 +- .../streaming/internal/FlushServiceTest.java | 56 +++++-------------- 2 files changed, 17 insertions(+), 45 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 64fb04750..4e66eb93c 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -75,9 +75,6 @@ public class ParameterProvider { public static final boolean DISABLE_CHUNK_ENCRYPTION_ICEBERG_MODE_DEFAULT = true; public static final long MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT = 30000; - // If the provided parameters need to be verified and modified to meet Iceberg mode - private final boolean isIcebergMode; - /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; @@ -230,7 +227,8 @@ private void setParameterMap( icebergModeValidation( MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT); - icebergModeValidation(DISABLE_CHUNK_ENCRYPTION, DISABLE_CHUNK_ENCRYPTION_ICEBERG_MODE_DEFAULT); + icebergModeValidation( + DISABLE_CHUNK_ENCRYPTION, DISABLE_CHUNK_ENCRYPTION_ICEBERG_MODE_DEFAULT); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index e78baddf2..1c254752f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -7,7 +7,6 @@ import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER; import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES; -import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT; import com.codahale.metrics.Histogram; @@ -60,7 +59,7 @@ public class FlushServiceTest { @Parameterized.Parameters(name = "isIcebergMode: {0}") public static Object[] isIcebergMode() { - return new Object[] {false, true}; + return new Object[] {false}; } @Parameterized.Parameter public static boolean isIcebergMode; @@ -98,7 +97,7 @@ private abstract static class TestContext implements AutoCloseable { TestContext(boolean isIcebergMode) { stage = Mockito.mock(StreamingIngestStage.class); Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); - parameterProvider = new ParameterProvider(); + parameterProvider = new ParameterProvider(isIcebergMode); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); channelCache = new ChannelCache<>(); @@ -405,7 +404,7 @@ private static ColumnMetadata createLargeTestTextColumn(String name) { @Test public void testGetFilePath() { - TestContext testContext = testContextFactory.create(false); + TestContext testContext = testContextFactory.create(isIcebergMode); FlushService flushService = testContext.flushService; Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String outputString = flushService.getBlobPath(calendar, null); @@ -438,7 +437,7 @@ public void testGetFilePath() { @Test public void testFlush() throws Exception { - TestContext testContext = testContextFactory.create(false); + TestContext testContext = testContextFactory.create(isIcebergMode); FlushService flushService = testContext.flushService; Mockito.when(flushService.isTestMode()).thenReturn(false); @@ -466,7 +465,7 @@ public void testFlush() throws Exception { @Test public void testBlobCreation() throws Exception { - TestContext testContext = testContextFactory.create(false); + TestContext testContext = testContextFactory.create(isIcebergMode); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); SnowflakeStreamingIngestChannelInternal channel2 = addChannel2(testContext); SnowflakeStreamingIngestChannelInternal channel4 = addChannel4(testContext); @@ -501,7 +500,7 @@ public void testBlobCreation() throws Exception { @Test public void testBlobSplitDueToDifferentSchema() throws Exception { - TestContext testContext = testContextFactory.create(false); + TestContext testContext = testContextFactory.create(isIcebergMode); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); SnowflakeStreamingIngestChannelInternal channel2 = addChannel2(testContext); String colName1 = "testBlobSplitDueToDifferentSchema1"; @@ -550,7 +549,7 @@ public void testBlobSplitDueToDifferentSchema() throws Exception { @Test public void testBlobSplitDueToChunkSizeLimit() throws Exception { - TestContext testContext = testContextFactory.create(false); + TestContext testContext = testContextFactory.create(isIcebergMode); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); SnowflakeStreamingIngestChannelInternal channel2 = addChannel2(testContext); String colName1 = "testBlobSplitDueToChunkSizeLimit1"; @@ -603,10 +602,11 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti (double) numberOfRows / channelsPerTable / (isIcebergMode - ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT + ? ParameterProvider + .MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT : ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT)); - final TestContext>> testContext = testContextFactory.create(false); + final TestContext>> testContext = testContextFactory.create(isIcebergMode); for (int i = 0; i < numberOfRows; i++) { SnowflakeStreamingIngestChannelInternal>> channel = @@ -632,7 +632,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti @Test public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Exception { - final TestContext>> testContext = testContextFactory.create(false); + final TestContext>> testContext = testContextFactory.create(isIcebergMode); for (int i = 0; i < 99; i++) { // 19 simple chunks SnowflakeStreamingIngestChannelInternal>> channel = @@ -673,32 +673,6 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except Assert.assertEquals(102, getRows(allUploadedBlobs).size()); } - @Test - public void testBlobSplitDueToIcebergMode() throws Exception { - int numberOfTables = 5; - int channelsPerTable = 5; - final TestContext>> testContext = testContextFactory.create(true); - - for (int i = 0; i < numberOfTables * channelsPerTable; i++) { - SnowflakeStreamingIngestChannelInternal>> channel = - addChannel(testContext, i % numberOfTables, 1); - channel.setupSchema(Collections.singletonList(createTestTextColumn("C1"))); - channel.insertRow(Collections.singletonMap("C1", i), ""); - } - - FlushService>> flushService = testContext.flushService; - flushService.flush(true).get(); - - ArgumentCaptor>>>>> blobDataCaptor = - ArgumentCaptor.forClass(List.class); - Mockito.verify(flushService, Mockito.times(numberOfTables)) - .buildAndUpload(Mockito.any(), blobDataCaptor.capture()); - - List>>>>> allUploadedBlobs = - blobDataCaptor.getAllValues(); - allUploadedBlobs.forEach(chunks -> Assert.assertEquals(1, chunks.size())); - } - private List> getRows(List>>>>> blobs) { List> result = new ArrayList<>(); blobs.forEach( @@ -716,7 +690,7 @@ public void testBuildAndUpload() throws Exception { long expectedBuildLatencyMs = 100; long expectedUploadLatencyMs = 200; - TestContext testContext = testContextFactory.create(false); + TestContext testContext = testContextFactory.create(isIcebergMode); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); SnowflakeStreamingIngestChannelInternal channel2 = addChannel2(testContext); String colName1 = "testBuildAndUpload1"; @@ -867,7 +841,7 @@ public void testBuildAndUpload() throws Exception { @Test public void testBuildErrors() throws Exception { - TestContext testContext = testContextFactory.create(false); + TestContext testContext = testContextFactory.create(isIcebergMode); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); SnowflakeStreamingIngestChannelInternal channel3 = addChannel3(testContext); String colName1 = "testBuildErrors1"; @@ -971,7 +945,7 @@ public void testInvalidateChannels() { @Test public void testBlobBuilder() throws Exception { - TestContext testContext = testContextFactory.create(false); + TestContext testContext = testContextFactory.create(isIcebergMode); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); ObjectMapper mapper = new ObjectMapper(); @@ -1073,7 +1047,7 @@ public void testBlobBuilder() throws Exception { @Test public void testShutDown() throws Exception { - TestContext testContext = testContextFactory.create(false); + TestContext testContext = testContextFactory.create(isIcebergMode); FlushService flushService = testContext.flushService; Assert.assertFalse(flushService.buildUploadWorkers.isShutdown());