diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index d4fe97ec6..6f5296209 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -192,12 +192,12 @@ private CompletableFuture distributeFlush( /** If tracing is enabled, print always else, check if it needs flush or is forceful. */ private void logFlushTask(boolean isForce, Set tablesToFlush, long flushStartTime) { boolean isNeedFlush = - this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1 + this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1 ? tablesToFlush.stream().anyMatch(channelCache::getNeedFlush) : this.isNeedFlush; long currentTime = System.currentTimeMillis(); final String logInfo; - if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) { logInfo = String.format( "Tables=[%s]", @@ -272,7 +272,7 @@ CompletableFuture flush(boolean isForce) { this.owningClient.getParameterProvider().getCachedMaxClientLagInMs(); final Set tablesToFlush; - if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) { tablesToFlush = this.channelCache.keySet().stream() .filter( @@ -694,7 +694,7 @@ void shutdown() throws InterruptedException { */ void setNeedFlush(String fullyQualifiedTableName) { this.isNeedFlush = true; - if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) { this.channelCache.setNeedFlush(fullyQualifiedTableName, true); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 8048020d5..76cf7551d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -122,7 +122,7 @@ private abstract static class TestContext implements AutoCloseable { } void setParameterOverride(Map parameterOverride) { - this.parameterProvider = new ParameterProvider(parameterOverride, null); + this.parameterProvider = new ParameterProvider(parameterOverride, null, isIcebergMode); } ChannelData flushChannel(String name) { @@ -455,10 +455,17 @@ public void testGetFilePath() { } @Test - public void testFlush() throws Exception { + public void testInterleaveFlush() throws Exception { + if (isIcebergMode) { + // Interleaved blob is not supported in iceberg mode + return; + } int numChannels = 4; Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow TestContext>> testContext = testContextFactory.create(); + testContext.setParameterOverride( + Collections.singletonMap( + ParameterProvider.MAX_CHUNKS_IN_BLOB, ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT)); addChannel1(testContext); FlushService flushService = testContext.flushService; ChannelCache channelCache = testContext.channelCache; @@ -523,7 +530,7 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep ChannelCache channelCache = testContext.channelCache; Mockito.when(flushService.isTestMode()).thenReturn(false); testContext.setParameterOverride( - Collections.singletonMap(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 1)); + Collections.singletonMap(ParameterProvider.MAX_CHUNKS_IN_BLOB, 1)); // Test need flush IntStream.range(0, numChannels)