Skip to content

Commit

Permalink
Add parameters & disable interleaving mode
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed May 23, 2024
1 parent 5e93b3d commit 1cb85e9
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public static class Builder {
// Allows client to override some default parameter values
private Map<String, Object> parameterOverrides;

// Indicates whether it's streaming to Iceberg tables
private boolean isIcebergMode;

// Indicates whether it's under test mode
private boolean isTestMode;

Expand All @@ -45,6 +48,11 @@ public Builder setParameterOverrides(Map<String, Object> parameterOverrides) {
return this;
}

public Builder setIcebergMode(boolean isIcebergMode) {
this.isIcebergMode = isIcebergMode;
return this;
}

public Builder setIsTestMode(boolean isTestMode) {
this.isTestMode = isTestMode;
return this;
Expand All @@ -58,7 +66,12 @@ public SnowflakeStreamingIngestClient build() {
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));

return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, this.isTestMode);
this.name,
accountURL,
prop,
this.parameterOverrides,
this.isIcebergMode,
this.isTestMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ List<List<ChannelData<T>>> getData() {
// blob encoding version
private final Constants.BdecVersion bdecVersion;

// Indicates if it's flushing to Iceberg tables, a blob could only contain one chunk under Iceberg
// mode
private final boolean isIcebergMode;

/**
* Constructor for TESTING that takes (usually mocked) StreamingIngestStage
*
Expand All @@ -134,6 +138,7 @@ List<List<ChannelData<T>>> getData() {
SnowflakeStreamingIngestClientInternal<T> client,
ChannelCache<T> cache,
StreamingIngestStage targetStage, // For testing
boolean isIcebergMode,
boolean isTestMode) {
this.owningClient = client;
this.channelCache = cache;
Expand All @@ -142,6 +147,7 @@ List<List<ChannelData<T>>> getData() {
this.registerService = new RegisterService<>(client, isTestMode);
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
this.isIcebergMode = isIcebergMode;
this.isTestMode = isTestMode;
this.latencyTimerContextMap = new ConcurrentHashMap<>();
this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
Expand All @@ -156,7 +162,10 @@ List<List<ChannelData<T>>> getData() {
* @param isTestMode
*/
FlushService(
SnowflakeStreamingIngestClientInternal<T> client, ChannelCache<T> cache, boolean isTestMode) {
SnowflakeStreamingIngestClientInternal<T> client,
ChannelCache<T> cache,
boolean isIcebergMode,
boolean isTestMode) {
this.owningClient = client;
this.channelCache = cache;
try {
Expand All @@ -176,6 +185,7 @@ List<List<ChannelData<T>>> getData() {
this.counter = new AtomicLong(0);
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
this.isIcebergMode = isIcebergMode;
this.isTestMode = isTestMode;
this.latencyTimerContextMap = new ConcurrentHashMap<>();
this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
Expand Down Expand Up @@ -437,7 +447,7 @@ && shouldStopProcessing(
}
// Add processed channels to the current blob, stop if we need to create a new blob
blobData.add(channelsDataPerTable.subList(0, idx));
if (idx != channelsDataPerTable.size()) {
if (idx != channelsDataPerTable.size() || isIcebergMode) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
// Indicates whether the client has closed
private volatile boolean isClosed;

// Indicates wheter the client is streaming to Iceberg tables
private final boolean isIcebergMode;

// Indicates whether the client is under test mode
private final boolean isTestMode;

Expand Down Expand Up @@ -152,6 +155,7 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
* @param accountURL Snowflake account url
* @param prop connection properties
* @param httpClient http client for sending request
* @param isIcebergMode whether we're streaming to iceberg tables
* @param isTestMode whether we're under test mode
* @param requestBuilder http request builder
* @param parameterOverrides parameters we override in case we want to set different values
Expand All @@ -161,13 +165,15 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
SnowflakeURL accountURL,
Properties prop,
CloseableHttpClient httpClient,
boolean isIcebergMode,
boolean isTestMode,
RequestBuilder requestBuilder,
Map<String, Object> parameterOverrides) {
this.parameterProvider = new ParameterProvider(parameterOverrides, prop);

this.name = name;
String accountName = accountURL == null ? null : accountURL.getAccount();
this.isIcebergMode = isIcebergMode;
this.isTestMode = isTestMode;
this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient;
this.channelCache = new ChannelCache<>();
Expand Down Expand Up @@ -229,7 +235,8 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
}

try {
this.flushService = new FlushService<>(this, this.channelCache, this.isTestMode);
this.flushService =
new FlushService<>(this, this.channelCache, this.isIcebergMode, this.isTestMode);
} catch (Exception e) {
// Need to clean up the resources before throwing any exceptions
cleanUpResources();
Expand Down Expand Up @@ -258,16 +265,17 @@ public SnowflakeStreamingIngestClientInternal(
SnowflakeURL accountURL,
Properties prop,
Map<String, Object> parameterOverrides,
boolean isIcebergMode,
boolean isTestMode) {
this(name, accountURL, prop, null, isTestMode, null, parameterOverrides);
this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides);
}

/*** Constructor for TEST ONLY
*
* @param name the name of the client
*/
SnowflakeStreamingIngestClientInternal(String name) {
this(name, null, null, null, true, null, new HashMap<>());
this(name, null, null, null, false, true, null, new HashMap<>());
}

// TESTING ONLY - inject the request builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private abstract static class TestContextFactory<T> {
this.name = name;
}

abstract TestContext<T> create();
abstract TestContext<T> create(boolean isIcebergMode);

@Override
public String toString() {
Expand All @@ -83,7 +83,7 @@ private abstract static class TestContext<T> implements AutoCloseable {

final List<ChannelData<T>> channelData = new ArrayList<>();

TestContext() {
TestContext(boolean isIcebergMode) {
stage = Mockito.mock(StreamingIngestStage.class);
Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix");
parameterProvider = new ParameterProvider();
Expand All @@ -92,7 +92,8 @@ private abstract static class TestContext<T> implements AutoCloseable {
channelCache = new ChannelCache<>();
Mockito.when(client.getChannelCache()).thenReturn(channelCache);
registerService = Mockito.spy(new RegisterService(client, client.isTestMode()));
flushService = Mockito.spy(new FlushService<>(client, channelCache, stage, true));
flushService =
Mockito.spy(new FlushService<>(client, channelCache, stage, isIcebergMode, true));
}

ChannelData<T> flushChannel(String name) {
Expand Down Expand Up @@ -233,6 +234,10 @@ static RowSetBuilder newBuilder() {

private static class ParquetTestContext extends TestContext<List<List<Object>>> {

ParquetTestContext(boolean isIcebergMode) {
super(isIcebergMode);
}

SnowflakeStreamingIngestChannelInternal<List<List<Object>>> createChannel(
String name,
String dbName,
Expand Down Expand Up @@ -268,8 +273,8 @@ public void close() {}
static TestContextFactory<List<List<Object>>> createFactory() {
return new TestContextFactory<List<List<Object>>>("Parquet") {
@Override
TestContext<List<List<Object>>> create() {
return new ParquetTestContext();
TestContext<List<List<Object>>> create(boolean isIcebergMode) {
return new ParquetTestContext(isIcebergMode);
}
};
}
Expand Down Expand Up @@ -388,7 +393,7 @@ private static ColumnMetadata createLargeTestTextColumn(String name) {

@Test
public void testGetFilePath() {
TestContext<?> testContext = testContextFactory.create();
TestContext<?> testContext = testContextFactory.create(false);
FlushService<?> flushService = testContext.flushService;
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
String clientPrefix = "honk";
Expand Down Expand Up @@ -422,7 +427,7 @@ public void testGetFilePath() {

@Test
public void testFlush() throws Exception {
TestContext<?> testContext = testContextFactory.create();
TestContext<?> testContext = testContextFactory.create(false);
FlushService<?> flushService = testContext.flushService;
Mockito.when(flushService.isTestMode()).thenReturn(false);

Expand Down Expand Up @@ -450,7 +455,7 @@ public void testFlush() throws Exception {

@Test
public void testBlobCreation() throws Exception {
TestContext<?> testContext = testContextFactory.create();
TestContext<?> testContext = testContextFactory.create(false);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel2 = addChannel2(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel4 = addChannel4(testContext);
Expand Down Expand Up @@ -485,7 +490,7 @@ public void testBlobCreation() throws Exception {

@Test
public void testBlobSplitDueToDifferentSchema() throws Exception {
TestContext<?> testContext = testContextFactory.create();
TestContext<?> testContext = testContextFactory.create(false);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel2 = addChannel2(testContext);
String colName1 = "testBlobSplitDueToDifferentSchema1";
Expand Down Expand Up @@ -534,7 +539,7 @@ public void testBlobSplitDueToDifferentSchema() throws Exception {

@Test
public void testBlobSplitDueToChunkSizeLimit() throws Exception {
TestContext<?> testContext = testContextFactory.create();
TestContext<?> testContext = testContextFactory.create(false);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel2 = addChannel2(testContext);
String colName1 = "testBlobSplitDueToChunkSizeLimit1";
Expand Down Expand Up @@ -588,7 +593,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti
/ channelsPerTable
/ ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT);

final TestContext<List<List<Object>>> testContext = testContextFactory.create();
final TestContext<List<List<Object>>> testContext = testContextFactory.create(false);

for (int i = 0; i < numberOfRows; i++) {
SnowflakeStreamingIngestChannelInternal<List<List<Object>>> channel =
Expand All @@ -614,7 +619,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti

@Test
public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Exception {
final TestContext<List<List<Object>>> testContext = testContextFactory.create();
final TestContext<List<List<Object>>> testContext = testContextFactory.create(false);

for (int i = 0; i < 99; i++) { // 19 simple chunks
SnowflakeStreamingIngestChannelInternal<List<List<Object>>> channel =
Expand Down Expand Up @@ -655,6 +660,32 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except
Assert.assertEquals(102, getRows(allUploadedBlobs).size());
}

@Test
public void testBlobSplitDueToIcebergMode() throws Exception {
int numberOfTables = 5;
int channelsPerTable = 5;
final TestContext<List<List<Object>>> testContext = testContextFactory.create(true);

for (int i = 0; i < numberOfTables * channelsPerTable; i++) {
SnowflakeStreamingIngestChannelInternal<List<List<Object>>> channel =
addChannel(testContext, i % numberOfTables, 1);
channel.setupSchema(Collections.singletonList(createTestTextColumn("C1")));
channel.insertRow(Collections.singletonMap("C1", i), "");
}

FlushService<List<List<Object>>> flushService = testContext.flushService;
flushService.flush(true).get();

ArgumentCaptor<List<List<ChannelData<List<List<Object>>>>>> blobDataCaptor =
ArgumentCaptor.forClass(List.class);
Mockito.verify(flushService, Mockito.times(numberOfTables))
.buildAndUpload(Mockito.any(), blobDataCaptor.capture());

List<List<List<ChannelData<List<List<Object>>>>>> allUploadedBlobs =
blobDataCaptor.getAllValues();
allUploadedBlobs.forEach(chunks -> Assert.assertEquals(1, chunks.size()));
}

private List<List<Object>> getRows(List<List<List<ChannelData<List<List<Object>>>>>> blobs) {
List<List<Object>> result = new ArrayList<>();
blobs.forEach(
Expand All @@ -672,7 +703,7 @@ public void testBuildAndUpload() throws Exception {
long expectedBuildLatencyMs = 100;
long expectedUploadLatencyMs = 200;

TestContext<?> testContext = testContextFactory.create();
TestContext<?> testContext = testContextFactory.create(false);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel2 = addChannel2(testContext);
String colName1 = "testBuildAndUpload1";
Expand Down Expand Up @@ -820,7 +851,7 @@ public void testBuildAndUpload() throws Exception {

@Test
public void testBuildErrors() throws Exception {
TestContext<?> testContext = testContextFactory.create();
TestContext<?> testContext = testContextFactory.create(false);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);
SnowflakeStreamingIngestChannelInternal<?> channel3 = addChannel3(testContext);
String colName1 = "testBuildErrors1";
Expand Down Expand Up @@ -915,7 +946,7 @@ public void testInvalidateChannels() {
StreamingIngestStage stage = Mockito.mock(StreamingIngestStage.class);
Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix");
FlushService<StubChunkData> flushService =
new FlushService<>(client, channelCache, stage, false);
new FlushService<>(client, channelCache, stage, false, false);
flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test");

Assert.assertFalse(channel1.isValid());
Expand All @@ -924,7 +955,7 @@ public void testInvalidateChannels() {

@Test
public void testBlobBuilder() throws Exception {
TestContext<?> testContext = testContextFactory.create();
TestContext<?> testContext = testContextFactory.create(false);
SnowflakeStreamingIngestChannelInternal<?> channel1 = addChannel1(testContext);

ObjectMapper mapper = new ObjectMapper();
Expand Down Expand Up @@ -1026,7 +1057,7 @@ public void testBlobBuilder() throws Exception {

@Test
public void testShutDown() throws Exception {
TestContext<?> testContext = testContextFactory.create();
TestContext<?> testContext = testContextFactory.create(false);
FlushService<?> flushService = testContext.flushService;

Assert.assertFalse(flushService.buildUploadWorkers.isShutdown());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ public void testOpenChannelErrorResponse() throws Exception {
new SnowflakeURL("snowflake.dev.local:8082"),
null,
httpClient,
false,
true,
requestBuilder,
null);
Expand Down Expand Up @@ -417,6 +418,7 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception {
new SnowflakeURL("snowflake.dev.local:8082"),
null,
httpClient,
false,
true,
requestBuilder,
null);
Expand Down Expand Up @@ -499,6 +501,7 @@ public void testOpenChannelSuccessResponse() throws Exception {
new SnowflakeURL("snowflake.dev.local:8082"),
null,
httpClient,
false,
true,
requestBuilder,
null);
Expand Down
Loading

0 comments on commit 1cb85e9

Please sign in to comment.