Skip to content

Commit

Permalink
Fix PRIMARY_FILE_ID_KEY
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-kkloudas committed Aug 8, 2024
1 parent 164b030 commit 8c3e739
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;

import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.parquet.hadoop.BdecParquetWriter;
Expand Down Expand Up @@ -34,6 +35,15 @@ public ParquetChunkData(
this.rows = rows;
this.parquetWriter = parquetWriter;
this.output = output;
this.metadata = metadata;
// create a defensive copy of the parameter map.
this.metadata = createDefensiveCopy(metadata);
}

private Map<String, String> createDefensiveCopy(final Map<String, String> metadata) {
final Map<String, String> copy = new HashMap<>(metadata);
for (String k: metadata.keySet()) {
copy.put(k, metadata.get(k));
}
return copy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
import static java.time.ZoneOffset.UTC;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT;
import static org.junit.Assert.fail;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
Expand Down Expand Up @@ -144,7 +142,7 @@ public void testCollatedColumnsAreRejected() {
collatedColumn.setCollation("en-ci");
try {
this.rowBufferOnErrorAbort.setupSchema(Collections.singletonList(collatedColumn));
Assert.fail("Collated columns are not supported");
fail("Collated columns are not supported");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.UNSUPPORTED_DATA_TYPE.getMessageCode(), e.getVendorCode());
}
Expand All @@ -164,7 +162,7 @@ public void buildFieldErrorStates() {
testCol.setPrecision(4);
try {
this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol));
Assert.fail("Expected error");
fail("Expected error");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode());
}
Expand All @@ -176,7 +174,7 @@ public void buildFieldErrorStates() {
testCol.setLogicalType("FIXED");
try {
this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol));
Assert.fail("Expected error");
fail("Expected error");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode());
}
Expand All @@ -188,7 +186,7 @@ public void buildFieldErrorStates() {
testCol.setLogicalType("TIMESTAMP_NTZ");
try {
this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol));
Assert.fail("Expected error");
fail("Expected error");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode());
}
Expand All @@ -200,7 +198,7 @@ public void buildFieldErrorStates() {
testCol.setLogicalType("TIMESTAMP_TZ");
try {
this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol));
Assert.fail("Expected error");
fail("Expected error");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode());
}
Expand All @@ -212,7 +210,7 @@ public void buildFieldErrorStates() {
testCol.setLogicalType("TIME");
try {
this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(testCol));
Assert.fail("Expected error");
fail("Expected error");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode());
}
Expand Down Expand Up @@ -244,7 +242,7 @@ public void testInvalidLogicalType() {

try {
this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(colInvalidLogical));
Assert.fail("Setup should fail if invalid column metadata is provided");
fail("Setup should fail if invalid column metadata is provided");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode(), e.getVendorCode());
// Do nothing
Expand All @@ -264,7 +262,7 @@ public void testInvalidPhysicalType() {

try {
this.rowBufferOnErrorContinue.setupSchema(Collections.singletonList(colInvalidPhysical));
Assert.fail("Setup should fail if invalid column metadata is provided");
fail("Setup should fail if invalid column metadata is provided");
} catch (SFException e) {
Assert.assertEquals(e.getVendorCode(), ErrorCode.UNKNOWN_DATA_TYPE.getMessageCode());
}
Expand Down Expand Up @@ -630,7 +628,7 @@ public void testInvalidEPInfo() {

try {
AbstractRowBuffer.buildEpInfoFromStats(1, colStats);
Assert.fail("should fail when row count is smaller than null count.");
fail("should fail when row count is smaller than null count.");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode());
}
Expand Down Expand Up @@ -1725,4 +1723,74 @@ public void testOnErrorAbortRowsWithError() {
Assert.assertEquals(1, snapshotAbortParquet.size());
Assert.assertEquals(Arrays.asList("a"), snapshotAbortParquet.get(0));
}

@Test
public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedException {
final String testFileA = "testFileA";
final String testFileB = "testFileB";

final ParquetRowBuffer bufferUnderTest =
(ParquetRowBuffer) createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE);

final ColumnMetadata colChar = new ColumnMetadata();
colChar.setOrdinal(1);
colChar.setName("COLCHAR");
colChar.setPhysicalType("LOB");
colChar.setNullable(true);
colChar.setLogicalType("TEXT");
colChar.setByteLength(14);
colChar.setLength(11);
colChar.setScale(0);

bufferUnderTest.setupSchema(Collections.singletonList(colChar));

loadData(bufferUnderTest, Collections.singletonMap("colChar", "a"));

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<ChannelData<ParquetChunkData>> firstFlushResult = new AtomicReference<>();
final Thread t = getThreadThatWaitsForLockReleaseAndFlushes(bufferUnderTest, testFileA, latch, firstFlushResult);
t.start();

final ChannelData<ParquetChunkData> secondFlushResult = bufferUnderTest.flush(testFileB);
Assert.assertEquals(testFileB, getPrimaryFileId(secondFlushResult));

latch.countDown();
t.join();

Assert.assertNotNull(firstFlushResult.get());
Assert.assertEquals(testFileA, getPrimaryFileId(firstFlushResult.get()));
Assert.assertEquals(testFileB, getPrimaryFileId(secondFlushResult));
}

private static Thread getThreadThatWaitsForLockReleaseAndFlushes(
final ParquetRowBuffer bufferUnderTest,
final String filenameToFlush,
final CountDownLatch latch,
final AtomicReference<ChannelData<ParquetChunkData>> flushResult) {
return new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
fail("Thread was unexpectedly interrupted");
}

final ChannelData<ParquetChunkData> flush =
loadData(bufferUnderTest, Collections.singletonMap("colChar", "b"))
.flush(filenameToFlush);
flushResult.set(flush);
});
}

private static ParquetRowBuffer loadData(final ParquetRowBuffer bufferToLoad, final Map<String, Object> data) {
final List<Map<String, Object>> validRows = new ArrayList<>();
validRows.add(data);

final InsertValidationResponse nResponse = bufferToLoad.insertRows(validRows, "1", "1");
Assert.assertFalse(nResponse.hasErrors());
return bufferToLoad;
}

private static String getPrimaryFileId(final ChannelData<ParquetChunkData> chunkData) {
return chunkData.getVectors().metadata.get(Constants.PRIMARY_FILE_ID_KEY);
}
}

0 comments on commit 8c3e739

Please sign in to comment.