Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][controller] Try ungraceful close when the graceful one faile… #828

Merged
merged 3 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Timer;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.VeniceResourceCloseResult;
Expand Down Expand Up @@ -399,26 +400,28 @@ public void close(boolean gracefulClose) {
if (isClosed) {
return;
}
long startTime = System.currentTimeMillis();
logger.info("Closing VeniceWriter for topic: {}", topicName);
try {
// If {@link #broadcastEndOfPush(Map)} was already called, the {@link #endAllSegments(boolean)}
// will not do anything (it's idempotent). Segments should not be ended if there are still data missing.
if (gracefulClose) {
endAllSegments(true);
}
// DO NOT call the {@link #PubSubProducerAdapter.close(int) version from here.}
// For non-shared producer mode gracefulClose will flush the producer

producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
} catch (Exception e) {
logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e);
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet();
logger.info("Closing VeniceWriter for topic: {}, gracefulness: {}", topicName, gracefulClose);
try (Timer ignore = Timer.run(
elapsedTimeInMs -> logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, elapsedTimeInMs))) {
lluwm marked this conversation as resolved.
Show resolved Hide resolved
try {
// If {@link #broadcastEndOfPush(Map)} was already called, the {@link #endAllSegments(boolean)}
// will not do anything (it's idempotent). Segments should not be ended if there are still data missing.
if (gracefulClose) {
endAllSegments(true);
lluwm marked this conversation as resolved.
Show resolved Hide resolved
}
// DO NOT call the {@link #PubSubProducerAdapter.close(int) version from here.}
// For non-shared producer mode gracefulClose will flush the producer

producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose);
lluwm marked this conversation as resolved.
Show resolved Hide resolved
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
} catch (Exception e) {
handleExceptionInClose(e, gracefulClose);
lluwm marked this conversation as resolved.
Show resolved Hide resolved
} finally {
threadPoolExecutor.shutdown();
isClosed = true;
}
}
threadPoolExecutor.shutdown();
isClosed = true;
logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, System.currentTimeMillis() - startTime);
}
}

Expand All @@ -428,37 +431,56 @@ public CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulC
if (isClosed) {
return VeniceResourceCloseResult.ALREADY_CLOSED;
}
long startTime = System.currentTimeMillis();
logger.info("Closing VeniceWriter for topic: {}", topicName);
try {
// try to end all segments before closing the producer
if (gracefulClose) {
CompletableFuture<Void> endSegmentsFuture =
CompletableFuture.runAsync(() -> endAllSegments(true), threadPoolExecutor);
try {
endSegmentsFuture.get(Math.max(100, closeTimeOutInMs / 2), TimeUnit.MILLISECONDS);
} catch (Exception e) {
// cancel the endSegmentsFuture if it's not done in time
if (!endSegmentsFuture.isDone()) {
endSegmentsFuture.cancel(true);
logger.info("Closing VeniceWriter for topic: {}, gracefulness: {}", topicName, gracefulClose);
try (Timer ignore = Timer.run(
elapsedTimeInMs -> logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, elapsedTimeInMs))) {
try {
// try to end all segments before closing the producer.
if (gracefulClose) {
CompletableFuture<Void> endSegmentsFuture =
CompletableFuture.runAsync(() -> endAllSegments(true), threadPoolExecutor);
try {
endSegmentsFuture.get(Math.max(100, closeTimeOutInMs / 2), TimeUnit.MILLISECONDS);
} catch (Exception e) {
// cancel the endSegmentsFuture if it's not done in time.
if (!endSegmentsFuture.isDone()) {
endSegmentsFuture.cancel(true);
}
logger.warn("Swallowed an exception while trying to end all segments for topic: {}", topicName, e);
}
logger.warn("Swallowed an exception while trying to end all segments for topic: {}", topicName, e);
}
producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
} catch (Exception e) {
handleExceptionInClose(e, gracefulClose);
} finally {
threadPoolExecutor.shutdown();
isClosed = true;
}
producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
} catch (Exception e) {
logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e);
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet();
}
threadPoolExecutor.shutdown();
isClosed = true;
logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, System.currentTimeMillis() - startTime);
return VeniceResourceCloseResult.SUCCESS;
}

}, threadPoolExecutor);
}

void handleExceptionInClose(Exception e, boolean gracefulClose) {
logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e);
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet();

// For graceful close, swallow the exception and give another try to close it ungracefully.
try {
if (gracefulClose) {
producerAdapter.close(topicName, closeTimeOutInMs, false);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
}
} catch (Exception ex) {
// Even ungraceful close fails, give up, swallow exception and move on.
logger.warn("Exception in ungraceful close for topic: {}", topicName, ex);
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet();
}
}

@Override
public void close() {
close(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES;
import static com.linkedin.venice.writer.VeniceWriter.VENICE_DEFAULT_LOGICAL_TS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -48,12 +50,15 @@
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.VeniceProperties;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.errors.TimeoutException;
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -533,4 +538,26 @@ public void testSendHeartbeat(boolean addLeaderCompleteHeader, LeaderCompleteSta
}
}
}

// Write a unit test for the retry mechanism in VeniceWriter.close(true) method.
@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = 10 * Time.MS_PER_SECOND)
public void testVeniceWriterCloseRetry(boolean gracefulClose) throws ExecutionException, InterruptedException {
PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class);
lluwm marked this conversation as resolved.
Show resolved Hide resolved
doThrow(new TimeoutException()).when(mockedProducer).close(anyString(), anyInt(), eq(true));

String testTopic = "test";
VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setPartitionCount(1).build();
VeniceWriter<Object, Object, Object> writer =
new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer);

// Verify that if the producer throws a TimeoutException, the VeniceWriter will retry the close() method
// with doFlash = false for both close() and closeAsync() methods.
writer.close(gracefulClose);

writer = new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer);
writer.closeAsync(gracefulClose).get();

// Verify that the close(false) method will be called twice.
verify(mockedProducer, times(2)).close(anyString(), anyInt(), eq(false));
}
}