From 075e9183e8af2fd47428c1566d6e8e1c4a3f524a Mon Sep 17 00:00:00 2001 From: Hitesh Madan Date: Tue, 19 Nov 2024 16:33:11 -0800 Subject: [PATCH 1/4] Make Channel.getIcebergSchema() API public + Stop NDV tracking (#912) logging fixes make api public for schema evolution in KC stop ndv tracking since its causing ingestion to massively slow down. --- .../ingest/streaming/internal/BlobPath.java | 5 + .../streaming/internal/ColumnProperties.java | 4 +- .../internal/InternalParameterProvider.java | 6 +- ...nowflakeStreamingIngestClientInternal.java | 3 +- .../SubscopedTokenExternalVolumeManager.java | 22 +-- .../streaming/internal/BlobBuilderTest.java | 8 +- .../streaming/internal/ChannelDataTest.java | 36 +++-- .../internal/FileColumnPropertiesTest.java | 4 +- .../streaming/internal/FlushServiceTest.java | 10 +- .../IcebergParquetValueParserTest.java | 99 +++++++++--- .../internal/RowBufferStatsTest.java | 149 ++++++++++++++---- .../streaming/internal/RowBufferTest.java | 52 +++--- .../SnowflakeStreamingIngestClientTest.java | 4 +- 13 files changed, 288 insertions(+), 114 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java index 7a6b23670..e49dc18a4 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java @@ -23,4 +23,9 @@ public BlobPath(String uploadPath, String fileRegistrationPath) { this.uploadPath = uploadPath; this.fileRegistrationPath = fileRegistrationPath; } + + @Override + public String toString() { + return String.format("uploadPath=%s fileRegistrationPath=%s", uploadPath, fileRegistrationPath); + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java index 52a72a5e2..3178c5bf7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java @@ -70,10 +70,8 @@ public boolean isNullable() { * when this object represents an iceberg table's column, null otherwise. The String returned from * here is meant to conform to the json schema specified here: * https://iceberg.apache.org/spec/#appendix-c-json-serialization - * - *

Make this a public API when the Builder.setEnableIcebergStreaming API is made public. */ - String getIcebergSchema() { + public String getIcebergSchema() { return icebergColumnSchema; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java index de5cb87eb..4bda0ef58 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java @@ -11,9 +11,11 @@ class InternalParameterProvider { public static final boolean ENABLE_VALUES_COUNT_DEFAULT = false; private final boolean enableIcebergStreaming; + private final boolean enableNDVTracking; - InternalParameterProvider(boolean enableIcebergStreaming) { + InternalParameterProvider(boolean enableIcebergStreaming, boolean enableNDVTracking) { this.enableIcebergStreaming = enableIcebergStreaming; + this.enableNDVTracking = enableNDVTracking; } boolean getEnableChunkEncryption() { @@ -38,7 +40,7 @@ boolean setIcebergSpecificFieldsInEp() { boolean isEnableDistinctValuesCount() { // When in Iceberg mode, we enabled distinct values count in EP metadata. - return enableIcebergStreaming; + return enableIcebergStreaming && enableNDVTracking; } boolean isEnableValuesCount() { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 9b68d924e..4bbcc9bc9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -167,7 +167,8 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea Map parameterOverrides) { this.parameterProvider = new ParameterProvider(parameterOverrides, prop); this.internalParameterProvider = - new InternalParameterProvider(parameterProvider.isEnableIcebergStreaming()); + new InternalParameterProvider( + parameterProvider.isEnableIcebergStreaming(), false /* enableNDVCount */); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java index 8f606b3dd..d15296484 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java @@ -58,8 +58,9 @@ class SubscopedTokenExternalVolumeManager implements IStorageManager { throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); } logger.logDebug( - "Created SubscopedTokenExternalVolumeManager with clientName=%s and clientPrefix=%s", - clientName, clientPrefix); + "Created SubscopedTokenExternalVolumeManager with clientName={} and clientPrefix={}", + clientName, + clientPrefix); } /** @@ -93,12 +94,12 @@ private InternalStage createStageForTable(TableRef tableRef) { this, clientName, getClientPrefix(), tableRef, locationInfo, DEFAULT_MAX_UPLOAD_RETRIES); } catch (SFException ex) { logger.logError( - "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); + "ExtVolManager.registerTable for tableRef={} failed with exception={}", tableRef, ex); // allow external volume ctor's SFExceptions to bubble up directly throw ex; } catch (Exception err) { logger.logError( - "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); + "ExtVolManager.registerTable for tableRef={} failed with exception={}", tableRef, err); throw new SFException( err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, @@ -127,8 +128,9 @@ static BlobPath generateBlobPathFromLocationInfoPath( String[] parts = filePathRelativeToVolume.split("/"); if (parts.length < 5) { logger.logError( - "Invalid file path returned by server. Table=%s FilePathRelativeToVolume=%s", - fullyQualifiedTableName, filePathRelativeToVolume); + "Invalid file path returned by server. Table={} FilePathRelativeToVolume={}", + fullyQualifiedTableName, + filePathRelativeToVolume); throw new SFException(ErrorCode.INTERNAL_ERROR, "File path returned by server is invalid"); } @@ -166,17 +168,19 @@ public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional RefreshTableInformationResponse response = this.serviceClient.refreshTableInformation( new RefreshTableInformationRequest(tableRef, this.role, true)); - logger.logDebug("Refreshed tokens for table=%s", tableRef); + logger.logDebug("Refreshed tokens for table={}", tableRef); if (response.getIcebergLocationInfo() == null) { logger.logError( "Did not receive location info, this will cause ingestion to grind to a halt." - + " TableRef=%s"); + + " TableRef={}", + tableRef); } else { Map creds = response.getIcebergLocationInfo().getCredentials(); if (creds == null || creds.isEmpty()) { logger.logError( "Did not receive creds in location info, this will cause ingestion to grind to a" - + " halt. TableRef=%s"); + + " halt. TableRef={}", + tableRef); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 627c95e72..81b31bd29 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -59,7 +59,7 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(1)), Constants.BdecVersion.THREE, - new InternalParameterProvider(enableIcebergStreaming), + new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */), encryptionKeysPerTable); // Construction fails if metadata contains 0 rows and data 1 row @@ -68,7 +68,7 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(0)), Constants.BdecVersion.THREE, - new InternalParameterProvider(enableIcebergStreaming), + new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */), encryptionKeysPerTable); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); @@ -93,7 +93,7 @@ public void testMetadataAndExtendedMetadataSize() throws Exception { "a.parquet", Collections.singletonList(createChannelDataPerTable(1)), Constants.BdecVersion.THREE, - new InternalParameterProvider(enableIcebergStreaming), + new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */), new ConcurrentHashMap<>()); InputFile blobInputFile = new InMemoryInputFile(blob.blobBytes); @@ -188,7 +188,7 @@ private List> createChannelDataPerTable(int metada .as(LogicalTypeAnnotation.stringType()) .id(1) .named("test"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming) : new RowBufferStats(columnName, null, 1, null, null, false, false)); channelData.setChannelContext( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java index d76499c14..440e13b27 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java @@ -27,7 +27,9 @@ public static Object[] enableNDVAndNV() { @Test public void testGetCombinedColumnStatsMapNulls() { Map left = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats leftStats1 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); left.put("one", leftStats1); leftStats1.addIntValue(new BigInteger("10")); @@ -56,12 +58,16 @@ public void testGetCombinedColumnStatsMapNulls() { @Test public void testGetCombinedColumnStatsMapMissingColumn() { Map left = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats leftStats1 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); left.put("one", leftStats1); leftStats1.addIntValue(new BigInteger("10")); Map right = new HashMap<>(); - RowBufferStats rightStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats rightStats1 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); right.put("foo", rightStats1); rightStats1.addIntValue(new BigInteger("11")); @@ -91,10 +97,18 @@ public void testGetCombinedColumnStatsMap() { Map left = new HashMap<>(); Map right = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - RowBufferStats rightStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - RowBufferStats leftStats2 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - RowBufferStats rightStats2 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats leftStats1 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + RowBufferStats rightStats1 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + RowBufferStats leftStats2 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + RowBufferStats rightStats2 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); left.put("one", leftStats1); left.put("two", leftStats2); @@ -125,7 +139,9 @@ public void testGetCombinedColumnStatsMap() { Assert.assertNull(oneCombined.getCurrentMinRealValue()); Assert.assertNull(oneCombined.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 5 : -1, + oneCombined.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getNumberOfValues()); Assert.assertArrayEquals( @@ -137,7 +153,9 @@ public void testGetCombinedColumnStatsMap() { Assert.assertNull(twoCombined.getCurrentMinRealValue()); Assert.assertNull(twoCombined.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 5 : -1, + oneCombined.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getNumberOfValues()); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java index 162ccc63f..82171b687 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java @@ -33,7 +33,7 @@ public void testFileColumnPropertiesConstructor() { .as(LogicalTypeAnnotation.stringType()) .id(1) .named("test"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming) : new RowBufferStats("COL", null, 1, null, null, false, false); stats.addStrValue("bcd"); @@ -58,7 +58,7 @@ public void testFileColumnPropertiesConstructor() { .as(LogicalTypeAnnotation.stringType()) .id(1) .named("test"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming) : new RowBufferStats("COL", null, 1, null, null, false, false); stats.addStrValue("aßßßßßßßßßßßßßßßß"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index ed697b026..5b5c05809 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -112,7 +112,7 @@ private abstract static class TestContext implements AutoCloseable { storage = Mockito.mock(InternalStage.class); parameterProvider = createParameterProvider(enableIcebergStreaming); InternalParameterProvider internalParameterProvider = - new InternalParameterProvider(enableIcebergStreaming); + new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); @@ -905,13 +905,13 @@ public void testBuildAndUpload() throws Exception { new RowBufferStats( "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); RowBufferStats stats2 = new RowBufferStats( "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); eps1.put("one", stats1); @@ -1078,7 +1078,7 @@ public void testInvalidateChannels() { ParameterProvider parameterProvider = createParameterProvider(enableIcebergStreaming); ChannelCache channelCache = new ChannelCache<>(); InternalParameterProvider internalParameterProvider = - new InternalParameterProvider(enableIcebergStreaming); + new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */); Mockito.when(client.getChannelCache()).thenReturn(channelCache); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); @@ -1167,7 +1167,7 @@ public void testBlobBuilder() throws Exception { new RowBufferStats( "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); eps1.put("one", stats1); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java index 7fe76f7b1..bdc366ae7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -50,7 +50,9 @@ public void parseValueBoolean() { Type type = Types.primitive(PrimitiveTypeName.BOOLEAN, Repetition.OPTIONAL).id(1).named("BOOLEAN_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "BOOLEAN_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -76,7 +78,9 @@ public void parseValueInt() { Type type = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL).id(1).named("INT_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("INT_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "INT_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -104,7 +108,9 @@ public void parseValueDecimalToInt() { .as(LogicalTypeAnnotation.decimalType(4, 9)) .named("DECIMAL_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "DECIMAL_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -138,7 +144,9 @@ public void parseValueDateToInt() { .as(LogicalTypeAnnotation.dateType()) .named("DATE_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DATE_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "DATE_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -163,7 +171,9 @@ public void parseValueLong() { Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL).id(1).named("LONG_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "LONG_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -191,7 +201,9 @@ public void parseValueDecimalToLong() { .as(LogicalTypeAnnotation.decimalType(9, 18)) .named("DECIMAL_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "DECIMAL_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -225,7 +237,9 @@ public void parseValueTimeToLong() { .as(LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIME_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIME_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "TIME_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -253,7 +267,9 @@ public void parseValueTimestampToLong() { .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "TIMESTAMP_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -287,7 +303,11 @@ public void parseValueTimestampTZToLong() { .as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_TZ_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_TZ_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "TIMESTAMP_TZ_COL", + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, + true); Map rowBufferStatsMap = new HashMap() { { @@ -318,7 +338,9 @@ public void parseValueFloat() { Type type = Types.primitive(PrimitiveTypeName.FLOAT, Repetition.OPTIONAL).id(1).named("FLOAT_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "FLOAT_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -343,7 +365,9 @@ public void parseValueDouble() { Type type = Types.primitive(PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL).id(1).named("DOUBLE_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "DOUBLE_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -368,7 +392,9 @@ public void parseValueBinary() { Type type = Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).id(1).named("BINARY_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "BINARY_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -398,7 +424,9 @@ public void parseValueStringToBinary() { .as(LogicalTypeAnnotation.stringType()) .named("BINARY_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "BINARY_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -430,7 +458,9 @@ public void parseValueFixed() { .length(4) .named("FIXED_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "FIXED_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -461,7 +491,9 @@ public void parseValueDecimalToFixed() { .as(LogicalTypeAnnotation.decimalType(10, 20)) .named("FIXED_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "FIXED_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -490,7 +522,11 @@ public void parseList() throws JsonProcessingException { .element(Types.optional(PrimitiveTypeName.INT32).id(2).named("element")) .id(1) .named("LIST_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.list.element", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "LIST_COL.list.element", + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, + true); Map rowBufferStatsMap = new HashMap() { { @@ -576,8 +612,16 @@ public void parseMap() throws JsonProcessingException { .value(Types.optional(PrimitiveTypeName.INT32).id(3).named("value")) .id(1) .named("MAP_COL"); - RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key_value.key", true, true); - RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.key_value.value", true, true); + RowBufferStats rowBufferKeyStats = + new RowBufferStats( + "MAP_COL.key_value.key", + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, + true); + RowBufferStats rowBufferValueStats = + new RowBufferStats( + "MAP_COL.key_value.value", + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, + true); Map rowBufferStatsMap = new HashMap() { { @@ -686,8 +730,12 @@ public void parseStruct() throws JsonProcessingException { .id(1) .named("STRUCT_COL"); - RowBufferStats rowBufferAStats = new RowBufferStats("STRUCT_COL.a", true, true); - RowBufferStats rowBufferBStats = new RowBufferStats("STRUCT_COL.b", true, true); + RowBufferStats rowBufferAStats = + new RowBufferStats( + "STRUCT_COL.a", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); + RowBufferStats rowBufferBStats = + new RowBufferStats( + "STRUCT_COL.b", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -819,7 +867,10 @@ public void parseNestedTypes() { private static Type generateNestedTypeAndStats( int depth, String name, Map rowBufferStatsMap, String path) { if (depth == 0) { - rowBufferStatsMap.put("0", new RowBufferStats(path, true, true)); + rowBufferStatsMap.put( + "0", + new RowBufferStats( + path, InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true)); return Types.optional(PrimitiveTypeName.INT32).id(0).named(name); } switch (depth % 3) { @@ -837,7 +888,11 @@ private static Type generateNestedTypeAndStats( .named(name); case 0: rowBufferStatsMap.put( - String.valueOf(depth), new RowBufferStats(path + ".key_value.key", true, true)); + String.valueOf(depth), + new RowBufferStats( + path + ".key_value.key", + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, + true)); return Types.optionalMap() .key(Types.required(PrimitiveTypeName.INT32).id(depth).named("key")) .value( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java index a145c86a8..272cafe0d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java @@ -22,7 +22,10 @@ public static Object[] enableNDVAndNV() { @Test public void testEmptyState() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; Assert.assertNull(stats.getCollationDefinitionString()); Assert.assertNull(stats.getCurrentMinRealValue()); @@ -32,25 +35,34 @@ public void testEmptyState() throws Exception { Assert.assertNull(stats.getCurrentMinIntValue()); Assert.assertNull(stats.getCurrentMaxIntValue()); Assert.assertEquals(0, stats.getCurrentNullCount()); - Assert.assertEquals(enableNDVAndNV ? 0 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 0 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 0 : -1, stats.getNumberOfValues()); } @Test public void testMinMaxStrNonCol() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; stats.addStrValue("bob"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getNumberOfValues()); stats.addStrValue("charlie"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals( "charlie".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getNumberOfValues()); stats.addStrValue("alice"); @@ -59,7 +71,9 @@ public void testMinMaxStrNonCol() throws Exception { Assert.assertArrayEquals( "charlie".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 3 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getNumberOfValues()); Assert.assertNull(stats.getCurrentMinRealValue()); @@ -72,27 +86,36 @@ public void testMinMaxStrNonCol() throws Exception { @Test public void testMinMaxInt() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; stats.addIntValue(BigInteger.valueOf(5)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getNumberOfValues()); stats.addIntValue(BigInteger.valueOf(6)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getNumberOfValues()); stats.addIntValue(BigInteger.valueOf(4)); Assert.assertEquals(BigInteger.valueOf((4)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 3 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getNumberOfValues()); Assert.assertNull(stats.getCurrentMinRealValue()); @@ -105,27 +128,36 @@ public void testMinMaxInt() throws Exception { @Test public void testMinMaxReal() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; stats.addRealValue(1.0); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getNumberOfValues()); stats.addRealValue(1.5); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getNumberOfValues()); stats.addRealValue(.8); Assert.assertEquals(Double.valueOf(.8), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 3 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getNumberOfValues()); Assert.assertNull(stats.getCurrentMinIntValue()); @@ -138,7 +170,10 @@ public void testMinMaxReal() throws Exception { @Test public void testIncCurrentNullCount() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; Assert.assertEquals(0, stats.getCurrentNullCount()); stats.incCurrentNullCount(); @@ -149,7 +184,10 @@ public void testIncCurrentNullCount() throws Exception { @Test public void testMaxLength() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; Assert.assertEquals(0, stats.getCurrentMaxLength()); stats.setCurrentMaxLength(100L); @@ -161,8 +199,14 @@ public void testMaxLength() throws Exception { @Test public void testGetCombinedStats() throws Exception { // Test for Integers - RowBufferStats one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - RowBufferStats two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; + RowBufferStats two = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addIntValue(BigInteger.valueOf(2)); one.addIntValue(BigInteger.valueOf(4)); @@ -179,7 +223,9 @@ public void testGetCombinedStats() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(1), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 7 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 10 : -1, result.getNumberOfValues()); Assert.assertEquals(2, result.getCurrentNullCount()); @@ -189,8 +235,14 @@ public void testGetCombinedStats() throws Exception { Assert.assertNull(result.getCurrentMaxRealValue()); // Test for Reals - one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; + two = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addRealValue(2d); one.addRealValue(4d); @@ -205,7 +257,9 @@ public void testGetCombinedStats() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(1), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 7 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 8 : -1, result.getNumberOfValues()); Assert.assertEquals(0, result.getCurrentNullCount()); @@ -216,8 +270,14 @@ public void testGetCombinedStats() throws Exception { Assert.assertNull(result.getCurrentMaxIntValue()); // Test for Strings without collation - one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; + two = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addStrValue("alpha"); one.addStrValue("d"); @@ -238,7 +298,9 @@ public void testGetCombinedStats() throws Exception { Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertEquals(5, result.getCurrentMaxLength()); - Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 7 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 10 : -1, result.getNumberOfValues()); Assert.assertNull(result.getCurrentMinRealValue()); @@ -250,8 +312,14 @@ public void testGetCombinedStats() throws Exception { @Test public void testGetCombinedStatsNull() throws Exception { // Test for Integers - RowBufferStats one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - RowBufferStats two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; + RowBufferStats two = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addIntValue(BigInteger.valueOf(2)); one.addIntValue(BigInteger.valueOf(4)); @@ -263,7 +331,9 @@ public void testGetCombinedStatsNull() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(2), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 4 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 6 : -1, result.getNumberOfValues()); Assert.assertEquals(2, result.getCurrentNullCount()); @@ -274,7 +344,10 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertNull(result.getCurrentMaxRealValue()); // Test for Reals - one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addRealValue(2d); one.addRealValue(4d); @@ -284,7 +357,9 @@ public void testGetCombinedStatsNull() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(2), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 4 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getNumberOfValues()); Assert.assertEquals(0, result.getCurrentNullCount()); @@ -294,8 +369,14 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertNull(result.getCurrentMaxIntValue()); // Test for Strings - one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; + two = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addStrValue("alpha"); one.addStrValue("d"); @@ -308,7 +389,9 @@ public void testGetCombinedStatsNull() throws Exception { "alpha".getBytes(StandardCharsets.UTF_8), result.getCurrentMinStrValue()); Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); - Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 4 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 5 : -1, result.getNumberOfValues()); Assert.assertEquals(1, result.getCurrentNullCount()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 8822363a8..57b919b55 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -223,7 +223,7 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, enableIcebergStreaming ? Optional.of(1) : Optional.empty(), enableIcebergStreaming, - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming), null, enableIcebergStreaming @@ -645,7 +645,7 @@ public void testBuildEpInfoFromStats() { new RowBufferStats( "intColumn", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("intColumn"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); stats1.addIntValue(BigInteger.valueOf(2)); stats1.addIntValue(BigInteger.valueOf(10)); @@ -655,7 +655,7 @@ public void testBuildEpInfoFromStats() { new RowBufferStats( "strColumn", Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).id(2).named("strColumn"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); stats2.addStrValue("alice"); stats2.addStrValue("bob"); @@ -671,7 +671,9 @@ public void testBuildEpInfoFromStats() { Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties strColumnResult = columnResults.get("strColumn"); - Assert.assertEquals(enableIcebergStreaming ? 2 : -1, strColumnResult.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, + strColumnResult.getDistinctValues()); Assert.assertEquals( Hex.encodeHexString("alice".getBytes(StandardCharsets.UTF_8)), strColumnResult.getMinStrValue()); @@ -681,7 +683,9 @@ public void testBuildEpInfoFromStats() { Assert.assertEquals(1, strColumnResult.getNullCount()); FileColumnProperties intColumnResult = columnResults.get("intColumn"); - Assert.assertEquals(enableIcebergStreaming ? 3 : -1, intColumnResult.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 3 : -1, + intColumnResult.getDistinctValues()); Assert.assertEquals(BigInteger.valueOf(1), intColumnResult.getMinIntValue()); Assert.assertEquals(BigInteger.valueOf(10), intColumnResult.getMaxIntValue()); Assert.assertEquals(0, intColumnResult.getNullCount()); @@ -697,13 +701,13 @@ public void testBuildEpInfoFromNullColumnStats() { new RowBufferStats( intColName, Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named(intColName), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); RowBufferStats stats2 = new RowBufferStats( realColName, Types.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).id(2).named(realColName), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); stats1.incCurrentNullCount(); stats2.incCurrentNullCount(); @@ -718,7 +722,9 @@ public void testBuildEpInfoFromNullColumnStats() { Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties intColumnResult = columnResults.get(intColName); - Assert.assertEquals(enableIcebergStreaming ? 0 : -1, intColumnResult.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 0 : -1, + intColumnResult.getDistinctValues()); Assert.assertEquals( FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMinIntValue()); Assert.assertEquals( @@ -727,7 +733,9 @@ public void testBuildEpInfoFromNullColumnStats() { Assert.assertEquals(0, intColumnResult.getMaxLength()); FileColumnProperties realColumnResult = columnResults.get(realColName); - Assert.assertEquals(enableIcebergStreaming ? 0 : -1, intColumnResult.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 0 : -1, + intColumnResult.getDistinctValues()); Assert.assertEquals( FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMinRealValue()); Assert.assertEquals( @@ -744,7 +752,7 @@ public void testInvalidEPInfo() { new RowBufferStats( "intColumn", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("intColumn"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); stats1.addIntValue(BigInteger.valueOf(2)); stats1.addIntValue(BigInteger.valueOf(10)); @@ -754,7 +762,7 @@ public void testInvalidEPInfo() { new RowBufferStats( "strColumn", Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).id(2).named("strColumn"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); stats2.addStrValue("alice"); stats2.incCurrentNullCount(); @@ -882,7 +890,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 0, columnEpStats.get(enableIcebergStreaming ? "1" : "colTinyInt").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, columnEpStats.get(enableIcebergStreaming ? "1" : "colTinyInt").getDistinctValues()); Assert.assertEquals( @@ -894,7 +902,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 0, columnEpStats.get(enableIcebergStreaming ? "2" : "COLTINYINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 1 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1, columnEpStats.get(enableIcebergStreaming ? "2" : "COLTINYINT").getDistinctValues()); Assert.assertEquals( @@ -906,7 +914,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 0, columnEpStats.get(enableIcebergStreaming ? "3" : "COLSMALLINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, columnEpStats.get(enableIcebergStreaming ? "3" : "COLSMALLINT").getDistinctValues()); Assert.assertEquals( @@ -918,7 +926,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 1L, columnEpStats.get(enableIcebergStreaming ? "4" : "COLINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 1 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1, columnEpStats.get(enableIcebergStreaming ? "4" : "COLINT").getDistinctValues()); Assert.assertEquals( @@ -930,7 +938,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 0, columnEpStats.get(enableIcebergStreaming ? "5" : "COLBIGINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, columnEpStats.get(enableIcebergStreaming ? "5" : "COLBIGINT").getDistinctValues()); Assert.assertArrayEquals( @@ -942,7 +950,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 0, columnEpStats.get(enableIcebergStreaming ? "7" : "COLCHAR").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, columnEpStats.get(enableIcebergStreaming ? "7" : "COLCHAR").getDistinctValues()); // Confirm we reset @@ -2419,7 +2427,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getCurrentNullCount()) .isEqualTo(0); assertThat(columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getDistinctValues()) - .isEqualTo(2); + .isEqualTo(InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1); assertThat(columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getNumberOfValues()) .isEqualTo(EP_NV_UNKNOWN); @@ -2433,7 +2441,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getCurrentNullCount()) .isEqualTo(1); assertThat(columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getDistinctValues()) - .isEqualTo(1); + .isEqualTo(InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1); assertThat(columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getNumberOfValues()) .isEqualTo(EP_NV_UNKNOWN); @@ -2456,7 +2464,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { columnEpStats .get(enableIcebergStreaming ? "6" : "COLMAP.key_value.key") .getDistinctValues()) - .isEqualTo(2); + .isEqualTo(InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1); assertThat( columnEpStats .get(enableIcebergStreaming ? "6" : "COLMAP.key_value.key") @@ -2482,7 +2490,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { columnEpStats .get(enableIcebergStreaming ? "7" : "COLMAP.key_value.value") .getDistinctValues()) - .isEqualTo(1); + .isEqualTo(InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1); assertThat( columnEpStats .get(enableIcebergStreaming ? "7" : "COLMAP.key_value.value") @@ -2508,7 +2516,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { columnEpStats .get(enableIcebergStreaming ? "8" : "COLARRAY.list.element") .getDistinctValues()) - .isEqualTo(1); + .isEqualTo(InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1); assertThat( columnEpStats .get(enableIcebergStreaming ? "8" : "COLARRAY.list.element") diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 5d8ba41a1..17afada70 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -520,7 +520,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { new RowBufferStats( "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming)); EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats( @@ -577,7 +577,7 @@ private Pair, Set> getRetryBlobMetadata( new RowBufferStats( "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming)); EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats( From e853ccbf078a012313ff95b4638c47771ace5b38 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 19 Nov 2024 22:33:58 -0800 Subject: [PATCH 2/4] Use role SYSADMIN for testing (#907) --- profile.json.gpg | Bin 1580 -> 1574 bytes profile_azure.json.gpg | Bin 1593 -> 1588 bytes profile_gcp.json.gpg | Bin 1594 -> 1592 bytes .../java/net/snowflake/ingest/TestUtils.java | 1 + 4 files changed, 1 insertion(+) diff --git a/profile.json.gpg b/profile.json.gpg index 12955621bd7fae34c150c08d7e7e5e48ff58cd2d..ffce40ab376a54afcc38ea7032be5b02cc5c5f49 100644 GIT binary patch literal 1574 zcmV+>2HE+H4Fm}T2ox6=oXHDE9{)xkbviS(O-FU; z)=GK6Azf>mT;&t|8uIY$=Qc6-Fd^Ssxdax_=Ny1$kcEZX4J%l8~ z6KJ2BbmquOu^jRP`vL5@IlA4Z>O^Fec%Ahg=>#3*5w!pd?Ue<&KMH)oucnCah$8Nd zu(SevViF?o?K%^zx*{nGnCe77DLu(XflzU+D%Cl`%<=K>0I<_Xy6ba0X6$I&7 zsvrh#BMYz+b3pT)`{FG*y!ru(*jWz%Z?VG<|1{Ue1tCygjD17te_KHy6z+c$CjpCR zl0}pXR#B+zw63QejJ~D_-+@0i(OZf@8>VxmO-v$<0`)GtA-%P*aeQ}Y#6s^nu- z<74FF_H@IG{-Wt=_4yar4oZyaI_!0@(!0jmfG`udJgjXTRX}ML(0f6Nk63Q}E(L#k z;gpxuJbK+uggj@2i5V#8>BSXUKI>mY_uLwGi$G;LdM5>0Je|I=E<+Tr>-I&51IX^R zlA?YTYI;t{BUXwO80RT1fqe~2We_*A_6P7Kto&OMl@{&45 zFD6w_AD5?c?|4$c{B+yM(`1lwxM=z?>S+V6h6a?AN4tcI zbJ=mAi-;@#SlgKnmZS}@=VP;D5a4*Ykh&8tKxl~C(V8WLix|^PBJ=4KPvryZ!Bri@ zcP2NqM~Cs3=@1o57M?nq@L4ghpB!Mg@whyZTuNL@^;E}Uk6|7=@Sy{Ze1H&H(BIKg zjGjYdgrVfzy++)PnedZ=xf>r4{IavN12%{&_Y^h9n&T#2vgXd~H3_-4oYI5GteIHa zqY(fQwLt&g{tIs>_@$4jhYV|nr!EPLU|r}Gh!I~^k^P#J-;rJd`B*{ z;P6c3DH8g4+PgqJBya&Faf-*q#jb7d-ah?y9Qi86o;q~nP$&n=g|oBhlcAi;DgM4= zvQluweVg>czTHeP*j1YjSbCF5e=^R`ED>P|9edN~%QR8_=KQ&8jo_k3C5wImcxRIG z62#lL>7di?+1-2CG(z%yOveKNG^HGfP{a$(;E1z+ zTGAE~lQ-kL9U)r3qlrOG0p$**oy7;+*RZDcP0rZ|MCA77m(`J9vbnZAEU%4+nl9K& zomtyyhY`lGeV571SgDNWwn)?~amNMa;q!5_e=6BE&RS2z+Ex9Arr=PO^+@%XFgvt% zY9lwn9oktK1c`S!3TJk1hiCyLTT1+B$)kcw1jxYppiW)|tb0YP2{W|ZrlK?HC?Ypo YjT3bjT6Gy7&qD-5*kx*vFp)SBjCt@DCjbBd literal 1580 zcmV+{2GjYB4Fm}T2+SLhNw~Ey#r@Li0YSn{lKhO!(@m;iRo+V7#ovwRh?`6%-QaN8K zU5xuB9j4U`pOM2#*5(E_trmSHo7XUzWy5zkiH=5z!S<$eSdMTwTIjJd*E|gv!DHsg zMeF24&toS)JnfZQ2T857_@B5uaKzY{(fu}doM^As1{y|Q!vnH!;pR`DBSJ>BvG;#G zS#YuVJn&b5Y7Hay&{t?^+BSc);db*}oT|QgA%;=cgm)>5cSv=-Hx_s(iJ(NK)=mk? zoT{cp45ZUw6ty56@WcwGTVT{fX2;Fue<)5Kj_;XpVB#{lm5-^^s@0~XLd|n09Lcs- zu=SP4@6E)2=oEN_%p5gq@7^zUiM1cH`?XonWGX*6uA4 z1pt~XgRTDtytTERTUqjr>HmE~)Ts79J(#@{+2vDm1_+K`9P79HBhz(ik+`a+?3QEK zH!Ys#qh8;g(8*CyrJJWB+Xt5=Ues!I<^vX?Mh(pAB5t3NCoK*C;_Nq!In%Zd&ReY% zm_On8_OLg4K@!%Fk=ep8Mmx?2&3SwUSUA-$p*e$tNi*QH7$Nu{gZUM3@G&US(EA<2ZrN?5fG6{2*I1Sa#jHz#x zD^Ec>sH$8xH}lmtd4fSG<5sgC6TPCgM#RX`{!41m{$*4p6!JeJJlTtx!-UyOv{lEe zmMd2%gRi~VB0F9+JgMox#qI41x@Ktd6zL!DN^3c_W=-eYKTjIJW}LO`K{ zD@x)J%NgS#Crxux01R^%%t0y;b9l3^pb`uI*6;oa-TFjcEi?jL8JZcSVtZa2+4iaPUmYE~nFg zX@wr&wlryb8{PGe(U&KBJ$j`JR+eMY-JVLilF`B~Zq_ahu`2;!Ul55CzrEc%zD@`T z-S2vJDc(QbW~YEP-8nYOd(Yf#^TqNu0AIQNJyB9Gjcft&^#K}IP3J(Om8c%2hy~Y2MN1% zs2`+oqcScBLJkK)oeiUHf{|UkFh81P#@hT3w@e_`3{Fq?a<~p{8E7q#lO8+b0T=V1 zC+i|-ov|^R4kBb@tSt@F!E&5(N4&T(%+$0DaZW#m`PH5Q9*ORhi{-2{4?blj+UcciIrHgXJ$2C4cY&`;h-KQ80T5x)y^q0c{+e4Ji!_ygE+A2pd#fw39 zLlwzZpxYRQ^_Tw7E7|{yMoZ8_HFU+Tb}q@l#ksGA43B609FfA8k3d2mZ>wmK!Cam8gn$5SZ efEj+kHW5f(W{7KkHnmSiXLM%;jwTgR+#+}3i4r~l diff --git a/profile_azure.json.gpg b/profile_azure.json.gpg index b388626dce176b3d587246bc4c8f9e88cdb9f2be..d7d3af0e52b15344266e908214b05bffed422092 100644 GIT binary patch literal 1588 zcmV-42Fv-34Fm}T2zNlsIk^7kPyf>D0V8w*)^AJxILx7D(}N<10qCuXw!PyxbDfiz z)Q#28TVL1NcOn#WD>&2pgu`N{Tp}HeF$*-v*15hJCN-@SOiccNY+#JfOA2$jAhtBU z5iTjH^6ZYm{Ja$XC41$Wk7FHA#~@-gCWy=hV{nRI%YDbL#DEGVdLz9|Iev7wh-2f= zXi%gzE4LWby(oAaBm>{~0TTdt!HlyWez<$oOV-SMU;k5~B&nzXY!jmc7eAMfLJCKw z_J+0sFye3Mjh9gV{RFXNPrBF46c`}w+In8v&D|&;#O5m{#su~K%pCIBid*|)Gn`}z z2uIcx;;rJ6fs;tF`6a$tCBl%P@5{14+r*+ubT|Hn9LC+#djM!p9ntFEzcES11cQwE z^W8BdG(;f#N4-9__84HnV`fVokQR zhRUE0UTzMOeb1j7t6CP$lIEWiJectZMwlAd5Pj1GXw}C3_e*?rlso1u<3$mTlu*IT zIG{-&EYCMs%Kn4MR~pj!XkV~fs?6fX-5X|vo_6>>M+}fzs*1bIX|BQe8a4<+d36wS z&;Q|Uz#q~)qQV3os{RH#E1wY& zkbbHB8`3jn(-o=*Qn0ZR@0?HG1m}wa7J;dA1`Z=i`TLctUjoU2%|5d`Q!-*edhnT` zk^!mj#5Nrt27!hflRpYEJU{`-Y5va6N1NKpytm3{iO?A(oUbcV{)_KbReLCuJ-f-N zb#hohyY*fBmqg!OtqkOI!z3$cG*~Scnxj$K2Cbd3`jcbT3@X_8MC{Mo&3@JZ;bIk?+y(C z@mRCcAV3o!-Tkn5!JVi*np^DON018a3_fFn!1iAY;ub0m;E+6c5qLYxI9DKL34n5jaT(*8ann49gUcr1@}PX*!IDoii=$Ml#tB^G?^Y{0#k+>zv{RH~-U zBGLlDq_&;`RD6e}L(QjwfBxF8xq^fKJh&|k+Q)+Y zLqeLnEo8N*khFe*p$0ga`0makZ`7IK4@9k3NU$mii_k1+AqgMZNVO`IulsC*npEkN zy64jD^q~q!dK<_$QC~(lPdr4>pHo;w@}(mb_yT$<2xG?os#(YgG6YB#Tf(h8j3+8q zlR3t;dm}jhO~V)@qI7FA z`yJwZUJ50e18Urd+fg4_0&zn!KfD#7R+V}c4`^sc89jyN)jVkD!S^#S0NVc3guI$` zn{TZmmh+>dM!_CJ7NIgeX8Nn}fz&ueqGb8N#JTE0c$b5}bN7yW{akOa=e!AY6m>IW zUL%2NufeB02(9X2ubx2W>bE4)Li!8M?GP!UYJ7Ns!MGqPlZ4Io6)e~NF&90wwVqtI z*-M`ZA)4_L>SkanTc$+5o(L^~@kG#U!K~MEq8$lod!S8zSnfRrffM<^vD8$&NYOin m03lzY6eJ`;GRuElToqB#Hd4D5j*AKCkfSgXHzdKlF3YPSO%sU# literal 1593 zcmV-92FCe}4Fm}T0*51Db!f;jsp-<{0Xmdh$|5ZWjI|JiYM(gAf2HUw3TyR3|90Ra zO0|YxVDpP|)?qK~!%AtkS9A96LiRoV z{AQLg)v#!Ra^+w^3e;U$3(=FzBV8Idqm1VgvJUerVCVSHdc4!+b%I4U;cd){UNvz$ zsu0c9Ppd-=$0T~d4K{6zObR+fN3{U8pP52E7pv?D%U(u$vLNCQM*+C!_9T$_k9;_f z-}CP9%I*AS_7vBAlq42Y3t*b_x8^y%o%8%Q>fj!6Rdk+sZ@%RoFPmjZv}*H^3JPZZ zQO;8SWS=h0 zsELgQC`#A4X_X}p9r=LbYdm5olH=u(yT5Bo?|TC^UL9of2LLN1bZ3nGRWk1jW)W5| zaEWr)N$x5Z_S41hb5k5&_i!4)?%KLz-v}kPCe0;<-ofN~NzixLj6FwKlTn+C6#w?} zmJ2v9o5q{#N>kBPEOY^H<}MW7+NkBw$rf%VGKqvu`J>}OF*)XN__+=Aqq_OfZ^|&0 zl$H{AzuyQ)fu{SG&Mxl-$}isovnC8!#I|HD8v6iE=~OvSwV!EbA-|;@?~L+`{?hi0XY5w#;vb zf8nrp@$6D*{-JXHjD)5eYq(kGeQy6WNl{u!>cbP&NX*3GL>Zll=|~lMW;;xopte*^ijbPkzJs)IwrJYzA@f@ZJsw?AoWHq^lzuAB`( z?#%R0_JplYkfT{-AJ7EeNQ(V_;*`z}_+kBXOm~Ru`#vw7I>j%Tqj{yPS%fz}!;BKp zG`nc>wbTkWU8m3Fk1givQdC~?K9!lg#8!D28fM@qxQr6(G^E+jFacA5LV~c|| z?trp7#2EmEA%mbDi9^ljLE2FQH_K>HMpVgW&sgD1^MbC^eCs(O+uG&N+L^G@A0Atr zk-d37N5~U*0x( zDvhJ-#GcAK>3Q>IA0bbvXwLiqq885Es6S#1+q^kTHeWh_&gJR>MV&)CnMcc0qP4>g z3Zp7uTb>1dbnjs3Pat=q^()B}a1)tHSqT2J0f+s-*y;>hWC$bw0N1eZJgMrD+!K2! zvKMBJtZK=X5M1S#lv~=@N&qwn_*B{ zDeZlNVTCj%IMZQ0J(~BFFnNwi3h{eR>AU?Fp(qxA4Q(P&BQ>GdJfFwnLLIpV3}m~E zoX>GW6%r$4fnXy1a@}_Q-(I(LFRy=paYHuI{|s$xw}Mbk*Ts&P%Zvattdx6rhowWq zLD+_0pah|B``Y!-$$>vo1$b7&t9O+=@e`^x)Pkj`weBJlFbtaV;5{qQb4Bfd%-lT8?^8 z+bej$gbGOFVDW97(8b$UpTcLyG-UZr75Z3{1LQ?~a5`+(QYOoQR+J9B8&AzKyDl*{ rza~I8Wc?5BzT-vEQOLaQE3nI1kbu&IERznw(iAPCgIWTW_U6diAE6vQ diff --git a/profile_gcp.json.gpg b/profile_gcp.json.gpg index cc45b109f82382db03b6abd53593bac7fbb727a9..cb9eb73f42eaa2dbbfea28e5bb4046c0e9263214 100644 GIT binary patch literal 1592 zcmV-82FLk~4Fm}T2;)|=Giihi>;KZ~0Xiu=Z$9HYne0Kg(0??QN58G!ZTrwuH-aJ?aK}x_~gXGG9|72z3U;3h7ljO=DVHsg?!QGNimVn4trVCej0G!SiyP zsmlU(`nI7Mn>q2okblMo=DCc3@Eqc!RKaKv!LSKJ3GNHH78|F-f_OEXucJc5O@Fj_ymS@6+p8un*FLz-h*r@ z(mVOHsI)#G0;6JXk&Zc<;vK66QOmu9d?WIuVs_EFmND8h)<^axKJIU3AN5Ux6IZ{5 zBLilEF5X!q#5$#lK^25fp*^RgOOac}#Ppx)jueE>vsoQXwzdd-HC9W6VE`cCp0rz) z3Oi!Xnr6|ubw0=`F$~#nGf9X5w7G0wA+g1$vgM(#w@5^UDKjbL9DJg$k-DkfT&c^$ zFKZ>HL`zd;*9EUjw8{vD(aK3hA~(3Qv6t$auBoFy>-jA!O;pr3d(;biy+Kf4x;W8x zTlv#_{I*K{$F%SM4z>V~PypjS5I-p<`1%RZ4+TKZVea39^e&9ZMNa zBLX=U0-d6LwQMJDV8tS}&Q1@kXby)p*!PlOaUdwi?VxZTpp+aa@D%}_Ft>0b$vpXk z@8sFW3qE`Hjb7MAU_~Wv(&p0&B$KH1a0{uk@7ZpfID7iTIdy8nxbgwl6+^BPLA!d= zG-j{ga3jjxQmpxnF!+lU$cT+~NTE=dx!OCA&Q1nAh1^42@v*7pEbwpwI2`y6XGDe~ zL*;Z9=4wu?POzYd3KY|pC(3<0?CzA|`+GO8v+7~;yqAY#1%7aRJB>>;&(pyuhop-R zblhdO5!or3%1GXo;OBJGSR*@UcZuk)wC67k&x|$Nf|npZ-5D?%TF=4!gIO|-ShRg~ z+{d6*y(bh;RGYYIb0&2JZ!fEA9m+8kHMeJMpE=^=U|JS!z89Sko84uK@ti{UV!{*U zdg$x2j_-?J=~5|i>;4VQ?l=7*+;j|XFonm$$hC%=n*xAOjk8!W{0HfBoHsWR;8c`H zkdnsL{c1EAJl{(=omjC6bn*AIHNT2R5wYo@{T~8Q4$;vB`IW~qAT#sZ2i}@1 zgDOOQsD6wIB#QHYCAT-TDdVOt*iG6bRPHTqptRBx6E0el_sAfIO9s#NZ zxfOrP>$B@taY2)C&QEJrL;oi&Ud_=(Y5U>GbN(oS&0Uy>+Du8;K4~NB9~`zPF6kocyb6Mpe#MEJai9lo8sEgWHuNPC!8?BVagFVoh0db84|TM9pz=!?cq13|CBg8h*Un@;8=S$C6}<4kJprFxq~yiR+ht>_JMSV#G-yjh#F&M0 z?eeq7mWt}`0%hu!j<{A<-F0bH)K6=`7)BTyFF-Pi*et1!iq$8XS#*!mZek^#9%5 z2-c7(a?Yt7ROi*4_Hi#lOe^ezWp0dEm#s01!?HyTP(zqJdL|jELhpa}TdwG$@GwiT zjA*xZyK65!-M{^MguoxYfLqCorfS-z4dLYI6tje&ln7FUci*Zd5X~DZA)OT`-SYRL z9gq?@n~K%oQc=Opkw5Ny!gSi>p}`itwm3}>TG>?p(vk5cUru-K4PAOpab^HXg1XaI zSc1`Cb`K=B=vh?g#0`Q~YS5GZO)CE`G_Ci+<@`G6OPHy`7|(JpRgo2@3HYQ3q|amS z%pxwZP;x@DkybEvT)oL|OTQxhLs#3crjK*ZGyB#@8J}*;kN# zh#H0;Hlcn^KOrrasRmoprrkPsOn#qNYOO>RtItAv;K-I%D(sw8`SQVN8)U7~{21Ln z{_G%G^z_Cn>}^}232qsTjMj8x8MFE6D1Z@82vTZ8T(+LE%CX*@Ri1?Qd$xfq3}&|O zoTvVE(7V~=z&nKvpooj>7RlLiN;Rb48z(5}Z+rb95na98I~4QO3sf(o>fY;d4KUh! zh7;Uw2disxz%iaXeNn?q1Psf9i`g3!BPS}85a6>bQ*9w(&r1y3eqkv3Ic(!molrboY)X4xBe z>x={nF5+UFB2Y2y4s31~jr0r`F!EA#?wc>m3^2b}D9zE96f&^wu1-YkzVgjr3CGFw zpU(KMSc0|;x1n1M``NM=tGSLTV#GKu=#PrOb(x#DubFv*m4Vn&EEm{8bCx0yqPl%t9!P4Cgwfdc#U43MUZ`gb`oRH&Ng(J6*cFn2a|b_T++A+h}l|_r)9x>C_cso~>V7rgWv0d6Y zCH$F-$7G6`_L1q)gTx}C~@%CLwER$^5*L%ixkjK0% zYA%8r7I9tdC~I=Dt(gPkh)e?UaUnmJfd-O}N<=m)iu{FbS!MC^Gi`TZ7Rt&j`jLFx zHDjvIlflMS*;!FrUa7$JVb;u!W<^>QCI^dVHkaI{w=)1KDDh1*Pf6h+Z#}R^4T>Gj zAmpnkK)F{K4q{_Yu*JZo#o9*e3HmhtE}=pMb$p&JVEF1DZJqS?_p1@T7UFSw7wF$7 zc0IAKzcl!6bwVbQhqqH&lsc?AYOCNajxvI`I-gOBTce_YMqDZl2`#I^si0)Ulte)l zjBxTZ@!!bBD^=P=d-((++o(Y&TmjUrL{JVY?oRlOp{Cxo{FvxkPdb6IkO5tt-G5!S sk|*3Rtr}laMms=KHn_L?LNG(pk1BH+f+BTKHhO39q^?}NG*_(>+g3Iko&W#< diff --git a/src/test/java/net/snowflake/ingest/TestUtils.java b/src/test/java/net/snowflake/ingest/TestUtils.java index 83ad3983b..2fc8d3f5d 100644 --- a/src/test/java/net/snowflake/ingest/TestUtils.java +++ b/src/test/java/net/snowflake/ingest/TestUtils.java @@ -302,6 +302,7 @@ public static Connection getConnection(boolean isStreamingConnection) throws Exc props.put("warehouse", warehouse); props.put("client_session_keep_alive", "true"); props.put("privateKey", privateKey); + props.put("role", role); if (isStreamingConnection) { streamingConn = DriverManager.getConnection(connectString, props); From ea32aa6eb15a7b8e0bbf843df17be9e1abb68975 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= <145468486+sfc-gh-mbobowski@users.noreply.github.com> Date: Wed, 20 Nov 2024 10:58:13 +0100 Subject: [PATCH 3/4] NO-SNOW Fix logging (#909) From 1f6fa8e5693292ffcf763c3a67a8cf3e07f65b3a Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 20 Nov 2024 12:44:38 -0800 Subject: [PATCH 4/4] SNOW-1813763 Add IT for token expiry and renewal of Iceberg table (#913) --- ...nowflakeStreamingIngestClientInternal.java | 23 +-- .../internal/it/SubscopedTokenRefreshIT.java | 160 ++++++++++++++++++ 2 files changed, 173 insertions(+), 10 deletions(-) create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 4bbcc9bc9..f10f683c3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -108,7 +108,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea private final ChannelCache channelCache; // Reference to the flush service - private final FlushService flushService; + private FlushService flushService; // Reference to storage manager private IStorageManager storageManager; @@ -157,7 +157,8 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values */ - SnowflakeStreamingIngestClientInternal( + @VisibleForTesting + public SnowflakeStreamingIngestClientInternal( String name, SnowflakeURL accountURL, Properties prop, @@ -219,14 +220,16 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea prop.getProperty(Constants.OAUTH_REFRESH_TOKEN), oAuthTokenEndpoint); } - this.requestBuilder = - new RequestBuilder( - accountURL, - prop.get(USER).toString(), - credential, - this.httpClient, - parameterProvider.isEnableIcebergStreaming(), - String.format("%s_%s", this.name, System.currentTimeMillis())); + if (this.requestBuilder == null) { + this.requestBuilder = + new RequestBuilder( + accountURL, + prop.get(USER).toString(), + credential, + this.httpClient, + parameterProvider.isEnableIcebergStreaming(), + String.format("%s_%s", this.name, System.currentTimeMillis())); + } logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType()); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java new file mode 100644 index 000000000..7b9319985 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal.it; + +import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; +import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG; + +import com.google.common.collect.ImmutableMap; +import java.sql.Connection; +import java.util.Properties; +import net.snowflake.ingest.IcebergIT; +import net.snowflake.ingest.TestUtils; +import net.snowflake.ingest.connection.RequestBuilder; +import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; +import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal; +import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.HttpUtil; +import net.snowflake.ingest.utils.SnowflakeURL; +import net.snowflake.ingest.utils.Utils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(IcebergIT.class) +public class SubscopedTokenRefreshIT { + + private String database; + private String schema; + private Connection conn; + private SnowflakeStreamingIngestClientInternal client; + private RequestBuilder requestBuilder; + + @Before + public void before() throws Exception { + database = String.format("SDK_TOKEN_EXPIRE_IT_DB_%d", System.nanoTime()); + schema = "PUBLIC"; + + conn = TestUtils.getConnection(true); + + conn.createStatement().execute(String.format("create or replace database %s;", database)); + conn.createStatement().execute(String.format("use database %s;", database)); + conn.createStatement().execute(String.format("use schema %s;", schema)); + conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse())); + + SnowflakeURL url = new SnowflakeURL(TestUtils.getAccountURL()); + Properties properties = TestUtils.getProperties(Constants.BdecVersion.THREE, false); + properties.setProperty(ENABLE_ICEBERG_STREAMING, "true"); + properties.setProperty(MAX_CLIENT_LAG, "1000"); + requestBuilder = + Mockito.spy( + new RequestBuilder( + url, + TestUtils.getUser(), + TestUtils.getKeyPair(), + HttpUtil.getHttpClient(url.getAccount()), + true /* enableIcebergStreaming */, + "client1")); + this.client = + new SnowflakeStreamingIngestClientInternal<>( + "client1", + url, + Utils.createProperties(properties), + HttpUtil.getHttpClient(url.getAccount()), + false /* isTestMode */, + requestBuilder, + null /* parameterOverrides */); + } + + @After + public void after() throws Exception { + conn.createStatement().execute(String.format("drop database if exists %s;", database)); + } + + @Test + public void testTokenExpire() throws Exception { + String tableName = "test_token_expire_table"; + + /* + * Minimum duration of token for each cloud storage: + * - S3: 900 seconds + * - GCS: 600 seconds + * - Azure: 300 seconds + */ + int duration = 900; + createIcebergTable(tableName); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_S3=%s", + tableName, duration)); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_GCS=%s", + tableName, duration)); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_AZURE=%s", + tableName, duration)); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_DEFAULT=%s", + tableName, duration)); + + SnowflakeStreamingIngestChannel channel = + client.openChannel( + OpenChannelRequest.builder("CHANNEL") + .setDBName(database) + .setSchemaName(schema) + .setTableName(tableName) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.ABORT) + .build()); + + /* Refresh table information should be called once channel is opened */ + Mockito.verify(requestBuilder, Mockito.times(1)) + .generateStreamingIngestPostRequest( + Mockito.anyString(), + Mockito.eq(REFRESH_TABLE_INFORMATION_ENDPOINT), + Mockito.eq("refresh table information")); + + channel.insertRow(ImmutableMap.of("int_col", 1), "1"); + TestUtils.waitForOffset(channel, "1"); + + /* Wait for token to expire */ + + Thread.sleep((duration + 1) * 1000); + + /* Insert a row to trigger token generation */ + channel.insertRow(ImmutableMap.of("int_col", 2), "2"); + TestUtils.waitForOffset(channel, "2"); + Mockito.verify(requestBuilder, Mockito.times(2)) + .generateStreamingIngestPostRequest( + Mockito.anyString(), + Mockito.eq(REFRESH_TABLE_INFORMATION_ENDPOINT), + Mockito.eq("refresh table information")); + } + + private void createIcebergTable(String tableName) throws Exception { + conn.createStatement() + .execute( + String.format( + "create or replace iceberg table %s(int_col int)" + + "catalog = 'SNOWFLAKE' " + + "external_volume = 'streaming_ingest' " + + "base_location = 'SDK_IT/%s/%s'", + tableName, database, tableName)); + } +}