diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index cd6d78787..63faa9e19 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -28,6 +28,9 @@ public static class Builder { // Allows client to override some default parameter values private Map parameterOverrides; + // Indicates whether it's streaming to Iceberg tables + private boolean isIcebergMode; + // Indicates whether it's under test mode private boolean isTestMode; @@ -45,6 +48,11 @@ public Builder setParameterOverrides(Map parameterOverrides) { return this; } + public Builder setIcebergMode(boolean isIcebergMode) { + this.isIcebergMode = isIcebergMode; + return this; + } + public Builder setIsTestMode(boolean isTestMode) { this.isTestMode = isTestMode; return this; @@ -58,7 +66,12 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, accountURL, prop, this.parameterOverrides, this.isTestMode); + this.name, + accountURL, + prop, + this.parameterOverrides, + this.isIcebergMode, + this.isTestMode); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index f08196477..404e26fab 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -123,6 +123,10 @@ List>> getData() { // blob encoding version private final Constants.BdecVersion bdecVersion; + // Indicates if it's flushing to Iceberg tables, a blob could only contain one chunk under Iceberg + // mode + private final boolean isIcebergMode; + /** * Constructor for TESTING that takes (usually mocked) StreamingIngestStage * @@ -134,6 +138,7 @@ List>> getData() { SnowflakeStreamingIngestClientInternal client, ChannelCache cache, StreamingIngestStage targetStage, // For testing + boolean isIcebergMode, boolean isTestMode) { this.owningClient = client; this.channelCache = cache; @@ -142,6 +147,7 @@ List>> getData() { this.registerService = new RegisterService<>(client, isTestMode); this.isNeedFlush = false; this.lastFlushTime = System.currentTimeMillis(); + this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.latencyTimerContextMap = new ConcurrentHashMap<>(); this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion(); @@ -156,7 +162,10 @@ List>> getData() { * @param isTestMode */ FlushService( - SnowflakeStreamingIngestClientInternal client, ChannelCache cache, boolean isTestMode) { + SnowflakeStreamingIngestClientInternal client, + ChannelCache cache, + boolean isIcebergMode, + boolean isTestMode) { this.owningClient = client; this.channelCache = cache; try { @@ -176,6 +185,7 @@ List>> getData() { this.counter = new AtomicLong(0); this.isNeedFlush = false; this.lastFlushTime = System.currentTimeMillis(); + this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.latencyTimerContextMap = new ConcurrentHashMap<>(); this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion(); @@ -437,7 +447,7 @@ && shouldStopProcessing( } // Add processed channels to the current blob, stop if we need to create a new blob blobData.add(channelsDataPerTable.subList(0, idx)); - if (idx != channelsDataPerTable.size()) { + if (idx != channelsDataPerTable.size() || isIcebergMode) { break; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 2990b49d8..9faaf5d72 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -121,6 +121,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Indicates whether the client has closed private volatile boolean isClosed; + // Indicates wheter the client is streaming to Iceberg tables + private final boolean isIcebergMode; + // Indicates whether the client is under test mode private final boolean isTestMode; @@ -152,6 +155,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param httpClient http client for sending request + * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode whether we're under test mode * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values @@ -161,6 +165,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea SnowflakeURL accountURL, Properties prop, CloseableHttpClient httpClient, + boolean isIcebergMode, boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { @@ -168,6 +173,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); + this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient; this.channelCache = new ChannelCache<>(); @@ -229,7 +235,8 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea } try { - this.flushService = new FlushService<>(this, this.channelCache, this.isTestMode); + this.flushService = + new FlushService<>(this, this.channelCache, this.isIcebergMode, this.isTestMode); } catch (Exception e) { // Need to clean up the resources before throwing any exceptions cleanUpResources(); @@ -258,8 +265,9 @@ public SnowflakeStreamingIngestClientInternal( SnowflakeURL accountURL, Properties prop, Map parameterOverrides, + boolean isIcebergMode, boolean isTestMode) { - this(name, accountURL, prop, null, isTestMode, null, parameterOverrides); + this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides); } /*** Constructor for TEST ONLY @@ -267,7 +275,7 @@ public SnowflakeStreamingIngestClientInternal( * @param name the name of the client */ SnowflakeStreamingIngestClientInternal(String name) { - this(name, null, null, null, true, null, new HashMap<>()); + this(name, null, null, null, false, true, null, new HashMap<>()); } // TESTING ONLY - inject the request builder diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index f200c7177..8c90150f0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -64,7 +64,7 @@ private abstract static class TestContextFactory { this.name = name; } - abstract TestContext create(); + abstract TestContext create(boolean isIcebergMode); @Override public String toString() { @@ -83,7 +83,7 @@ private abstract static class TestContext implements AutoCloseable { final List> channelData = new ArrayList<>(); - TestContext() { + TestContext(boolean isIcebergMode) { stage = Mockito.mock(StreamingIngestStage.class); Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); parameterProvider = new ParameterProvider(); @@ -92,7 +92,8 @@ private abstract static class TestContext implements AutoCloseable { channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); registerService = Mockito.spy(new RegisterService(client, client.isTestMode())); - flushService = Mockito.spy(new FlushService<>(client, channelCache, stage, true)); + flushService = + Mockito.spy(new FlushService<>(client, channelCache, stage, isIcebergMode, true)); } ChannelData flushChannel(String name) { @@ -233,6 +234,10 @@ static RowSetBuilder newBuilder() { private static class ParquetTestContext extends TestContext>> { + ParquetTestContext(boolean isIcebergMode) { + super(isIcebergMode); + } + SnowflakeStreamingIngestChannelInternal>> createChannel( String name, String dbName, @@ -268,8 +273,8 @@ public void close() {} static TestContextFactory>> createFactory() { return new TestContextFactory>>("Parquet") { @Override - TestContext>> create() { - return new ParquetTestContext(); + TestContext>> create(boolean isIcebergMode) { + return new ParquetTestContext(isIcebergMode); } }; } @@ -388,7 +393,7 @@ private static ColumnMetadata createLargeTestTextColumn(String name) { @Test public void testGetFilePath() { - TestContext testContext = testContextFactory.create(); + TestContext testContext = testContextFactory.create(false); FlushService flushService = testContext.flushService; Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String clientPrefix = "honk"; @@ -422,7 +427,7 @@ public void testGetFilePath() { @Test public void testFlush() throws Exception { - TestContext testContext = testContextFactory.create(); + TestContext testContext = testContextFactory.create(false); FlushService flushService = testContext.flushService; Mockito.when(flushService.isTestMode()).thenReturn(false); @@ -450,7 +455,7 @@ public void testFlush() throws Exception { @Test public void testBlobCreation() throws Exception { - TestContext testContext = testContextFactory.create(); + TestContext testContext = testContextFactory.create(false); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); SnowflakeStreamingIngestChannelInternal channel2 = addChannel2(testContext); SnowflakeStreamingIngestChannelInternal channel4 = addChannel4(testContext); @@ -485,7 +490,7 @@ public void testBlobCreation() throws Exception { @Test public void testBlobSplitDueToDifferentSchema() throws Exception { - TestContext testContext = testContextFactory.create(); + TestContext testContext = testContextFactory.create(false); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); SnowflakeStreamingIngestChannelInternal channel2 = addChannel2(testContext); String colName1 = "testBlobSplitDueToDifferentSchema1"; @@ -534,7 +539,7 @@ public void testBlobSplitDueToDifferentSchema() throws Exception { @Test public void testBlobSplitDueToChunkSizeLimit() throws Exception { - TestContext testContext = testContextFactory.create(); + TestContext testContext = testContextFactory.create(false); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); SnowflakeStreamingIngestChannelInternal channel2 = addChannel2(testContext); String colName1 = "testBlobSplitDueToChunkSizeLimit1"; @@ -588,7 +593,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti / channelsPerTable / ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); - final TestContext>> testContext = testContextFactory.create(); + final TestContext>> testContext = testContextFactory.create(false); for (int i = 0; i < numberOfRows; i++) { SnowflakeStreamingIngestChannelInternal>> channel = @@ -614,7 +619,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti @Test public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Exception { - final TestContext>> testContext = testContextFactory.create(); + final TestContext>> testContext = testContextFactory.create(false); for (int i = 0; i < 99; i++) { // 19 simple chunks SnowflakeStreamingIngestChannelInternal>> channel = @@ -655,6 +660,32 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except Assert.assertEquals(102, getRows(allUploadedBlobs).size()); } + @Test + public void testBlobSplitDueToIcebergMode() throws Exception { + int numberOfTables = 5; + int channelsPerTable = 5; + final TestContext>> testContext = testContextFactory.create(true); + + for (int i = 0; i < numberOfTables * channelsPerTable; i++) { + SnowflakeStreamingIngestChannelInternal>> channel = + addChannel(testContext, i % numberOfTables, 1); + channel.setupSchema(Collections.singletonList(createTestTextColumn("C1"))); + channel.insertRow(Collections.singletonMap("C1", i), ""); + } + + FlushService>> flushService = testContext.flushService; + flushService.flush(true).get(); + + ArgumentCaptor>>>>> blobDataCaptor = + ArgumentCaptor.forClass(List.class); + Mockito.verify(flushService, Mockito.times(numberOfTables)) + .buildAndUpload(Mockito.any(), blobDataCaptor.capture()); + + List>>>>> allUploadedBlobs = + blobDataCaptor.getAllValues(); + allUploadedBlobs.forEach(chunks -> Assert.assertEquals(1, chunks.size())); + } + private List> getRows(List>>>>> blobs) { List> result = new ArrayList<>(); blobs.forEach( @@ -672,7 +703,7 @@ public void testBuildAndUpload() throws Exception { long expectedBuildLatencyMs = 100; long expectedUploadLatencyMs = 200; - TestContext testContext = testContextFactory.create(); + TestContext testContext = testContextFactory.create(false); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); SnowflakeStreamingIngestChannelInternal channel2 = addChannel2(testContext); String colName1 = "testBuildAndUpload1"; @@ -820,7 +851,7 @@ public void testBuildAndUpload() throws Exception { @Test public void testBuildErrors() throws Exception { - TestContext testContext = testContextFactory.create(); + TestContext testContext = testContextFactory.create(false); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); SnowflakeStreamingIngestChannelInternal channel3 = addChannel3(testContext); String colName1 = "testBuildErrors1"; @@ -915,7 +946,7 @@ public void testInvalidateChannels() { StreamingIngestStage stage = Mockito.mock(StreamingIngestStage.class); Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); FlushService flushService = - new FlushService<>(client, channelCache, stage, false); + new FlushService<>(client, channelCache, stage, false, false); flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test"); Assert.assertFalse(channel1.isValid()); @@ -924,7 +955,7 @@ public void testInvalidateChannels() { @Test public void testBlobBuilder() throws Exception { - TestContext testContext = testContextFactory.create(); + TestContext testContext = testContextFactory.create(false); SnowflakeStreamingIngestChannelInternal channel1 = addChannel1(testContext); ObjectMapper mapper = new ObjectMapper(); @@ -1026,7 +1057,7 @@ public void testBlobBuilder() throws Exception { @Test public void testShutDown() throws Exception { - TestContext testContext = testContextFactory.create(); + TestContext testContext = testContextFactory.create(false); FlushService flushService = testContext.flushService; Assert.assertFalse(flushService.buildUploadWorkers.isShutdown()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 87e3f8f11..5644723c1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -347,6 +347,7 @@ public void testOpenChannelErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -417,6 +418,7 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -499,6 +501,7 @@ public void testOpenChannelSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 1693e1520..770856b18 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -344,6 +344,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -404,6 +405,7 @@ public void testDropChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -449,6 +451,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -677,6 +680,7 @@ public void testGetRetryBlobs() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -718,6 +722,7 @@ public void testRegisterBlobErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -765,6 +770,7 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -821,6 +827,7 @@ public void testRegisterBlobSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -905,6 +912,7 @@ public void testRegisterBlobsRetries() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -934,6 +942,7 @@ public void testRegisterBlobChunkLimit() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null)); @@ -1106,6 +1115,7 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -1180,6 +1190,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null); @@ -1372,6 +1383,7 @@ public void testFlushServiceException() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, parameterMap); @@ -1408,6 +1420,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + false, true, requestBuilder, null);