Skip to content

Commit

Permalink
SNOW-1512047 Introduce independent per-table flushes when interleavin…
Browse files Browse the repository at this point in the history
…g is disabled (#788)
  • Loading branch information
sfc-gh-alhuang authored Jul 26, 2024
1 parent e808bbd commit 164b030
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import java.util.Iterator;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;

/**
* In-memory cache that stores the active channels for a given Streaming Ingest client, and the
Expand All @@ -23,6 +26,20 @@ class ChannelCache<T> {
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>
cache = new ConcurrentHashMap<>();

/** Flush information for each table including last flush time and if flush is needed */
static class FlushInfo {
final long lastFlushTime;
final boolean needFlush;

FlushInfo(long lastFlushTime, boolean needFlush) {
this.lastFlushTime = lastFlushTime;
this.needFlush = needFlush;
}
}

/** Flush information for each table, only used when max chunks in blob is 1 */
private final ConcurrentHashMap<String, FlushInfo> tableFlushInfo = new ConcurrentHashMap<>();

/**
* Add a channel to the channel cache
*
Expand All @@ -33,6 +50,11 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
this.cache.computeIfAbsent(
channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap<>());

// Update the last flush time for the table, add jitter to avoid all channels flush at the same
// time when the blobs are not interleaved
this.tableFlushInfo.putIfAbsent(
channel.getFullyQualifiedTableName(), new FlushInfo(System.currentTimeMillis(), false));

SnowflakeStreamingIngestChannelInternal<T> oldChannel =
channels.put(channel.getName(), channel);
// Invalidate old channel if it exits to block new inserts and return error to users earlier
Expand All @@ -44,13 +66,84 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
}

/**
* Returns an iterator over the (table, channels) in this map.
* Get the last flush time for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @return last flush time in milliseconds
*/
Long getLastFlushTime(String fullyQualifiedTableName) {
FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName);
if (tableFlushInfo == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Last flush time for table %s not found", fullyQualifiedTableName));
}
return tableFlushInfo.lastFlushTime;
}

/**
* Set the last flush time for a table as the current time
*
* @return
* @param fullyQualifiedTableName fully qualified table name
* @param lastFlushTime last flush time in milliseconds
*/
Iterator<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
iterator() {
return this.cache.entrySet().iterator();
void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) {
this.tableFlushInfo.compute(
fullyQualifiedTableName,
(k, v) -> {
if (v == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Last flush time for table %s not found", fullyQualifiedTableName));
}
return new FlushInfo(lastFlushTime, v.needFlush);
});
}

/**
* Get need flush flag for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @return need flush flag
*/
boolean getNeedFlush(String fullyQualifiedTableName) {
FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName);
if (tableFlushInfo == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Need flush flag for table %s not found", fullyQualifiedTableName));
}
return tableFlushInfo.needFlush;
}

/**
* Set need flush flag for a table
*
* @param fullyQualifiedTableName fully qualified table name
* @param needFlush need flush flag
*/
void setNeedFlush(String fullyQualifiedTableName, boolean needFlush) {
this.tableFlushInfo.compute(
fullyQualifiedTableName,
(k, v) -> {
if (v == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Need flush flag for table %s not found", fullyQualifiedTableName));
}
return new FlushInfo(v.lastFlushTime, needFlush);
});
}

/** Returns an immutable set view of the mappings contained in the channel cache. */
Set<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
entrySet() {
return Collections.unmodifiableSet(cache.entrySet());
}

/** Returns an immutable set view of the keys contained in the channel cache. */
Set<String> keySet() {
return Collections.unmodifiableSet(cache.keySet());
}

/** Close all channels in the channel cache */
Expand Down
150 changes: 117 additions & 33 deletions src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -31,6 +32,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
Expand Down Expand Up @@ -100,12 +102,15 @@ List<List<ChannelData<T>>> getData() {
// Reference to register service
private final RegisterService<T> registerService;

// Indicates whether we need to schedule a flush
@VisibleForTesting volatile boolean isNeedFlush;

// Latest flush time
/**
* Client level last flush time and need flush flag. This two variables are used when max chunk in
* blob is not 1. When max chunk in blob is 1, flush service ignores these variables and uses
* table level last flush time and need flush flag. See {@link ChannelCache.FlushInfo}.
*/
@VisibleForTesting volatile long lastFlushTime;

@VisibleForTesting volatile boolean isNeedFlush;

// Indicates whether it's running as part of the test
private final boolean isTestMode;

Expand Down Expand Up @@ -162,36 +167,65 @@ private CompletableFuture<Void> statsFuture() {

/**
* @param isForce if true will flush regardless of other conditions
* @param timeDiffMillis Time in milliseconds since the last flush
* @param tablesToFlush list of tables to flush
* @param flushStartTime the time when the flush started
* @return
*/
private CompletableFuture<Void> distributeFlush(boolean isForce, long timeDiffMillis) {
private CompletableFuture<Void> distributeFlush(
boolean isForce, Set<String> tablesToFlush, Long flushStartTime) {
return CompletableFuture.runAsync(
() -> {
logFlushTask(isForce, timeDiffMillis);
distributeFlushTasks();
logFlushTask(isForce, tablesToFlush, flushStartTime);
distributeFlushTasks(tablesToFlush);
long prevFlushEndTime = System.currentTimeMillis();
this.lastFlushTime = prevFlushEndTime;
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
return;
tablesToFlush.forEach(
table -> {
this.channelCache.setLastFlushTime(table, prevFlushEndTime);
this.channelCache.setNeedFlush(table, false);
});
},
this.flushWorker);
}

/** If tracing is enabled, print always else, check if it needs flush or is forceful. */
private void logFlushTask(boolean isForce, long timeDiffMillis) {
private void logFlushTask(boolean isForce, Set<String> tablesToFlush, long flushStartTime) {
boolean isNeedFlush =
this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1
? tablesToFlush.stream().anyMatch(channelCache::getNeedFlush)
: this.isNeedFlush;
long currentTime = System.currentTimeMillis();
final String logInfo;
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
logInfo =
String.format(
"Tables=[%s]",
tablesToFlush.stream()
.map(
table ->
String.format(
"(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)",
table,
channelCache.getNeedFlush(table),
flushStartTime - channelCache.getLastFlushTime(table),
currentTime - channelCache.getLastFlushTime(table)))
.collect(Collectors.joining(", ")));
} else {
logInfo =
String.format(
"isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s",
isNeedFlush, flushStartTime - this.lastFlushTime, currentTime - this.lastFlushTime);
}

final String flushTaskLogFormat =
String.format(
"Submit forced or ad-hoc flush task on client=%s, isForce=%s,"
+ " isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s",
this.owningClient.getName(),
isForce,
this.isNeedFlush,
timeDiffMillis,
System.currentTimeMillis() - this.lastFlushTime);
"Submit forced or ad-hoc flush task on client=%s, isForce=%s, %s",
this.owningClient.getName(), isForce, logInfo);
if (logger.isTraceEnabled()) {
logger.logTrace(flushTaskLogFormat);
}
if (!logger.isTraceEnabled() && (this.isNeedFlush || isForce)) {
if (!logger.isTraceEnabled() && (isNeedFlush || isForce)) {
logger.logDebug(flushTaskLogFormat);
}
}
Expand All @@ -207,27 +241,65 @@ private CompletableFuture<Void> registerFuture() {
}

/**
* Kick off a flush job and distribute the tasks if one of the following conditions is met:
* <li>Flush is forced by the users
* <li>One or more buffers have reached the flush size
* <li>Periodical background flush when a time interval has reached
* Kick off a flush job and distribute the tasks. The flush service behaves differently based on
* the max chunks in blob:
*
* <ul>
* <li>The max chunks in blob is not 1 (interleaving is allowed), every channel will be flushed
* together if one of the following conditions is met:
* <ul>
* <li>Flush is forced by the users
* <li>One or more buffers have reached the flush size
* <li>Periodical background flush when a time interval has reached
* </ul>
* <li>The max chunks in blob is 1 (interleaving is not allowed), a channel will be flushed if
* one of the following conditions is met:
* <ul>
* <li>Flush is forced by the users
* <li>One or more buffers with the same target table as the channel have reached the
* flush size
* <li>Periodical background flush of the target table when a time interval has reached
* </ul>
* </ul>
*
* @param isForce
* @return Completable future that will return when the blobs are registered successfully, or null
* if none of the conditions is met above
*/
CompletableFuture<Void> flush(boolean isForce) {
long timeDiffMillis = System.currentTimeMillis() - this.lastFlushTime;
final long flushStartTime = System.currentTimeMillis();
final long flushingInterval =
this.owningClient.getParameterProvider().getCachedMaxClientLagInMs();

final Set<String> tablesToFlush;
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
tablesToFlush =
this.channelCache.keySet().stream()
.filter(
key ->
isForce
|| flushStartTime - this.channelCache.getLastFlushTime(key)
>= flushingInterval
|| this.channelCache.getNeedFlush(key))
.collect(Collectors.toSet());
} else {
if (isForce
|| (!DISABLE_BACKGROUND_FLUSH
&& !isTestMode()
&& (this.isNeedFlush || flushStartTime - this.lastFlushTime >= flushingInterval))) {
tablesToFlush = this.channelCache.keySet();
} else {
tablesToFlush = null;
}
}

if (isForce
|| (!DISABLE_BACKGROUND_FLUSH
&& !isTestMode()
&& (this.isNeedFlush
|| timeDiffMillis
>= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()))) {

&& tablesToFlush != null
&& !tablesToFlush.isEmpty())) {
return this.statsFuture()
.thenCompose((v) -> this.distributeFlush(isForce, timeDiffMillis))
.thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, flushStartTime))
.thenCompose((v) -> this.registerFuture());
}
return this.statsFuture();
Expand Down Expand Up @@ -310,12 +382,17 @@ private void createWorkers() {
/**
* Distribute the flush tasks by iterating through all the channels in the channel cache and kick
* off a build blob work when certain size has reached or we have reached the end
*
* @param tablesToFlush list of tables to flush
*/
void distributeFlushTasks() {
void distributeFlushTasks(Set<String> tablesToFlush) {
Iterator<
Map.Entry<
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
itr = this.channelCache.iterator();
itr =
this.channelCache.entrySet().stream()
.filter(e -> tablesToFlush.contains(e.getKey()))
.iterator();
List<Pair<BlobData<T>, CompletableFuture<BlobMetadata>>> blobs = new ArrayList<>();
List<ChannelData<T>> leftoverChannelsDataPerTable = new ArrayList<>();

Expand Down Expand Up @@ -607,9 +684,16 @@ void shutdown() throws InterruptedException {
}
}

/** Set the flag to indicate that a flush is needed */
void setNeedFlush() {
/**
* Set the flag to indicate that a flush is needed
*
* @param fullyQualifiedTableName the fully qualified table name
*/
void setNeedFlush(String fullyQualifiedTableName) {
this.isNeedFlush = true;
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
this.channelCache.setNeedFlush(fullyQualifiedTableName, true);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -413,7 +413,7 @@ public InsertValidationResponse insertRows(
// if a large number of rows are inserted
if (this.rowBuffer.getSize()
>= this.owningClient.getParameterProvider().getMaxChannelSizeInBytes()) {
this.owningClient.setNeedFlush();
this.owningClient.setNeedFlush(this.channelFlushContext.getFullyQualifiedTableName());
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,8 @@ CompletableFuture<Void> flush(boolean closing) {
}

/** Set the flag to indicate that a flush is needed */
void setNeedFlush() {
this.flushService.setNeedFlush();
void setNeedFlush(String fullyQualifiedTableName) {
this.flushService.setNeedFlush(fullyQualifiedTableName);
}

/** Remove the channel in the channel cache if the channel sequencer matches */
Expand Down
Loading

0 comments on commit 164b030

Please sign in to comment.