Skip to content

Commit

Permalink
Address comments resolving potential block on endAllSegments and inte…
Browse files Browse the repository at this point in the history
…gration test
  • Loading branch information
lluwm committed Jan 25, 2024
1 parent 1135a4b commit 805e2fa
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,25 @@ public static ControlMessage generateHeartbeatMessage(CheckSumType checkSumType)
*/
@Override
public void close(boolean gracefulClose) {
close(gracefulClose, true);
try {
closeAsync(gracefulClose).get();
} catch (ExecutionException | InterruptedException e) {
logger.warn("Future completed exceptionally in closing VeniceWriter for topic: {}", topicName, e);
}
}

/**
* Close the {@link VeniceWriter}.
*
* Deprecating this method due to the concern of sending END_OF_SEGMENT control message to a non-existing topic can be
* blocked indefinitely as it is calling
* {@link #sendMessage(KeyProvider, KafkaMessageEnvelopeProvider, int, PubSubProducerCallback, boolean)}.get()
* without timeout.
*
* @param gracefulClose whether to end the segments and send END_OF_SEGMENT control message.
* @param retryOnGracefulCloseFailure whether to retry on graceful close failure.
*/
@Deprecated
public void close(boolean gracefulClose, boolean retryOnGracefulCloseFailure) {
synchronized (closeLock) {
if (isClosed) {
Expand Down Expand Up @@ -436,6 +452,16 @@ public CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulC
public CompletableFuture<VeniceResourceCloseResult> closeAsync(
boolean gracefulClose,
boolean retryOnGracefulCloseFailure) {
/*
* If the VeniceWriter is already closed, return a completed future. This is to avoid the case that a Thread pool
* RejectedExecutionException when a previous closeAsync is executed and the threadPool is already terminated.
*/
synchronized (closeLock) {
if (isClosed) {
return CompletableFuture.completedFuture(VeniceResourceCloseResult.ALREADY_CLOSED);
}
}

return CompletableFuture.supplyAsync(() -> {
synchronized (closeLock) {
if (isClosed) {
Expand Down Expand Up @@ -481,6 +507,10 @@ void handleExceptionInClose(Exception e, boolean gracefulClose, boolean retryOnG
// For graceful close, swallow the exception and give another try to close it ungracefully.
try {
if (gracefulClose && retryOnGracefulCloseFailure) {
logger.info(
"Ungracefully closing the VeniceWriter for topic: {}, closeTimeOut: {} ms",
topicName,
closeTimeOutInMs);
producerAdapter.close(topicName, closeTimeOutInMs, false);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
}
Expand Down Expand Up @@ -1593,7 +1623,7 @@ private String getSizeReport(int serializedKeySize, int serializedValueSize, int
* @param partition the Kafka partition to write to.
* @param debugInfo arbitrary key/value pairs of information that will be propagated alongside the control message.
*/
private void sendStartOfSegment(int partition, Map<String, String> debugInfo) {
public void sendStartOfSegment(int partition, Map<String, String> debugInfo) {
ControlMessage controlMessage = new ControlMessage();
controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue();
StartOfSegment startOfSegment = new StartOfSegment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ private void createTopic(String topicName, int numPartitions, int replicationFac
}
}

private KafkaKey getDummyKey() {
public static KafkaKey getDummyKey() {
return new KafkaKey(MessageType.PUT, Utils.getUniqueString("key-").getBytes());
}

private KafkaMessageEnvelope getDummyVal() {
public static KafkaMessageEnvelope getDummyVal() {
KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope();
messageEnvelope.producerMetadata = new ProducerMetadata();
messageEnvelope.producerMetadata.messageTimestamp = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.linkedin.venice.writer;

import static com.linkedin.venice.kafka.TopicManager.DEFAULT_KAFKA_OPERATION_TIMEOUT_MS;
import static com.linkedin.venice.utils.Time.MS_PER_SECOND;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.kafka.TopicManager;
Expand All @@ -22,6 +27,8 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
Expand All @@ -33,10 +40,13 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -45,6 +55,7 @@

@Test
public class VeniceWriterTest {
private static final Logger LOGGER = LogManager.getLogger(VeniceWriterTest.class);
private PubSubBrokerWrapper pubSubBrokerWrapper;
private TopicManager topicManager;
private PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory;
Expand Down Expand Up @@ -142,4 +153,72 @@ public void testThreadSafetyForPutMessages() throws ExecutionException, Interrup
100,
veniceWriter -> veniceWriter.put(new KafkaKey(MessageType.PUT, "blah".getBytes()), "blah".getBytes(), 1, null));
}

/**
* This test does the following steps:
* 1. Creates a VeniceWriter with a topic that does not exist.
* 2. Create a new thread to send a SOS control message to this non-existent topic.
* 3. The new thread should block on sendMessage() call.
* 4. Main thread closes the VeniceWriter (no matter 'doFlush' flag is true or false) and
* expect the 'sendMessageThread' to unblock.
*/
@Test(timeOut = 60 * MS_PER_SECOND, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
public void testVeniceWriterClose(boolean doFlush) {
String topicName = Utils.getUniqueString("version-topic");
int partitionCount = 1;

// Intentionally not create the topic: "version-topic", so that the control message send will also be blocked.
// topicManager.createTopic(pubSubTopic, partitionCount, 1, true);

CountDownLatch countDownLatch = new CountDownLatch(1);

Properties properties = new Properties();
properties.put(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS, pubSubBrokerWrapper.getAddress());
properties.put(ConfigKeys.PARTITIONER_CLASS, DefaultVenicePartitioner.class.getName());

try (VeniceWriter<KafkaKey, byte[], byte[]> veniceWriter =
TestUtils.getVeniceWriterFactory(properties, pubSubProducerAdapterFactory)
.createVeniceWriter(
new VeniceWriterOptions.Builder(topicName).setUseKafkaKeySerializer(true)
.setPartitionCount(partitionCount)
.build())) {
ExecutorService executor = Executors.newSingleThreadExecutor();

Future<?> sendMessageFuture = executor.submit(() -> {
Thread.currentThread().setName("sendMessageThread");
countDownLatch.countDown();
try {
// send to non-existent topic and block.
veniceWriter.sendStartOfSegment(0, null);
fail("sendMessage on non-existent topic should have blocked the executing thread");
} catch (VeniceException e) {
LOGGER.info("As expected an exception has been received from sendMessage()", e);
assertNotNull(e.getMessage(), "Exception thrown by sendMessage does not have a message");
assertTrue(
e.getMessage()
.contains(
String.format(
"Got an error while trying to produce message into Kafka. Topic: '%s'",
veniceWriter.getTopicName())));
assertTrue(ExceptionUtils.recursiveMessageContains(e, "Producer closed while send in progress"));
assertTrue(ExceptionUtils.recursiveMessageContains(e, "Requested metadata update after close"));
LOGGER.info("All expectations were met in thread: {}", Thread.currentThread().getName());
}
});

try {
countDownLatch.await();
// Still wait for some time to make sure blocking sendMessage is inside kafka before closing it.
Utils.sleep(5000);
veniceWriter.close(doFlush);

// this is necessary to check whether expectations in sendMessage thread were met.
sendMessageFuture.get();
} catch (Exception e) {
fail("Producer closing should have succeeded without an exception", e);
} finally {
executor.shutdownNow();
}
}
}
}

0 comments on commit 805e2fa

Please sign in to comment.