Skip to content

Commit

Permalink
[server][controller] Try ungraceful close when the graceful one faile… (
Browse files Browse the repository at this point in the history
#828)

* [server][controller] Try ungraceful close when the graceful one failed in VeniceWriter

In VeniceWriter.close(), when gracefulClose is set to true, Kafka producer call the underlining method,
which tries to flush the buffered data before closing it (see ApacheKafkaProducerAdapter.close). However,
when exception happens (e.g. timeout), today, VeniveWriter only logs the exception, moves on, and consider
the VeniceWriter being closed while it is actually not. (A leaking venicewriter can cause several issues,
e.g. a stuck consumer thread etc.)

This change adds a retry of ungraceful close if gracefulClose flag is set to true and it failed.
In this case, ungraceful close will skip the flushing step and it should always succeed (e.g. StoreIngestionTask.kill()).
For cases where gracefulClose is set to false, this change doesn't change anything.
  • Loading branch information
lluwm authored Jan 26, 2024
1 parent cb2a0a5 commit c1d289e
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Timer;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.VeniceResourceCloseResult;
Expand Down Expand Up @@ -395,70 +396,131 @@ public static ControlMessage generateHeartbeatMessage(CheckSumType checkSumType)
*/
@Override
public void close(boolean gracefulClose) {
try {
closeAsync(gracefulClose).get();
} catch (ExecutionException | InterruptedException e) {
logger.warn("Future completed exceptionally in closing VeniceWriter for topic: {}", topicName, e);
}
}

/**
* Close the {@link VeniceWriter}.
*
* Deprecating this method due to the concern of sending END_OF_SEGMENT control message to a non-existing topic can be
* blocked indefinitely as it is calling
* {@link #sendMessage(KeyProvider, KafkaMessageEnvelopeProvider, int, PubSubProducerCallback, boolean)}.get()
* without timeout.
*
* @param gracefulClose whether to end the segments and send END_OF_SEGMENT control message.
* @param retryOnGracefulCloseFailure whether to retry on graceful close failure.
*/
@Deprecated
public void close(boolean gracefulClose, boolean retryOnGracefulCloseFailure) {
synchronized (closeLock) {
if (isClosed) {
return;
}
long startTime = System.currentTimeMillis();
logger.info("Closing VeniceWriter for topic: {}", topicName);
try {
// If {@link #broadcastEndOfPush(Map)} was already called, the {@link #endAllSegments(boolean)}
// will not do anything (it's idempotent). Segments should not be ended if there are still data missing.
if (gracefulClose) {
endAllSegments(true);
}
// DO NOT call the {@link #PubSubProducerAdapter.close(int) version from here.}
// For non-shared producer mode gracefulClose will flush the producer

producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
} catch (Exception e) {
logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e);
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet();
logger.info("Closing VeniceWriter for topic: {}, gracefulness: {}", topicName, gracefulClose);
try (Timer ignore = Timer.run(
elapsedTimeInMs -> logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, elapsedTimeInMs))) {
try {
// If {@link #broadcastEndOfPush(Map)} was already called, the {@link #endAllSegments(boolean)}
// will not do anything (it's idempotent). Segments should not be ended if there are still data missing.
if (gracefulClose) {
endAllSegments(true);
}
// DO NOT call the {@link #PubSubProducerAdapter.close(int) version from here.}
// For non-shared producer mode gracefulClose will flush the producer

producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
} catch (Exception e) {
handleExceptionInClose(e, gracefulClose, retryOnGracefulCloseFailure);
} finally {
threadPoolExecutor.shutdown();
isClosed = true;
}
}
threadPoolExecutor.shutdown();
isClosed = true;
logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, System.currentTimeMillis() - startTime);
}
}

public CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulClose) {
return closeAsync(gracefulClose, true);
}

public CompletableFuture<VeniceResourceCloseResult> closeAsync(
boolean gracefulClose,
boolean retryOnGracefulCloseFailure) {
/*
* If the VeniceWriter is already closed, return a completed future. This is to avoid the case that a Thread pool
* RejectedExecutionException when a previous closeAsync is executed and the threadPool is already terminated.
*/
synchronized (closeLock) {
if (isClosed) {
return CompletableFuture.completedFuture(VeniceResourceCloseResult.ALREADY_CLOSED);
}
}

return CompletableFuture.supplyAsync(() -> {
synchronized (closeLock) {
if (isClosed) {
return VeniceResourceCloseResult.ALREADY_CLOSED;
}
long startTime = System.currentTimeMillis();
logger.info("Closing VeniceWriter for topic: {}", topicName);
try {
// try to end all segments before closing the producer
if (gracefulClose) {
CompletableFuture<Void> endSegmentsFuture =
CompletableFuture.runAsync(() -> endAllSegments(true), threadPoolExecutor);
try {
endSegmentsFuture.get(Math.max(100, closeTimeOutInMs / 2), TimeUnit.MILLISECONDS);
} catch (Exception e) {
// cancel the endSegmentsFuture if it's not done in time
if (!endSegmentsFuture.isDone()) {
endSegmentsFuture.cancel(true);
logger.info("Closing VeniceWriter for topic: {}, gracefulness: {}", topicName, gracefulClose);
try (Timer ignore = Timer.run(
elapsedTimeInMs -> logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, elapsedTimeInMs))) {
try {
// try to end all segments before closing the producer.
if (gracefulClose) {
CompletableFuture<Void> endSegmentsFuture =
CompletableFuture.runAsync(() -> endAllSegments(true), threadPoolExecutor);
try {
endSegmentsFuture.get(Math.max(100, closeTimeOutInMs / 2), TimeUnit.MILLISECONDS);
} catch (Exception e) {
// cancel the endSegmentsFuture if it's not done in time.
if (!endSegmentsFuture.isDone()) {
endSegmentsFuture.cancel(true);
}
logger.warn("Swallowed an exception while trying to end all segments for topic: {}", topicName, e);
}
logger.warn("Swallowed an exception while trying to end all segments for topic: {}", topicName, e);
}
producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
} catch (Exception e) {
handleExceptionInClose(e, gracefulClose, retryOnGracefulCloseFailure);
} finally {
threadPoolExecutor.shutdown();
isClosed = true;
}
producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
} catch (Exception e) {
logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e);
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet();
}
threadPoolExecutor.shutdown();
isClosed = true;
logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, System.currentTimeMillis() - startTime);
return VeniceResourceCloseResult.SUCCESS;
}

}, threadPoolExecutor);
}

void handleExceptionInClose(Exception e, boolean gracefulClose, boolean retryOnGracefulCloseFailure) {
logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e);
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet();

// For graceful close, swallow the exception and give another try to close it ungracefully.
try {
if (gracefulClose && retryOnGracefulCloseFailure) {
logger.info(
"Ungracefully closing the VeniceWriter for topic: {}, closeTimeOut: {} ms",
topicName,
closeTimeOutInMs);
producerAdapter.close(topicName, closeTimeOutInMs, false);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
}
} catch (Exception ex) {
// Even ungraceful close fails, give up, swallow exception and move on.
logger.warn("Exception in ungraceful close for topic: {}", topicName, ex);
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet();
}
}

@Override
public void close() {
close(true);
Expand Down Expand Up @@ -1561,7 +1623,7 @@ private String getSizeReport(int serializedKeySize, int serializedValueSize, int
* @param partition the Kafka partition to write to.
* @param debugInfo arbitrary key/value pairs of information that will be propagated alongside the control message.
*/
private void sendStartOfSegment(int partition, Map<String, String> debugInfo) {
public void sendStartOfSegment(int partition, Map<String, String> debugInfo) {
ControlMessage controlMessage = new ControlMessage();
controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue();
StartOfSegment startOfSegment = new StartOfSegment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES;
import static com.linkedin.venice.writer.VeniceWriter.VENICE_DEFAULT_LOGICAL_TS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -48,12 +50,15 @@
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.VeniceProperties;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.errors.TimeoutException;
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -533,4 +538,26 @@ public void testSendHeartbeat(boolean addLeaderCompleteHeader, LeaderCompleteSta
}
}
}

// Write a unit test for the retry mechanism in VeniceWriter.close(true) method.
@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = 10 * Time.MS_PER_SECOND)
public void testVeniceWriterCloseRetry(boolean gracefulClose) throws ExecutionException, InterruptedException {
PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class);
doThrow(new TimeoutException()).when(mockedProducer).close(anyString(), anyInt(), eq(true));

String testTopic = "test";
VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setPartitionCount(1).build();
VeniceWriter<Object, Object, Object> writer =
new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer);

// Verify that if the producer throws a TimeoutException, the VeniceWriter will retry the close() method
// with doFlash = false for both close() and closeAsync() methods.
writer.close(gracefulClose);

writer = new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer);
writer.closeAsync(gracefulClose).get();

// Verify that the close(false) method will be called twice.
verify(mockedProducer, times(2)).close(anyString(), anyInt(), eq(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ private void createTopic(String topicName, int numPartitions, int replicationFac
}
}

private KafkaKey getDummyKey() {
public static KafkaKey getDummyKey() {
return new KafkaKey(MessageType.PUT, Utils.getUniqueString("key-").getBytes());
}

private KafkaMessageEnvelope getDummyVal() {
public static KafkaMessageEnvelope getDummyVal() {
KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope();
messageEnvelope.producerMetadata = new ProducerMetadata();
messageEnvelope.producerMetadata.messageTimestamp = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.linkedin.venice.writer;

import static com.linkedin.venice.kafka.TopicManager.DEFAULT_KAFKA_OPERATION_TIMEOUT_MS;
import static com.linkedin.venice.utils.Time.MS_PER_SECOND;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.kafka.TopicManager;
Expand All @@ -22,6 +27,8 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
Expand All @@ -33,10 +40,13 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -45,6 +55,7 @@

@Test
public class VeniceWriterTest {
private static final Logger LOGGER = LogManager.getLogger(VeniceWriterTest.class);
private PubSubBrokerWrapper pubSubBrokerWrapper;
private TopicManager topicManager;
private PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory;
Expand Down Expand Up @@ -142,4 +153,72 @@ public void testThreadSafetyForPutMessages() throws ExecutionException, Interrup
100,
veniceWriter -> veniceWriter.put(new KafkaKey(MessageType.PUT, "blah".getBytes()), "blah".getBytes(), 1, null));
}

/**
* This test does the following steps:
* 1. Creates a VeniceWriter with a topic that does not exist.
* 2. Create a new thread to send a SOS control message to this non-existent topic.
* 3. The new thread should block on sendMessage() call.
* 4. Main thread closes the VeniceWriter (no matter 'doFlush' flag is true or false) and
* expect the 'sendMessageThread' to unblock.
*/
@Test(timeOut = 60 * MS_PER_SECOND, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
public void testVeniceWriterClose(boolean doFlush) {
String topicName = Utils.getUniqueString("version-topic");
int partitionCount = 1;

// Intentionally not create the topic: "version-topic", so that the control message send will also be blocked.
// topicManager.createTopic(pubSubTopic, partitionCount, 1, true);

CountDownLatch countDownLatch = new CountDownLatch(1);

Properties properties = new Properties();
properties.put(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS, pubSubBrokerWrapper.getAddress());
properties.put(ConfigKeys.PARTITIONER_CLASS, DefaultVenicePartitioner.class.getName());

try (VeniceWriter<KafkaKey, byte[], byte[]> veniceWriter =
TestUtils.getVeniceWriterFactory(properties, pubSubProducerAdapterFactory)
.createVeniceWriter(
new VeniceWriterOptions.Builder(topicName).setUseKafkaKeySerializer(true)
.setPartitionCount(partitionCount)
.build())) {
ExecutorService executor = Executors.newSingleThreadExecutor();

Future<?> sendMessageFuture = executor.submit(() -> {
Thread.currentThread().setName("sendMessageThread");
countDownLatch.countDown();
try {
// send to non-existent topic and block.
veniceWriter.sendStartOfSegment(0, null);
fail("sendMessage on non-existent topic should have blocked the executing thread");
} catch (VeniceException e) {
LOGGER.info("As expected an exception has been received from sendMessage()", e);
assertNotNull(e.getMessage(), "Exception thrown by sendMessage does not have a message");
assertTrue(
e.getMessage()
.contains(
String.format(
"Got an error while trying to produce message into Kafka. Topic: '%s'",
veniceWriter.getTopicName())));
assertTrue(ExceptionUtils.recursiveMessageContains(e, "Producer closed while send in progress"));
assertTrue(ExceptionUtils.recursiveMessageContains(e, "Requested metadata update after close"));
LOGGER.info("All expectations were met in thread: {}", Thread.currentThread().getName());
}
});

try {
countDownLatch.await();
// Still wait for some time to make sure blocking sendMessage is inside kafka before closing it.
Utils.sleep(5000);
veniceWriter.close(doFlush);

// this is necessary to check whether expectations in sendMessage thread were met.
sendMessageFuture.get();
} catch (Exception e) {
fail("Producer closing should have succeeded without an exception", e);
} finally {
executor.shutdownNow();
}
}
}
}

0 comments on commit c1d289e

Please sign in to comment.