From 27e1bcb8f124762bade650d579b7c07896dc4e63 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 9 Jul 2024 15:55:50 -0700 Subject: [PATCH] Modify log --- .../streaming/internal/ChannelCache.java | 45 +++++-------- .../streaming/internal/FlushService.java | 66 +++++++++++-------- .../streaming/internal/ChannelCacheTest.java | 12 ++-- .../streaming/internal/FlushServiceTest.java | 51 +++++++++----- 4 files changed, 96 insertions(+), 78 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 782c5e7cf..14bfd43c8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -4,8 +4,9 @@ package net.snowflake.ingest.streaming.internal; -import java.util.Iterator; +import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -43,8 +44,7 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { // Update the last flush time for the table, add jitter to avoid all channels flush at the same // time when the blobs are not interleaved this.lastFlushTime.putIfAbsent( - channel.getFullyQualifiedTableName(), - System.currentTimeMillis() + (long) (Math.random() * 1000)); + channel.getFullyQualifiedTableName(), System.currentTimeMillis() + getRandomFlushJitter()); SnowflakeStreamingIngestChannelInternal oldChannel = channels.put(channel.getName(), channel); @@ -62,8 +62,8 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { * @param fullyQualifiedTableName fully qualified table name * @return last flush time in milliseconds */ - Long getLastFlushTime(String fullyQualifiedTableName) { - return this.lastFlushTime.get(fullyQualifiedTableName); + Optional getLastFlushTime(String fullyQualifiedTableName) { + return Optional.ofNullable(this.lastFlushTime.get(fullyQualifiedTableName)); } /** @@ -73,7 +73,7 @@ Long getLastFlushTime(String fullyQualifiedTableName) { * @param lastFlushTime last flush time in milliseconds */ void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) { - this.lastFlushTime.put(fullyQualifiedTableName, lastFlushTime); + this.lastFlushTime.put(fullyQualifiedTableName, lastFlushTime + getRandomFlushJitter()); } /** @@ -96,28 +96,15 @@ void setNeedFlush(String fullyQualifiedTableName, Boolean needFlush) { this.needFlush.put(fullyQualifiedTableName, needFlush); } - /** - * Returns an iterator over the (table, channels) in this map. - * - * @return - */ - Iterator>>> - iterator() { - return this.cache.entrySet().iterator(); + /** Returns an immutable set view of the mappings contained in the channel cache. */ + Set>>> + entrySet() { + return Collections.unmodifiableSet(cache.entrySet()); } - /** - * Returns an iterator over the (table, channels) in this map, filtered by the given table name - * set - * - * @param tableNames the set of table names to filter - * @return - */ - Iterator>>> - iterator(Set tableNames) { - return this.cache.entrySet().stream() - .filter(entry -> tableNames.contains(entry.getKey())) - .iterator(); + /** Returns an immutable set view of the keys contained in the channel cache. */ + Set keySet() { + return Collections.unmodifiableSet(cache.keySet()); } /** Close all channels in the channel cache */ @@ -169,9 +156,7 @@ int getSize() { return cache.size(); } - public Set< - Map.Entry>>> - entrySet() { - return cache.entrySet(); + private long getRandomFlushJitter() { + return (long) (Math.random() * 1000); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 894f513d0..0128e0a76 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -202,19 +202,19 @@ private CompletableFuture statsFuture() { /** * @param isForce if true will flush regardless of other conditions * @param tablesToFlush list of tables to flush - * @param timeDiffMillis time difference in milliseconds + * @param flushStartTime the time when the flush started * @return */ private CompletableFuture distributeFlush( - boolean isForce, Set tablesToFlush, Long timeDiffMillis) { + boolean isForce, Set tablesToFlush, Long flushStartTime) { return CompletableFuture.runAsync( () -> { - logFlushTask(isForce, tablesToFlush, timeDiffMillis); + logFlushTask(isForce, tablesToFlush, flushStartTime); distributeFlushTasks(tablesToFlush); - this.lastFlushTime = System.currentTimeMillis(); + long flushEndTime = System.currentTimeMillis(); tablesToFlush.forEach( table -> { - this.channelCache.setLastFlushTime(table, this.lastFlushTime); + this.channelCache.setLastFlushTime(table, flushEndTime); this.channelCache.setNeedFlush(table, false); }); }, @@ -222,18 +222,30 @@ private CompletableFuture distributeFlush( } /** If tracing is enabled, print always else, check if it needs flush or is forceful. */ - private void logFlushTask(boolean isForce, Set tablesToFlush, long timeDiffMillis) { + private void logFlushTask(boolean isForce, Set tablesToFlush, long flushStartTime) { boolean isNeedFlush = tablesToFlush.stream().anyMatch(channelCache::getNeedFlush); + long currentTime = System.currentTimeMillis(); + + final String tablesToFlushLogFormat = + tablesToFlush.stream() + .map( + table -> + String.format( + "(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)", + table, + channelCache.getNeedFlush(table), + channelCache.getLastFlushTime(table).isPresent() + ? flushStartTime - channelCache.getLastFlushTime(table).get() + : "N/A", + channelCache.getLastFlushTime(table).isPresent() + ? currentTime - channelCache.getLastFlushTime(table).get() + : "N/A")) + .collect(Collectors.joining(", ")); final String flushTaskLogFormat = String.format( - "Submit forced or ad-hoc flush task on client=%s, isForce=%s," - + " isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s", - this.owningClient.getName(), - isForce, - isNeedFlush, - timeDiffMillis, - System.currentTimeMillis() - this.lastFlushTime); + "Submit forced or ad-hoc flush task on client=%s, isForce=%s," + " Tables=[%s]", + this.owningClient.getName(), isForce, tablesToFlushLogFormat); if (logger.isTraceEnabled()) { logger.logTrace(flushTaskLogFormat); } @@ -279,31 +291,30 @@ private CompletableFuture registerFuture() { * if none of the conditions is met above */ CompletableFuture flush(boolean isForce) { - long currentTime = System.currentTimeMillis(); - long timeDiffMillis = currentTime - this.lastFlushTime; + long flushStartTime = System.currentTimeMillis(); Set tablesToFlush = - this.channelCache.entrySet().stream() + this.channelCache.keySet().stream() .filter( - entry -> + key -> isForce - || currentTime - this.channelCache.getLastFlushTime(entry.getKey()) - >= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs() - || this.channelCache.getNeedFlush(entry.getKey())) - .map(Map.Entry::getKey) + || (this.channelCache.getLastFlushTime(key).isPresent() + && flushStartTime - this.channelCache.getLastFlushTime(key).get() + >= this.owningClient + .getParameterProvider() + .getCachedMaxClientLagInMs()) + || this.channelCache.getNeedFlush(key)) .collect(Collectors.toSet()); // Flush every table together when it's interleaving chunk is allowed if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() != 1 && !tablesToFlush.isEmpty()) { - tablesToFlush.addAll( - this.channelCache.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toSet())); + tablesToFlush.addAll(this.channelCache.keySet()); } if (isForce || (!DISABLE_BACKGROUND_FLUSH && !isTestMode() && !tablesToFlush.isEmpty())) { - return this.statsFuture() - .thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, timeDiffMillis)) + .thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, flushStartTime)) .thenCompose((v) -> this.registerFuture()); } return this.statsFuture(); @@ -393,7 +404,10 @@ void distributeFlushTasks(Set tablesToFlush) { Iterator< Map.Entry< String, ConcurrentHashMap>>> - itr = this.channelCache.iterator(tablesToFlush); + itr = + this.channelCache.entrySet().stream() + .filter(e -> tablesToFlush.contains(e.getKey())) + .iterator(); List, CompletableFuture>> blobs = new ArrayList<>(); List> leftoverChannelsDataPerTable = new ArrayList<>(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 947908ef9..db1d737ba 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -95,7 +99,7 @@ public void testAddChannel() { UTC); cache.addChannel(channel); Assert.assertEquals(1, cache.getSize()); - Assert.assertTrue(channel == cache.iterator().next().getValue().get(channelName)); + Assert.assertTrue(channel == cache.entrySet().iterator().next().getValue().get(channelName)); SnowflakeStreamingIngestChannelInternal channelDup = new SnowflakeStreamingIngestChannelInternal<>( @@ -117,7 +121,7 @@ public void testAddChannel() { Assert.assertTrue(channelDup.isValid()); Assert.assertEquals(1, cache.getSize()); ConcurrentHashMap> channels = - cache.iterator().next().getValue(); + cache.entrySet().iterator().next().getValue(); Assert.assertEquals(1, channels.size()); Assert.assertTrue(channelDup == channels.get(channelName)); Assert.assertFalse(channel == channelDup); @@ -130,7 +134,7 @@ public void testIterator() { Map.Entry< String, ConcurrentHashMap>>> - iter = cache.iterator(); + iter = cache.entrySet().iterator(); Map.Entry< String, ConcurrentHashMap>> @@ -160,7 +164,7 @@ public void testCloseAllChannels() { Map.Entry< String, ConcurrentHashMap>>> - iter = cache.iterator(); + iter = cache.entrySet().iterator(); while (iter.hasNext()) { for (SnowflakeStreamingIngestChannelInternal channel : iter.next().getValue().values()) { Assert.assertTrue(channel.isClosed()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index eaefb8cb4..906a52625 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -38,6 +38,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -435,6 +436,7 @@ public void testGetFilePath() { @Test public void testFlush() throws Exception { int numChannels = 4; + Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow TestContext>> testContext = testContextFactory.create(); addChannel1(testContext); FlushService flushService = testContext.flushService; @@ -453,7 +455,7 @@ public void testFlush() throws Exception { .forEach( i -> { addChannel(testContext, i, 1L); - channelCache.setLastFlushTime(getFullyQualifiedTableName(i), Long.MAX_VALUE); + channelCache.setLastFlushTime(getFullyQualifiedTableName(i), maxLastFlushTime); }); // isNeedFlush = true flushes @@ -461,14 +463,16 @@ public void testFlush() throws Exception { flushService.flush(false).get(); Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any()); Assert.assertNotEquals( - Long.MAX_VALUE, channelCache.getLastFlushTime(getFullyQualifiedTableName(0)).longValue()); + Optional.of(maxLastFlushTime), + channelCache.getLastFlushTime(getFullyQualifiedTableName(0))); IntStream.range(0, numChannels) .forEach( i -> { Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); - Assert.assertEquals( + assertTimeDiffwithinThreshold( channelCache.getLastFlushTime(getFullyQualifiedTableName(0)), - channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); }); // lastFlushTime causes flush @@ -476,20 +480,23 @@ public void testFlush() throws Exception { flushService.flush(false).get(); Mockito.verify(flushService, Mockito.times(3)).distributeFlushTasks(Mockito.any()); Assert.assertNotEquals( - Long.MAX_VALUE, channelCache.getLastFlushTime(getFullyQualifiedTableName(0)).longValue()); + Optional.of(maxLastFlushTime), + channelCache.getLastFlushTime(getFullyQualifiedTableName(0))); IntStream.range(0, numChannels) .forEach( i -> { Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); - Assert.assertEquals( + assertTimeDiffwithinThreshold( channelCache.getLastFlushTime(getFullyQualifiedTableName(0)), - channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); }); } @Test public void testNonInterleaveFlush() throws ExecutionException, InterruptedException { int numChannels = 4; + Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow TestContext>> testContext = testContextFactory.create(); FlushService flushService = testContext.flushService; ChannelCache channelCache = testContext.channelCache; @@ -502,7 +509,7 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep .forEach( i -> { addChannel(testContext, i, 1L); - channelCache.setLastFlushTime(getFullyQualifiedTableName(i), Long.MAX_VALUE); + channelCache.setLastFlushTime(getFullyQualifiedTableName(i), maxLastFlushTime); if (i % 2 == 0) { flushService.setNeedFlush(getFullyQualifiedTableName(i)); } @@ -515,12 +522,13 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); if (i % 2 == 0) { Assert.assertNotEquals( - Long.MAX_VALUE, - channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + Optional.of(maxLastFlushTime), + channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); } else { - Assert.assertEquals( - Long.MAX_VALUE, - channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + assertTimeDiffwithinThreshold( + Optional.of(maxLastFlushTime), + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); } }); @@ -529,7 +537,7 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep .forEach( i -> { channelCache.setLastFlushTime( - getFullyQualifiedTableName(i), i % 2 == 0 ? 0L : Long.MAX_VALUE); + getFullyQualifiedTableName(i), i % 2 == 0 ? 0L : maxLastFlushTime); }); flushService.flush(false).get(); Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any()); @@ -539,11 +547,12 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i))); if (i % 2 == 0) { Assert.assertNotEquals( - 0L, channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + Optional.of(0L), channelCache.getLastFlushTime(getFullyQualifiedTableName(i))); } else { - Assert.assertEquals( - Long.MAX_VALUE, - channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue()); + assertTimeDiffwithinThreshold( + Optional.of(maxLastFlushTime), + channelCache.getLastFlushTime(getFullyQualifiedTableName(i)), + 1000L); } }); } @@ -1167,4 +1176,10 @@ private Timer setupTimer(long expectedLatencyMs) { private String getFullyQualifiedTableName(int tableId) { return String.format("db1.PUBLIC.table%d", tableId); } + + private void assertTimeDiffwithinThreshold( + Optional time1, Optional time2, long threshold) { + Assert.assertTrue( + time1.isPresent() && time2.isPresent() && Math.abs(time1.get() - time2.get()) <= threshold); + } }