From 8f95d6404edc8e022c68ad9ac4a537dc2320aea3 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Sat, 7 Sep 2024 05:11:38 +0000 Subject: [PATCH 01/10] fix corruption --- .../snowflake/ingest/streaming/internal/ParquetFlusher.java | 4 ++++ .../ingest/streaming/internal/ParquetRowBuffer.java | 6 ------ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index 3ad3db5f4..d338a6a7b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -212,6 +212,10 @@ private SerializationResult serializeFromJavaObjects( } Map metadata = channelsDataPerTable.get(0).getVectors().metadata; + // We insert the filename in the file itself as metadata so that streams can work on replicated + // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and + // http://go/streams-on-replicated-mixed-tables + metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); parquetWriter = new BdecParquetWriter( mergedData, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 627478bca..526574d7f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -22,7 +22,6 @@ import net.snowflake.ingest.connection.TelemetryService; import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; -import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.hadoop.BdecParquetWriter; @@ -263,11 +262,6 @@ boolean hasColumns() { @Override Optional getSnapshot(final String filePath) { - // We insert the filename in the file itself as metadata so that streams can work on replicated - // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and - // http://go/streams-on-replicated-mixed-tables - metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); - List> oldData = new ArrayList<>(); if (!clientBufferParameters.getEnableParquetInternalBuffering()) { data.forEach(r -> oldData.add(new ArrayList<>(r))); From df57b6e4f71ecfed339896d9d04be8a876601e3e Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Sat, 7 Sep 2024 05:43:18 +0000 Subject: [PATCH 02/10] fix tests: --- .../ingest/streaming/internal/RowBufferTest.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 41739c267..eb84e363d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -505,11 +505,6 @@ private void testFlushHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals(startOffsetToken, data.getStartOffsetToken()); Assert.assertEquals(endOffsetToken, data.getEndOffsetToken()); Assert.assertEquals(bufferSize, data.getBufferSize(), 0); - - final ParquetChunkData chunkData = (ParquetChunkData) data.getVectors(); - Assert.assertEquals( - StreamingIngestUtils.getShortname(filename), - chunkData.metadata.get(Constants.PRIMARY_FILE_ID_KEY)); } @Test @@ -783,9 +778,6 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals(0, columnEpStats.get("COLCHAR").getCurrentNullCount()); Assert.assertEquals(-1, columnEpStats.get("COLCHAR").getDistinctValues()); - final ParquetChunkData chunkData = (ParquetChunkData) result.getVectors(); - Assert.assertEquals(filename, chunkData.metadata.get(Constants.PRIMARY_FILE_ID_KEY)); - // Confirm we reset ChannelData resetResults = rowBuffer.flush("my_snowpipe_streaming.bdec"); Assert.assertNull(resetResults); From 22773bbc85e0824836d7b4be87b4920fcc031f07 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Sat, 7 Sep 2024 05:43:26 +0000 Subject: [PATCH 03/10] Revert "SNOW-1618257 Fix PRIMARY_FILE_ID_KEY (#807)" This reverts commit 412ad3df6dfdba09f586e450cf4be797449cf725. --- .../streaming/internal/ParquetChunkData.java | 13 +-- .../streaming/internal/RowBufferTest.java | 95 ++----------------- 2 files changed, 10 insertions(+), 98 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java index 9950c44aa..16b1ededa 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java @@ -5,7 +5,6 @@ package net.snowflake.ingest.streaming.internal; import java.io.ByteArrayOutputStream; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.parquet.hadoop.BdecParquetWriter; @@ -35,16 +34,6 @@ public ParquetChunkData( this.rows = rows; this.parquetWriter = parquetWriter; this.output = output; - // create a defensive copy of the parameter map because the argument map passed here - // may currently be shared across multiple threads. - this.metadata = createDefensiveCopy(metadata); - } - - private Map createDefensiveCopy(final Map metadata) { - final Map copy = new HashMap<>(metadata); - for (String k : metadata.keySet()) { - copy.put(k, metadata.get(k)); - } - return copy; + this.metadata = metadata; } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index eb84e363d..60f711700 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -4,7 +4,6 @@ import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT; -import static org.junit.Assert.fail; import java.math.BigDecimal; import java.math.BigInteger; @@ -16,8 +15,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Constants; @@ -149,7 +146,7 @@ public void testCollatedColumnsAreRejected() { collatedColumn.setCollation("en-ci"); try { this.rowBufferOnErrorAbort.setupSchema(Collections.singletonList(collatedColumn)); - fail("Collated columns are not supported"); + Assert.fail("Collated columns are not supported"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNSUPPORTED_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -169,7 +166,7 @@ public void buildFieldErrorStates() { testCol.setPrecision(4); try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol)); - fail("Expected error"); + Assert.fail("Expected error"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -181,7 +178,7 @@ public void buildFieldErrorStates() { testCol.setLogicalType("FIXED"); try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol)); - fail("Expected error"); + Assert.fail("Expected error"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -193,7 +190,7 @@ public void buildFieldErrorStates() { testCol.setLogicalType("TIMESTAMP_NTZ"); try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol)); - fail("Expected error"); + Assert.fail("Expected error"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -205,7 +202,7 @@ public void buildFieldErrorStates() { testCol.setLogicalType("TIMESTAMP_TZ"); try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol)); - fail("Expected error"); + Assert.fail("Expected error"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -217,7 +214,7 @@ public void buildFieldErrorStates() { testCol.setLogicalType("TIME"); try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol)); - fail("Expected error"); + Assert.fail("Expected error"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -249,7 +246,7 @@ public void testInvalidLogicalType() { try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(colInvalidLogical)); - fail("Setup should fail if invalid column metadata is provided"); + Assert.fail("Setup should fail if invalid column metadata is provided"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); // Do nothing @@ -269,7 +266,7 @@ public void testInvalidPhysicalType() { try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(colInvalidPhysical)); - fail("Setup should fail if invalid column metadata is provided"); + Assert.fail("Setup should fail if invalid column metadata is provided"); } catch (SFException e) { Assert.assertEquals(e.getVendorCode(), ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode()); } @@ -630,7 +627,7 @@ public void testInvalidEPInfo() { try { AbstractRowBuffer.buildEpInfoFromStats(1, colStats); - fail("should fail when row count is smaller than null count."); + Assert.fail("should fail when row count is smaller than null count."); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); } @@ -1722,78 +1719,4 @@ public void testOnErrorAbortRowsWithError() { Assert.assertEquals(1, snapshotAbortParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotAbortParquet.get(0)); } - - @Test - public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedException { - final String testFileA = "testFileA"; - final String testFileB = "testFileB"; - - final ParquetRowBuffer bufferUnderTest = - (ParquetRowBuffer) createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE); - - final ColumnMetadata colChar = new ColumnMetadata(); - colChar.setOrdinal(1); - colChar.setName("COLCHAR"); - colChar.setPhysicalType("LOB"); - colChar.setNullable(true); - colChar.setLogicalType("TEXT"); - colChar.setByteLength(14); - colChar.setLength(11); - colChar.setScale(0); - - bufferUnderTest.setupSchema(Collections.singletonList(colChar)); - - loadData(bufferUnderTest, Collections.singletonMap("colChar", "a")); - - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference> firstFlushResult = new AtomicReference<>(); - final Thread t = - getThreadThatWaitsForLockReleaseAndFlushes( - bufferUnderTest, testFileA, latch, firstFlushResult); - t.start(); - - final ChannelData secondFlushResult = bufferUnderTest.flush(testFileB); - Assert.assertEquals(testFileB, getPrimaryFileId(secondFlushResult)); - - latch.countDown(); - t.join(); - - Assert.assertNotNull(firstFlushResult.get()); - Assert.assertEquals(testFileA, getPrimaryFileId(firstFlushResult.get())); - Assert.assertEquals(testFileB, getPrimaryFileId(secondFlushResult)); - } - - private static Thread getThreadThatWaitsForLockReleaseAndFlushes( - final ParquetRowBuffer bufferUnderTest, - final String filenameToFlush, - final CountDownLatch latch, - final AtomicReference> flushResult) { - return new Thread( - () -> { - try { - latch.await(); - } catch (InterruptedException e) { - fail("Thread was unexpectedly interrupted"); - } - - final ChannelData flush = - loadData(bufferUnderTest, Collections.singletonMap("colChar", "b")) - .flush(filenameToFlush); - flushResult.set(flush); - }); - } - - private static ParquetRowBuffer loadData( - final ParquetRowBuffer bufferToLoad, final Map data) { - final List> validRows = new ArrayList<>(); - validRows.add(data); - - final InsertValidationResponse nResponse = bufferToLoad.insertRows(validRows, "1", "1"); - Assert.assertFalse(nResponse.hasErrors()); - return bufferToLoad; - } - - private static String getPrimaryFileId(final ChannelData chunkData) { - return chunkData.getVectors().metadata.get(Constants.PRIMARY_FILE_ID_KEY); - } } From a3f5a3b2806c8614d807ef07746e21f0361ff5b8 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Mon, 9 Sep 2024 05:57:49 +0000 Subject: [PATCH 04/10] Reapply "SNOW-1618257 Fix PRIMARY_FILE_ID_KEY (#807)" This reverts commit 22773bbc85e0824836d7b4be87b4920fcc031f07. --- .../streaming/internal/ParquetChunkData.java | 13 ++- .../streaming/internal/RowBufferTest.java | 95 +++++++++++++++++-- 2 files changed, 98 insertions(+), 10 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java index 16b1ededa..9950c44aa 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import java.io.ByteArrayOutputStream; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.parquet.hadoop.BdecParquetWriter; @@ -34,6 +35,16 @@ public ParquetChunkData( this.rows = rows; this.parquetWriter = parquetWriter; this.output = output; - this.metadata = metadata; + // create a defensive copy of the parameter map because the argument map passed here + // may currently be shared across multiple threads. + this.metadata = createDefensiveCopy(metadata); + } + + private Map createDefensiveCopy(final Map metadata) { + final Map copy = new HashMap<>(metadata); + for (String k : metadata.keySet()) { + copy.put(k, metadata.get(k)); + } + return copy; } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 60f711700..eb84e363d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -4,6 +4,7 @@ import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT; +import static org.junit.Assert.fail; import java.math.BigDecimal; import java.math.BigInteger; @@ -15,6 +16,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Constants; @@ -146,7 +149,7 @@ public void testCollatedColumnsAreRejected() { collatedColumn.setCollation("en-ci"); try { this.rowBufferOnErrorAbort.setupSchema(Collections.singletonList(collatedColumn)); - Assert.fail("Collated columns are not supported"); + fail("Collated columns are not supported"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNSUPPORTED_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -166,7 +169,7 @@ public void buildFieldErrorStates() { testCol.setPrecision(4); try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol)); - Assert.fail("Expected error"); + fail("Expected error"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -178,7 +181,7 @@ public void buildFieldErrorStates() { testCol.setLogicalType("FIXED"); try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol)); - Assert.fail("Expected error"); + fail("Expected error"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -190,7 +193,7 @@ public void buildFieldErrorStates() { testCol.setLogicalType("TIMESTAMP_NTZ"); try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol)); - Assert.fail("Expected error"); + fail("Expected error"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -202,7 +205,7 @@ public void buildFieldErrorStates() { testCol.setLogicalType("TIMESTAMP_TZ"); try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol)); - Assert.fail("Expected error"); + fail("Expected error"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -214,7 +217,7 @@ public void buildFieldErrorStates() { testCol.setLogicalType("TIME"); try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol)); - Assert.fail("Expected error"); + fail("Expected error"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); } @@ -246,7 +249,7 @@ public void testInvalidLogicalType() { try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(colInvalidLogical)); - Assert.fail("Setup should fail if invalid column metadata is provided"); + fail("Setup should fail if invalid column metadata is provided"); } catch (SFException e) { Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode()); // Do nothing @@ -266,7 +269,7 @@ public void testInvalidPhysicalType() { try { this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(colInvalidPhysical)); - Assert.fail("Setup should fail if invalid column metadata is provided"); + fail("Setup should fail if invalid column metadata is provided"); } catch (SFException e) { Assert.assertEquals(e.getVendorCode(), ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode()); } @@ -627,7 +630,7 @@ public void testInvalidEPInfo() { try { AbstractRowBuffer.buildEpInfoFromStats(1, colStats); - Assert.fail("should fail when row count is smaller than null count."); + fail("should fail when row count is smaller than null count."); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); } @@ -1719,4 +1722,78 @@ public void testOnErrorAbortRowsWithError() { Assert.assertEquals(1, snapshotAbortParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotAbortParquet.get(0)); } + + @Test + public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedException { + final String testFileA = "testFileA"; + final String testFileB = "testFileB"; + + final ParquetRowBuffer bufferUnderTest = + (ParquetRowBuffer) createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE); + + final ColumnMetadata colChar = new ColumnMetadata(); + colChar.setOrdinal(1); + colChar.setName("COLCHAR"); + colChar.setPhysicalType("LOB"); + colChar.setNullable(true); + colChar.setLogicalType("TEXT"); + colChar.setByteLength(14); + colChar.setLength(11); + colChar.setScale(0); + + bufferUnderTest.setupSchema(Collections.singletonList(colChar)); + + loadData(bufferUnderTest, Collections.singletonMap("colChar", "a")); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference> firstFlushResult = new AtomicReference<>(); + final Thread t = + getThreadThatWaitsForLockReleaseAndFlushes( + bufferUnderTest, testFileA, latch, firstFlushResult); + t.start(); + + final ChannelData secondFlushResult = bufferUnderTest.flush(testFileB); + Assert.assertEquals(testFileB, getPrimaryFileId(secondFlushResult)); + + latch.countDown(); + t.join(); + + Assert.assertNotNull(firstFlushResult.get()); + Assert.assertEquals(testFileA, getPrimaryFileId(firstFlushResult.get())); + Assert.assertEquals(testFileB, getPrimaryFileId(secondFlushResult)); + } + + private static Thread getThreadThatWaitsForLockReleaseAndFlushes( + final ParquetRowBuffer bufferUnderTest, + final String filenameToFlush, + final CountDownLatch latch, + final AtomicReference> flushResult) { + return new Thread( + () -> { + try { + latch.await(); + } catch (InterruptedException e) { + fail("Thread was unexpectedly interrupted"); + } + + final ChannelData flush = + loadData(bufferUnderTest, Collections.singletonMap("colChar", "b")) + .flush(filenameToFlush); + flushResult.set(flush); + }); + } + + private static ParquetRowBuffer loadData( + final ParquetRowBuffer bufferToLoad, final Map data) { + final List> validRows = new ArrayList<>(); + validRows.add(data); + + final InsertValidationResponse nResponse = bufferToLoad.insertRows(validRows, "1", "1"); + Assert.assertFalse(nResponse.hasErrors()); + return bufferToLoad; + } + + private static String getPrimaryFileId(final ChannelData chunkData) { + return chunkData.getVectors().metadata.get(Constants.PRIMARY_FILE_ID_KEY); + } } From 3d49a915a2ef45edb0e2a6f7665350959dcbb149 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Mon, 9 Sep 2024 06:33:47 +0000 Subject: [PATCH 05/10] address comments --- .../streaming/internal/AbstractRowBuffer.java | 9 ++-- .../streaming/internal/FlushService.java | 2 +- .../streaming/internal/ParquetRowBuffer.java | 2 +- .../ingest/streaming/internal/RowBuffer.java | 3 +- ...owflakeStreamingIngestChannelInternal.java | 5 +- .../streaming/internal/FlushServiceTest.java | 2 +- .../streaming/internal/RowBufferTest.java | 50 ++++++++----------- .../SnowflakeStreamingIngestChannelTest.java | 4 +- 8 files changed, 33 insertions(+), 44 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 71a9d501e..df64af430 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -496,11 +496,10 @@ public InsertValidationResponse insertRows( * Flush the data in the row buffer by taking the ownership of the old vectors and pass all the * required info back to the flush service to build the blob * - * @param filePath the name of the file the data will be written in * @return A ChannelData object that contains the info needed by the flush service to build a blob */ @Override - public ChannelData flush(final String filePath) { + public ChannelData flush() { logger.logDebug("Start get data for channel={}", channelFullyQualifiedName); if (this.bufferedRowCount > 0) { Optional oldData = Optional.empty(); @@ -518,7 +517,7 @@ public ChannelData flush(final String filePath) { try { if (this.bufferedRowCount > 0) { // Transfer the ownership of the vectors - oldData = getSnapshot(filePath); + oldData = getSnapshot(); oldRowCount = this.bufferedRowCount; oldBufferSize = this.bufferSize; oldRowSequencer = this.channelState.incrementAndGetRowSequencer(); @@ -617,10 +616,8 @@ void reset() { /** * Get buffered data snapshot for later flushing. - * - * @param filePath the name of the file the data will be written in */ - abstract Optional getSnapshot(final String filePath); + abstract Optional getSnapshot(); @VisibleForTesting abstract Object getVectorValueAt(String column, int index); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 6f5296209..2e7c6507c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -430,7 +430,7 @@ void distributeFlushTasks(Set tablesToFlush) { .forEach( channel -> { if (channel.isValid()) { - ChannelData data = channel.getData(blobPath); + ChannelData data = channel.getData(); if (data != null) { channelsDataPerTable.add(data); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 526574d7f..30851c274 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -261,7 +261,7 @@ boolean hasColumns() { } @Override - Optional getSnapshot(final String filePath) { + Optional getSnapshot() { List> oldData = new ArrayList<>(); if (!clientBufferParameters.getEnableParquetInternalBuffering()) { data.forEach(r -> oldData.add(new ArrayList<>(r))); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java index 02905c02e..6bb2f43b9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java @@ -37,10 +37,9 @@ InsertValidationResponse insertRows( * Flush the data in the row buffer by taking the ownership of the old vectors and pass all the * required info back to the flush service to build the blob * - * @param filePath the name of the file the data will be written in * @return A ChannelData object that contains the info needed by the flush service to build a blob */ - ChannelData flush(final String filePath); + ChannelData flush(); /** * Close the row buffer and release resources. Note that the caller needs to handle diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index ca0bbe782..4e884387b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -220,11 +220,10 @@ public String getFullyQualifiedTableName() { /** * Get all the data needed to build the blob during flush * - * @param filePath the name of the file the data will be written in * @return a ChannelData object */ - ChannelData getData(final String filePath) { - ChannelData data = this.rowBuffer.flush(filePath); + ChannelData getData() { + ChannelData data = this.rowBuffer.flush(); if (data != null) { data.setChannelContext(channelFlushContext); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 4cc5cbe30..303893bf0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -131,7 +131,7 @@ void setParameterOverride(Map parameterOverride) { ChannelData flushChannel(String name) { SnowflakeStreamingIngestChannelInternal channel = channels.get(name); - ChannelData channelData = channel.getRowBuffer().flush(name + "_snowpipe_streaming.bdec"); + ChannelData channelData = channel.getRowBuffer().flush(); channelData.setChannelContext(channel.getChannelContext()); this.channelData.add(channelData); return channelData; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index eb84e363d..81eab494a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -499,7 +499,7 @@ private void testFlushHelper(AbstractRowBuffer rowBuffer) { float bufferSize = rowBuffer.getSize(); final String filename = "2022/7/13/16/56/testFlushHelper_streaming.bdec"; - ChannelData data = rowBuffer.flush(filename); + ChannelData data = rowBuffer.flush(); Assert.assertEquals(2, data.getRowCount()); Assert.assertEquals((Long) 1L, data.getRowSequencer()); Assert.assertEquals(startOffsetToken, data.getStartOffsetToken()); @@ -734,7 +734,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { final String filename = "testStatsE2EHelper_streaming.bdec"; InsertValidationResponse response = rowBuffer.insertRows(Arrays.asList(row1, row2), null, null); Assert.assertFalse(response.hasErrors()); - ChannelData result = rowBuffer.flush(filename); + ChannelData result = rowBuffer.flush(); Map columnEpStats = result.getColumnEps(); Assert.assertEquals( @@ -779,7 +779,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals(-1, columnEpStats.get("COLCHAR").getDistinctValues()); // Confirm we reset - ChannelData resetResults = rowBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData resetResults = rowBuffer.flush(); Assert.assertNull(resetResults); } @@ -838,7 +838,7 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro InsertValidationResponse response = innerBuffer.insertRows(Arrays.asList(row1, row2, row3), null, null); Assert.assertFalse(response.hasErrors()); - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( @@ -907,7 +907,7 @@ private void testE2EDateHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertNull(innerBuffer.getVectorValueAt("COLDATE", 2)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( @@ -973,7 +973,7 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB8", 2)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( @@ -1204,7 +1204,7 @@ private void doTestFailureHalfwayThroughColumnProcessing( } } - ChannelData channelData = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData channelData = innerBuffer.flush(); RowBufferStats statsCol1 = channelData.getColumnEps().get("COLVARCHAR1"); RowBufferStats statsCol2 = channelData.getColumnEps().get("COLVARCHAR2"); RowBufferStats statsCol3 = channelData.getColumnEps().get("COLBOOLEAN1"); @@ -1264,7 +1264,7 @@ private void testE2EBooleanHelper(OpenChannelRequest.OnErrorOption onErrorOption Assert.assertNull(innerBuffer.getVectorValueAt("COLBOOLEAN", 2)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( @@ -1319,7 +1319,7 @@ private void testE2EBinaryHelper(OpenChannelRequest.OnErrorOption onErrorOption) Assert.assertNull(innerBuffer.getVectorValueAt("COLBINARY", 2)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals(11L, result.getColumnEps().get("COLBINARY").getCurrentMaxLength()); @@ -1371,7 +1371,7 @@ private void testE2ERealHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertNull(innerBuffer.getVectorValueAt("COLREAL", 2)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( @@ -1454,7 +1454,7 @@ public void testOnErrorAbortFailures() { Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); - ChannelData data = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData data = innerBuffer.flush(); Assert.assertEquals(3, data.getRowCount()); Assert.assertEquals(0, innerBuffer.bufferedRowCount); } @@ -1528,7 +1528,7 @@ public void testOnErrorAbortSkipBatch() { Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); - ChannelData data = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData data = innerBuffer.flush(); Assert.assertEquals(3, data.getRowCount()); Assert.assertEquals(0, innerBuffer.bufferedRowCount); } @@ -1579,7 +1579,7 @@ private void testE2EVariantHelper(OpenChannelRequest.OnErrorOption onErrorOption Assert.assertEquals("3", innerBuffer.getVectorValueAt("COLVARIANT", 4)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(5, result.getRowCount()); Assert.assertEquals(2, result.getColumnEps().get("COLVARIANT").getCurrentNullCount()); } @@ -1613,7 +1613,7 @@ private void testE2EObjectHelper(OpenChannelRequest.OnErrorOption onErrorOption) Assert.assertEquals("{\"key\":1}", innerBuffer.getVectorValueAt("COLOBJECT", 0)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(1, result.getRowCount()); } @@ -1663,7 +1663,7 @@ private void testE2EArrayHelper(OpenChannelRequest.OnErrorOption onErrorOption) Assert.assertEquals("[1,2,3]", innerBuffer.getVectorValueAt("COLARRAY", 4)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(5, result.getRowCount()); } @@ -1710,14 +1710,14 @@ public void testOnErrorAbortRowsWithError() { SFException.class, () -> innerBufferOnErrorAbort.insertRows(mixedRows, "1", "3")); List> snapshotContinueParquet = - ((ParquetChunkData) innerBufferOnErrorContinue.getSnapshot("fake/filePath").get()).rows; + ((ParquetChunkData) innerBufferOnErrorContinue.getSnapshot().get()).rows; // validRows and only the good row from mixedRows are in the buffer Assert.assertEquals(2, snapshotContinueParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotContinueParquet.get(0)); Assert.assertEquals(Arrays.asList("b"), snapshotContinueParquet.get(1)); List> snapshotAbortParquet = - ((ParquetChunkData) innerBufferOnErrorAbort.getSnapshot("fake/filePath").get()).rows; + ((ParquetChunkData) innerBufferOnErrorAbort.getSnapshot().get()).rows; // only validRows and none of the mixedRows are in the buffer Assert.assertEquals(1, snapshotAbortParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotAbortParquet.get(0)); @@ -1725,9 +1725,6 @@ public void testOnErrorAbortRowsWithError() { @Test public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedException { - final String testFileA = "testFileA"; - final String testFileB = "testFileB"; - final ParquetRowBuffer bufferUnderTest = (ParquetRowBuffer) createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE); @@ -1749,23 +1746,20 @@ public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedExc final AtomicReference> firstFlushResult = new AtomicReference<>(); final Thread t = getThreadThatWaitsForLockReleaseAndFlushes( - bufferUnderTest, testFileA, latch, firstFlushResult); + bufferUnderTest, latch, firstFlushResult); t.start(); - final ChannelData secondFlushResult = bufferUnderTest.flush(testFileB); - Assert.assertEquals(testFileB, getPrimaryFileId(secondFlushResult)); + final ChannelData secondFlushResult = bufferUnderTest.flush(); + // TODO: need to verify other fields latch.countDown(); t.join(); Assert.assertNotNull(firstFlushResult.get()); - Assert.assertEquals(testFileA, getPrimaryFileId(firstFlushResult.get())); - Assert.assertEquals(testFileB, getPrimaryFileId(secondFlushResult)); } private static Thread getThreadThatWaitsForLockReleaseAndFlushes( final ParquetRowBuffer bufferUnderTest, - final String filenameToFlush, final CountDownLatch latch, final AtomicReference> flushResult) { return new Thread( @@ -1775,10 +1769,10 @@ private static Thread getThreadThatWaitsForLockReleaseAndFlushes( } catch (InterruptedException e) { fail("Thread was unexpectedly interrupted"); } - + final ChannelData flush = loadData(bufferUnderTest, Collections.singletonMap("colChar", "b")) - .flush(filenameToFlush); + .flush(); flushResult.set(flush); }); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index dc3285df8..9495b133e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -592,7 +592,7 @@ public void testInsertRow() { row.put("col", 1); // Get data before insert to verify that there is no row (data should be null) - ChannelData data = channel.getData("my_snowpipe_streaming.bdec"); + ChannelData data = channel.getData(); Assert.assertNull(data); long insertStartTimeInMs = System.currentTimeMillis(); @@ -605,7 +605,7 @@ public void testInsertRow() { long insertEndTimeInMs = System.currentTimeMillis(); // Get data again to verify the row is inserted - data = channel.getData("my_snowpipe_streaming.bdec"); + data = channel.getData(); Assert.assertEquals(3, data.getRowCount()); Assert.assertEquals((Long) 1L, data.getRowSequencer()); Assert.assertEquals(1, ((ChannelData) data).getVectors().rows.get(0).size()); From 3bfc9378c9621b2e4975be84cc048a15549ba018 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Mon, 9 Sep 2024 06:42:28 +0000 Subject: [PATCH 06/10] format --- .../ingest/streaming/internal/AbstractRowBuffer.java | 4 +--- .../ingest/streaming/internal/RowBufferTest.java | 8 +++----- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index df64af430..9172c4328 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -614,9 +614,7 @@ void reset() { this.statsMap.replaceAll((key, value) -> value.forkEmpty()); } - /** - * Get buffered data snapshot for later flushing. - */ + /** Get buffered data snapshot for later flushing. */ abstract Optional getSnapshot(); @VisibleForTesting diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 81eab494a..d0123f389 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -1745,8 +1745,7 @@ public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedExc final CountDownLatch latch = new CountDownLatch(1); final AtomicReference> firstFlushResult = new AtomicReference<>(); final Thread t = - getThreadThatWaitsForLockReleaseAndFlushes( - bufferUnderTest, latch, firstFlushResult); + getThreadThatWaitsForLockReleaseAndFlushes(bufferUnderTest, latch, firstFlushResult); t.start(); final ChannelData secondFlushResult = bufferUnderTest.flush(); @@ -1769,10 +1768,9 @@ private static Thread getThreadThatWaitsForLockReleaseAndFlushes( } catch (InterruptedException e) { fail("Thread was unexpectedly interrupted"); } - + final ChannelData flush = - loadData(bufferUnderTest, Collections.singletonMap("colChar", "b")) - .flush(); + loadData(bufferUnderTest, Collections.singletonMap("colChar", "b")).flush(); flushResult.set(flush); }); } From 86de156bc1a43d18689416aa8fd14444f4583e4a Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Tue, 10 Sep 2024 04:42:32 +0000 Subject: [PATCH 07/10] add tests --- .../parquet/hadoop/BdecParquetReader.java | 8 +- .../streaming/internal/RowBufferTest.java | 75 +++++++++++++++---- 2 files changed, 67 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java index 1a92a8cd4..ef95fab14 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java @@ -34,6 +34,7 @@ */ public class BdecParquetReader implements AutoCloseable { private final InternalParquetRecordReader> reader; + private final ParquetFileReader fileReader; /** * @param data buffer where the data that has to be read resides. @@ -41,7 +42,7 @@ public class BdecParquetReader implements AutoCloseable { */ public BdecParquetReader(byte[] data) throws IOException { ParquetReadOptions options = ParquetReadOptions.builder().build(); - ParquetFileReader fileReader = ParquetFileReader.open(new BdecInputFile(data), options); + fileReader = ParquetFileReader.open(new BdecInputFile(data), options); reader = new InternalParquetRecordReader<>(new BdecReadSupport(), options.getRecordFilter()); reader.initialize(fileReader, options); } @@ -60,6 +61,11 @@ public List read() throws IOException { } } + /** Get the key value metadata in the file */ + public Map getKeyValueMetadata() { + return fileReader.getFileMetaData().getKeyValueMetaData(); + } + /** * Close the reader. * diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index d0123f389..3f8e927a4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -6,6 +6,7 @@ import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT; import static org.junit.Assert.fail; +import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -24,6 +25,7 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.binary.Hex; +import org.apache.parquet.hadoop.BdecParquetReader; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -1728,17 +1730,28 @@ public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedExc final ParquetRowBuffer bufferUnderTest = (ParquetRowBuffer) createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE); - final ColumnMetadata colChar = new ColumnMetadata(); - colChar.setOrdinal(1); - colChar.setName("COLCHAR"); - colChar.setPhysicalType("LOB"); - colChar.setNullable(true); - colChar.setLogicalType("TEXT"); - colChar.setByteLength(14); - colChar.setLength(11); - colChar.setScale(0); - - bufferUnderTest.setupSchema(Collections.singletonList(colChar)); + final int columnOrdinal = 1; + final ColumnMetadata colChar1 = new ColumnMetadata(); + colChar1.setOrdinal(columnOrdinal); + colChar1.setName("COLCHAR"); + colChar1.setPhysicalType("LOB"); + colChar1.setNullable(true); + colChar1.setLogicalType("TEXT"); + colChar1.setByteLength(14); + colChar1.setLength(11); + colChar1.setScale(0); + + final ColumnMetadata colChar2 = new ColumnMetadata(); + colChar2.setOrdinal(columnOrdinal); + colChar2.setName("COLCHAR"); + colChar2.setPhysicalType("SB1"); + colChar2.setNullable(true); + colChar2.setLogicalType("TEXT"); + colChar2.setByteLength(14); + colChar2.setLength(11); + colChar2.setScale(0); + + bufferUnderTest.setupSchema(Collections.singletonList(colChar1)); loadData(bufferUnderTest, Collections.singletonMap("colChar", "a")); @@ -1749,12 +1762,44 @@ public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedExc t.start(); final ChannelData secondFlushResult = bufferUnderTest.flush(); - // TODO: need to verify other fields + bufferUnderTest.setupSchema(Collections.singletonList(colChar2)); latch.countDown(); t.join(); - Assert.assertNotNull(firstFlushResult.get()); + // The logical and physical types should be different + Assert.assertNotEquals( + getColumnType(firstFlushResult.get(), columnOrdinal), + getColumnType(secondFlushResult, columnOrdinal)); + } + + @Test + public void testParquetFileNameMetadata() throws IOException { + String filePath = "testParquetFileNameMetadata.bdec"; + final ParquetRowBuffer bufferUnderTest = + (ParquetRowBuffer) createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE); + + final ColumnMetadata colChar = new ColumnMetadata(); + colChar.setOrdinal(1); + colChar.setName("COLCHAR"); + colChar.setPhysicalType("LOB"); + colChar.setNullable(true); + colChar.setLogicalType("TEXT"); + colChar.setByteLength(14); + colChar.setLength(11); + colChar.setScale(0); + + bufferUnderTest.setupSchema(Collections.singletonList(colChar)); + loadData(bufferUnderTest, Collections.singletonMap("colChar", "a")); + ChannelData data = bufferUnderTest.flush(); + data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L)); + + ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher(); + Flusher.SerializationResult result = + flusher.serialize(Collections.singletonList(data), filePath); + + BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); + Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); } private static Thread getThreadThatWaitsForLockReleaseAndFlushes( @@ -1785,7 +1830,7 @@ private static ParquetRowBuffer loadData( return bufferToLoad; } - private static String getPrimaryFileId(final ChannelData chunkData) { - return chunkData.getVectors().metadata.get(Constants.PRIMARY_FILE_ID_KEY); + private static String getColumnType(final ChannelData chunkData, int columnId) { + return chunkData.getVectors().metadata.get(Integer.toString(columnId)); } } From dcf884133497708df52877c842d54f1ce498ca99 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Tue, 10 Sep 2024 04:48:18 +0000 Subject: [PATCH 08/10] add dependency --- pom.xml | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index cb4b2ed0b..8365b5b9f 100644 --- a/pom.xml +++ b/pom.xml @@ -352,6 +352,12 @@ 1.14.9 test + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + test + org.mockito mockito-core @@ -478,6 +484,10 @@ org.apache.hadoop hadoop-common + + org.apache.hadoop + hadoop-mapreduce-client-core + org.apache.parquet @@ -756,8 +766,8 @@ true + to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that + the dependency is unused, so we ignore it here--> org.apache.commons:commons-compress org.apache.commons:commons-configuration2 @@ -852,9 +862,9 @@ failFast + The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please + check your dependencies", verify the conditions of the license and add the reference to it here. + --> Apache License 2.0 BSD 2-Clause License @@ -1167,9 +1177,9 @@ + Plugin executes license processing Python script, which copies third party license files into the directory + target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR. + --> org.codehaus.mojo exec-maven-plugin From 504d781671cd356adad7778bc11a57f63f2167d6 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Tue, 10 Sep 2024 06:36:04 +0000 Subject: [PATCH 09/10] fix failure --- pom.xml | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 8365b5b9f..b7763631a 100644 --- a/pom.xml +++ b/pom.xml @@ -357,6 +357,24 @@ hadoop-mapreduce-client-core ${hadoop.version} test + + + jakarta.xml.bind + jakarta.xml.bind-api + + + javax.xml.bind + jaxb-api + + + org.slf4j + slf4j-reload4j + + + org.slf4j + slf4j-simple + + org.mockito @@ -766,8 +784,8 @@ true + to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that + the dependency is unused, so we ignore it here--> org.apache.commons:commons-compress org.apache.commons:commons-configuration2 @@ -862,9 +880,9 @@ failFast + The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please + check your dependencies", verify the conditions of the license and add the reference to it here. + --> Apache License 2.0 BSD 2-Clause License @@ -1177,9 +1195,9 @@ + Plugin executes license processing Python script, which copies third party license files into the directory + target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR. + --> org.codehaus.mojo exec-maven-plugin From 2831b28c64104aed75205babe82c5771c6ecb058 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Tue, 10 Sep 2024 16:36:11 +0000 Subject: [PATCH 10/10] fix test failure --- .../ingest/streaming/internal/StreamingIngestUtilsIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java index b40cdad82..67a70f6fc 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestUtilsIT.java @@ -27,7 +27,10 @@ @RunWith(PowerMockRunner.class) @PrepareForTest({JWTManager.class}) -@PowerMockIgnore({"javax.net.ssl.*"}) /* Avoid ssl related exception from power mockito*/ +@PowerMockIgnore({ + "javax.net.ssl.*", + "javax.security.*" +}) /* Avoid ssl related exception from power mockito*/ public class StreamingIngestUtilsIT { @Test public void testJWTRetries() throws Exception {