Skip to content

Commit

Permalink
Merge branch 'master' into ggeng_SNOW-1814204-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ggeng authored Nov 21, 2024
2 parents 97e7d13 + 1f6fa8e commit 385fd24
Show file tree
Hide file tree
Showing 18 changed files with 462 additions and 124 deletions.
Binary file modified profile.json.gpg
Binary file not shown.
Binary file modified profile_azure.json.gpg
Binary file not shown.
Binary file modified profile_gcp.json.gpg
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,9 @@ public BlobPath(String uploadPath, String fileRegistrationPath) {
this.uploadPath = uploadPath;
this.fileRegistrationPath = fileRegistrationPath;
}

@Override
public String toString() {
return String.format("uploadPath=%s fileRegistrationPath=%s", uploadPath, fileRegistrationPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,8 @@ public boolean isNullable() {
* when this object represents an iceberg table's column, null otherwise. The String returned from
* here is meant to conform to the json schema specified here:
* https://iceberg.apache.org/spec/#appendix-c-json-serialization
*
* <p>Make this a public API when the Builder.setEnableIcebergStreaming API is made public.
*/
String getIcebergSchema() {
public String getIcebergSchema() {
return icebergColumnSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ class InternalParameterProvider {
public static final boolean ENABLE_VALUES_COUNT_DEFAULT = false;

private final boolean enableIcebergStreaming;
private final boolean enableNDVTracking;

InternalParameterProvider(boolean enableIcebergStreaming) {
InternalParameterProvider(boolean enableIcebergStreaming, boolean enableNDVTracking) {
this.enableIcebergStreaming = enableIcebergStreaming;
this.enableNDVTracking = enableNDVTracking;
}

boolean getEnableChunkEncryption() {
Expand All @@ -38,7 +40,7 @@ boolean setIcebergSpecificFieldsInEp() {

boolean isEnableDistinctValuesCount() {
// When in Iceberg mode, we enabled distinct values count in EP metadata.
return enableIcebergStreaming;
return enableIcebergStreaming && enableNDVTracking;
}

boolean isEnableValuesCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
private final ChannelCache<T> channelCache;

// Reference to the flush service
private final FlushService<T> flushService;
private FlushService<T> flushService;

// Reference to storage manager
private IStorageManager storageManager;
Expand Down Expand Up @@ -157,7 +157,8 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
* @param requestBuilder http request builder
* @param parameterOverrides parameters we override in case we want to set different values
*/
SnowflakeStreamingIngestClientInternal(
@VisibleForTesting
public SnowflakeStreamingIngestClientInternal(
String name,
SnowflakeURL accountURL,
Properties prop,
Expand All @@ -167,7 +168,8 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
Map<String, Object> parameterOverrides) {
this.parameterProvider = new ParameterProvider(parameterOverrides, prop);
this.internalParameterProvider =
new InternalParameterProvider(parameterProvider.isEnableIcebergStreaming());
new InternalParameterProvider(
parameterProvider.isEnableIcebergStreaming(), false /* enableNDVCount */);

this.name = name;
String accountName = accountURL == null ? null : accountURL.getAccount();
Expand Down Expand Up @@ -218,14 +220,16 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
prop.getProperty(Constants.OAUTH_REFRESH_TOKEN),
oAuthTokenEndpoint);
}
this.requestBuilder =
new RequestBuilder(
accountURL,
prop.get(USER).toString(),
credential,
this.httpClient,
parameterProvider.isEnableIcebergStreaming(),
String.format("%s_%s", this.name, System.currentTimeMillis()));
if (this.requestBuilder == null) {
this.requestBuilder =
new RequestBuilder(
accountURL,
prop.get(USER).toString(),
credential,
this.httpClient,
parameterProvider.isEnableIcebergStreaming(),
String.format("%s_%s", this.name, System.currentTimeMillis()));
}

logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ class SubscopedTokenExternalVolumeManager implements IStorageManager {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
}
logger.logDebug(
"Created SubscopedTokenExternalVolumeManager with clientName=%s and clientPrefix=%s",
clientName, clientPrefix);
"Created SubscopedTokenExternalVolumeManager with clientName={} and clientPrefix={}",
clientName,
clientPrefix);
}

/**
Expand Down Expand Up @@ -93,12 +94,12 @@ private InternalStage createStageForTable(TableRef tableRef) {
this, clientName, getClientPrefix(), tableRef, locationInfo, DEFAULT_MAX_UPLOAD_RETRIES);
} catch (SFException ex) {
logger.logError(
"ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex);
"ExtVolManager.registerTable for tableRef={} failed with exception={}", tableRef, ex);
// allow external volume ctor's SFExceptions to bubble up directly
throw ex;
} catch (Exception err) {
logger.logError(
"ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err);
"ExtVolManager.registerTable for tableRef={} failed with exception={}", tableRef, err);
throw new SFException(
err,
ErrorCode.UNABLE_TO_CONNECT_TO_STAGE,
Expand Down Expand Up @@ -127,8 +128,9 @@ static BlobPath generateBlobPathFromLocationInfoPath(
String[] parts = filePathRelativeToVolume.split("/");
if (parts.length < 5) {
logger.logError(
"Invalid file path returned by server. Table=%s FilePathRelativeToVolume=%s",
fullyQualifiedTableName, filePathRelativeToVolume);
"Invalid file path returned by server. Table={} FilePathRelativeToVolume={}",
fullyQualifiedTableName,
filePathRelativeToVolume);
throw new SFException(ErrorCode.INTERNAL_ERROR, "File path returned by server is invalid");
}

Expand Down Expand Up @@ -166,17 +168,19 @@ public FileLocationInfo getRefreshedLocation(TableRef tableRef, Optional<String>
RefreshTableInformationResponse response =
this.serviceClient.refreshTableInformation(
new RefreshTableInformationRequest(tableRef, this.role, true));
logger.logDebug("Refreshed tokens for table=%s", tableRef);
logger.logDebug("Refreshed tokens for table={}", tableRef);
if (response.getIcebergLocationInfo() == null) {
logger.logError(
"Did not receive location info, this will cause ingestion to grind to a halt."
+ " TableRef=%s");
+ " TableRef={}",
tableRef);
} else {
Map<String, String> creds = response.getIcebergLocationInfo().getCredentials();
if (creds == null || creds.isEmpty()) {
logger.logError(
"Did not receive creds in location info, this will cause ingestion to grind to a"
+ " halt. TableRef=%s");
+ " halt. TableRef={}",
tableRef);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/test/java/net/snowflake/ingest/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public static Connection getConnection(boolean isStreamingConnection) throws Exc
props.put("warehouse", warehouse);
props.put("client_session_keep_alive", "true");
props.put("privateKey", privateKey);
props.put("role", role);

if (isStreamingConnection) {
streamingConn = DriverManager.getConnection(connectString, props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testSerializationErrors() throws Exception {
"a.bdec",
Collections.singletonList(createChannelDataPerTable(1)),
Constants.BdecVersion.THREE,
new InternalParameterProvider(enableIcebergStreaming),
new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */),
encryptionKeysPerTable);

// Construction fails if metadata contains 0 rows and data 1 row
Expand All @@ -68,7 +68,7 @@ public void testSerializationErrors() throws Exception {
"a.bdec",
Collections.singletonList(createChannelDataPerTable(0)),
Constants.BdecVersion.THREE,
new InternalParameterProvider(enableIcebergStreaming),
new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */),
encryptionKeysPerTable);
} catch (SFException e) {
Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode());
Expand All @@ -93,7 +93,7 @@ public void testMetadataAndExtendedMetadataSize() throws Exception {
"a.parquet",
Collections.singletonList(createChannelDataPerTable(1)),
Constants.BdecVersion.THREE,
new InternalParameterProvider(enableIcebergStreaming),
new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */),
new ConcurrentHashMap<>());

InputFile blobInputFile = new InMemoryInputFile(blob.blobBytes);
Expand Down Expand Up @@ -188,7 +188,7 @@ private List<ChannelData<ParquetChunkData>> createChannelDataPerTable(int metada
.as(LogicalTypeAnnotation.stringType())
.id(1)
.named("test"),
enableIcebergStreaming,
InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT,
enableIcebergStreaming)
: new RowBufferStats(columnName, null, 1, null, null, false, false));
channelData.setChannelContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public static Object[] enableNDVAndNV() {
@Test
public void testGetCombinedColumnStatsMapNulls() {
Map<String, RowBufferStats> left = new HashMap<>();
RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV);
RowBufferStats leftStats1 =
new RowBufferStats(
"COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV);
left.put("one", leftStats1);
leftStats1.addIntValue(new BigInteger("10"));

Expand Down Expand Up @@ -56,12 +58,16 @@ public void testGetCombinedColumnStatsMapNulls() {
@Test
public void testGetCombinedColumnStatsMapMissingColumn() {
Map<String, RowBufferStats> left = new HashMap<>();
RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV);
RowBufferStats leftStats1 =
new RowBufferStats(
"COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV);
left.put("one", leftStats1);
leftStats1.addIntValue(new BigInteger("10"));

Map<String, RowBufferStats> right = new HashMap<>();
RowBufferStats rightStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV);
RowBufferStats rightStats1 =
new RowBufferStats(
"COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV);
right.put("foo", rightStats1);
rightStats1.addIntValue(new BigInteger("11"));

Expand Down Expand Up @@ -91,10 +97,18 @@ public void testGetCombinedColumnStatsMap() {
Map<String, RowBufferStats> left = new HashMap<>();
Map<String, RowBufferStats> right = new HashMap<>();

RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV);
RowBufferStats rightStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV);
RowBufferStats leftStats2 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV);
RowBufferStats rightStats2 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV);
RowBufferStats leftStats1 =
new RowBufferStats(
"COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV);
RowBufferStats rightStats1 =
new RowBufferStats(
"COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV);
RowBufferStats leftStats2 =
new RowBufferStats(
"COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV);
RowBufferStats rightStats2 =
new RowBufferStats(
"COL1", InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, enableNDVAndNV);

left.put("one", leftStats1);
left.put("two", leftStats2);
Expand Down Expand Up @@ -125,7 +139,9 @@ public void testGetCombinedColumnStatsMap() {
Assert.assertNull(oneCombined.getCurrentMinRealValue());
Assert.assertNull(oneCombined.getCurrentMaxRealValue());

Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getDistinctValues());
Assert.assertEquals(
InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 5 : -1,
oneCombined.getDistinctValues());
Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getNumberOfValues());

Assert.assertArrayEquals(
Expand All @@ -137,7 +153,9 @@ public void testGetCombinedColumnStatsMap() {
Assert.assertNull(twoCombined.getCurrentMinRealValue());
Assert.assertNull(twoCombined.getCurrentMaxRealValue());

Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getDistinctValues());
Assert.assertEquals(
InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT ? 5 : -1,
oneCombined.getDistinctValues());
Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getNumberOfValues());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void testFileColumnPropertiesConstructor() {
.as(LogicalTypeAnnotation.stringType())
.id(1)
.named("test"),
enableIcebergStreaming,
InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT,
enableIcebergStreaming)
: new RowBufferStats("COL", null, 1, null, null, false, false);
stats.addStrValue("bcd");
Expand All @@ -58,7 +58,7 @@ public void testFileColumnPropertiesConstructor() {
.as(LogicalTypeAnnotation.stringType())
.id(1)
.named("test"),
enableIcebergStreaming,
InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT,
enableIcebergStreaming)
: new RowBufferStats("COL", null, 1, null, null, false, false);
stats.addStrValue("aßßßßßßßßßßßßßßßß");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private abstract static class TestContext<T> implements AutoCloseable {
storage = Mockito.mock(InternalStage.class);
parameterProvider = createParameterProvider(enableIcebergStreaming);
InternalParameterProvider internalParameterProvider =
new InternalParameterProvider(enableIcebergStreaming);
new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */);
client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);
Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider);
Expand Down Expand Up @@ -905,13 +905,13 @@ public void testBuildAndUpload() throws Exception {
new RowBufferStats(
"COL1",
Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"),
enableIcebergStreaming,
InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT,
enableIcebergStreaming);
RowBufferStats stats2 =
new RowBufferStats(
"COL1",
Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"),
enableIcebergStreaming,
InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT,
enableIcebergStreaming);

eps1.put("one", stats1);
Expand Down Expand Up @@ -1078,7 +1078,7 @@ public void testInvalidateChannels() {
ParameterProvider parameterProvider = createParameterProvider(enableIcebergStreaming);
ChannelCache<StubChunkData> channelCache = new ChannelCache<>();
InternalParameterProvider internalParameterProvider =
new InternalParameterProvider(enableIcebergStreaming);
new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */);
Mockito.when(client.getChannelCache()).thenReturn(channelCache);
Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);
Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider);
Expand Down Expand Up @@ -1167,7 +1167,7 @@ public void testBlobBuilder() throws Exception {
new RowBufferStats(
"COL1",
Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"),
enableIcebergStreaming,
InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT,
enableIcebergStreaming);

eps1.put("one", stats1);
Expand Down
Loading

0 comments on commit 385fd24

Please sign in to comment.