Skip to content

Commit

Permalink
Add even more logs to debug producerLeak
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Nov 1, 2024
1 parent a44f0b6 commit 5ab0954
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void commit(Collection<CommitRequest<PscCommittable>> requests)
for (CommitRequest<PscCommittable> request : requests) {
final PscCommittable committable = request.getCommittable();
final String transactionalId = committable.getTransactionalId();
LOG.debug("Committing transaction {}", transactionalId);
LOG.info("Committing transaction {}", transactionalId);
Optional<Recyclable<? extends FlinkPscInternalProducer<?, ?>>> recyclable =
committable.getProducer();
FlinkPscInternalProducer<?, ?> producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class PscWriter<IN>
checkNotNull(recoveredStates, "recoveredStates"), lastCheckpointId + 1);
this.currentProducer = getTransactionalProducer(lastCheckpointId + 1);
this.currentProducer.beginTransaction();
LOG.info("producerPool.size(): " + producerPool.size());
LOG.info("producerPool.size() in thread={}: " + producerPool.size(), Thread.currentThread().getId());
} else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE
|| deliveryGuarantee == DeliveryGuarantee.NONE) {
this.currentProducer = new FlinkPscInternalProducer<>(this.pscProducerConfig, null);
Expand Down Expand Up @@ -223,15 +223,17 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException {

@Override
public Collection<PscCommittable> prepareCommit() {
LOG.info("Entering prepareCommit()");
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
final List<PscCommittable> committables;
try {
LOG.info("Committing producer {}", currentProducer);
committables = Collections.singletonList(
PscCommittable.of(currentProducer, producerPool::add));
} catch (ProducerException e) {
throw new RuntimeException(e);
}
LOG.debug("Committing {} committables.", committables);
LOG.info("Committing {} committables.", committables);
return committables;
}
return Collections.emptyList();
Expand Down Expand Up @@ -345,8 +347,8 @@ private FlinkPscInternalProducer<byte[], byte[]> getTransactionalProducer(long c

private FlinkPscInternalProducer<byte[], byte[]> getOrCreateTransactionalProducer(
String transactionalId) {
LOG.info("producerPool.size() in getOrCreateTransactionalProducer: " + producerPool.size());
LOG.info("producerPool: " + producerPool);
LOG.info("producerPool.size() in thread={} for getOrCreateTransactionalProducer: " + producerPool.size(), Thread.currentThread().getId());
LOG.info("producerPool in thread={}: " + producerPool, Thread.currentThread().getId());
FlinkPscInternalProducer<byte[], byte[]> producer = producerPool.poll();
try {
if (producer == null) {
Expand All @@ -357,7 +359,7 @@ private FlinkPscInternalProducer<byte[], byte[]> getOrCreateTransactionalProduce
if (!isMetricsInitialized)
initPscAndFlinkMetrics(producer);
} else {
LOG.info("Reusing transactional producer {}", producer.getTransactionalId());
LOG.info("Reusing transactional producer {} and initing transactionalId={}", producer.getTransactionalId(), transactionalId);
producer.initTransactionId(transactionalId);
}
} catch (ConfigurationException | ClientException | TopicUriSyntaxException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package com.pinterest.flink.connector.psc.sink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.function.Consumer;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

class Recyclable<T> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(Recyclable.class);
private T object;
private final Consumer<T> recycler;

Expand All @@ -43,6 +47,7 @@ boolean isRecycled() {

@Override
public void close() {
LOG.info("Closing recyclable: " + object);
recycler.accept(object);
object = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ void abortLingeringTransactions(List<String> prefixesToAbort, long startCheckpoi
* is responsible for all even and subtask 1 for all odd subtasks.
*/
private void abortTransactionsWithPrefix(String prefix, long startCheckpointId) throws ProducerException {
LOG.info("Aborting transactions in subtask {} with prefix: {}", this.subtaskId, prefix);
for (int subtaskId = this.subtaskId; ; subtaskId += parallelism) {
if (abortTransactionOfSubtask(prefix, startCheckpointId, subtaskId) == 0) {
// If Flink didn't abort any transaction for current subtask, then we assume that no
Expand Down Expand Up @@ -134,7 +135,7 @@ public void close() {
LOG.info("Closing transaction aborter");
Thread.dumpStack();
if (producer != null) {
LOG.info("CloseAction: {}", closeAction);
LOG.info("CloseAction: {}", producer);
closeAction.accept(producer);
}
}
Expand Down

0 comments on commit 5ab0954

Please sign in to comment.