Skip to content

Commit

Permalink
fix FlushService merge
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jul 30, 2024
1 parent f383146 commit c9a45ae
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ private CompletableFuture<Void> distributeFlush(
/** If tracing is enabled, print always else, check if it needs flush or is forceful. */
private void logFlushTask(boolean isForce, Set<String> tablesToFlush, long flushStartTime) {
boolean isNeedFlush =
this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1
this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1
? tablesToFlush.stream().anyMatch(channelCache::getNeedFlush)
: this.isNeedFlush;
long currentTime = System.currentTimeMillis();
final String logInfo;
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) {
logInfo =
String.format(
"Tables=[%s]",
Expand Down Expand Up @@ -272,7 +272,7 @@ CompletableFuture<Void> flush(boolean isForce) {
this.owningClient.getParameterProvider().getCachedMaxClientLagInMs();

final Set<String> tablesToFlush;
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) {
tablesToFlush =
this.channelCache.keySet().stream()
.filter(
Expand Down Expand Up @@ -694,7 +694,7 @@ void shutdown() throws InterruptedException {
*/
void setNeedFlush(String fullyQualifiedTableName) {
this.isNeedFlush = true;
if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) {
this.channelCache.setNeedFlush(fullyQualifiedTableName, true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private abstract static class TestContext<T> implements AutoCloseable {
}

void setParameterOverride(Map<String, Object> parameterOverride) {
this.parameterProvider = new ParameterProvider(parameterOverride, null);
this.parameterProvider = new ParameterProvider(parameterOverride, null, isIcebergMode);
}

ChannelData<T> flushChannel(String name) {
Expand Down Expand Up @@ -455,10 +455,17 @@ public void testGetFilePath() {
}

@Test
public void testFlush() throws Exception {
public void testInterleaveFlush() throws Exception {
if (isIcebergMode) {
// Interleaved blob is not supported in iceberg mode
return;
}
int numChannels = 4;
Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow
TestContext<List<List<Object>>> testContext = testContextFactory.create();
testContext.setParameterOverride(
Collections.singletonMap(
ParameterProvider.MAX_CHUNKS_IN_BLOB, ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT));
addChannel1(testContext);
FlushService<?> flushService = testContext.flushService;
ChannelCache<?> channelCache = testContext.channelCache;
Expand Down Expand Up @@ -523,7 +530,7 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep
ChannelCache<?> channelCache = testContext.channelCache;
Mockito.when(flushService.isTestMode()).thenReturn(false);
testContext.setParameterOverride(
Collections.singletonMap(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 1));
Collections.singletonMap(ParameterProvider.MAX_CHUNKS_IN_BLOB, 1));

// Test need flush
IntStream.range(0, numChannels)
Expand Down

0 comments on commit c9a45ae

Please sign in to comment.