Skip to content

Commit

Permalink
Add logs to debug producer thread leak
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 31, 2024
1 parent 5b19053 commit c406077
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -307,6 +306,7 @@ void abortLingeringTransactions(
pscSinkContext.getNumberOfParallelInstances(),
this::getOrCreateTransactionalProducer,
producerPool::add)) {
LOG.info("Aborting lingering transactions with prefixes {}, startCheckpointId {}", prefixesToAbort, startCheckpointId);
transactionAborter.abortLingeringTransactions(prefixesToAbort, startCheckpointId);
} catch (ProducerException e) {
throw new RuntimeException("Failed to abort lingering transactions", e);
Expand Down Expand Up @@ -347,12 +347,14 @@ private FlinkPscInternalProducer<byte[], byte[]> getOrCreateTransactionalProduce
FlinkPscInternalProducer<byte[], byte[]> producer = producerPool.poll();
try {
if (producer == null) {
LOG.info("Creating brand-new transactional producer {}", transactionalId);
producer = new FlinkPscInternalProducer<>(pscProducerConfig, transactionalId);
closer.register(producer);
producer.initTransactions();
if (!isMetricsInitialized)
initPscAndFlinkMetrics(producer);
} else {
LOG.info("Reusing transactional producer {}", producer.getTransactionalId());
producer.initTransactionId(transactionalId);
}
} catch (ConfigurationException | ClientException | TopicUriSyntaxException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.pinterest.flink.connector.psc.sink;

import com.pinterest.psc.exception.producer.ProducerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.Closeable;
Expand Down Expand Up @@ -45,6 +47,7 @@
* transactions.
*/
class TransactionAborter implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(TransactionAborter.class);
private final int subtaskId;
private final int parallelism;
private final Function<String, FlinkPscInternalProducer<byte[], byte[]>> producerFactory;
Expand Down Expand Up @@ -98,15 +101,15 @@ private void abortTransactionsWithPrefix(String prefix, long startCheckpointId)
*/
private int abortTransactionOfSubtask(String prefix, long startCheckpointId, int subtaskId) throws ProducerException {
int numTransactionAborted = 0;
int numCalled = 0;
int numCalled1 = 0;
for (long checkpointId = startCheckpointId; ; checkpointId++, numTransactionAborted++) {
// initTransactions fences all old transactions with the same id by bumping the epoch
String transactionalId =
TransactionalIdFactory.buildTransactionalId(prefix, subtaskId, checkpointId);
if (producer == null) {
LOG.info("producer = producerFactory.apply(transactionalId): {}", transactionalId);
producer = producerFactory.apply(transactionalId);
} else {
LOG.info("producer.initTransactionId(transactionalId): {}", transactionalId);
producer.initTransactionId(transactionalId);
}
producer.flush();
Expand All @@ -128,7 +131,9 @@ private int abortTransactionOfSubtask(String prefix, long startCheckpointId, int
}

public void close() {
LOG.info("Closing transaction aborter");
if (producer != null) {
LOG.info("CloseAction: {}", closeAction);
closeAction.accept(producer);
}
}
Expand Down

0 comments on commit c406077

Please sign in to comment.