Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1512047 Introduce independent per-table flushes when interleaving is disabled #788

Merged
merged 7 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import java.util.Iterator;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;

/**
* In-memory cache that stores the active channels for a given Streaming Ingest client, and the
Expand All @@ -23,6 +26,20 @@ class ChannelCache<T> {
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>
cache = new ConcurrentHashMap<>();

/** Flush information for each table including last flush time and if flush is needed */
static class FlushInfo {
final long lastFlushTime;
final boolean needFlush;

FlushInfo(long lastFlushTime, boolean needFlush) {
this.lastFlushTime = lastFlushTime;
this.needFlush = needFlush;
}
}

/** Flush information for each table, only used when max chunks in blob is 1 */
private final ConcurrentHashMap<String, FlushInfo> tableFlushInfo = new ConcurrentHashMap<>();

/**
* Add a channel to the channel cache
*
Expand All @@ -33,6 +50,11 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
this.cache.computeIfAbsent(
channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap<>());

// Update the last flush time for the table, add jitter to avoid all channels flush at the same
// time when the blobs are not interleaved
this.tableFlushInfo.putIfAbsent(
channel.getFullyQualifiedTableName(), new FlushInfo(System.currentTimeMillis(), false));

SnowflakeStreamingIngestChannelInternal<T> oldChannel =
channels.put(channel.getName(), channel);
// Invalidate old channel if it exits to block new inserts and return error to users earlier
Expand All @@ -44,13 +66,84 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
}

/**
* Returns an iterator over the (table, channels) in this map.
* Get the last flush time for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @return last flush time in milliseconds
*/
Long getLastFlushTime(String fullyQualifiedTableName) {
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName);
if (tableFlushInfo == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Last flush time for table %s not found", fullyQualifiedTableName));
}
return tableFlushInfo.lastFlushTime;
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Set the last flush time for a table as the current time
*
* @return
* @param fullyQualifiedTableName fully qualified table name
* @param lastFlushTime last flush time in milliseconds
*/
Iterator<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
iterator() {
return this.cache.entrySet().iterator();
void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) {
this.tableFlushInfo.compute(
fullyQualifiedTableName,
(k, v) -> {
if (v == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Last flush time for table %s not found", fullyQualifiedTableName));
}
return new FlushInfo(lastFlushTime, v.needFlush);
});
}

/**
* Get need flush flag for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @return need flush flag
*/
boolean getNeedFlush(String fullyQualifiedTableName) {
FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName);
if (tableFlushInfo == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Need flush flag for table %s not found", fullyQualifiedTableName));
}
return tableFlushInfo.needFlush;
}

/**
* Set need flush flag for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @param needFlush need flush flag
*/
void setNeedFlush(String fullyQualifiedTableName, boolean needFlush) {
this.tableFlushInfo.compute(
fullyQualifiedTableName,
(k, v) -> {
if (v == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Need flush flag for table %s not found", fullyQualifiedTableName));
}
return new FlushInfo(v.lastFlushTime, needFlush);
});
}

/** Returns an immutable set view of the mappings contained in the channel cache. */
Set<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
entrySet() {
return Collections.unmodifiableSet(cache.entrySet());
}

/** Returns an immutable set view of the keys contained in the channel cache. */
Set<String> keySet() {
return Collections.unmodifiableSet(cache.keySet());
}

/** Close all channels in the channel cache */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -31,6 +32,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
Expand Down Expand Up @@ -100,12 +102,15 @@ List<List<ChannelData<T>>> getData() {
// Reference to register service
private final RegisterService<T> registerService;

// Indicates whether we need to schedule a flush
@VisibleForTesting volatile boolean isNeedFlush;

// Latest flush time
/**
* Client level last flush time and need flush flag. This two variables are used when max chunk in
* blob is not 1. When max chunk in blob is 1, flush service ignores these variables and uses
* table level last flush time and need flush flag. See {@link ChannelCache.FlushInfo}.
*/
@VisibleForTesting volatile long lastFlushTime;

@VisibleForTesting volatile boolean isNeedFlush;

// Indicates whether it's running as part of the test
private final boolean isTestMode;

Expand Down Expand Up @@ -162,36 +167,65 @@ private CompletableFuture<Void> statsFuture() {

/**
* @param isForce if true will flush regardless of other conditions
* @param timeDiffMillis Time in milliseconds since the last flush
* @param tablesToFlush list of tables to flush
* @param flushStartTime the time when the flush started
* @return
*/
private CompletableFuture<Void> distributeFlush(boolean isForce, long timeDiffMillis) {
private CompletableFuture<Void> distributeFlush(
boolean isForce, Set<String> tablesToFlush, Long flushStartTime) {
return CompletableFuture.runAsync(
() -> {
logFlushTask(isForce, timeDiffMillis);
distributeFlushTasks();
logFlushTask(isForce, tablesToFlush, flushStartTime);
distributeFlushTasks(tablesToFlush);
long prevFlushEndTime = System.currentTimeMillis();
this.lastFlushTime = prevFlushEndTime;
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
return;
tablesToFlush.forEach(
table -> {
this.channelCache.setLastFlushTime(table, prevFlushEndTime);
this.channelCache.setNeedFlush(table, false);
});
},
this.flushWorker);
}

/** If tracing is enabled, print always else, check if it needs flush or is forceful. */
private void logFlushTask(boolean isForce, long timeDiffMillis) {
private void logFlushTask(boolean isForce, Set<String> tablesToFlush, long flushStartTime) {
boolean isNeedFlush =
this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1
? tablesToFlush.stream().anyMatch(channelCache::getNeedFlush)
: this.isNeedFlush;
long currentTime = System.currentTimeMillis();
final String logInfo;
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
logInfo =
String.format(
"Tables=[%s]",
tablesToFlush.stream()
.map(
table ->
String.format(
"(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)",
table,
channelCache.getNeedFlush(table),
flushStartTime - channelCache.getLastFlushTime(table),
currentTime - channelCache.getLastFlushTime(table)))
.collect(Collectors.joining(", ")));
} else {
logInfo =
String.format(
"isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s",
isNeedFlush, flushStartTime - this.lastFlushTime, currentTime - this.lastFlushTime);
}

final String flushTaskLogFormat =
String.format(
"Submit forced or ad-hoc flush task on client=%s, isForce=%s,"
+ " isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s",
this.owningClient.getName(),
isForce,
this.isNeedFlush,
timeDiffMillis,
System.currentTimeMillis() - this.lastFlushTime);
"Submit forced or ad-hoc flush task on client=%s, isForce=%s, %s",
this.owningClient.getName(), isForce, logInfo);
if (logger.isTraceEnabled()) {
logger.logTrace(flushTaskLogFormat);
}
if (!logger.isTraceEnabled() && (this.isNeedFlush || isForce)) {
if (!logger.isTraceEnabled() && (isNeedFlush || isForce)) {
logger.logDebug(flushTaskLogFormat);
}
}
Expand All @@ -207,27 +241,65 @@ private CompletableFuture<Void> registerFuture() {
}

/**
* Kick off a flush job and distribute the tasks if one of the following conditions is met:
* <li>Flush is forced by the users
* <li>One or more buffers have reached the flush size
* <li>Periodical background flush when a time interval has reached
* Kick off a flush job and distribute the tasks. The flush service behaves differently based on
* the max chunks in blob:
*
* <ul>
* <li>The max chunks in blob is not 1 (interleaving is allowed), every channel will be flushed
* together if one of the following conditions is met:
* <ul>
* <li>Flush is forced by the users
* <li>One or more buffers have reached the flush size
* <li>Periodical background flush when a time interval has reached
* </ul>
* <li>The max chunks in blob is 1 (interleaving is not allowed), a channel will be flushed if
* one of the following conditions is met:
* <ul>
* <li>Flush is forced by the users
* <li>One or more buffers with the same target table as the channel have reached the
* flush size
* <li>Periodical background flush of the target table when a time interval has reached
* </ul>
* </ul>
*
* @param isForce
* @return Completable future that will return when the blobs are registered successfully, or null
* if none of the conditions is met above
*/
CompletableFuture<Void> flush(boolean isForce) {
long timeDiffMillis = System.currentTimeMillis() - this.lastFlushTime;
final long flushStartTime = System.currentTimeMillis();
final long flushingInterval =
this.owningClient.getParameterProvider().getCachedMaxClientLagInMs();

final Set<String> tablesToFlush;
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
tablesToFlush =
this.channelCache.keySet().stream()
.filter(
key ->
isForce
|| flushStartTime - this.channelCache.getLastFlushTime(key)
>= flushingInterval
|| this.channelCache.getNeedFlush(key))
.collect(Collectors.toSet());
} else {
if (isForce
|| (!DISABLE_BACKGROUND_FLUSH
&& !isTestMode()
&& (this.isNeedFlush || flushStartTime - this.lastFlushTime >= flushingInterval))) {
tablesToFlush = this.channelCache.keySet();
} else {
tablesToFlush = null;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this, If the previous code block already picked up the minimal set of tables needing flush?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, even if interleaving is enabled, I'd prefer to keep the above logic for flushing and wait until the MaxClientLag for each channel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aimed to maintain the original interleaving behavior, where all channels are flushed if any channel needs it. With independent flushing intervals, we might miss the chance to combine multiple chunks into the same BDEC. A potential workaround is to discretize timestamps and reduce jitter on lastFlushTime in interleaving mode. This can increase the chances of combining multiple chunks into the same blob. What do you think?


if (isForce
|| (!DISABLE_BACKGROUND_FLUSH
&& !isTestMode()
&& (this.isNeedFlush
|| timeDiffMillis
>= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()))) {

&& tablesToFlush != null
&& !tablesToFlush.isEmpty())) {
return this.statsFuture()
.thenCompose((v) -> this.distributeFlush(isForce, timeDiffMillis))
.thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, flushStartTime))
.thenCompose((v) -> this.registerFuture());
}
return this.statsFuture();
Expand Down Expand Up @@ -310,12 +382,17 @@ private void createWorkers() {
/**
* Distribute the flush tasks by iterating through all the channels in the channel cache and kick
* off a build blob work when certain size has reached or we have reached the end
*
* @param tablesToFlush list of tables to flush
*/
void distributeFlushTasks() {
void distributeFlushTasks(Set<String> tablesToFlush) {
Iterator<
Map.Entry<
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
itr = this.channelCache.iterator();
itr =
this.channelCache.entrySet().stream()
.filter(e -> tablesToFlush.contains(e.getKey()))
.iterator();
List<Pair<BlobData<T>, CompletableFuture<BlobMetadata>>> blobs = new ArrayList<>();
List<ChannelData<T>> leftoverChannelsDataPerTable = new ArrayList<>();

Expand Down Expand Up @@ -607,9 +684,16 @@ void shutdown() throws InterruptedException {
}
}

/** Set the flag to indicate that a flush is needed */
void setNeedFlush() {
/**
* Set the flag to indicate that a flush is needed
*
* @param fullyQualifiedTableName the fully qualified table name
*/
void setNeedFlush(String fullyQualifiedTableName) {
this.isNeedFlush = true;
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
this.channelCache.setNeedFlush(fullyQualifiedTableName, true);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -413,7 +413,7 @@ public InsertValidationResponse insertRows(
// if a large number of rows are inserted
if (this.rowBuffer.getSize()
>= this.owningClient.getParameterProvider().getMaxChannelSizeInBytes()) {
this.owningClient.setNeedFlush();
this.owningClient.setNeedFlush(this.channelFlushContext.getFullyQualifiedTableName());
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,8 @@ CompletableFuture<Void> flush(boolean closing) {
}

/** Set the flag to indicate that a flush is needed */
void setNeedFlush() {
this.flushService.setNeedFlush();
void setNeedFlush(String fullyQualifiedTableName) {
this.flushService.setNeedFlush(fullyQualifiedTableName);
}

/** Remove the channel in the channel cache if the channel sequencer matches */
Expand Down
Loading
Loading