From 3d49a915a2ef45edb0e2a6f7665350959dcbb149 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Mon, 9 Sep 2024 06:33:47 +0000 Subject: [PATCH] address comments --- .../streaming/internal/AbstractRowBuffer.java | 9 ++-- .../streaming/internal/FlushService.java | 2 +- .../streaming/internal/ParquetRowBuffer.java | 2 +- .../ingest/streaming/internal/RowBuffer.java | 3 +- ...owflakeStreamingIngestChannelInternal.java | 5 +- .../streaming/internal/FlushServiceTest.java | 2 +- .../streaming/internal/RowBufferTest.java | 50 ++++++++----------- .../SnowflakeStreamingIngestChannelTest.java | 4 +- 8 files changed, 33 insertions(+), 44 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 71a9d501e..df64af430 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -496,11 +496,10 @@ public InsertValidationResponse insertRows( * Flush the data in the row buffer by taking the ownership of the old vectors and pass all the * required info back to the flush service to build the blob * - * @param filePath the name of the file the data will be written in * @return A ChannelData object that contains the info needed by the flush service to build a blob */ @Override - public ChannelData flush(final String filePath) { + public ChannelData flush() { logger.logDebug("Start get data for channel={}", channelFullyQualifiedName); if (this.bufferedRowCount > 0) { Optional oldData = Optional.empty(); @@ -518,7 +517,7 @@ public ChannelData flush(final String filePath) { try { if (this.bufferedRowCount > 0) { // Transfer the ownership of the vectors - oldData = getSnapshot(filePath); + oldData = getSnapshot(); oldRowCount = this.bufferedRowCount; oldBufferSize = this.bufferSize; oldRowSequencer = this.channelState.incrementAndGetRowSequencer(); @@ -617,10 +616,8 @@ void reset() { /** * Get buffered data snapshot for later flushing. - * - * @param filePath the name of the file the data will be written in */ - abstract Optional getSnapshot(final String filePath); + abstract Optional getSnapshot(); @VisibleForTesting abstract Object getVectorValueAt(String column, int index); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 6f5296209..2e7c6507c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -430,7 +430,7 @@ void distributeFlushTasks(Set tablesToFlush) { .forEach( channel -> { if (channel.isValid()) { - ChannelData data = channel.getData(blobPath); + ChannelData data = channel.getData(); if (data != null) { channelsDataPerTable.add(data); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 526574d7f..30851c274 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -261,7 +261,7 @@ boolean hasColumns() { } @Override - Optional getSnapshot(final String filePath) { + Optional getSnapshot() { List> oldData = new ArrayList<>(); if (!clientBufferParameters.getEnableParquetInternalBuffering()) { data.forEach(r -> oldData.add(new ArrayList<>(r))); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java index 02905c02e..6bb2f43b9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java @@ -37,10 +37,9 @@ InsertValidationResponse insertRows( * Flush the data in the row buffer by taking the ownership of the old vectors and pass all the * required info back to the flush service to build the blob * - * @param filePath the name of the file the data will be written in * @return A ChannelData object that contains the info needed by the flush service to build a blob */ - ChannelData flush(final String filePath); + ChannelData flush(); /** * Close the row buffer and release resources. Note that the caller needs to handle diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index ca0bbe782..4e884387b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -220,11 +220,10 @@ public String getFullyQualifiedTableName() { /** * Get all the data needed to build the blob during flush * - * @param filePath the name of the file the data will be written in * @return a ChannelData object */ - ChannelData getData(final String filePath) { - ChannelData data = this.rowBuffer.flush(filePath); + ChannelData getData() { + ChannelData data = this.rowBuffer.flush(); if (data != null) { data.setChannelContext(channelFlushContext); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 4cc5cbe30..303893bf0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -131,7 +131,7 @@ void setParameterOverride(Map parameterOverride) { ChannelData flushChannel(String name) { SnowflakeStreamingIngestChannelInternal channel = channels.get(name); - ChannelData channelData = channel.getRowBuffer().flush(name + "_snowpipe_streaming.bdec"); + ChannelData channelData = channel.getRowBuffer().flush(); channelData.setChannelContext(channel.getChannelContext()); this.channelData.add(channelData); return channelData; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index eb84e363d..81eab494a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -499,7 +499,7 @@ private void testFlushHelper(AbstractRowBuffer rowBuffer) { float bufferSize = rowBuffer.getSize(); final String filename = "2022/7/13/16/56/testFlushHelper_streaming.bdec"; - ChannelData data = rowBuffer.flush(filename); + ChannelData data = rowBuffer.flush(); Assert.assertEquals(2, data.getRowCount()); Assert.assertEquals((Long) 1L, data.getRowSequencer()); Assert.assertEquals(startOffsetToken, data.getStartOffsetToken()); @@ -734,7 +734,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { final String filename = "testStatsE2EHelper_streaming.bdec"; InsertValidationResponse response = rowBuffer.insertRows(Arrays.asList(row1, row2), null, null); Assert.assertFalse(response.hasErrors()); - ChannelData result = rowBuffer.flush(filename); + ChannelData result = rowBuffer.flush(); Map columnEpStats = result.getColumnEps(); Assert.assertEquals( @@ -779,7 +779,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals(-1, columnEpStats.get("COLCHAR").getDistinctValues()); // Confirm we reset - ChannelData resetResults = rowBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData resetResults = rowBuffer.flush(); Assert.assertNull(resetResults); } @@ -838,7 +838,7 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro InsertValidationResponse response = innerBuffer.insertRows(Arrays.asList(row1, row2, row3), null, null); Assert.assertFalse(response.hasErrors()); - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( @@ -907,7 +907,7 @@ private void testE2EDateHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertNull(innerBuffer.getVectorValueAt("COLDATE", 2)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( @@ -973,7 +973,7 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB8", 2)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( @@ -1204,7 +1204,7 @@ private void doTestFailureHalfwayThroughColumnProcessing( } } - ChannelData channelData = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData channelData = innerBuffer.flush(); RowBufferStats statsCol1 = channelData.getColumnEps().get("COLVARCHAR1"); RowBufferStats statsCol2 = channelData.getColumnEps().get("COLVARCHAR2"); RowBufferStats statsCol3 = channelData.getColumnEps().get("COLBOOLEAN1"); @@ -1264,7 +1264,7 @@ private void testE2EBooleanHelper(OpenChannelRequest.OnErrorOption onErrorOption Assert.assertNull(innerBuffer.getVectorValueAt("COLBOOLEAN", 2)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( @@ -1319,7 +1319,7 @@ private void testE2EBinaryHelper(OpenChannelRequest.OnErrorOption onErrorOption) Assert.assertNull(innerBuffer.getVectorValueAt("COLBINARY", 2)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals(11L, result.getColumnEps().get("COLBINARY").getCurrentMaxLength()); @@ -1371,7 +1371,7 @@ private void testE2ERealHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertNull(innerBuffer.getVectorValueAt("COLREAL", 2)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( @@ -1454,7 +1454,7 @@ public void testOnErrorAbortFailures() { Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); - ChannelData data = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData data = innerBuffer.flush(); Assert.assertEquals(3, data.getRowCount()); Assert.assertEquals(0, innerBuffer.bufferedRowCount); } @@ -1528,7 +1528,7 @@ public void testOnErrorAbortSkipBatch() { Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); - ChannelData data = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData data = innerBuffer.flush(); Assert.assertEquals(3, data.getRowCount()); Assert.assertEquals(0, innerBuffer.bufferedRowCount); } @@ -1579,7 +1579,7 @@ private void testE2EVariantHelper(OpenChannelRequest.OnErrorOption onErrorOption Assert.assertEquals("3", innerBuffer.getVectorValueAt("COLVARIANT", 4)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(5, result.getRowCount()); Assert.assertEquals(2, result.getColumnEps().get("COLVARIANT").getCurrentNullCount()); } @@ -1613,7 +1613,7 @@ private void testE2EObjectHelper(OpenChannelRequest.OnErrorOption onErrorOption) Assert.assertEquals("{\"key\":1}", innerBuffer.getVectorValueAt("COLOBJECT", 0)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(1, result.getRowCount()); } @@ -1663,7 +1663,7 @@ private void testE2EArrayHelper(OpenChannelRequest.OnErrorOption onErrorOption) Assert.assertEquals("[1,2,3]", innerBuffer.getVectorValueAt("COLARRAY", 4)); // Check stats generation - ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); + ChannelData result = innerBuffer.flush(); Assert.assertEquals(5, result.getRowCount()); } @@ -1710,14 +1710,14 @@ public void testOnErrorAbortRowsWithError() { SFException.class, () -> innerBufferOnErrorAbort.insertRows(mixedRows, "1", "3")); List> snapshotContinueParquet = - ((ParquetChunkData) innerBufferOnErrorContinue.getSnapshot("fake/filePath").get()).rows; + ((ParquetChunkData) innerBufferOnErrorContinue.getSnapshot().get()).rows; // validRows and only the good row from mixedRows are in the buffer Assert.assertEquals(2, snapshotContinueParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotContinueParquet.get(0)); Assert.assertEquals(Arrays.asList("b"), snapshotContinueParquet.get(1)); List> snapshotAbortParquet = - ((ParquetChunkData) innerBufferOnErrorAbort.getSnapshot("fake/filePath").get()).rows; + ((ParquetChunkData) innerBufferOnErrorAbort.getSnapshot().get()).rows; // only validRows and none of the mixedRows are in the buffer Assert.assertEquals(1, snapshotAbortParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotAbortParquet.get(0)); @@ -1725,9 +1725,6 @@ public void testOnErrorAbortRowsWithError() { @Test public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedException { - final String testFileA = "testFileA"; - final String testFileB = "testFileB"; - final ParquetRowBuffer bufferUnderTest = (ParquetRowBuffer) createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE); @@ -1749,23 +1746,20 @@ public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedExc final AtomicReference> firstFlushResult = new AtomicReference<>(); final Thread t = getThreadThatWaitsForLockReleaseAndFlushes( - bufferUnderTest, testFileA, latch, firstFlushResult); + bufferUnderTest, latch, firstFlushResult); t.start(); - final ChannelData secondFlushResult = bufferUnderTest.flush(testFileB); - Assert.assertEquals(testFileB, getPrimaryFileId(secondFlushResult)); + final ChannelData secondFlushResult = bufferUnderTest.flush(); + // TODO: need to verify other fields latch.countDown(); t.join(); Assert.assertNotNull(firstFlushResult.get()); - Assert.assertEquals(testFileA, getPrimaryFileId(firstFlushResult.get())); - Assert.assertEquals(testFileB, getPrimaryFileId(secondFlushResult)); } private static Thread getThreadThatWaitsForLockReleaseAndFlushes( final ParquetRowBuffer bufferUnderTest, - final String filenameToFlush, final CountDownLatch latch, final AtomicReference> flushResult) { return new Thread( @@ -1775,10 +1769,10 @@ private static Thread getThreadThatWaitsForLockReleaseAndFlushes( } catch (InterruptedException e) { fail("Thread was unexpectedly interrupted"); } - + final ChannelData flush = loadData(bufferUnderTest, Collections.singletonMap("colChar", "b")) - .flush(filenameToFlush); + .flush(); flushResult.set(flush); }); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index dc3285df8..9495b133e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -592,7 +592,7 @@ public void testInsertRow() { row.put("col", 1); // Get data before insert to verify that there is no row (data should be null) - ChannelData data = channel.getData("my_snowpipe_streaming.bdec"); + ChannelData data = channel.getData(); Assert.assertNull(data); long insertStartTimeInMs = System.currentTimeMillis(); @@ -605,7 +605,7 @@ public void testInsertRow() { long insertEndTimeInMs = System.currentTimeMillis(); // Get data again to verify the row is inserted - data = channel.getData("my_snowpipe_streaming.bdec"); + data = channel.getData(); Assert.assertEquals(3, data.getRowCount()); Assert.assertEquals((Long) 1L, data.getRowSequencer()); Assert.assertEquals(1, ((ChannelData) data).getVectors().rows.get(0).size());