Skip to content

Commit

Permalink
Per table flush
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jul 3, 2024
1 parent 321f852 commit f3ef51c
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand All @@ -23,6 +24,12 @@ class ChannelCache<T> {
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>
cache = new ConcurrentHashMap<>();

// Last flush time for each table, the key is FullyQualifiedTableName.
private final ConcurrentHashMap<String, Long> lastFlushTime = new ConcurrentHashMap<>();

// Need flush flag for each table, the key is FullyQualifiedTableName.
private final ConcurrentHashMap<String, Boolean> needFlush = new ConcurrentHashMap<>();

/**
* Add a channel to the channel cache
*
Expand All @@ -33,6 +40,12 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
this.cache.computeIfAbsent(
channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap<>());

// Update the last flush time for the table, add jitter to avoid all channels flush at the same
// time when the blobs are not interleaved
this.lastFlushTime.putIfAbsent(
channel.getFullyQualifiedTableName(),
System.currentTimeMillis() + (long) (Math.random() * 1000));

SnowflakeStreamingIngestChannelInternal<T> oldChannel =
channels.put(channel.getName(), channel);
// Invalidate old channel if it exits to block new inserts and return error to users earlier
Expand All @@ -43,6 +56,46 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
}
}

/**
* Get the last flush time for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @return last flush time in milliseconds
*/
Long getLastFlushTime(String fullyQualifiedTableName) {
return this.lastFlushTime.get(fullyQualifiedTableName);
}

/**
* Set the last flush time for a table as the current time
*
* @param fullyQualifiedTableName fully qualified table name
* @param lastFlushTime last flush time in milliseconds
*/
void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) {
this.lastFlushTime.put(fullyQualifiedTableName, lastFlushTime);
}

/**
* Get need flush flag for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @return need flush flag
*/
Boolean getNeedFlush(String fullyQualifiedTableName) {
return this.needFlush.getOrDefault(fullyQualifiedTableName, false);
}

/**
* Set need flush flag for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @param needFlush need flush flag
*/
void setNeedFlush(String fullyQualifiedTableName, Boolean needFlush) {
this.needFlush.put(fullyQualifiedTableName, needFlush);
}

/**
* Returns an iterator over the (table, channels) in this map.
*
Expand All @@ -53,6 +106,20 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
return this.cache.entrySet().iterator();
}

/**
* Returns an iterator over the (table, channels) in this map, filtered by the given table name
* set
*
* @param tableNames the set of table names to filter
* @return
*/
Iterator<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
iterator(Set<String> tableNames) {
return this.cache.entrySet().stream()
.filter(entry -> tableNames.contains(entry.getKey()))
.iterator();
}

/** Close all channels in the channel cache */
void closeAllChannels() {
this.cache
Expand Down Expand Up @@ -101,4 +168,10 @@ void invalidateChannelIfSequencersMatch(
int getSize() {
return cache.size();
}

public Set<
Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
entrySet() {
return cache.entrySet();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
Expand Down Expand Up @@ -108,9 +110,6 @@ List<List<ChannelData<T>>> getData() {
// Reference to register service
private final RegisterService<T> registerService;

// Indicates whether we need to schedule a flush
@VisibleForTesting volatile boolean isNeedFlush;

// Latest flush time
@VisibleForTesting volatile long lastFlushTime;

Expand Down Expand Up @@ -141,7 +140,6 @@ List<List<ChannelData<T>>> getData() {
this.targetStage = targetStage;
this.counter = new AtomicLong(0);
this.registerService = new RegisterService<>(client, isTestMode);
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
this.isTestMode = isTestMode;
this.latencyTimerContextMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -175,7 +173,6 @@ List<List<ChannelData<T>>> getData() {

this.registerService = new RegisterService<>(client, isTestMode);
this.counter = new AtomicLong(0);
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
this.isTestMode = isTestMode;
this.latencyTimerContextMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -204,36 +201,43 @@ private CompletableFuture<Void> statsFuture() {

/**
* @param isForce if true will flush regardless of other conditions
* @param timeDiffMillis Time in milliseconds since the last flush
* @param tablesToFlush list of tables to flush
* @param timeDiffMillis time difference in milliseconds
* @return
*/
private CompletableFuture<Void> distributeFlush(boolean isForce, long timeDiffMillis) {
private CompletableFuture<Void> distributeFlush(
boolean isForce, Set<String> tablesToFlush, Long timeDiffMillis) {
return CompletableFuture.runAsync(
() -> {
logFlushTask(isForce, timeDiffMillis);
distributeFlushTasks();
this.isNeedFlush = false;
logFlushTask(isForce, tablesToFlush, timeDiffMillis);
distributeFlushTasks(tablesToFlush);
this.lastFlushTime = System.currentTimeMillis();
return;
tablesToFlush.forEach(
table -> {
this.channelCache.setLastFlushTime(table, this.lastFlushTime);
this.channelCache.setNeedFlush(table, false);
});
},
this.flushWorker);
}

/** If tracing is enabled, print always else, check if it needs flush or is forceful. */
private void logFlushTask(boolean isForce, long timeDiffMillis) {
private void logFlushTask(boolean isForce, Set<String> tablesToFlush, long timeDiffMillis) {
boolean isNeedFlush = tablesToFlush.stream().anyMatch(channelCache::getNeedFlush);

final String flushTaskLogFormat =
String.format(
"Submit forced or ad-hoc flush task on client=%s, isForce=%s,"
+ " isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s",
this.owningClient.getName(),
isForce,
this.isNeedFlush,
isNeedFlush,
timeDiffMillis,
System.currentTimeMillis() - this.lastFlushTime);
if (logger.isTraceEnabled()) {
logger.logTrace(flushTaskLogFormat);
}
if (!logger.isTraceEnabled() && (this.isNeedFlush || isForce)) {
if (!logger.isTraceEnabled() && (isNeedFlush || isForce)) {
logger.logDebug(flushTaskLogFormat);
}
}
Expand All @@ -249,27 +253,57 @@ private CompletableFuture<Void> registerFuture() {
}

/**
* Kick off a flush job and distribute the tasks if one of the following conditions is met:
* <li>Flush is forced by the users
* <li>One or more buffers have reached the flush size
* <li>Periodical background flush when a time interval has reached
* Kick off a flush job and distribute the tasks. The flush service behaves differently based on
* the max chunks in blob:
*
* <ul>
* <li>The max chunks in blob is not 1 (interleaving is allowed), every channel will be flushed
* together if one of the following conditions is met:
* <ul>
* <li>Flush is forced by the users
* <li>One or more buffers have reached the flush size
* <li>Periodical background flush when a time interval has reached
* </ul>
* <li>The max chunks in blob is 1 (interleaving is not allowed), a channel will be flushed if
* one of the following conditions is met:
* <ul>
* <li>Flush is forced by the users
* <li>One or more buffers with the same target table as the channel have reached the
* flush size
* <li>Periodical background flush of the target table when a time interval has reached
* </ul>
* </ul>
*
* @param isForce
* @return Completable future that will return when the blobs are registered successfully, or null
* if none of the conditions is met above
*/
CompletableFuture<Void> flush(boolean isForce) {
long timeDiffMillis = System.currentTimeMillis() - this.lastFlushTime;
long currentTime = System.currentTimeMillis();
long timeDiffMillis = currentTime - this.lastFlushTime;

Set<String> tablesToFlush =
this.channelCache.entrySet().stream()
.filter(
entry ->
isForce
|| currentTime - this.channelCache.getLastFlushTime(entry.getKey())
>= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()
|| this.channelCache.getNeedFlush(entry.getKey()))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());

// Flush every table together when it's interleaving chunk is allowed
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() != 1
&& !tablesToFlush.isEmpty()) {
tablesToFlush.addAll(
this.channelCache.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toSet()));
}

if (isForce
|| (!DISABLE_BACKGROUND_FLUSH
&& !isTestMode()
&& (this.isNeedFlush
|| timeDiffMillis
>= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()))) {
if (isForce || (!DISABLE_BACKGROUND_FLUSH && !isTestMode() && !tablesToFlush.isEmpty())) {

return this.statsFuture()
.thenCompose((v) -> this.distributeFlush(isForce, timeDiffMillis))
.thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, timeDiffMillis))
.thenCompose((v) -> this.registerFuture());
}
return this.statsFuture();
Expand Down Expand Up @@ -352,12 +386,14 @@ private void createWorkers() {
/**
* Distribute the flush tasks by iterating through all the channels in the channel cache and kick
* off a build blob work when certain size has reached or we have reached the end
*
* @param tablesToFlush list of tables to flush
*/
void distributeFlushTasks() {
void distributeFlushTasks(Set<String> tablesToFlush) {
Iterator<
Map.Entry<
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
itr = this.channelCache.iterator();
itr = this.channelCache.iterator(tablesToFlush);
List<Pair<BlobData<T>, CompletableFuture<BlobMetadata>>> blobs = new ArrayList<>();
List<ChannelData<T>> leftoverChannelsDataPerTable = new ArrayList<>();

Expand Down Expand Up @@ -389,11 +425,11 @@ void distributeFlushTasks() {
blobPath);
break;
} else {
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> table =
itr.next().getValue();
Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>
tableEntry = itr.next();
// Use parallel stream since getData could be the performance bottleneck when we have a
// high number of channels
table.values().parallelStream()
tableEntry.getValue().values().parallelStream()
.forEach(
channel -> {
if (channel.isValid()) {
Expand Down Expand Up @@ -630,9 +666,13 @@ void shutdown() throws InterruptedException {
}
}

/** Set the flag to indicate that a flush is needed */
void setNeedFlush() {
this.isNeedFlush = true;
/**
* Set the flag to indicate that a flush is needed
*
* @param fullyQualifiedTableName the fully qualified table name
*/
void setNeedFlush(String fullyQualifiedTableName) {
this.channelCache.setNeedFlush(fullyQualifiedTableName, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -413,7 +413,7 @@ public InsertValidationResponse insertRows(
// if a large number of rows are inserted
if (this.rowBuffer.getSize()
>= this.owningClient.getParameterProvider().getMaxChannelSizeInBytes()) {
this.owningClient.setNeedFlush();
this.owningClient.setNeedFlush(this.channelFlushContext.getFullyQualifiedTableName());
}

return response;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -820,8 +820,8 @@ CompletableFuture<Void> flush(boolean closing) {
}

/** Set the flag to indicate that a flush is needed */
void setNeedFlush() {
this.flushService.setNeedFlush();
void setNeedFlush(String fullyQualifiedTableName) {
this.flushService.setNeedFlush(fullyQualifiedTableName);
}

/** Remove the channel in the channel cache if the channel sequencer matches */
Expand Down
Loading

0 comments on commit f3ef51c

Please sign in to comment.