From 965653766117a3dbb47711d6af8fa16f04cb7ed7 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Fri, 26 Jul 2024 15:17:03 -0700 Subject: [PATCH] SNOW-1512047 Introduce independent per-table flushes when interleaving is disabled (#788) --- .../streaming/internal/ChannelCache.java | 107 ++++++++++++- .../streaming/internal/FlushService.java | 150 ++++++++++++++---- ...owflakeStreamingIngestChannelInternal.java | 4 +- ...nowflakeStreamingIngestClientInternal.java | 4 +- .../streaming/internal/ChannelCacheTest.java | 12 +- .../streaming/internal/FlushServiceTest.java | 128 ++++++++++++++- .../streaming/internal/StreamingIngestIT.java | 2 +- 7 files changed, 350 insertions(+), 57 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 989be0fa1..90c0f2ac9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -1,12 +1,15 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; -import java.util.Iterator; +import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; /** * In-memory cache that stores the active channels for a given Streaming Ingest client, and the @@ -23,6 +26,20 @@ class ChannelCache { String, ConcurrentHashMap>> cache = new ConcurrentHashMap<>(); + /** Flush information for each table including last flush time and if flush is needed */ + static class FlushInfo { + final long lastFlushTime; + final boolean needFlush; + + FlushInfo(long lastFlushTime, boolean needFlush) { + this.lastFlushTime = lastFlushTime; + this.needFlush = needFlush; + } + } + + /** Flush information for each table, only used when max chunks in blob is 1 */ + private final ConcurrentHashMap tableFlushInfo = new ConcurrentHashMap<>(); + /** * Add a channel to the channel cache * @@ -33,6 +50,11 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { this.cache.computeIfAbsent( channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap<>()); + // Update the last flush time for the table, add jitter to avoid all channels flush at the same + // time when the blobs are not interleaved + this.tableFlushInfo.putIfAbsent( + channel.getFullyQualifiedTableName(), new FlushInfo(System.currentTimeMillis(), false)); + SnowflakeStreamingIngestChannelInternal oldChannel = channels.put(channel.getName(), channel); // Invalidate old channel if it exits to block new inserts and return error to users earlier @@ -44,13 +66,84 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { } /** - * Returns an iterator over the (table, channels) in this map. + * Get the last flush time for a table + * + * @param fullyQualifiedTableName fully qualified table name + * @return last flush time in milliseconds + */ + Long getLastFlushTime(String fullyQualifiedTableName) { + FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName); + if (tableFlushInfo == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("Last flush time for table %s not found", fullyQualifiedTableName)); + } + return tableFlushInfo.lastFlushTime; + } + + /** + * Set the last flush time for a table as the current time * - * @return + * @param fullyQualifiedTableName fully qualified table name + * @param lastFlushTime last flush time in milliseconds */ - Iterator>>> - iterator() { - return this.cache.entrySet().iterator(); + void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) { + this.tableFlushInfo.compute( + fullyQualifiedTableName, + (k, v) -> { + if (v == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("Last flush time for table %s not found", fullyQualifiedTableName)); + } + return new FlushInfo(lastFlushTime, v.needFlush); + }); + } + + /** + * Get need flush flag for a table + * + * @param fullyQualifiedTableName fully qualified table name + * @return need flush flag + */ + boolean getNeedFlush(String fullyQualifiedTableName) { + FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName); + if (tableFlushInfo == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("Need flush flag for table %s not found", fullyQualifiedTableName)); + } + return tableFlushInfo.needFlush; + } + + /** + * Set need flush flag for a table + * + * @param fullyQualifiedTableName fully qualified table name + * @param needFlush need flush flag + */ + void setNeedFlush(String fullyQualifiedTableName, boolean needFlush) { + this.tableFlushInfo.compute( + fullyQualifiedTableName, + (k, v) -> { + if (v == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("Need flush flag for table %s not found", fullyQualifiedTableName)); + } + return new FlushInfo(v.lastFlushTime, needFlush); + }); + } + + /** Returns an immutable set view of the mappings contained in the channel cache. */ + Set>>> + entrySet() { + return Collections.unmodifiableSet(cache.entrySet()); + } + + /** Returns an immutable set view of the keys contained in the channel cache. */ + Set keySet() { + return Collections.unmodifiableSet(cache.keySet()); } /** Close all channels in the channel cache */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 954abfc4a..84e1a2561 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -31,6 +32,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; import javax.crypto.NoSuchPaddingException; @@ -100,12 +102,15 @@ List>> getData() { // Reference to register service private final RegisterService registerService; - // Indicates whether we need to schedule a flush - @VisibleForTesting volatile boolean isNeedFlush; - - // Latest flush time + /** + * Client level last flush time and need flush flag. This two variables are used when max chunk in + * blob is not 1. When max chunk in blob is 1, flush service ignores these variables and uses + * table level last flush time and need flush flag. See {@link ChannelCache.FlushInfo}. + */ @VisibleForTesting volatile long lastFlushTime; + @VisibleForTesting volatile boolean isNeedFlush; + // Indicates whether it's running as part of the test private final boolean isTestMode; @@ -162,36 +167,65 @@ private CompletableFuture statsFuture() { /** * @param isForce if true will flush regardless of other conditions - * @param timeDiffMillis Time in milliseconds since the last flush + * @param tablesToFlush list of tables to flush + * @param flushStartTime the time when the flush started * @return */ - private CompletableFuture distributeFlush(boolean isForce, long timeDiffMillis) { + private CompletableFuture distributeFlush( + boolean isForce, Set tablesToFlush, Long flushStartTime) { return CompletableFuture.runAsync( () -> { - logFlushTask(isForce, timeDiffMillis); - distributeFlushTasks(); + logFlushTask(isForce, tablesToFlush, flushStartTime); + distributeFlushTasks(tablesToFlush); + long prevFlushEndTime = System.currentTimeMillis(); + this.lastFlushTime = prevFlushEndTime; this.isNeedFlush = false; - this.lastFlushTime = System.currentTimeMillis(); - return; + tablesToFlush.forEach( + table -> { + this.channelCache.setLastFlushTime(table, prevFlushEndTime); + this.channelCache.setNeedFlush(table, false); + }); }, this.flushWorker); } /** If tracing is enabled, print always else, check if it needs flush or is forceful. */ - private void logFlushTask(boolean isForce, long timeDiffMillis) { + private void logFlushTask(boolean isForce, Set tablesToFlush, long flushStartTime) { + boolean isNeedFlush = + this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1 + ? tablesToFlush.stream().anyMatch(channelCache::getNeedFlush) + : this.isNeedFlush; + long currentTime = System.currentTimeMillis(); + final String logInfo; + if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + logInfo = + String.format( + "Tables=[%s]", + tablesToFlush.stream() + .map( + table -> + String.format( + "(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)", + table, + channelCache.getNeedFlush(table), + flushStartTime - channelCache.getLastFlushTime(table), + currentTime - channelCache.getLastFlushTime(table))) + .collect(Collectors.joining(", "))); + } else { + logInfo = + String.format( + "isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s", + isNeedFlush, flushStartTime - this.lastFlushTime, currentTime - this.lastFlushTime); + } + final String flushTaskLogFormat = String.format( - "Submit forced or ad-hoc flush task on client=%s, isForce=%s," - + " isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s", - this.owningClient.getName(), - isForce, - this.isNeedFlush, - timeDiffMillis, - System.currentTimeMillis() - this.lastFlushTime); + "Submit forced or ad-hoc flush task on client=%s, isForce=%s, %s", + this.owningClient.getName(), isForce, logInfo); if (logger.isTraceEnabled()) { logger.logTrace(flushTaskLogFormat); } - if (!logger.isTraceEnabled() && (this.isNeedFlush || isForce)) { + if (!logger.isTraceEnabled() && (isNeedFlush || isForce)) { logger.logDebug(flushTaskLogFormat); } } @@ -207,27 +241,65 @@ private CompletableFuture registerFuture() { } /** - * Kick off a flush job and distribute the tasks if one of the following conditions is met: - *
  • Flush is forced by the users - *
  • One or more buffers have reached the flush size - *
  • Periodical background flush when a time interval has reached + * Kick off a flush job and distribute the tasks. The flush service behaves differently based on + * the max chunks in blob: + * + *
      + *
    • The max chunks in blob is not 1 (interleaving is allowed), every channel will be flushed + * together if one of the following conditions is met: + *
        + *
      • Flush is forced by the users + *
      • One or more buffers have reached the flush size + *
      • Periodical background flush when a time interval has reached + *
      + *
    • The max chunks in blob is 1 (interleaving is not allowed), a channel will be flushed if + * one of the following conditions is met: + *
        + *
      • Flush is forced by the users + *
      • One or more buffers with the same target table as the channel have reached the + * flush size + *
      • Periodical background flush of the target table when a time interval has reached + *
      + *
    * * @param isForce * @return Completable future that will return when the blobs are registered successfully, or null * if none of the conditions is met above */ CompletableFuture flush(boolean isForce) { - long timeDiffMillis = System.currentTimeMillis() - this.lastFlushTime; + final long flushStartTime = System.currentTimeMillis(); + final long flushingInterval = + this.owningClient.getParameterProvider().getCachedMaxClientLagInMs(); + + final Set tablesToFlush; + if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + tablesToFlush = + this.channelCache.keySet().stream() + .filter( + key -> + isForce + || flushStartTime - this.channelCache.getLastFlushTime(key) + >= flushingInterval + || this.channelCache.getNeedFlush(key)) + .collect(Collectors.toSet()); + } else { + if (isForce + || (!DISABLE_BACKGROUND_FLUSH + && !isTestMode() + && (this.isNeedFlush || flushStartTime - this.lastFlushTime >= flushingInterval))) { + tablesToFlush = this.channelCache.keySet(); + } else { + tablesToFlush = null; + } + } if (isForce || (!DISABLE_BACKGROUND_FLUSH && !isTestMode() - && (this.isNeedFlush - || timeDiffMillis - >= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()))) { - + && tablesToFlush != null + && !tablesToFlush.isEmpty())) { return this.statsFuture() - .thenCompose((v) -> this.distributeFlush(isForce, timeDiffMillis)) + .thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, flushStartTime)) .thenCompose((v) -> this.registerFuture()); } return this.statsFuture(); @@ -310,12 +382,17 @@ private void createWorkers() { /** * Distribute the flush tasks by iterating through all the channels in the channel cache and kick * off a build blob work when certain size has reached or we have reached the end + * + * @param tablesToFlush list of tables to flush */ - void distributeFlushTasks() { + void distributeFlushTasks(Set tablesToFlush) { Iterator< Map.Entry< String, ConcurrentHashMap>>> - itr = this.channelCache.iterator(); + itr = + this.channelCache.entrySet().stream() + .filter(e -> tablesToFlush.contains(e.getKey())) + .iterator(); List, CompletableFuture>> blobs = new ArrayList<>(); List> leftoverChannelsDataPerTable = new ArrayList<>(); @@ -607,9 +684,16 @@ void shutdown() throws InterruptedException { } } - /** Set the flag to indicate that a flush is needed */ - void setNeedFlush() { + /** + * Set the flag to indicate that a flush is needed + * + * @param fullyQualifiedTableName the fully qualified table name + */ + void setNeedFlush(String fullyQualifiedTableName) { this.isNeedFlush = true; + if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + this.channelCache.setNeedFlush(fullyQualifiedTableName, true); + } } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 8ebc23ca1..ca0bbe782 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -413,7 +413,7 @@ public InsertValidationResponse insertRows( // if a large number of rows are inserted if (this.rowBuffer.getSize() >= this.owningClient.getParameterProvider().getMaxChannelSizeInBytes()) { - this.owningClient.setNeedFlush(); + this.owningClient.setNeedFlush(this.channelFlushContext.getFullyQualifiedTableName()); } return response; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 6331a4045..080b4f87d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -740,8 +740,8 @@ CompletableFuture flush(boolean closing) { } /** Set the flag to indicate that a flush is needed */ - void setNeedFlush() { - this.flushService.setNeedFlush(); + void setNeedFlush(String fullyQualifiedTableName) { + this.flushService.setNeedFlush(fullyQualifiedTableName); } /** Remove the channel in the channel cache if the channel sequencer matches */ diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 947908ef9..db1d737ba 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -95,7 +99,7 @@ public void testAddChannel() { UTC); cache.addChannel(channel); Assert.assertEquals(1, cache.getSize()); - Assert.assertTrue(channel == cache.iterator().next().getValue().get(channelName)); + Assert.assertTrue(channel == cache.entrySet().iterator().next().getValue().get(channelName)); SnowflakeStreamingIngestChannelInternal channelDup = new SnowflakeStreamingIngestChannelInternal<>( @@ -117,7 +121,7 @@ public void testAddChannel() { Assert.assertTrue(channelDup.isValid()); Assert.assertEquals(1, cache.getSize()); ConcurrentHashMap> channels = - cache.iterator().next().getValue(); + cache.entrySet().iterator().next().getValue(); Assert.assertEquals(1, channels.size()); Assert.assertTrue(channelDup == channels.get(channelName)); Assert.assertFalse(channel == channelDup); @@ -130,7 +134,7 @@ public void testIterator() { Map.Entry< String, ConcurrentHashMap>>> - iter = cache.iterator(); + iter = cache.entrySet().iterator(); Map.Entry< String, ConcurrentHashMap>> @@ -160,7 +164,7 @@ public void testCloseAllChannels() { Map.Entry< String, ConcurrentHashMap>>> - iter = cache.iterator(); + iter = cache.entrySet().iterator(); while (iter.hasNext()) { for (SnowflakeStreamingIngestChannelInternal channel : iter.next().getValue().values()) { Assert.assertTrue(channel.isClosed()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 8ac1d2b85..5ab500f9e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -36,7 +36,9 @@ import java.util.Map; import java.util.TimeZone; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; import javax.crypto.NoSuchPaddingException; @@ -51,6 +53,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; public class FlushServiceTest { public FlushServiceTest() { @@ -92,12 +95,18 @@ private abstract static class TestContext implements AutoCloseable { storageManager = Mockito.spy(new InternalStageManager<>(true, "role", "client", null)); Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any()); Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix"); + Mockito.when(client.getParameterProvider()) + .thenAnswer((Answer) (i) -> parameterProvider); channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); registerService = Mockito.spy(new RegisterService(client, client.isTestMode())); flushService = Mockito.spy(new FlushService<>(client, channelCache, storageManager, true)); } + void setParameterOverride(Map parameterOverride) { + this.parameterProvider = new ParameterProvider(parameterOverride, null); + } + ChannelData flushChannel(String name) { SnowflakeStreamingIngestChannelInternal channel = channels.get(name); ChannelData channelData = channel.getRowBuffer().flush(name + "_snowpipe_streaming.bdec"); @@ -429,30 +438,125 @@ public void testGetFilePath() { @Test public void testFlush() throws Exception { - TestContext testContext = testContextFactory.create(); + int numChannels = 4; + Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow + TestContext>> testContext = testContextFactory.create(); + addChannel1(testContext); FlushService flushService = testContext.flushService; + ChannelCache channelCache = testContext.channelCache; Mockito.when(flushService.isTestMode()).thenReturn(false); // Nothing to flush flushService.flush(false).get(); - Mockito.verify(flushService, Mockito.times(0)).distributeFlushTasks(); + Mockito.verify(flushService, Mockito.times(0)).distributeFlushTasks(Mockito.any()); // Force = true flushes flushService.flush(true).get(); - Mockito.verify(flushService).distributeFlushTasks(); - Mockito.verify(flushService, Mockito.times(1)).distributeFlushTasks(); + Mockito.verify(flushService, Mockito.times(1)).distributeFlushTasks(Mockito.any()); + + IntStream.range(0, numChannels) + .forEach( + i -> { + addChannel(testContext, i, 1L); + channelCache.setLastFlushTime(getFullyQualifiedTableName(i), maxLastFlushTime); + }); // isNeedFlush = true flushes - flushService.isNeedFlush = true; + flushService.setNeedFlush(getFullyQualifiedTableName(0)); flushService.flush(false).get(); - Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(); + Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any()); Assert.assertFalse(flushService.isNeedFlush); + Assert.assertNotEquals( + maxLastFlushTime, channelCache.getLastFlushTime(getFullyQualifiedTableName(0))); + IntStream.range(0, numChannels) + .forEach( + i -> { + Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); + assertTimeDiffwithinThreshold( + channelCache.getLastFlushTime(getFullyQualifiedTableName(0)), + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); + }); // lastFlushTime causes flush - flushService.lastFlushTime = 0; + flushService.lastFlushTime = 0L; flushService.flush(false).get(); - Mockito.verify(flushService, Mockito.times(3)).distributeFlushTasks(); + Mockito.verify(flushService, Mockito.times(3)).distributeFlushTasks(Mockito.any()); Assert.assertTrue(flushService.lastFlushTime > 0); + Assert.assertNotEquals( + maxLastFlushTime, channelCache.getLastFlushTime(getFullyQualifiedTableName(0))); + IntStream.range(0, numChannels) + .forEach( + i -> { + Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); + assertTimeDiffwithinThreshold( + channelCache.getLastFlushTime(getFullyQualifiedTableName(0)), + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); + }); + } + + @Test + public void testNonInterleaveFlush() throws ExecutionException, InterruptedException { + int numChannels = 4; + Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow + TestContext>> testContext = testContextFactory.create(); + FlushService flushService = testContext.flushService; + ChannelCache channelCache = testContext.channelCache; + Mockito.when(flushService.isTestMode()).thenReturn(false); + testContext.setParameterOverride( + Collections.singletonMap(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 1)); + + // Test need flush + IntStream.range(0, numChannels) + .forEach( + i -> { + addChannel(testContext, i, 1L); + channelCache.setLastFlushTime(getFullyQualifiedTableName(i), maxLastFlushTime); + if (i % 2 == 0) { + flushService.setNeedFlush(getFullyQualifiedTableName(i)); + } + }); + flushService.flush(false).get(); + Mockito.verify(flushService, Mockito.times(1)).distributeFlushTasks(Mockito.any()); + IntStream.range(0, numChannels) + .forEach( + i -> { + Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); + if (i % 2 == 0) { + Assert.assertNotEquals( + maxLastFlushTime, channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); + } else { + assertTimeDiffwithinThreshold( + maxLastFlushTime, + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); + } + }); + + // Test time based flush + IntStream.range(0, numChannels) + .forEach( + i -> { + channelCache.setLastFlushTime( + getFullyQualifiedTableName(i), i % 2 == 0 ? 0L : maxLastFlushTime); + }); + flushService.flush(false).get(); + Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any()); + IntStream.range(0, numChannels) + .forEach( + i -> { + Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); + if (i % 2 == 0) { + Assert.assertNotEquals( + 0L, channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + } else { + assertTimeDiffwithinThreshold( + maxLastFlushTime, + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); + } + }); } @Test @@ -1076,4 +1180,12 @@ private Timer setupTimer(long expectedLatencyMs) { return timer; } + + private String getFullyQualifiedTableName(int tableId) { + return String.format("db1.PUBLIC.table%d", tableId); + } + + private void assertTimeDiffwithinThreshold(Long time1, Long time2, long threshold) { + Assert.assertTrue(Math.abs(time1 - time2) <= threshold); + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java index 9426d4dc1..1941a48f5 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java @@ -276,7 +276,7 @@ public void testDropChannel() throws Exception { @Test public void testParameterOverrides() throws Exception { Map parameterMap = new HashMap<>(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3 sec"); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3 seconds"); parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 50L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 1); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024);