From 5ab0954d50b43f48ebba55a76577f2c3b5e73bf9 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 31 Oct 2024 21:10:48 -0400 Subject: [PATCH] Add even more logs to debug producerLeak --- .../flink/connector/psc/sink/PscCommitter.java | 2 +- .../flink/connector/psc/sink/PscWriter.java | 12 +++++++----- .../flink/connector/psc/sink/Recyclable.java | 5 +++++ .../flink/connector/psc/sink/TransactionAborter.java | 3 ++- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscCommitter.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscCommitter.java index 4f75934..0012c7a 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscCommitter.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscCommitter.java @@ -63,7 +63,7 @@ public void commit(Collection> requests) for (CommitRequest request : requests) { final PscCommittable committable = request.getCommittable(); final String transactionalId = committable.getTransactionalId(); - LOG.debug("Committing transaction {}", transactionalId); + LOG.info("Committing transaction {}", transactionalId); Optional>> recyclable = committable.getProducer(); FlinkPscInternalProducer producer; diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java index 08d1648..1c98b13 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java @@ -182,7 +182,7 @@ class PscWriter checkNotNull(recoveredStates, "recoveredStates"), lastCheckpointId + 1); this.currentProducer = getTransactionalProducer(lastCheckpointId + 1); this.currentProducer.beginTransaction(); - LOG.info("producerPool.size(): " + producerPool.size()); + LOG.info("producerPool.size() in thread={}: " + producerPool.size(), Thread.currentThread().getId()); } else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE || deliveryGuarantee == DeliveryGuarantee.NONE) { this.currentProducer = new FlinkPscInternalProducer<>(this.pscProducerConfig, null); @@ -223,15 +223,17 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException { @Override public Collection prepareCommit() { + LOG.info("Entering prepareCommit()"); if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { final List committables; try { + LOG.info("Committing producer {}", currentProducer); committables = Collections.singletonList( PscCommittable.of(currentProducer, producerPool::add)); } catch (ProducerException e) { throw new RuntimeException(e); } - LOG.debug("Committing {} committables.", committables); + LOG.info("Committing {} committables.", committables); return committables; } return Collections.emptyList(); @@ -345,8 +347,8 @@ private FlinkPscInternalProducer getTransactionalProducer(long c private FlinkPscInternalProducer getOrCreateTransactionalProducer( String transactionalId) { - LOG.info("producerPool.size() in getOrCreateTransactionalProducer: " + producerPool.size()); - LOG.info("producerPool: " + producerPool); + LOG.info("producerPool.size() in thread={} for getOrCreateTransactionalProducer: " + producerPool.size(), Thread.currentThread().getId()); + LOG.info("producerPool in thread={}: " + producerPool, Thread.currentThread().getId()); FlinkPscInternalProducer producer = producerPool.poll(); try { if (producer == null) { @@ -357,7 +359,7 @@ private FlinkPscInternalProducer getOrCreateTransactionalProduce if (!isMetricsInitialized) initPscAndFlinkMetrics(producer); } else { - LOG.info("Reusing transactional producer {}", producer.getTransactionalId()); + LOG.info("Reusing transactional producer {} and initing transactionalId={}", producer.getTransactionalId(), transactionalId); producer.initTransactionId(transactionalId); } } catch (ConfigurationException | ClientException | TopicUriSyntaxException e) { diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/Recyclable.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/Recyclable.java index 2b8a4fc..43085de 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/Recyclable.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/Recyclable.java @@ -17,6 +17,9 @@ package com.pinterest.flink.connector.psc.sink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.util.function.Consumer; @@ -24,6 +27,7 @@ import static org.apache.flink.util.Preconditions.checkState; class Recyclable implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(Recyclable.class); private T object; private final Consumer recycler; @@ -43,6 +47,7 @@ boolean isRecycled() { @Override public void close() { + LOG.info("Closing recyclable: " + object); recycler.accept(object); object = null; } diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java index aa506a7..bb6abe5 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java @@ -83,6 +83,7 @@ void abortLingeringTransactions(List prefixesToAbort, long startCheckpoi * is responsible for all even and subtask 1 for all odd subtasks. */ private void abortTransactionsWithPrefix(String prefix, long startCheckpointId) throws ProducerException { + LOG.info("Aborting transactions in subtask {} with prefix: {}", this.subtaskId, prefix); for (int subtaskId = this.subtaskId; ; subtaskId += parallelism) { if (abortTransactionOfSubtask(prefix, startCheckpointId, subtaskId) == 0) { // If Flink didn't abort any transaction for current subtask, then we assume that no @@ -134,7 +135,7 @@ public void close() { LOG.info("Closing transaction aborter"); Thread.dumpStack(); if (producer != null) { - LOG.info("CloseAction: {}", closeAction); + LOG.info("CloseAction: {}", producer); closeAction.accept(producer); } }