Skip to content

Commit

Permalink
Modify log
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jul 9, 2024
1 parent be25b1e commit 27e1bcb
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

package net.snowflake.ingest.streaming.internal;

import java.util.Iterator;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -43,8 +44,7 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
// Update the last flush time for the table, add jitter to avoid all channels flush at the same
// time when the blobs are not interleaved
this.lastFlushTime.putIfAbsent(
channel.getFullyQualifiedTableName(),
System.currentTimeMillis() + (long) (Math.random() * 1000));
channel.getFullyQualifiedTableName(), System.currentTimeMillis() + getRandomFlushJitter());

SnowflakeStreamingIngestChannelInternal<T> oldChannel =
channels.put(channel.getName(), channel);
Expand All @@ -62,8 +62,8 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
* @param fullyQualifiedTableName fully qualified table name
* @return last flush time in milliseconds
*/
Long getLastFlushTime(String fullyQualifiedTableName) {
return this.lastFlushTime.get(fullyQualifiedTableName);
Optional<Long> getLastFlushTime(String fullyQualifiedTableName) {
return Optional.ofNullable(this.lastFlushTime.get(fullyQualifiedTableName));
}

/**
Expand All @@ -73,7 +73,7 @@ Long getLastFlushTime(String fullyQualifiedTableName) {
* @param lastFlushTime last flush time in milliseconds
*/
void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) {
this.lastFlushTime.put(fullyQualifiedTableName, lastFlushTime);
this.lastFlushTime.put(fullyQualifiedTableName, lastFlushTime + getRandomFlushJitter());
}

/**
Expand All @@ -96,28 +96,15 @@ void setNeedFlush(String fullyQualifiedTableName, Boolean needFlush) {
this.needFlush.put(fullyQualifiedTableName, needFlush);
}

/**
* Returns an iterator over the (table, channels) in this map.
*
* @return
*/
Iterator<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
iterator() {
return this.cache.entrySet().iterator();
/** Returns an immutable set view of the mappings contained in the channel cache. */
Set<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
entrySet() {
return Collections.unmodifiableSet(cache.entrySet());
}

/**
* Returns an iterator over the (table, channels) in this map, filtered by the given table name
* set
*
* @param tableNames the set of table names to filter
* @return
*/
Iterator<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
iterator(Set<String> tableNames) {
return this.cache.entrySet().stream()
.filter(entry -> tableNames.contains(entry.getKey()))
.iterator();
/** Returns an immutable set view of the keys contained in the channel cache. */
Set<String> keySet() {
return Collections.unmodifiableSet(cache.keySet());
}

/** Close all channels in the channel cache */
Expand Down Expand Up @@ -169,9 +156,7 @@ int getSize() {
return cache.size();
}

public Set<
Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
entrySet() {
return cache.entrySet();
private long getRandomFlushJitter() {
return (long) (Math.random() * 1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,38 +202,50 @@ private CompletableFuture<Void> statsFuture() {
/**
* @param isForce if true will flush regardless of other conditions
* @param tablesToFlush list of tables to flush
* @param timeDiffMillis time difference in milliseconds
* @param flushStartTime the time when the flush started
* @return
*/
private CompletableFuture<Void> distributeFlush(
boolean isForce, Set<String> tablesToFlush, Long timeDiffMillis) {
boolean isForce, Set<String> tablesToFlush, Long flushStartTime) {
return CompletableFuture.runAsync(
() -> {
logFlushTask(isForce, tablesToFlush, timeDiffMillis);
logFlushTask(isForce, tablesToFlush, flushStartTime);
distributeFlushTasks(tablesToFlush);
this.lastFlushTime = System.currentTimeMillis();
long flushEndTime = System.currentTimeMillis();
tablesToFlush.forEach(
table -> {
this.channelCache.setLastFlushTime(table, this.lastFlushTime);
this.channelCache.setLastFlushTime(table, flushEndTime);
this.channelCache.setNeedFlush(table, false);
});
},
this.flushWorker);
}

/** If tracing is enabled, print always else, check if it needs flush or is forceful. */
private void logFlushTask(boolean isForce, Set<String> tablesToFlush, long timeDiffMillis) {
private void logFlushTask(boolean isForce, Set<String> tablesToFlush, long flushStartTime) {
boolean isNeedFlush = tablesToFlush.stream().anyMatch(channelCache::getNeedFlush);
long currentTime = System.currentTimeMillis();

final String tablesToFlushLogFormat =
tablesToFlush.stream()
.map(
table ->
String.format(
"(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)",
table,
channelCache.getNeedFlush(table),
channelCache.getLastFlushTime(table).isPresent()
? flushStartTime - channelCache.getLastFlushTime(table).get()
: "N/A",
channelCache.getLastFlushTime(table).isPresent()
? currentTime - channelCache.getLastFlushTime(table).get()
: "N/A"))
.collect(Collectors.joining(", "));

final String flushTaskLogFormat =
String.format(
"Submit forced or ad-hoc flush task on client=%s, isForce=%s,"
+ " isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s",
this.owningClient.getName(),
isForce,
isNeedFlush,
timeDiffMillis,
System.currentTimeMillis() - this.lastFlushTime);
"Submit forced or ad-hoc flush task on client=%s, isForce=%s," + " Tables=[%s]",
this.owningClient.getName(), isForce, tablesToFlushLogFormat);
if (logger.isTraceEnabled()) {
logger.logTrace(flushTaskLogFormat);
}
Expand Down Expand Up @@ -279,31 +291,30 @@ private CompletableFuture<Void> registerFuture() {
* if none of the conditions is met above
*/
CompletableFuture<Void> flush(boolean isForce) {
long currentTime = System.currentTimeMillis();
long timeDiffMillis = currentTime - this.lastFlushTime;
long flushStartTime = System.currentTimeMillis();

Set<String> tablesToFlush =
this.channelCache.entrySet().stream()
this.channelCache.keySet().stream()
.filter(
entry ->
key ->
isForce
|| currentTime - this.channelCache.getLastFlushTime(entry.getKey())
>= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()
|| this.channelCache.getNeedFlush(entry.getKey()))
.map(Map.Entry::getKey)
|| (this.channelCache.getLastFlushTime(key).isPresent()
&& flushStartTime - this.channelCache.getLastFlushTime(key).get()
>= this.owningClient
.getParameterProvider()
.getCachedMaxClientLagInMs())
|| this.channelCache.getNeedFlush(key))
.collect(Collectors.toSet());

// Flush every table together when it's interleaving chunk is allowed
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() != 1
&& !tablesToFlush.isEmpty()) {
tablesToFlush.addAll(
this.channelCache.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toSet()));
tablesToFlush.addAll(this.channelCache.keySet());
}

if (isForce || (!DISABLE_BACKGROUND_FLUSH && !isTestMode() && !tablesToFlush.isEmpty())) {

return this.statsFuture()
.thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, timeDiffMillis))
.thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, flushStartTime))
.thenCompose((v) -> this.registerFuture());
}
return this.statsFuture();
Expand Down Expand Up @@ -393,7 +404,10 @@ void distributeFlushTasks(Set<String> tablesToFlush) {
Iterator<
Map.Entry<
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
itr = this.channelCache.iterator(tablesToFlush);
itr =
this.channelCache.entrySet().stream()
.filter(e -> tablesToFlush.contains(e.getKey()))
.iterator();
List<Pair<BlobData<T>, CompletableFuture<BlobMetadata>>> blobs = new ArrayList<>();
List<ChannelData<T>> leftoverChannelsDataPerTable = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static java.time.ZoneOffset.UTC;
Expand Down Expand Up @@ -95,7 +99,7 @@ public void testAddChannel() {
UTC);
cache.addChannel(channel);
Assert.assertEquals(1, cache.getSize());
Assert.assertTrue(channel == cache.iterator().next().getValue().get(channelName));
Assert.assertTrue(channel == cache.entrySet().iterator().next().getValue().get(channelName));

SnowflakeStreamingIngestChannelInternal<StubChunkData> channelDup =
new SnowflakeStreamingIngestChannelInternal<>(
Expand All @@ -117,7 +121,7 @@ public void testAddChannel() {
Assert.assertTrue(channelDup.isValid());
Assert.assertEquals(1, cache.getSize());
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<StubChunkData>> channels =
cache.iterator().next().getValue();
cache.entrySet().iterator().next().getValue();
Assert.assertEquals(1, channels.size());
Assert.assertTrue(channelDup == channels.get(channelName));
Assert.assertFalse(channel == channelDup);
Expand All @@ -130,7 +134,7 @@ public void testIterator() {
Map.Entry<
String,
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<StubChunkData>>>>
iter = cache.iterator();
iter = cache.entrySet().iterator();
Map.Entry<
String,
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<StubChunkData>>>
Expand Down Expand Up @@ -160,7 +164,7 @@ public void testCloseAllChannels() {
Map.Entry<
String,
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<StubChunkData>>>>
iter = cache.iterator();
iter = cache.entrySet().iterator();
while (iter.hasNext()) {
for (SnowflakeStreamingIngestChannelInternal<?> channel : iter.next().getValue().values()) {
Assert.assertTrue(channel.isClosed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -435,6 +436,7 @@ public void testGetFilePath() {
@Test
public void testFlush() throws Exception {
int numChannels = 4;
Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow
TestContext<List<List<Object>>> testContext = testContextFactory.create();
addChannel1(testContext);
FlushService<?> flushService = testContext.flushService;
Expand All @@ -453,43 +455,48 @@ public void testFlush() throws Exception {
.forEach(
i -> {
addChannel(testContext, i, 1L);
channelCache.setLastFlushTime(getFullyQualifiedTableName(i), Long.MAX_VALUE);
channelCache.setLastFlushTime(getFullyQualifiedTableName(i), maxLastFlushTime);
});

// isNeedFlush = true flushes
flushService.setNeedFlush(getFullyQualifiedTableName(0));
flushService.flush(false).get();
Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any());
Assert.assertNotEquals(
Long.MAX_VALUE, channelCache.getLastFlushTime(getFullyQualifiedTableName(0)).longValue());
Optional.of(maxLastFlushTime),
channelCache.getLastFlushTime(getFullyQualifiedTableName(0)));
IntStream.range(0, numChannels)
.forEach(
i -> {
Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i)));
Assert.assertEquals(
assertTimeDiffwithinThreshold(
channelCache.getLastFlushTime(getFullyQualifiedTableName(0)),
channelCache.getLastFlushTime(getFullyQualifiedTableName(i)));
channelCache.getLastFlushTime(getFullyQualifiedTableName(i)),
1000L);
});

// lastFlushTime causes flush
channelCache.setLastFlushTime(getFullyQualifiedTableName(0), 0L);
flushService.flush(false).get();
Mockito.verify(flushService, Mockito.times(3)).distributeFlushTasks(Mockito.any());
Assert.assertNotEquals(
Long.MAX_VALUE, channelCache.getLastFlushTime(getFullyQualifiedTableName(0)).longValue());
Optional.of(maxLastFlushTime),
channelCache.getLastFlushTime(getFullyQualifiedTableName(0)));
IntStream.range(0, numChannels)
.forEach(
i -> {
Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i)));
Assert.assertEquals(
assertTimeDiffwithinThreshold(
channelCache.getLastFlushTime(getFullyQualifiedTableName(0)),
channelCache.getLastFlushTime(getFullyQualifiedTableName(i)));
channelCache.getLastFlushTime(getFullyQualifiedTableName(i)),
1000L);
});
}

@Test
public void testNonInterleaveFlush() throws ExecutionException, InterruptedException {
int numChannels = 4;
Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow
TestContext<List<List<Object>>> testContext = testContextFactory.create();
FlushService<?> flushService = testContext.flushService;
ChannelCache<?> channelCache = testContext.channelCache;
Expand All @@ -502,7 +509,7 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep
.forEach(
i -> {
addChannel(testContext, i, 1L);
channelCache.setLastFlushTime(getFullyQualifiedTableName(i), Long.MAX_VALUE);
channelCache.setLastFlushTime(getFullyQualifiedTableName(i), maxLastFlushTime);
if (i % 2 == 0) {
flushService.setNeedFlush(getFullyQualifiedTableName(i));
}
Expand All @@ -515,12 +522,13 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep
Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i)));
if (i % 2 == 0) {
Assert.assertNotEquals(
Long.MAX_VALUE,
channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue());
Optional.of(maxLastFlushTime),
channelCache.getLastFlushTime(getFullyQualifiedTableName(i)));
} else {
Assert.assertEquals(
Long.MAX_VALUE,
channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue());
assertTimeDiffwithinThreshold(
Optional.of(maxLastFlushTime),
channelCache.getLastFlushTime(getFullyQualifiedTableName(i)),
1000L);
}
});

Expand All @@ -529,7 +537,7 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep
.forEach(
i -> {
channelCache.setLastFlushTime(
getFullyQualifiedTableName(i), i % 2 == 0 ? 0L : Long.MAX_VALUE);
getFullyQualifiedTableName(i), i % 2 == 0 ? 0L : maxLastFlushTime);
});
flushService.flush(false).get();
Mockito.verify(flushService, Mockito.times(2)).distributeFlushTasks(Mockito.any());
Expand All @@ -539,11 +547,12 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep
Assert.assertFalse(channelCache.getNeedFlush(getFullyQualifiedTableName(i)));
if (i % 2 == 0) {
Assert.assertNotEquals(
0L, channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue());
Optional.of(0L), channelCache.getLastFlushTime(getFullyQualifiedTableName(i)));
} else {
Assert.assertEquals(
Long.MAX_VALUE,
channelCache.getLastFlushTime(getFullyQualifiedTableName(i)).longValue());
assertTimeDiffwithinThreshold(
Optional.of(maxLastFlushTime),
channelCache.getLastFlushTime(getFullyQualifiedTableName(i)),
1000L);
}
});
}
Expand Down Expand Up @@ -1167,4 +1176,10 @@ private Timer setupTimer(long expectedLatencyMs) {
private String getFullyQualifiedTableName(int tableId) {
return String.format("db1.PUBLIC.table%d", tableId);
}

private void assertTimeDiffwithinThreshold(
Optional<Long> time1, Optional<Long> time2, long threshold) {
Assert.assertTrue(
time1.isPresent() && time2.isPresent() && Math.abs(time1.get() - time2.get()) <= threshold);
}
}

0 comments on commit 27e1bcb

Please sign in to comment.