From be25b1ea78c4f3a9960e0c0d8a9c794e33254864 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 3 Jul 2024 15:53:32 -0700 Subject: [PATCH 1/6] Per table flush --- .../streaming/internal/ChannelCache.java | 75 ++++++++++- .../streaming/internal/FlushService.java | 104 ++++++++++----- ...owflakeStreamingIngestChannelInternal.java | 4 +- ...nowflakeStreamingIngestClientInternal.java | 6 +- .../streaming/internal/FlushServiceTest.java | 126 ++++++++++++++++-- 5 files changed, 266 insertions(+), 49 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 989be0fa1..782c5e7cf 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -1,11 +1,12 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -23,6 +24,12 @@ class ChannelCache { String, ConcurrentHashMap>> cache = new ConcurrentHashMap<>(); + // Last flush time for each table, the key is FullyQualifiedTableName. + private final ConcurrentHashMap lastFlushTime = new ConcurrentHashMap<>(); + + // Need flush flag for each table, the key is FullyQualifiedTableName. + private final ConcurrentHashMap needFlush = new ConcurrentHashMap<>(); + /** * Add a channel to the channel cache * @@ -33,6 +40,12 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { this.cache.computeIfAbsent( channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap<>()); + // Update the last flush time for the table, add jitter to avoid all channels flush at the same + // time when the blobs are not interleaved + this.lastFlushTime.putIfAbsent( + channel.getFullyQualifiedTableName(), + System.currentTimeMillis() + (long) (Math.random() * 1000)); + SnowflakeStreamingIngestChannelInternal oldChannel = channels.put(channel.getName(), channel); // Invalidate old channel if it exits to block new inserts and return error to users earlier @@ -43,6 +56,46 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { } } + /** + * Get the last flush time for a table + * + * @param fullyQualifiedTableName fully qualified table name + * @return last flush time in milliseconds + */ + Long getLastFlushTime(String fullyQualifiedTableName) { + return this.lastFlushTime.get(fullyQualifiedTableName); + } + + /** + * Set the last flush time for a table as the current time + * + * @param fullyQualifiedTableName fully qualified table name + * @param lastFlushTime last flush time in milliseconds + */ + void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) { + this.lastFlushTime.put(fullyQualifiedTableName, lastFlushTime); + } + + /** + * Get need flush flag for a table + * + * @param fullyQualifiedTableName fully qualified table name + * @return need flush flag + */ + Boolean getNeedFlush(String fullyQualifiedTableName) { + return this.needFlush.getOrDefault(fullyQualifiedTableName, false); + } + + /** + * Set need flush flag for a table + * + * @param fullyQualifiedTableName fully qualified table name + * @param needFlush need flush flag + */ + void setNeedFlush(String fullyQualifiedTableName, Boolean needFlush) { + this.needFlush.put(fullyQualifiedTableName, needFlush); + } + /** * Returns an iterator over the (table, channels) in this map. * @@ -53,6 +106,20 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { return this.cache.entrySet().iterator(); } + /** + * Returns an iterator over the (table, channels) in this map, filtered by the given table name + * set + * + * @param tableNames the set of table names to filter + * @return + */ + Iterator>>> + iterator(Set tableNames) { + return this.cache.entrySet().stream() + .filter(entry -> tableNames.contains(entry.getKey())) + .iterator(); + } + /** Close all channels in the channel cache */ void closeAllChannels() { this.cache @@ -101,4 +168,10 @@ void invalidateChannelIfSequencersMatch( int getSize() { return cache.size(); } + + public Set< + Map.Entry>>> + entrySet() { + return cache.entrySet(); + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 76e43ff4d..894f513d0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.TimeZone; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -35,6 +36,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; import javax.crypto.NoSuchPaddingException; @@ -108,9 +110,6 @@ List>> getData() { // Reference to register service private final RegisterService registerService; - // Indicates whether we need to schedule a flush - @VisibleForTesting volatile boolean isNeedFlush; - // Latest flush time @VisibleForTesting volatile long lastFlushTime; @@ -141,7 +140,6 @@ List>> getData() { this.targetStage = targetStage; this.counter = new AtomicLong(0); this.registerService = new RegisterService<>(client, isTestMode); - this.isNeedFlush = false; this.lastFlushTime = System.currentTimeMillis(); this.isTestMode = isTestMode; this.latencyTimerContextMap = new ConcurrentHashMap<>(); @@ -175,7 +173,6 @@ List>> getData() { this.registerService = new RegisterService<>(client, isTestMode); this.counter = new AtomicLong(0); - this.isNeedFlush = false; this.lastFlushTime = System.currentTimeMillis(); this.isTestMode = isTestMode; this.latencyTimerContextMap = new ConcurrentHashMap<>(); @@ -204,36 +201,43 @@ private CompletableFuture statsFuture() { /** * @param isForce if true will flush regardless of other conditions - * @param timeDiffMillis Time in milliseconds since the last flush + * @param tablesToFlush list of tables to flush + * @param timeDiffMillis time difference in milliseconds * @return */ - private CompletableFuture distributeFlush(boolean isForce, long timeDiffMillis) { + private CompletableFuture distributeFlush( + boolean isForce, Set tablesToFlush, Long timeDiffMillis) { return CompletableFuture.runAsync( () -> { - logFlushTask(isForce, timeDiffMillis); - distributeFlushTasks(); - this.isNeedFlush = false; + logFlushTask(isForce, tablesToFlush, timeDiffMillis); + distributeFlushTasks(tablesToFlush); this.lastFlushTime = System.currentTimeMillis(); - return; + tablesToFlush.forEach( + table -> { + this.channelCache.setLastFlushTime(table, this.lastFlushTime); + this.channelCache.setNeedFlush(table, false); + }); }, this.flushWorker); } /** If tracing is enabled, print always else, check if it needs flush or is forceful. */ - private void logFlushTask(boolean isForce, long timeDiffMillis) { + private void logFlushTask(boolean isForce, Set tablesToFlush, long timeDiffMillis) { + boolean isNeedFlush = tablesToFlush.stream().anyMatch(channelCache::getNeedFlush); + final String flushTaskLogFormat = String.format( "Submit forced or ad-hoc flush task on client=%s, isForce=%s," + " isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s", this.owningClient.getName(), isForce, - this.isNeedFlush, + isNeedFlush, timeDiffMillis, System.currentTimeMillis() - this.lastFlushTime); if (logger.isTraceEnabled()) { logger.logTrace(flushTaskLogFormat); } - if (!logger.isTraceEnabled() && (this.isNeedFlush || isForce)) { + if (!logger.isTraceEnabled() && (isNeedFlush || isForce)) { logger.logDebug(flushTaskLogFormat); } } @@ -249,27 +253,57 @@ private CompletableFuture registerFuture() { } /** - * Kick off a flush job and distribute the tasks if one of the following conditions is met: - *
  • Flush is forced by the users - *
  • One or more buffers have reached the flush size - *
  • Periodical background flush when a time interval has reached + * Kick off a flush job and distribute the tasks. The flush service behaves differently based on + * the max chunks in blob: + * + *
      + *
    • The max chunks in blob is not 1 (interleaving is allowed), every channel will be flushed + * together if one of the following conditions is met: + *
        + *
      • Flush is forced by the users + *
      • One or more buffers have reached the flush size + *
      • Periodical background flush when a time interval has reached + *
      + *
    • The max chunks in blob is 1 (interleaving is not allowed), a channel will be flushed if + * one of the following conditions is met: + *
        + *
      • Flush is forced by the users + *
      • One or more buffers with the same target table as the channel have reached the + * flush size + *
      • Periodical background flush of the target table when a time interval has reached + *
      + *
    * * @param isForce * @return Completable future that will return when the blobs are registered successfully, or null * if none of the conditions is met above */ CompletableFuture flush(boolean isForce) { - long timeDiffMillis = System.currentTimeMillis() - this.lastFlushTime; + long currentTime = System.currentTimeMillis(); + long timeDiffMillis = currentTime - this.lastFlushTime; + + Set tablesToFlush = + this.channelCache.entrySet().stream() + .filter( + entry -> + isForce + || currentTime - this.channelCache.getLastFlushTime(entry.getKey()) + >= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs() + || this.channelCache.getNeedFlush(entry.getKey())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + // Flush every table together when it's interleaving chunk is allowed + if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() != 1 + && !tablesToFlush.isEmpty()) { + tablesToFlush.addAll( + this.channelCache.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toSet())); + } - if (isForce - || (!DISABLE_BACKGROUND_FLUSH - && !isTestMode() - && (this.isNeedFlush - || timeDiffMillis - >= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()))) { + if (isForce || (!DISABLE_BACKGROUND_FLUSH && !isTestMode() && !tablesToFlush.isEmpty())) { return this.statsFuture() - .thenCompose((v) -> this.distributeFlush(isForce, timeDiffMillis)) + .thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, timeDiffMillis)) .thenCompose((v) -> this.registerFuture()); } return this.statsFuture(); @@ -352,12 +386,14 @@ private void createWorkers() { /** * Distribute the flush tasks by iterating through all the channels in the channel cache and kick * off a build blob work when certain size has reached or we have reached the end + * + * @param tablesToFlush list of tables to flush */ - void distributeFlushTasks() { + void distributeFlushTasks(Set tablesToFlush) { Iterator< Map.Entry< String, ConcurrentHashMap>>> - itr = this.channelCache.iterator(); + itr = this.channelCache.iterator(tablesToFlush); List, CompletableFuture>> blobs = new ArrayList<>(); List> leftoverChannelsDataPerTable = new ArrayList<>(); @@ -630,9 +666,13 @@ void shutdown() throws InterruptedException { } } - /** Set the flag to indicate that a flush is needed */ - void setNeedFlush() { - this.isNeedFlush = true; + /** + * Set the flag to indicate that a flush is needed + * + * @param fullyQualifiedTableName the fully qualified table name + */ + void setNeedFlush(String fullyQualifiedTableName) { + this.channelCache.setNeedFlush(fullyQualifiedTableName, true); } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 8ebc23ca1..ca0bbe782 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -413,7 +413,7 @@ public InsertValidationResponse insertRows( // if a large number of rows are inserted if (this.rowBuffer.getSize() >= this.owningClient.getParameterProvider().getMaxChannelSizeInBytes()) { - this.owningClient.setNeedFlush(); + this.owningClient.setNeedFlush(this.channelFlushContext.getFullyQualifiedTableName()); } return response; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 75eb4f717..b9d4c23d0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -820,8 +820,8 @@ CompletableFuture flush(boolean closing) { } /** Set the flag to indicate that a flush is needed */ - void setNeedFlush() { - this.flushService.setNeedFlush(); + void setNeedFlush(String fullyQualifiedTableName) { + this.flushService.setNeedFlush(fullyQualifiedTableName); } /** Remove the channel in the channel cache if the channel sequencer matches */ diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index f200c7177..eaefb8cb4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.BLOB_CHECKSUM_SIZE_IN_BYTES; @@ -36,7 +40,9 @@ import java.util.Map; import java.util.TimeZone; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; import javax.crypto.NoSuchPaddingException; @@ -51,6 +57,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; public class FlushServiceTest { public FlushServiceTest() { @@ -88,13 +95,18 @@ private abstract static class TestContext implements AutoCloseable { Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); parameterProvider = new ParameterProvider(); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); - Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); + Mockito.when(client.getParameterProvider()) + .thenAnswer((Answer) (i) -> parameterProvider); channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); registerService = Mockito.spy(new RegisterService(client, client.isTestMode())); flushService = Mockito.spy(new FlushService<>(client, channelCache, stage, true)); } + void setParameterOverride(Map parameterOverride) { + this.parameterProvider = new ParameterProvider(parameterOverride, null); + } + ChannelData flushChannel(String name) { SnowflakeStreamingIngestChannelInternal channel = channels.get(name); ChannelData channelData = channel.getRowBuffer().flush(name + "_snowpipe_streaming.bdec"); @@ -422,30 +434,118 @@ public void testGetFilePath() { @Test public void testFlush() throws Exception { - TestContext testContext = testContextFactory.create(); + int numChannels = 4; + TestContext>> testContext = testContextFactory.create(); + addChannel1(testContext); FlushService flushService = testContext.flushService; + ChannelCache channelCache = testContext.channelCache; Mockito.when(flushService.isTestMode()).thenReturn(false); // Nothing to flush flushService.flush(false).get(); - Mockito.verify(flushService, Mockito.times(0)).distributeFlushTasks(); + Mockito.verify(flushService, Mockito.times(0)).distributeFlushTasks(Mockito.any()); // Force = true flushes flushService.flush(true).get(); - Mockito.verify(flushService).distributeFlushTasks(); - Mockito.verify(flushService, Mockito.times(1)).distributeFlushTasks(); + Mockito.verify(flushService, Mockito.times(1)).distributeFlushTasks(Mockito.any()); + + IntStream.range(0, numChannels) + .forEach( + i -> { + addChannel(testContext, i, 1L); + channelCache.setLastFlushTime(getFullyQualifiedTableName(i), Long.MAX_VALUE); + }); // isNeedFlush = true flushes - flushService.isNeedFlush = true; + flushService.setNeedFlush(getFullyQualifiedTableName(0)); flushService.flush(false).get(); - Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(); - Assert.assertFalse(flushService.isNeedFlush); + Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any()); + Assert.assertNotEquals( + Long.MAX_VALUE, channelCache.getLastFlushTime(getFullyQualifiedTableName(0)).longValue()); + IntStream.range(0, numChannels) + .forEach( + i -> { + Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); + Assert.assertEquals( + channelCache.getLastFlushTime(getFullyQualifiedTableName(0)), + channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); + }); // lastFlushTime causes flush - flushService.lastFlushTime = 0; + channelCache.setLastFlushTime(getFullyQualifiedTableName(0), 0L); + flushService.flush(false).get(); + Mockito.verify(flushService, Mockito.times(3)).distributeFlushTasks(Mockito.any()); + Assert.assertNotEquals( + Long.MAX_VALUE, channelCache.getLastFlushTime(getFullyQualifiedTableName(0)).longValue()); + IntStream.range(0, numChannels) + .forEach( + i -> { + Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); + Assert.assertEquals( + channelCache.getLastFlushTime(getFullyQualifiedTableName(0)), + channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); + }); + } + + @Test + public void testNonInterleaveFlush() throws ExecutionException, InterruptedException { + int numChannels = 4; + TestContext>> testContext = testContextFactory.create(); + FlushService flushService = testContext.flushService; + ChannelCache channelCache = testContext.channelCache; + Mockito.when(flushService.isTestMode()).thenReturn(false); + testContext.setParameterOverride( + Collections.singletonMap(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 1)); + + // Test need flush + IntStream.range(0, numChannels) + .forEach( + i -> { + addChannel(testContext, i, 1L); + channelCache.setLastFlushTime(getFullyQualifiedTableName(i), Long.MAX_VALUE); + if (i % 2 == 0) { + flushService.setNeedFlush(getFullyQualifiedTableName(i)); + } + }); flushService.flush(false).get(); - Mockito.verify(flushService, Mockito.times(3)).distributeFlushTasks(); - Assert.assertTrue(flushService.lastFlushTime > 0); + Mockito.verify(flushService, Mockito.times(1)).distributeFlushTasks(Mockito.any()); + IntStream.range(0, numChannels) + .forEach( + i -> { + Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); + if (i % 2 == 0) { + Assert.assertNotEquals( + Long.MAX_VALUE, + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + } else { + Assert.assertEquals( + Long.MAX_VALUE, + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + } + }); + + // Test time based flush + IntStream.range(0, numChannels) + .forEach( + i -> { + channelCache.setLastFlushTime( + getFullyQualifiedTableName(i), i % 2 == 0 ? 0L : Long.MAX_VALUE); + }); + flushService.flush(false).get(); + Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any()); + IntStream.range(0, numChannels) + .forEach( + i -> { + Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); + if (i % 2 == 0) { + Assert.assertNotEquals( + 0L, channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + } else { + Assert.assertEquals( + Long.MAX_VALUE, + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + } + }); } @Test @@ -1063,4 +1163,8 @@ private Timer setupTimer(long expectedLatencyMs) { return timer; } + + private String getFullyQualifiedTableName(int tableId) { + return String.format("db1.PUBLIC.table%d", tableId); + } } From 27e1bcb8f124762bade650d579b7c07896dc4e63 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 9 Jul 2024 15:55:50 -0700 Subject: [PATCH 2/6] Modify log --- .../streaming/internal/ChannelCache.java | 45 +++++-------- .../streaming/internal/FlushService.java | 66 +++++++++++-------- .../streaming/internal/ChannelCacheTest.java | 12 ++-- .../streaming/internal/FlushServiceTest.java | 51 +++++++++----- 4 files changed, 96 insertions(+), 78 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 782c5e7cf..14bfd43c8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -4,8 +4,9 @@ package net.snowflake.ingest.streaming.internal; -import java.util.Iterator; +import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -43,8 +44,7 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { // Update the last flush time for the table, add jitter to avoid all channels flush at the same // time when the blobs are not interleaved this.lastFlushTime.putIfAbsent( - channel.getFullyQualifiedTableName(), - System.currentTimeMillis() + (long) (Math.random() * 1000)); + channel.getFullyQualifiedTableName(), System.currentTimeMillis() + getRandomFlushJitter()); SnowflakeStreamingIngestChannelInternal oldChannel = channels.put(channel.getName(), channel); @@ -62,8 +62,8 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { * @param fullyQualifiedTableName fully qualified table name * @return last flush time in milliseconds */ - Long getLastFlushTime(String fullyQualifiedTableName) { - return this.lastFlushTime.get(fullyQualifiedTableName); + Optional getLastFlushTime(String fullyQualifiedTableName) { + return Optional.ofNullable(this.lastFlushTime.get(fullyQualifiedTableName)); } /** @@ -73,7 +73,7 @@ Long getLastFlushTime(String fullyQualifiedTableName) { * @param lastFlushTime last flush time in milliseconds */ void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) { - this.lastFlushTime.put(fullyQualifiedTableName, lastFlushTime); + this.lastFlushTime.put(fullyQualifiedTableName, lastFlushTime + getRandomFlushJitter()); } /** @@ -96,28 +96,15 @@ void setNeedFlush(String fullyQualifiedTableName, Boolean needFlush) { this.needFlush.put(fullyQualifiedTableName, needFlush); } - /** - * Returns an iterator over the (table, channels) in this map. - * - * @return - */ - Iterator>>> - iterator() { - return this.cache.entrySet().iterator(); + /** Returns an immutable set view of the mappings contained in the channel cache. */ + Set>>> + entrySet() { + return Collections.unmodifiableSet(cache.entrySet()); } - /** - * Returns an iterator over the (table, channels) in this map, filtered by the given table name - * set - * - * @param tableNames the set of table names to filter - * @return - */ - Iterator>>> - iterator(Set tableNames) { - return this.cache.entrySet().stream() - .filter(entry -> tableNames.contains(entry.getKey())) - .iterator(); + /** Returns an immutable set view of the keys contained in the channel cache. */ + Set keySet() { + return Collections.unmodifiableSet(cache.keySet()); } /** Close all channels in the channel cache */ @@ -169,9 +156,7 @@ int getSize() { return cache.size(); } - public Set< - Map.Entry>>> - entrySet() { - return cache.entrySet(); + private long getRandomFlushJitter() { + return (long) (Math.random() * 1000); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 894f513d0..0128e0a76 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -202,19 +202,19 @@ private CompletableFuture statsFuture() { /** * @param isForce if true will flush regardless of other conditions * @param tablesToFlush list of tables to flush - * @param timeDiffMillis time difference in milliseconds + * @param flushStartTime the time when the flush started * @return */ private CompletableFuture distributeFlush( - boolean isForce, Set tablesToFlush, Long timeDiffMillis) { + boolean isForce, Set tablesToFlush, Long flushStartTime) { return CompletableFuture.runAsync( () -> { - logFlushTask(isForce, tablesToFlush, timeDiffMillis); + logFlushTask(isForce, tablesToFlush, flushStartTime); distributeFlushTasks(tablesToFlush); - this.lastFlushTime = System.currentTimeMillis(); + long flushEndTime = System.currentTimeMillis(); tablesToFlush.forEach( table -> { - this.channelCache.setLastFlushTime(table, this.lastFlushTime); + this.channelCache.setLastFlushTime(table, flushEndTime); this.channelCache.setNeedFlush(table, false); }); }, @@ -222,18 +222,30 @@ private CompletableFuture distributeFlush( } /** If tracing is enabled, print always else, check if it needs flush or is forceful. */ - private void logFlushTask(boolean isForce, Set tablesToFlush, long timeDiffMillis) { + private void logFlushTask(boolean isForce, Set tablesToFlush, long flushStartTime) { boolean isNeedFlush = tablesToFlush.stream().anyMatch(channelCache::getNeedFlush); + long currentTime = System.currentTimeMillis(); + + final String tablesToFlushLogFormat = + tablesToFlush.stream() + .map( + table -> + String.format( + "(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)", + table, + channelCache.getNeedFlush(table), + channelCache.getLastFlushTime(table).isPresent() + ? flushStartTime - channelCache.getLastFlushTime(table).get() + : "N/A", + channelCache.getLastFlushTime(table).isPresent() + ? currentTime - channelCache.getLastFlushTime(table).get() + : "N/A")) + .collect(Collectors.joining(", ")); final String flushTaskLogFormat = String.format( - "Submit forced or ad-hoc flush task on client=%s, isForce=%s," - + " isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s", - this.owningClient.getName(), - isForce, - isNeedFlush, - timeDiffMillis, - System.currentTimeMillis() - this.lastFlushTime); + "Submit forced or ad-hoc flush task on client=%s, isForce=%s," + " Tables=[%s]", + this.owningClient.getName(), isForce, tablesToFlushLogFormat); if (logger.isTraceEnabled()) { logger.logTrace(flushTaskLogFormat); } @@ -279,31 +291,30 @@ private CompletableFuture registerFuture() { * if none of the conditions is met above */ CompletableFuture flush(boolean isForce) { - long currentTime = System.currentTimeMillis(); - long timeDiffMillis = currentTime - this.lastFlushTime; + long flushStartTime = System.currentTimeMillis(); Set tablesToFlush = - this.channelCache.entrySet().stream() + this.channelCache.keySet().stream() .filter( - entry -> + key -> isForce - || currentTime - this.channelCache.getLastFlushTime(entry.getKey()) - >= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs() - || this.channelCache.getNeedFlush(entry.getKey())) - .map(Map.Entry::getKey) + || (this.channelCache.getLastFlushTime(key).isPresent() + && flushStartTime - this.channelCache.getLastFlushTime(key).get() + >= this.owningClient + .getParameterProvider() + .getCachedMaxClientLagInMs()) + || this.channelCache.getNeedFlush(key)) .collect(Collectors.toSet()); // Flush every table together when it's interleaving chunk is allowed if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() != 1 && !tablesToFlush.isEmpty()) { - tablesToFlush.addAll( - this.channelCache.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toSet())); + tablesToFlush.addAll(this.channelCache.keySet()); } if (isForce || (!DISABLE_BACKGROUND_FLUSH && !isTestMode() && !tablesToFlush.isEmpty())) { - return this.statsFuture() - .thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, timeDiffMillis)) + .thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, flushStartTime)) .thenCompose((v) -> this.registerFuture()); } return this.statsFuture(); @@ -393,7 +404,10 @@ void distributeFlushTasks(Set tablesToFlush) { Iterator< Map.Entry< String, ConcurrentHashMap>>> - itr = this.channelCache.iterator(tablesToFlush); + itr = + this.channelCache.entrySet().stream() + .filter(e -> tablesToFlush.contains(e.getKey())) + .iterator(); List, CompletableFuture>> blobs = new ArrayList<>(); List> leftoverChannelsDataPerTable = new ArrayList<>(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 947908ef9..db1d737ba 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -95,7 +99,7 @@ public void testAddChannel() { UTC); cache.addChannel(channel); Assert.assertEquals(1, cache.getSize()); - Assert.assertTrue(channel == cache.iterator().next().getValue().get(channelName)); + Assert.assertTrue(channel == cache.entrySet().iterator().next().getValue().get(channelName)); SnowflakeStreamingIngestChannelInternal channelDup = new SnowflakeStreamingIngestChannelInternal<>( @@ -117,7 +121,7 @@ public void testAddChannel() { Assert.assertTrue(channelDup.isValid()); Assert.assertEquals(1, cache.getSize()); ConcurrentHashMap> channels = - cache.iterator().next().getValue(); + cache.entrySet().iterator().next().getValue(); Assert.assertEquals(1, channels.size()); Assert.assertTrue(channelDup == channels.get(channelName)); Assert.assertFalse(channel == channelDup); @@ -130,7 +134,7 @@ public void testIterator() { Map.Entry< String, ConcurrentHashMap>>> - iter = cache.iterator(); + iter = cache.entrySet().iterator(); Map.Entry< String, ConcurrentHashMap>> @@ -160,7 +164,7 @@ public void testCloseAllChannels() { Map.Entry< String, ConcurrentHashMap>>> - iter = cache.iterator(); + iter = cache.entrySet().iterator(); while (iter.hasNext()) { for (SnowflakeStreamingIngestChannelInternal channel : iter.next().getValue().values()) { Assert.assertTrue(channel.isClosed()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index eaefb8cb4..906a52625 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -38,6 +38,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -435,6 +436,7 @@ public void testGetFilePath() { @Test public void testFlush() throws Exception { int numChannels = 4; + Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow TestContext>> testContext = testContextFactory.create(); addChannel1(testContext); FlushService flushService = testContext.flushService; @@ -453,7 +455,7 @@ public void testFlush() throws Exception { .forEach( i -> { addChannel(testContext, i, 1L); - channelCache.setLastFlushTime(getFullyQualifiedTableName(i), Long.MAX_VALUE); + channelCache.setLastFlushTime(getFullyQualifiedTableName(i), maxLastFlushTime); }); // isNeedFlush = true flushes @@ -461,14 +463,16 @@ public void testFlush() throws Exception { flushService.flush(false).get(); Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any()); Assert.assertNotEquals( - Long.MAX_VALUE, channelCache.getLastFlushTime(getFullyQualifiedTableName(0)).longValue()); + Optional.of(maxLastFlushTime), + channelCache.getLastFlushTime(getFullyQualifiedTableName(0))); IntStream.range(0, numChannels) .forEach( i -> { Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); - Assert.assertEquals( + assertTimeDiffwithinThreshold( channelCache.getLastFlushTime(getFullyQualifiedTableName(0)), - channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); }); // lastFlushTime causes flush @@ -476,20 +480,23 @@ public void testFlush() throws Exception { flushService.flush(false).get(); Mockito.verify(flushService, Mockito.times(3)).distributeFlushTasks(Mockito.any()); Assert.assertNotEquals( - Long.MAX_VALUE, channelCache.getLastFlushTime(getFullyQualifiedTableName(0)).longValue()); + Optional.of(maxLastFlushTime), + channelCache.getLastFlushTime(getFullyQualifiedTableName(0))); IntStream.range(0, numChannels) .forEach( i -> { Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); - Assert.assertEquals( + assertTimeDiffwithinThreshold( channelCache.getLastFlushTime(getFullyQualifiedTableName(0)), - channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); }); } @Test public void testNonInterleaveFlush() throws ExecutionException, InterruptedException { int numChannels = 4; + Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow TestContext>> testContext = testContextFactory.create(); FlushService flushService = testContext.flushService; ChannelCache channelCache = testContext.channelCache; @@ -502,7 +509,7 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep .forEach( i -> { addChannel(testContext, i, 1L); - channelCache.setLastFlushTime(getFullyQualifiedTableName(i), Long.MAX_VALUE); + channelCache.setLastFlushTime(getFullyQualifiedTableName(i), maxLastFlushTime); if (i % 2 == 0) { flushService.setNeedFlush(getFullyQualifiedTableName(i)); } @@ -515,12 +522,13 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); if (i % 2 == 0) { Assert.assertNotEquals( - Long.MAX_VALUE, - channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + Optional.of(maxLastFlushTime), + channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); } else { - Assert.assertEquals( - Long.MAX_VALUE, - channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + assertTimeDiffwithinThreshold( + Optional.of(maxLastFlushTime), + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); } }); @@ -529,7 +537,7 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep .forEach( i -> { channelCache.setLastFlushTime( - getFullyQualifiedTableName(i), i % 2 == 0 ? 0L : Long.MAX_VALUE); + getFullyQualifiedTableName(i), i % 2 == 0 ? 0L : maxLastFlushTime); }); flushService.flush(false).get(); Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any()); @@ -539,11 +547,12 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); if (i % 2 == 0) { Assert.assertNotEquals( - 0L, channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + Optional.of(0L), channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); } else { - Assert.assertEquals( - Long.MAX_VALUE, - channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + assertTimeDiffwithinThreshold( + Optional.of(maxLastFlushTime), + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); } }); } @@ -1167,4 +1176,10 @@ private Timer setupTimer(long expectedLatencyMs) { private String getFullyQualifiedTableName(int tableId) { return String.format("db1.PUBLIC.table%d", tableId); } + + private void assertTimeDiffwithinThreshold( + Optional time1, Optional time2, long threshold) { + Assert.assertTrue( + time1.isPresent() && time2.isPresent() && Math.abs(time1.get() - time2.get()) <= threshold); + } } From 91f546ea56fd8c487640a7442af5e068af661436 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 9 Jul 2024 16:29:28 -0700 Subject: [PATCH 3/6] remove client level last flush time --- .../snowflake/ingest/streaming/internal/FlushService.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 0128e0a76..30e5b2300 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -110,9 +110,6 @@ List>> getData() { // Reference to register service private final RegisterService registerService; - // Latest flush time - @VisibleForTesting volatile long lastFlushTime; - // Indicates whether it's running as part of the test private final boolean isTestMode; @@ -140,7 +137,6 @@ List>> getData() { this.targetStage = targetStage; this.counter = new AtomicLong(0); this.registerService = new RegisterService<>(client, isTestMode); - this.lastFlushTime = System.currentTimeMillis(); this.isTestMode = isTestMode; this.latencyTimerContextMap = new ConcurrentHashMap<>(); this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion(); @@ -173,7 +169,6 @@ List>> getData() { this.registerService = new RegisterService<>(client, isTestMode); this.counter = new AtomicLong(0); - this.lastFlushTime = System.currentTimeMillis(); this.isTestMode = isTestMode; this.latencyTimerContextMap = new ConcurrentHashMap<>(); this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion(); From 89e2a4a14b11bf917aa38390f008f8aa7d4b17e8 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 15 Jul 2024 15:54:39 -0700 Subject: [PATCH 4/6] fix client level flush --- .../streaming/internal/ChannelCache.java | 54 ++++++-- .../streaming/internal/FlushService.java | 119 +++++++++++------- .../streaming/internal/FlushServiceTest.java | 26 ++-- 3 files changed, 130 insertions(+), 69 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 14bfd43c8..75e56a01c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -6,9 +6,10 @@ import java.util.Collections; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; /** * In-memory cache that stores the active channels for a given Streaming Ingest client, and the @@ -25,11 +26,19 @@ class ChannelCache { String, ConcurrentHashMap>> cache = new ConcurrentHashMap<>(); - // Last flush time for each table, the key is FullyQualifiedTableName. - private final ConcurrentHashMap lastFlushTime = new ConcurrentHashMap<>(); + /** Flush information for each table including last flush time and if flush is needed */ + static class FlushInfo { + long lastFlushTime; + boolean needFlush; - // Need flush flag for each table, the key is FullyQualifiedTableName. - private final ConcurrentHashMap needFlush = new ConcurrentHashMap<>(); + FlushInfo(long lastFlushTime, boolean needFlush) { + this.lastFlushTime = lastFlushTime; + this.needFlush = needFlush; + } + } + + /** Flush information for each table, only used when max chunks in blob is 1 */ + private final ConcurrentHashMap tableFlushInfo = new ConcurrentHashMap<>(); /** * Add a channel to the channel cache @@ -43,8 +52,9 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { // Update the last flush time for the table, add jitter to avoid all channels flush at the same // time when the blobs are not interleaved - this.lastFlushTime.putIfAbsent( - channel.getFullyQualifiedTableName(), System.currentTimeMillis() + getRandomFlushJitter()); + this.tableFlushInfo.putIfAbsent( + channel.getFullyQualifiedTableName(), + new FlushInfo(System.currentTimeMillis() + getRandomFlushJitter(), false)); SnowflakeStreamingIngestChannelInternal oldChannel = channels.put(channel.getName(), channel); @@ -62,8 +72,14 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { * @param fullyQualifiedTableName fully qualified table name * @return last flush time in milliseconds */ - Optional getLastFlushTime(String fullyQualifiedTableName) { - return Optional.ofNullable(this.lastFlushTime.get(fullyQualifiedTableName)); + Long getLastFlushTime(String fullyQualifiedTableName) { + FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName); + if (tableFlushInfo == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("Last flush time for table %s not found", fullyQualifiedTableName)); + } + return tableFlushInfo.lastFlushTime; } /** @@ -73,7 +89,9 @@ Optional getLastFlushTime(String fullyQualifiedTableName) { * @param lastFlushTime last flush time in milliseconds */ void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) { - this.lastFlushTime.put(fullyQualifiedTableName, lastFlushTime + getRandomFlushJitter()); + this.tableFlushInfo.compute( + fullyQualifiedTableName, + (k, v) -> new FlushInfo(lastFlushTime + getRandomFlushJitter(), v != null && v.needFlush)); } /** @@ -83,7 +101,13 @@ void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) { * @return need flush flag */ Boolean getNeedFlush(String fullyQualifiedTableName) { - return this.needFlush.getOrDefault(fullyQualifiedTableName, false); + FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName); + if (tableFlushInfo == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("Need flush flag for table %s not found", fullyQualifiedTableName)); + } + return tableFlushInfo.needFlush; } /** @@ -93,7 +117,12 @@ Boolean getNeedFlush(String fullyQualifiedTableName) { * @param needFlush need flush flag */ void setNeedFlush(String fullyQualifiedTableName, Boolean needFlush) { - this.needFlush.put(fullyQualifiedTableName, needFlush); + this.tableFlushInfo.compute( + fullyQualifiedTableName, + (k, v) -> + new FlushInfo( + v != null ? v.lastFlushTime : System.currentTimeMillis() + getRandomFlushJitter(), + needFlush)); } /** Returns an immutable set view of the mappings contained in the channel cache. */ @@ -156,6 +185,7 @@ int getSize() { return cache.size(); } + /** Generate a random flush jitter to avoid all channels flush at the same time */ private long getRandomFlushJitter() { return (long) (Math.random() * 1000); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 30e5b2300..a2f32d5fb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -110,6 +110,15 @@ List>> getData() { // Reference to register service private final RegisterService registerService; + /** + * Client level last flush time and need flush flag. This two variables are used when max chunk in + * blob is not 1. When max chunk in blob is 1, flush service ignores these variables and uses + * table level last flush time and need flush flag. See {@link ChannelCache.FlushInfo}. + */ + @VisibleForTesting volatile long lastFlushTime; + + @VisibleForTesting volatile boolean isNeedFlush; + // Indicates whether it's running as part of the test private final boolean isTestMode; @@ -137,6 +146,8 @@ List>> getData() { this.targetStage = targetStage; this.counter = new AtomicLong(0); this.registerService = new RegisterService<>(client, isTestMode); + this.isNeedFlush = false; + this.lastFlushTime = System.currentTimeMillis(); this.isTestMode = isTestMode; this.latencyTimerContextMap = new ConcurrentHashMap<>(); this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion(); @@ -169,6 +180,8 @@ List>> getData() { this.registerService = new RegisterService<>(client, isTestMode); this.counter = new AtomicLong(0); + this.isNeedFlush = false; + this.lastFlushTime = System.currentTimeMillis(); this.isTestMode = isTestMode; this.latencyTimerContextMap = new ConcurrentHashMap<>(); this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion(); @@ -206,10 +219,12 @@ private CompletableFuture distributeFlush( () -> { logFlushTask(isForce, tablesToFlush, flushStartTime); distributeFlushTasks(tablesToFlush); - long flushEndTime = System.currentTimeMillis(); + long prevFlushEndTime = System.currentTimeMillis(); tablesToFlush.forEach( table -> { - this.channelCache.setLastFlushTime(table, flushEndTime); + this.lastFlushTime = prevFlushEndTime; + this.channelCache.setLastFlushTime(table, prevFlushEndTime); + this.isNeedFlush = false; this.channelCache.setNeedFlush(table, false); }); }, @@ -218,29 +233,37 @@ private CompletableFuture distributeFlush( /** If tracing is enabled, print always else, check if it needs flush or is forceful. */ private void logFlushTask(boolean isForce, Set tablesToFlush, long flushStartTime) { - boolean isNeedFlush = tablesToFlush.stream().anyMatch(channelCache::getNeedFlush); + boolean isNeedFlush = + this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1 + ? tablesToFlush.stream().anyMatch(channelCache::getNeedFlush) + : this.isNeedFlush; long currentTime = System.currentTimeMillis(); - - final String tablesToFlushLogFormat = - tablesToFlush.stream() - .map( - table -> - String.format( - "(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)", - table, - channelCache.getNeedFlush(table), - channelCache.getLastFlushTime(table).isPresent() - ? flushStartTime - channelCache.getLastFlushTime(table).get() - : "N/A", - channelCache.getLastFlushTime(table).isPresent() - ? currentTime - channelCache.getLastFlushTime(table).get() - : "N/A")) - .collect(Collectors.joining(", ")); + final String logInfo; + if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + logInfo = + String.format( + "Tables=[%s]", + tablesToFlush.stream() + .map( + table -> + String.format( + "(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)", + table, + channelCache.getNeedFlush(table), + flushStartTime - channelCache.getLastFlushTime(table), + currentTime - channelCache.getLastFlushTime(table))) + .collect(Collectors.joining(", "))); + } else { + logInfo = + String.format( + "isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s", + isNeedFlush, flushStartTime - this.lastFlushTime, currentTime - this.lastFlushTime); + } final String flushTaskLogFormat = String.format( - "Submit forced or ad-hoc flush task on client=%s, isForce=%s," + " Tables=[%s]", - this.owningClient.getName(), isForce, tablesToFlushLogFormat); + "Submit forced or ad-hoc flush task on client=%s, isForce=%s, %s", + this.owningClient.getName(), isForce, logInfo); if (logger.isTraceEnabled()) { logger.logTrace(flushTaskLogFormat); } @@ -286,28 +309,37 @@ private CompletableFuture registerFuture() { * if none of the conditions is met above */ CompletableFuture flush(boolean isForce) { - long flushStartTime = System.currentTimeMillis(); - - Set tablesToFlush = - this.channelCache.keySet().stream() - .filter( - key -> - isForce - || (this.channelCache.getLastFlushTime(key).isPresent() - && flushStartTime - this.channelCache.getLastFlushTime(key).get() - >= this.owningClient - .getParameterProvider() - .getCachedMaxClientLagInMs()) - || this.channelCache.getNeedFlush(key)) - .collect(Collectors.toSet()); - - // Flush every table together when it's interleaving chunk is allowed - if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() != 1 - && !tablesToFlush.isEmpty()) { - tablesToFlush.addAll(this.channelCache.keySet()); + final long flushStartTime = System.currentTimeMillis(); + final long flushingInterval = + this.owningClient.getParameterProvider().getCachedMaxClientLagInMs(); + + final Set tablesToFlush; + if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + tablesToFlush = + this.channelCache.keySet().stream() + .filter( + key -> + isForce + || flushStartTime - this.channelCache.getLastFlushTime(key) + >= flushingInterval + || this.channelCache.getNeedFlush(key)) + .collect(Collectors.toSet()); + } else { + if (isForce + || (!DISABLE_BACKGROUND_FLUSH + && !isTestMode() + && (this.isNeedFlush || flushStartTime - this.lastFlushTime >= flushingInterval))) { + tablesToFlush = this.channelCache.keySet(); + } else { + tablesToFlush = null; + } } - if (isForce || (!DISABLE_BACKGROUND_FLUSH && !isTestMode() && !tablesToFlush.isEmpty())) { + if (isForce + || (!DISABLE_BACKGROUND_FLUSH + && !isTestMode() + && tablesToFlush != null + && !tablesToFlush.isEmpty())) { return this.statsFuture() .thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, flushStartTime)) .thenCompose((v) -> this.registerFuture()); @@ -681,7 +713,10 @@ void shutdown() throws InterruptedException { * @param fullyQualifiedTableName the fully qualified table name */ void setNeedFlush(String fullyQualifiedTableName) { - this.channelCache.setNeedFlush(fullyQualifiedTableName, true); + this.isNeedFlush = true; + if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + this.channelCache.setNeedFlush(fullyQualifiedTableName, true); + } } /** diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 906a52625..1ab6c94ac 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -38,7 +38,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -462,9 +461,9 @@ public void testFlush() throws Exception { flushService.setNeedFlush(getFullyQualifiedTableName(0)); flushService.flush(false).get(); Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any()); + Assert.assertFalse(flushService.isNeedFlush); Assert.assertNotEquals( - Optional.of(maxLastFlushTime), - channelCache.getLastFlushTime(getFullyQualifiedTableName(0))); + maxLastFlushTime, channelCache.getLastFlushTime(getFullyQualifiedTableName(0))); IntStream.range(0, numChannels) .forEach( i -> { @@ -476,12 +475,12 @@ public void testFlush() throws Exception { }); // lastFlushTime causes flush - channelCache.setLastFlushTime(getFullyQualifiedTableName(0), 0L); + flushService.lastFlushTime = 0L; flushService.flush(false).get(); Mockito.verify(flushService, Mockito.times(3)).distributeFlushTasks(Mockito.any()); + Assert.assertTrue(flushService.lastFlushTime > 0); Assert.assertNotEquals( - Optional.of(maxLastFlushTime), - channelCache.getLastFlushTime(getFullyQualifiedTableName(0))); + maxLastFlushTime, channelCache.getLastFlushTime(getFullyQualifiedTableName(0))); IntStream.range(0, numChannels) .forEach( i -> { @@ -522,11 +521,10 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); if (i % 2 == 0) { Assert.assertNotEquals( - Optional.of(maxLastFlushTime), - channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); + maxLastFlushTime, channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); } else { assertTimeDiffwithinThreshold( - Optional.of(maxLastFlushTime), + maxLastFlushTime, channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), 1000L); } @@ -547,10 +545,10 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); if (i % 2 == 0) { Assert.assertNotEquals( - Optional.of(0L), channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); + 0L, channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); } else { assertTimeDiffwithinThreshold( - Optional.of(maxLastFlushTime), + maxLastFlushTime, channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), 1000L); } @@ -1177,9 +1175,7 @@ private String getFullyQualifiedTableName(int tableId) { return String.format("db1.PUBLIC.table%d", tableId); } - private void assertTimeDiffwithinThreshold( - Optional time1, Optional time2, long threshold) { - Assert.assertTrue( - time1.isPresent() && time2.isPresent() && Math.abs(time1.get() - time2.get()) <= threshold); + private void assertTimeDiffwithinThreshold(Long time1, Long time2, long threshold) { + Assert.assertTrue(Math.abs(time1 - time2) <= threshold); } } From 5f57e88eae426b67465a1cda65f35b6451c0d611 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 15 Jul 2024 17:15:28 -0700 Subject: [PATCH 5/6] fix test --- .../snowflake/ingest/streaming/internal/StreamingIngestIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java index 2ecaa4e01..8d6c1d5c0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java @@ -275,7 +275,7 @@ public void testDropChannel() throws Exception { @Test public void testParameterOverrides() throws Exception { Map parameterMap = new HashMap<>(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3 sec"); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3 seconds"); parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 50L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 1); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); From cdc53858eafa5ca41d2ee373b5551ad1282a3431 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Fri, 26 Jul 2024 13:24:47 -0700 Subject: [PATCH 6/6] fix null assertions. --- .../streaming/internal/ChannelCache.java | 37 +++++++++++-------- .../streaming/internal/FlushService.java | 4 +- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 75e56a01c..90c0f2ac9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -28,8 +28,8 @@ class ChannelCache { /** Flush information for each table including last flush time and if flush is needed */ static class FlushInfo { - long lastFlushTime; - boolean needFlush; + final long lastFlushTime; + final boolean needFlush; FlushInfo(long lastFlushTime, boolean needFlush) { this.lastFlushTime = lastFlushTime; @@ -53,8 +53,7 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { // Update the last flush time for the table, add jitter to avoid all channels flush at the same // time when the blobs are not interleaved this.tableFlushInfo.putIfAbsent( - channel.getFullyQualifiedTableName(), - new FlushInfo(System.currentTimeMillis() + getRandomFlushJitter(), false)); + channel.getFullyQualifiedTableName(), new FlushInfo(System.currentTimeMillis(), false)); SnowflakeStreamingIngestChannelInternal oldChannel = channels.put(channel.getName(), channel); @@ -91,7 +90,14 @@ Long getLastFlushTime(String fullyQualifiedTableName) { void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) { this.tableFlushInfo.compute( fullyQualifiedTableName, - (k, v) -> new FlushInfo(lastFlushTime + getRandomFlushJitter(), v != null && v.needFlush)); + (k, v) -> { + if (v == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("Last flush time for table %s not found", fullyQualifiedTableName)); + } + return new FlushInfo(lastFlushTime, v.needFlush); + }); } /** @@ -100,7 +106,7 @@ void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) { * @param fullyQualifiedTableName fully qualified table name * @return need flush flag */ - Boolean getNeedFlush(String fullyQualifiedTableName) { + boolean getNeedFlush(String fullyQualifiedTableName) { FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName); if (tableFlushInfo == null) { throw new SFException( @@ -116,13 +122,17 @@ Boolean getNeedFlush(String fullyQualifiedTableName) { * @param fullyQualifiedTableName fully qualified table name * @param needFlush need flush flag */ - void setNeedFlush(String fullyQualifiedTableName, Boolean needFlush) { + void setNeedFlush(String fullyQualifiedTableName, boolean needFlush) { this.tableFlushInfo.compute( fullyQualifiedTableName, - (k, v) -> - new FlushInfo( - v != null ? v.lastFlushTime : System.currentTimeMillis() + getRandomFlushJitter(), - needFlush)); + (k, v) -> { + if (v == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format("Need flush flag for table %s not found", fullyQualifiedTableName)); + } + return new FlushInfo(v.lastFlushTime, needFlush); + }); } /** Returns an immutable set view of the mappings contained in the channel cache. */ @@ -184,9 +194,4 @@ void invalidateChannelIfSequencersMatch( int getSize() { return cache.size(); } - - /** Generate a random flush jitter to avoid all channels flush at the same time */ - private long getRandomFlushJitter() { - return (long) (Math.random() * 1000); - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 3afd23314..84e1a2561 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -178,11 +178,11 @@ private CompletableFuture distributeFlush( logFlushTask(isForce, tablesToFlush, flushStartTime); distributeFlushTasks(tablesToFlush); long prevFlushEndTime = System.currentTimeMillis(); + this.lastFlushTime = prevFlushEndTime; + this.isNeedFlush = false; tablesToFlush.forEach( table -> { - this.lastFlushTime = prevFlushEndTime; this.channelCache.setLastFlushTime(table, prevFlushEndTime); - this.isNeedFlush = false; this.channelCache.setNeedFlush(table, false); }); },