diff --git a/profile.json.gpg b/profile.json.gpg index 12955621b..ffce40ab3 100644 Binary files a/profile.json.gpg and b/profile.json.gpg differ diff --git a/profile_azure.json.gpg b/profile_azure.json.gpg index b388626dc..d7d3af0e5 100644 Binary files a/profile_azure.json.gpg and b/profile_azure.json.gpg differ diff --git a/profile_gcp.json.gpg b/profile_gcp.json.gpg index cc45b109f..cb9eb73f4 100644 Binary files a/profile_gcp.json.gpg and b/profile_gcp.json.gpg differ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java index 7a6b23670..e49dc18a4 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java @@ -23,4 +23,9 @@ public BlobPath(String uploadPath, String fileRegistrationPath) { this.uploadPath = uploadPath; this.fileRegistrationPath = fileRegistrationPath; } + + @Override + public String toString() { + return String.format("uploadPath=%s fileRegistrationPath=%s", uploadPath, fileRegistrationPath); + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java index 52a72a5e2..3178c5bf7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java @@ -70,10 +70,8 @@ public boolean isNullable() { * when this object represents an iceberg table's column, null otherwise. The String returned from * here is meant to conform to the json schema specified here: * https://iceberg.apache.org/spec/#appendix-c-json-serialization - * - *

Make this a public API when the Builder.setEnableIcebergStreaming API is made public. */ - String getIcebergSchema() { + public String getIcebergSchema() { return icebergColumnSchema; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java index de5cb87eb..4bda0ef58 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java @@ -11,9 +11,11 @@ class InternalParameterProvider { public static final boolean ENABLE_VALUES_COUNT_DEFAULT = false; private final boolean enableIcebergStreaming; + private final boolean enableNDVTracking; - InternalParameterProvider(boolean enableIcebergStreaming) { + InternalParameterProvider(boolean enableIcebergStreaming, boolean enableNDVTracking) { this.enableIcebergStreaming = enableIcebergStreaming; + this.enableNDVTracking = enableNDVTracking; } boolean getEnableChunkEncryption() { @@ -38,7 +40,7 @@ boolean setIcebergSpecificFieldsInEp() { boolean isEnableDistinctValuesCount() { // When in Iceberg mode, we enabled distinct values count in EP metadata. - return enableIcebergStreaming; + return enableIcebergStreaming && enableNDVTracking; } boolean isEnableValuesCount() { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 9b68d924e..f10f683c3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -108,7 +108,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea private final ChannelCache channelCache; // Reference to the flush service - private final FlushService flushService; + private FlushService flushService; // Reference to storage manager private IStorageManager storageManager; @@ -157,7 +157,8 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values */ - SnowflakeStreamingIngestClientInternal( + @VisibleForTesting + public SnowflakeStreamingIngestClientInternal( String name, SnowflakeURL accountURL, Properties prop, @@ -167,7 +168,8 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea Map parameterOverrides) { this.parameterProvider = new ParameterProvider(parameterOverrides, prop); this.internalParameterProvider = - new InternalParameterProvider(parameterProvider.isEnableIcebergStreaming()); + new InternalParameterProvider( + parameterProvider.isEnableIcebergStreaming(), false /* enableNDVCount */); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); @@ -218,14 +220,16 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea prop.getProperty(Constants.OAUTH_REFRESH_TOKEN), oAuthTokenEndpoint); } - this.requestBuilder = - new RequestBuilder( - accountURL, - prop.get(USER).toString(), - credential, - this.httpClient, - parameterProvider.isEnableIcebergStreaming(), - String.format("%s_%s", this.name, System.currentTimeMillis())); + if (this.requestBuilder == null) { + this.requestBuilder = + new RequestBuilder( + accountURL, + prop.get(USER).toString(), + credential, + this.httpClient, + parameterProvider.isEnableIcebergStreaming(), + String.format("%s_%s", this.name, System.currentTimeMillis())); + } logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType()); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java index 8f606b3dd..d15296484 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java @@ -58,8 +58,9 @@ class SubscopedTokenExternalVolumeManager implements IStorageManager { throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); } logger.logDebug( - "Created SubscopedTokenExternalVolumeManager with clientName=%s and clientPrefix=%s", - clientName, clientPrefix); + "Created SubscopedTokenExternalVolumeManager with clientName={} and clientPrefix={}", + clientName, + clientPrefix); } /** @@ -93,12 +94,12 @@ private InternalStage createStageForTable(TableRef tableRef) { this, clientName, getClientPrefix(), tableRef, locationInfo, DEFAULT_MAX_UPLOAD_RETRIES); } catch (SFException ex) { logger.logError( - "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); + "ExtVolManager.registerTable for tableRef={} failed with exception={}", tableRef, ex); // allow external volume ctor's SFExceptions to bubble up directly throw ex; } catch (Exception err) { logger.logError( - "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); + "ExtVolManager.registerTable for tableRef={} failed with exception={}", tableRef, err); throw new SFException( err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, @@ -127,8 +128,9 @@ static BlobPath generateBlobPathFromLocationInfoPath( String[] parts = filePathRelativeToVolume.split("/"); if (parts.length < 5) { logger.logError( - "Invalid file path returned by server. Table=%s FilePathRelativeToVolume=%s", - fullyQualifiedTableName, filePathRelativeToVolume); + "Invalid file path returned by server. Table={} FilePathRelativeToVolume={}", + fullyQualifiedTableName, + filePathRelativeToVolume); throw new SFException(ErrorCode.INTERNAL_ERROR, "File path returned by server is invalid"); } @@ -166,17 +168,19 @@ public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional RefreshTableInformationResponse response = this.serviceClient.refreshTableInformation( new RefreshTableInformationRequest(tableRef, this.role, true)); - logger.logDebug("Refreshed tokens for table=%s", tableRef); + logger.logDebug("Refreshed tokens for table={}", tableRef); if (response.getIcebergLocationInfo() == null) { logger.logError( "Did not receive location info, this will cause ingestion to grind to a halt." - + " TableRef=%s"); + + " TableRef={}", + tableRef); } else { Map creds = response.getIcebergLocationInfo().getCredentials(); if (creds == null || creds.isEmpty()) { logger.logError( "Did not receive creds in location info, this will cause ingestion to grind to a" - + " halt. TableRef=%s"); + + " halt. TableRef={}", + tableRef); } } diff --git a/src/test/java/net/snowflake/ingest/TestUtils.java b/src/test/java/net/snowflake/ingest/TestUtils.java index 83ad3983b..2fc8d3f5d 100644 --- a/src/test/java/net/snowflake/ingest/TestUtils.java +++ b/src/test/java/net/snowflake/ingest/TestUtils.java @@ -302,6 +302,7 @@ public static Connection getConnection(boolean isStreamingConnection) throws Exc props.put("warehouse", warehouse); props.put("client_session_keep_alive", "true"); props.put("privateKey", privateKey); + props.put("role", role); if (isStreamingConnection) { streamingConn = DriverManager.getConnection(connectString, props); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 627c95e72..81b31bd29 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -59,7 +59,7 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(1)), Constants.BdecVersion.THREE, - new InternalParameterProvider(enableIcebergStreaming), + new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */), encryptionKeysPerTable); // Construction fails if metadata contains 0 rows and data 1 row @@ -68,7 +68,7 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(0)), Constants.BdecVersion.THREE, - new InternalParameterProvider(enableIcebergStreaming), + new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */), encryptionKeysPerTable); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); @@ -93,7 +93,7 @@ public void testMetadataAndExtendedMetadataSize() throws Exception { "a.parquet", Collections.singletonList(createChannelDataPerTable(1)), Constants.BdecVersion.THREE, - new InternalParameterProvider(enableIcebergStreaming), + new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */), new ConcurrentHashMap<>()); InputFile blobInputFile = new InMemoryInputFile(blob.blobBytes); @@ -188,7 +188,7 @@ private List> createChannelDataPerTable(int metada .as(LogicalTypeAnnotation.stringType()) .id(1) .named("test"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming) : new RowBufferStats(columnName, null, 1, null, null, false, false)); channelData.setChannelContext( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java index d76499c14..440e13b27 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java @@ -27,7 +27,9 @@ public static Object[] enableNDVAndNV() { @Test public void testGetCombinedColumnStatsMapNulls() { Map left = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats leftStats1 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); left.put("one", leftStats1); leftStats1.addIntValue(new BigInteger("10")); @@ -56,12 +58,16 @@ public void testGetCombinedColumnStatsMapNulls() { @Test public void testGetCombinedColumnStatsMapMissingColumn() { Map left = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats leftStats1 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); left.put("one", leftStats1); leftStats1.addIntValue(new BigInteger("10")); Map right = new HashMap<>(); - RowBufferStats rightStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats rightStats1 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); right.put("foo", rightStats1); rightStats1.addIntValue(new BigInteger("11")); @@ -91,10 +97,18 @@ public void testGetCombinedColumnStatsMap() { Map left = new HashMap<>(); Map right = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - RowBufferStats rightStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - RowBufferStats leftStats2 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - RowBufferStats rightStats2 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats leftStats1 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + RowBufferStats rightStats1 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + RowBufferStats leftStats2 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + RowBufferStats rightStats2 = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); left.put("one", leftStats1); left.put("two", leftStats2); @@ -125,7 +139,9 @@ public void testGetCombinedColumnStatsMap() { Assert.assertNull(oneCombined.getCurrentMinRealValue()); Assert.assertNull(oneCombined.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 5 : -1, + oneCombined.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getNumberOfValues()); Assert.assertArrayEquals( @@ -137,7 +153,9 @@ public void testGetCombinedColumnStatsMap() { Assert.assertNull(twoCombined.getCurrentMinRealValue()); Assert.assertNull(twoCombined.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 5 : -1, + oneCombined.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getNumberOfValues()); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java index 162ccc63f..82171b687 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java @@ -33,7 +33,7 @@ public void testFileColumnPropertiesConstructor() { .as(LogicalTypeAnnotation.stringType()) .id(1) .named("test"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming) : new RowBufferStats("COL", null, 1, null, null, false, false); stats.addStrValue("bcd"); @@ -58,7 +58,7 @@ public void testFileColumnPropertiesConstructor() { .as(LogicalTypeAnnotation.stringType()) .id(1) .named("test"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming) : new RowBufferStats("COL", null, 1, null, null, false, false); stats.addStrValue("aßßßßßßßßßßßßßßßß"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index ed697b026..5b5c05809 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -112,7 +112,7 @@ private abstract static class TestContext implements AutoCloseable { storage = Mockito.mock(InternalStage.class); parameterProvider = createParameterProvider(enableIcebergStreaming); InternalParameterProvider internalParameterProvider = - new InternalParameterProvider(enableIcebergStreaming); + new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); @@ -905,13 +905,13 @@ public void testBuildAndUpload() throws Exception { new RowBufferStats( "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); RowBufferStats stats2 = new RowBufferStats( "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); eps1.put("one", stats1); @@ -1078,7 +1078,7 @@ public void testInvalidateChannels() { ParameterProvider parameterProvider = createParameterProvider(enableIcebergStreaming); ChannelCache channelCache = new ChannelCache<>(); InternalParameterProvider internalParameterProvider = - new InternalParameterProvider(enableIcebergStreaming); + new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */); Mockito.when(client.getChannelCache()).thenReturn(channelCache); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); @@ -1167,7 +1167,7 @@ public void testBlobBuilder() throws Exception { new RowBufferStats( "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); eps1.put("one", stats1); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java index 7fe76f7b1..bdc366ae7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -50,7 +50,9 @@ public void parseValueBoolean() { Type type = Types.primitive(PrimitiveTypeName.BOOLEAN, Repetition.OPTIONAL).id(1).named("BOOLEAN_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "BOOLEAN_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -76,7 +78,9 @@ public void parseValueInt() { Type type = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL).id(1).named("INT_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("INT_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "INT_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -104,7 +108,9 @@ public void parseValueDecimalToInt() { .as(LogicalTypeAnnotation.decimalType(4, 9)) .named("DECIMAL_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "DECIMAL_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -138,7 +144,9 @@ public void parseValueDateToInt() { .as(LogicalTypeAnnotation.dateType()) .named("DATE_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DATE_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "DATE_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -163,7 +171,9 @@ public void parseValueLong() { Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL).id(1).named("LONG_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "LONG_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -191,7 +201,9 @@ public void parseValueDecimalToLong() { .as(LogicalTypeAnnotation.decimalType(9, 18)) .named("DECIMAL_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "DECIMAL_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -225,7 +237,9 @@ public void parseValueTimeToLong() { .as(LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIME_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIME_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "TIME_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -253,7 +267,9 @@ public void parseValueTimestampToLong() { .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "TIMESTAMP_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -287,7 +303,11 @@ public void parseValueTimestampTZToLong() { .as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_TZ_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_TZ_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "TIMESTAMP_TZ_COL", + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, + true); Map rowBufferStatsMap = new HashMap() { { @@ -318,7 +338,9 @@ public void parseValueFloat() { Type type = Types.primitive(PrimitiveTypeName.FLOAT, Repetition.OPTIONAL).id(1).named("FLOAT_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "FLOAT_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -343,7 +365,9 @@ public void parseValueDouble() { Type type = Types.primitive(PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL).id(1).named("DOUBLE_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "DOUBLE_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -368,7 +392,9 @@ public void parseValueBinary() { Type type = Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).id(1).named("BINARY_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "BINARY_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -398,7 +424,9 @@ public void parseValueStringToBinary() { .as(LogicalTypeAnnotation.stringType()) .named("BINARY_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "BINARY_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -430,7 +458,9 @@ public void parseValueFixed() { .length(4) .named("FIXED_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "FIXED_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -461,7 +491,9 @@ public void parseValueDecimalToFixed() { .as(LogicalTypeAnnotation.decimalType(10, 20)) .named("FIXED_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "FIXED_COL", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -490,7 +522,11 @@ public void parseList() throws JsonProcessingException { .element(Types.optional(PrimitiveTypeName.INT32).id(2).named("element")) .id(1) .named("LIST_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.list.element", true, true); + RowBufferStats rowBufferStats = + new RowBufferStats( + "LIST_COL.list.element", + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, + true); Map rowBufferStatsMap = new HashMap() { { @@ -576,8 +612,16 @@ public void parseMap() throws JsonProcessingException { .value(Types.optional(PrimitiveTypeName.INT32).id(3).named("value")) .id(1) .named("MAP_COL"); - RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key_value.key", true, true); - RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.key_value.value", true, true); + RowBufferStats rowBufferKeyStats = + new RowBufferStats( + "MAP_COL.key_value.key", + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, + true); + RowBufferStats rowBufferValueStats = + new RowBufferStats( + "MAP_COL.key_value.value", + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, + true); Map rowBufferStatsMap = new HashMap() { { @@ -686,8 +730,12 @@ public void parseStruct() throws JsonProcessingException { .id(1) .named("STRUCT_COL"); - RowBufferStats rowBufferAStats = new RowBufferStats("STRUCT_COL.a", true, true); - RowBufferStats rowBufferBStats = new RowBufferStats("STRUCT_COL.b", true, true); + RowBufferStats rowBufferAStats = + new RowBufferStats( + "STRUCT_COL.a", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); + RowBufferStats rowBufferBStats = + new RowBufferStats( + "STRUCT_COL.b", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true); Map rowBufferStatsMap = new HashMap() { { @@ -819,7 +867,10 @@ public void parseNestedTypes() { private static Type generateNestedTypeAndStats( int depth, String name, Map rowBufferStatsMap, String path) { if (depth == 0) { - rowBufferStatsMap.put("0", new RowBufferStats(path, true, true)); + rowBufferStatsMap.put( + "0", + new RowBufferStats( + path, InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, true)); return Types.optional(PrimitiveTypeName.INT32).id(0).named(name); } switch (depth % 3) { @@ -837,7 +888,11 @@ private static Type generateNestedTypeAndStats( .named(name); case 0: rowBufferStatsMap.put( - String.valueOf(depth), new RowBufferStats(path + ".key_value.key", true, true)); + String.valueOf(depth), + new RowBufferStats( + path + ".key_value.key", + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, + true)); return Types.optionalMap() .key(Types.required(PrimitiveTypeName.INT32).id(depth).named("key")) .value( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java index a145c86a8..272cafe0d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java @@ -22,7 +22,10 @@ public static Object[] enableNDVAndNV() { @Test public void testEmptyState() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; Assert.assertNull(stats.getCollationDefinitionString()); Assert.assertNull(stats.getCurrentMinRealValue()); @@ -32,25 +35,34 @@ public void testEmptyState() throws Exception { Assert.assertNull(stats.getCurrentMinIntValue()); Assert.assertNull(stats.getCurrentMaxIntValue()); Assert.assertEquals(0, stats.getCurrentNullCount()); - Assert.assertEquals(enableNDVAndNV ? 0 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 0 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 0 : -1, stats.getNumberOfValues()); } @Test public void testMinMaxStrNonCol() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; stats.addStrValue("bob"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getNumberOfValues()); stats.addStrValue("charlie"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals( "charlie".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getNumberOfValues()); stats.addStrValue("alice"); @@ -59,7 +71,9 @@ public void testMinMaxStrNonCol() throws Exception { Assert.assertArrayEquals( "charlie".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 3 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getNumberOfValues()); Assert.assertNull(stats.getCurrentMinRealValue()); @@ -72,27 +86,36 @@ public void testMinMaxStrNonCol() throws Exception { @Test public void testMinMaxInt() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; stats.addIntValue(BigInteger.valueOf(5)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getNumberOfValues()); stats.addIntValue(BigInteger.valueOf(6)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getNumberOfValues()); stats.addIntValue(BigInteger.valueOf(4)); Assert.assertEquals(BigInteger.valueOf((4)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 3 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getNumberOfValues()); Assert.assertNull(stats.getCurrentMinRealValue()); @@ -105,27 +128,36 @@ public void testMinMaxInt() throws Exception { @Test public void testMinMaxReal() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; stats.addRealValue(1.0); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getNumberOfValues()); stats.addRealValue(1.5); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getNumberOfValues()); stats.addRealValue(.8); Assert.assertEquals(Double.valueOf(.8), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 3 : -1, + stats.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getNumberOfValues()); Assert.assertNull(stats.getCurrentMinIntValue()); @@ -138,7 +170,10 @@ public void testMinMaxReal() throws Exception { @Test public void testIncCurrentNullCount() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; Assert.assertEquals(0, stats.getCurrentNullCount()); stats.incCurrentNullCount(); @@ -149,7 +184,10 @@ public void testIncCurrentNullCount() throws Exception { @Test public void testMaxLength() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats stats = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; Assert.assertEquals(0, stats.getCurrentMaxLength()); stats.setCurrentMaxLength(100L); @@ -161,8 +199,14 @@ public void testMaxLength() throws Exception { @Test public void testGetCombinedStats() throws Exception { // Test for Integers - RowBufferStats one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - RowBufferStats two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; + RowBufferStats two = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addIntValue(BigInteger.valueOf(2)); one.addIntValue(BigInteger.valueOf(4)); @@ -179,7 +223,9 @@ public void testGetCombinedStats() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(1), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 7 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 10 : -1, result.getNumberOfValues()); Assert.assertEquals(2, result.getCurrentNullCount()); @@ -189,8 +235,14 @@ public void testGetCombinedStats() throws Exception { Assert.assertNull(result.getCurrentMaxRealValue()); // Test for Reals - one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; + two = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addRealValue(2d); one.addRealValue(4d); @@ -205,7 +257,9 @@ public void testGetCombinedStats() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(1), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 7 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 8 : -1, result.getNumberOfValues()); Assert.assertEquals(0, result.getCurrentNullCount()); @@ -216,8 +270,14 @@ public void testGetCombinedStats() throws Exception { Assert.assertNull(result.getCurrentMaxIntValue()); // Test for Strings without collation - one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; + two = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addStrValue("alpha"); one.addStrValue("d"); @@ -238,7 +298,9 @@ public void testGetCombinedStats() throws Exception { Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertEquals(5, result.getCurrentMaxLength()); - Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 7 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 10 : -1, result.getNumberOfValues()); Assert.assertNull(result.getCurrentMinRealValue()); @@ -250,8 +312,14 @@ public void testGetCombinedStats() throws Exception { @Test public void testGetCombinedStatsNull() throws Exception { // Test for Integers - RowBufferStats one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - RowBufferStats two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; + RowBufferStats two = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addIntValue(BigInteger.valueOf(2)); one.addIntValue(BigInteger.valueOf(4)); @@ -263,7 +331,9 @@ public void testGetCombinedStatsNull() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(2), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 4 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 6 : -1, result.getNumberOfValues()); Assert.assertEquals(2, result.getCurrentNullCount()); @@ -274,7 +344,10 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertNull(result.getCurrentMaxRealValue()); // Test for Reals - one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addRealValue(2d); one.addRealValue(4d); @@ -284,7 +357,9 @@ public void testGetCombinedStatsNull() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(2), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 4 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getNumberOfValues()); Assert.assertEquals(0, result.getCurrentNullCount()); @@ -294,8 +369,14 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertNull(result.getCurrentMaxIntValue()); // Test for Strings - one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); - two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + one = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; + two = + new RowBufferStats( + "COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV); + ; one.addStrValue("alpha"); one.addStrValue("d"); @@ -308,7 +389,9 @@ public void testGetCombinedStatsNull() throws Exception { "alpha".getBytes(StandardCharsets.UTF_8), result.getCurrentMinStrValue()); Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); - Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 4 : -1, + result.getDistinctValues()); Assert.assertEquals(enableNDVAndNV ? 5 : -1, result.getNumberOfValues()); Assert.assertEquals(1, result.getCurrentNullCount()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 8822363a8..57b919b55 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -223,7 +223,7 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, enableIcebergStreaming ? Optional.of(1) : Optional.empty(), enableIcebergStreaming, - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming), null, enableIcebergStreaming @@ -645,7 +645,7 @@ public void testBuildEpInfoFromStats() { new RowBufferStats( "intColumn", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("intColumn"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); stats1.addIntValue(BigInteger.valueOf(2)); stats1.addIntValue(BigInteger.valueOf(10)); @@ -655,7 +655,7 @@ public void testBuildEpInfoFromStats() { new RowBufferStats( "strColumn", Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).id(2).named("strColumn"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); stats2.addStrValue("alice"); stats2.addStrValue("bob"); @@ -671,7 +671,9 @@ public void testBuildEpInfoFromStats() { Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties strColumnResult = columnResults.get("strColumn"); - Assert.assertEquals(enableIcebergStreaming ? 2 : -1, strColumnResult.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, + strColumnResult.getDistinctValues()); Assert.assertEquals( Hex.encodeHexString("alice".getBytes(StandardCharsets.UTF_8)), strColumnResult.getMinStrValue()); @@ -681,7 +683,9 @@ public void testBuildEpInfoFromStats() { Assert.assertEquals(1, strColumnResult.getNullCount()); FileColumnProperties intColumnResult = columnResults.get("intColumn"); - Assert.assertEquals(enableIcebergStreaming ? 3 : -1, intColumnResult.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 3 : -1, + intColumnResult.getDistinctValues()); Assert.assertEquals(BigInteger.valueOf(1), intColumnResult.getMinIntValue()); Assert.assertEquals(BigInteger.valueOf(10), intColumnResult.getMaxIntValue()); Assert.assertEquals(0, intColumnResult.getNullCount()); @@ -697,13 +701,13 @@ public void testBuildEpInfoFromNullColumnStats() { new RowBufferStats( intColName, Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named(intColName), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); RowBufferStats stats2 = new RowBufferStats( realColName, Types.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).id(2).named(realColName), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); stats1.incCurrentNullCount(); stats2.incCurrentNullCount(); @@ -718,7 +722,9 @@ public void testBuildEpInfoFromNullColumnStats() { Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties intColumnResult = columnResults.get(intColName); - Assert.assertEquals(enableIcebergStreaming ? 0 : -1, intColumnResult.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 0 : -1, + intColumnResult.getDistinctValues()); Assert.assertEquals( FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMinIntValue()); Assert.assertEquals( @@ -727,7 +733,9 @@ public void testBuildEpInfoFromNullColumnStats() { Assert.assertEquals(0, intColumnResult.getMaxLength()); FileColumnProperties realColumnResult = columnResults.get(realColName); - Assert.assertEquals(enableIcebergStreaming ? 0 : -1, intColumnResult.getDistinctValues()); + Assert.assertEquals( + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 0 : -1, + intColumnResult.getDistinctValues()); Assert.assertEquals( FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMinRealValue()); Assert.assertEquals( @@ -744,7 +752,7 @@ public void testInvalidEPInfo() { new RowBufferStats( "intColumn", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("intColumn"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); stats1.addIntValue(BigInteger.valueOf(2)); stats1.addIntValue(BigInteger.valueOf(10)); @@ -754,7 +762,7 @@ public void testInvalidEPInfo() { new RowBufferStats( "strColumn", Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).id(2).named("strColumn"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming); stats2.addStrValue("alice"); stats2.incCurrentNullCount(); @@ -882,7 +890,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 0, columnEpStats.get(enableIcebergStreaming ? "1" : "colTinyInt").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, columnEpStats.get(enableIcebergStreaming ? "1" : "colTinyInt").getDistinctValues()); Assert.assertEquals( @@ -894,7 +902,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 0, columnEpStats.get(enableIcebergStreaming ? "2" : "COLTINYINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 1 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1, columnEpStats.get(enableIcebergStreaming ? "2" : "COLTINYINT").getDistinctValues()); Assert.assertEquals( @@ -906,7 +914,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 0, columnEpStats.get(enableIcebergStreaming ? "3" : "COLSMALLINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, columnEpStats.get(enableIcebergStreaming ? "3" : "COLSMALLINT").getDistinctValues()); Assert.assertEquals( @@ -918,7 +926,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 1L, columnEpStats.get(enableIcebergStreaming ? "4" : "COLINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 1 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1, columnEpStats.get(enableIcebergStreaming ? "4" : "COLINT").getDistinctValues()); Assert.assertEquals( @@ -930,7 +938,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 0, columnEpStats.get(enableIcebergStreaming ? "5" : "COLBIGINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, columnEpStats.get(enableIcebergStreaming ? "5" : "COLBIGINT").getDistinctValues()); Assert.assertArrayEquals( @@ -942,7 +950,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( 0, columnEpStats.get(enableIcebergStreaming ? "7" : "COLCHAR").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1, columnEpStats.get(enableIcebergStreaming ? "7" : "COLCHAR").getDistinctValues()); // Confirm we reset @@ -2419,7 +2427,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getCurrentNullCount()) .isEqualTo(0); assertThat(columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getDistinctValues()) - .isEqualTo(2); + .isEqualTo(InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1); assertThat(columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getNumberOfValues()) .isEqualTo(EP_NV_UNKNOWN); @@ -2433,7 +2441,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getCurrentNullCount()) .isEqualTo(1); assertThat(columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getDistinctValues()) - .isEqualTo(1); + .isEqualTo(InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1); assertThat(columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getNumberOfValues()) .isEqualTo(EP_NV_UNKNOWN); @@ -2456,7 +2464,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { columnEpStats .get(enableIcebergStreaming ? "6" : "COLMAP.key_value.key") .getDistinctValues()) - .isEqualTo(2); + .isEqualTo(InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 2 : -1); assertThat( columnEpStats .get(enableIcebergStreaming ? "6" : "COLMAP.key_value.key") @@ -2482,7 +2490,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { columnEpStats .get(enableIcebergStreaming ? "7" : "COLMAP.key_value.value") .getDistinctValues()) - .isEqualTo(1); + .isEqualTo(InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1); assertThat( columnEpStats .get(enableIcebergStreaming ? "7" : "COLMAP.key_value.value") @@ -2508,7 +2516,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { columnEpStats .get(enableIcebergStreaming ? "8" : "COLARRAY.list.element") .getDistinctValues()) - .isEqualTo(1); + .isEqualTo(InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 1 : -1); assertThat( columnEpStats .get(enableIcebergStreaming ? "8" : "COLARRAY.list.element") diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 5d8ba41a1..17afada70 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -520,7 +520,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { new RowBufferStats( "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming)); EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats( @@ -577,7 +577,7 @@ private Pair, Set> getRetryBlobMetadata( new RowBufferStats( "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), - enableIcebergStreaming, + InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableIcebergStreaming)); EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java new file mode 100644 index 000000000..7b9319985 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/SubscopedTokenRefreshIT.java @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal.it; + +import static net.snowflake.ingest.utils.Constants.REFRESH_TABLE_INFORMATION_ENDPOINT; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; +import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG; + +import com.google.common.collect.ImmutableMap; +import java.sql.Connection; +import java.util.Properties; +import net.snowflake.ingest.IcebergIT; +import net.snowflake.ingest.TestUtils; +import net.snowflake.ingest.connection.RequestBuilder; +import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; +import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal; +import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.HttpUtil; +import net.snowflake.ingest.utils.SnowflakeURL; +import net.snowflake.ingest.utils.Utils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(IcebergIT.class) +public class SubscopedTokenRefreshIT { + + private String database; + private String schema; + private Connection conn; + private SnowflakeStreamingIngestClientInternal client; + private RequestBuilder requestBuilder; + + @Before + public void before() throws Exception { + database = String.format("SDK_TOKEN_EXPIRE_IT_DB_%d", System.nanoTime()); + schema = "PUBLIC"; + + conn = TestUtils.getConnection(true); + + conn.createStatement().execute(String.format("create or replace database %s;", database)); + conn.createStatement().execute(String.format("use database %s;", database)); + conn.createStatement().execute(String.format("use schema %s;", schema)); + conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse())); + + SnowflakeURL url = new SnowflakeURL(TestUtils.getAccountURL()); + Properties properties = TestUtils.getProperties(Constants.BdecVersion.THREE, false); + properties.setProperty(ENABLE_ICEBERG_STREAMING, "true"); + properties.setProperty(MAX_CLIENT_LAG, "1000"); + requestBuilder = + Mockito.spy( + new RequestBuilder( + url, + TestUtils.getUser(), + TestUtils.getKeyPair(), + HttpUtil.getHttpClient(url.getAccount()), + true /* enableIcebergStreaming */, + "client1")); + this.client = + new SnowflakeStreamingIngestClientInternal<>( + "client1", + url, + Utils.createProperties(properties), + HttpUtil.getHttpClient(url.getAccount()), + false /* isTestMode */, + requestBuilder, + null /* parameterOverrides */); + } + + @After + public void after() throws Exception { + conn.createStatement().execute(String.format("drop database if exists %s;", database)); + } + + @Test + public void testTokenExpire() throws Exception { + String tableName = "test_token_expire_table"; + + /* + * Minimum duration of token for each cloud storage: + * - S3: 900 seconds + * - GCS: 600 seconds + * - Azure: 300 seconds + */ + int duration = 900; + createIcebergTable(tableName); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_S3=%s", + tableName, duration)); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_GCS=%s", + tableName, duration)); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_AZURE=%s", + tableName, duration)); + conn.createStatement() + .execute( + String.format( + "alter iceberg table %s set" + + " STREAMING_ICEBERG_INGESTION_SUBSCOPED_TOKEN_DURATION_SECONDS_DEFAULT=%s", + tableName, duration)); + + SnowflakeStreamingIngestChannel channel = + client.openChannel( + OpenChannelRequest.builder("CHANNEL") + .setDBName(database) + .setSchemaName(schema) + .setTableName(tableName) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.ABORT) + .build()); + + /* Refresh table information should be called once channel is opened */ + Mockito.verify(requestBuilder, Mockito.times(1)) + .generateStreamingIngestPostRequest( + Mockito.anyString(), + Mockito.eq(REFRESH_TABLE_INFORMATION_ENDPOINT), + Mockito.eq("refresh table information")); + + channel.insertRow(ImmutableMap.of("int_col", 1), "1"); + TestUtils.waitForOffset(channel, "1"); + + /* Wait for token to expire */ + + Thread.sleep((duration + 1) * 1000); + + /* Insert a row to trigger token generation */ + channel.insertRow(ImmutableMap.of("int_col", 2), "2"); + TestUtils.waitForOffset(channel, "2"); + Mockito.verify(requestBuilder, Mockito.times(2)) + .generateStreamingIngestPostRequest( + Mockito.anyString(), + Mockito.eq(REFRESH_TABLE_INFORMATION_ENDPOINT), + Mockito.eq("refresh table information")); + } + + private void createIcebergTable(String tableName) throws Exception { + conn.createStatement() + .execute( + String.format( + "create or replace iceberg table %s(int_col int)" + + "catalog = 'SNOWFLAKE' " + + "external_volume = 'streaming_ingest' " + + "base_location = 'SDK_IT/%s/%s'", + tableName, database, tableName)); + } +}